diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/FileStore.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/FileStore.java index 42a8606f523a7b986e8b89fa1f2264077e75ca36..2a741df2978b02dabbb7c005bedac2d8e60669f9 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/FileStore.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/FileStore.java @@ -173,11 +173,14 @@ public abstract class FileStore public abstract boolean exists(StoreItem item); /** - * Returns the last time when there was a write access to <var>resource</var>. - * + * Returns the last time when there was a write access to <var>item</var>. + * + * @param item The {@link StoreItem} to check. + * @param stopWhenFindYounger If > 0, the recursive search for younger file will be stopped when a file or + * directory is found that is as young as or younger than the time specified in this parameter. * @return The time (in milliseconds since the start of the epoch) when <var>resource</var> was last changed. */ - public abstract long lastChanged(StoreItem item); + public abstract long lastChanged(StoreItem item, long stopWhenFindYounger); /** * List files in the scanned store. Sort in order of "oldest first". diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java index 21822b59292a7e57d3946476128d5e728836dc80..96c5dbc965eb1f0652188f05d9e007a6ff010055 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java @@ -49,6 +49,8 @@ public class CopyActivityMonitor private final long checkIntervallMillis; + private final long inactivityPeriodMillis; + private final String threadNamePrefix; /** @@ -110,6 +112,7 @@ public class CopyActivityMonitor this.destinationDirectory = destinationDirectory; this.checkIntervallMillis = timingParameters.getCheckIntervalMillis(); + this.inactivityPeriodMillis = timingParameters.getInactivityPeriodMillis(); assert this.checkIntervallMillis > 0; @@ -126,8 +129,8 @@ public class CopyActivityMonitor startNewActivityMonitor(); this.inactivityReportingTimer = new Timer(threadNamePrefix + "Inactivity Reporter", true); - this.inactivityReportingTimer.schedule(new InactivityReportingTimerTask(copyProcess, timingParameters - .getInactivityPeriodMillis()), 0, timingParameters.getCheckIntervalMillis()); + this.inactivityReportingTimer.schedule(new InactivityReportingTimerTask(copyProcess), 0, timingParameters + .getCheckIntervalMillis()); } /** @@ -179,6 +182,8 @@ public class CopyActivityMonitor private final class ActivityMonitoringTimerTask extends TimerTask implements ITerminable { + private final static long SAFETY_MARGIN_MILLIS = 20000L; + private AtomicBoolean terminated = new AtomicBoolean(false); private AtomicReference<Thread> timerThread = new AtomicReference<Thread>(null); @@ -220,11 +225,11 @@ public class CopyActivityMonitor monitoredPathLastChecked.set(System.currentTimeMillis()); return; } - final long lastChangedAsFoundByPathChecker = destinationDirectory.lastChanged(item); + final long lastChangedAsFoundByPathChecker = lastChanged(destinationDirectory, item); if (operationLog.isTraceEnabled()) { operationLog.trace(String.format( - "Reported last changed time of '%s' inside '%s' to be %3$tF %3$tT.", item, + "Checker reported last changed time of '%s' inside '%s' to be %3$tF %3$tT.", item, destinationDirectory, lastChangedAsFoundByPathChecker)); } if (terminated.get()) // Don't modify the time variables any more if we got terminated. @@ -234,12 +239,12 @@ public class CopyActivityMonitor } final long lastChecked = monitoredPathLastChecked.get(); final long lastLastChanged = monitoredPathLastChanged.get(); - final long now = System.currentTimeMillis(); // This catches the case where since the last check copying a files has been finished (and consequently // the // "last changed" time has been set to that of the source file), but copying of the next file has not // yet been // started. + final long now = System.currentTimeMillis(); final long lastChanged = Math.max(lastChangedAsFoundByPathChecker, lastLastChanged + (now - lastChecked) - 1); if (lastChanged > now) // That can happen if the system clock of the data producer is screwed up. @@ -268,6 +273,22 @@ public class CopyActivityMonitor } } + private long lastChanged(FileStore store, StoreItem item) + { + final long now = System.currentTimeMillis(); + final long stopWhenYoungerThan = now - (inactivityPeriodMillis - SAFETY_MARGIN_MILLIS); + final long lastChangedAsFoundByPathChecker = + store.lastChanged(item, stopWhenYoungerThan); + // Check if it took too long to find the value. + if (System.currentTimeMillis() - now > SAFETY_MARGIN_MILLIS / 2) + { + return store.lastChanged(item, 0L); + } else + { + return lastChangedAsFoundByPathChecker; + } + } + /** * @return Always <code>true</code>. */ @@ -299,11 +320,9 @@ public class CopyActivityMonitor private static final String INACTIVITY_REPORT_TEMPLATE = "No progress on copying '%s' to '%s' for %f seconds - network connection might be stalled."; - private final long inactivityPeriodMillis; - private final ITerminable terminable; - public InactivityReportingTimerTask(ITerminable terminable, long inactivityPeriodMillis) + public InactivityReportingTimerTask(ITerminable terminable) { assert terminable != null; assert inactivityPeriodMillis > 0; @@ -311,7 +330,6 @@ public class CopyActivityMonitor assert destinationDirectory != null; this.terminable = terminable; - this.inactivityPeriodMillis = inactivityPeriodMillis; } @Override diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreLocal.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreLocal.java index 6799da89569fabd4ea69f3de8cc63a128c7168fa..29a3aa290f7df636e3517b310a745566b771d848 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreLocal.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreLocal.java @@ -66,9 +66,9 @@ public class FileStoreLocal extends ExtendedFileStore } @Override - public long lastChanged(StoreItem item) + public long lastChanged(StoreItem item, long stopWhenFindYounger) { - return FileUtilities.lastChanged(getChildFile(item)); + return FileUtilities.lastChanged(getChildFile(item), true, stopWhenFindYounger); } @Override diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemote.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemote.java index 72b179dfb70e1a5cf8195699f31e334b52db5ac9..493e268862e1d37ffcdc9c8590287b270f33e2e3 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemote.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemote.java @@ -62,7 +62,7 @@ public class FileStoreRemote extends FileStore } @Override - public long lastChanged(StoreItem item) + public long lastChanged(StoreItem item, long stopWhenFindYounger) { // TODO 2007-10-09, Tomasz Pylak: implement ssh tunneling mode return 0; diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java index 9fdfe169fbc9793d9c16fa3f104750902636c102..25476145f2a4974f4a663b95028c690fccd5fb30 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java @@ -77,9 +77,9 @@ public class FileStoreRemoteMounted extends FileStore } @Override - public long lastChanged(StoreItem item) + public long lastChanged(StoreItem item, long stopWhenFindYounger) { - return localImpl.lastChanged(item); + return localImpl.lastChanged(item, stopWhenFindYounger); } @Override diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/utils/QuietPeriodFileFilter.java b/datamover/source/java/ch/systemsx/cisd/datamover/utils/QuietPeriodFileFilter.java index 0ae1d580b51e86239313e1583b9e2d37c6a983da..ffd03aa74c21de0baa48417cb318dc5166d78f7d 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/utils/QuietPeriodFileFilter.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/utils/QuietPeriodFileFilter.java @@ -29,6 +29,9 @@ import ch.systemsx.cisd.datamover.intf.ITimingParameters; */ public class QuietPeriodFileFilter { + + private static final long SAFETY_MARGIN_MILLIS = 2000L; + private final long quietPeriodMillis; private final FileStore store; @@ -37,10 +40,13 @@ public class QuietPeriodFileFilter * Creates a <var>QuietPeriodFileFilter</var>. * * @param store The store in which items reside - * @param timingParameters The timing paramter object to get the quiet period from. + * @param timingParameters The timing parameter object to get the quiet period from. */ public QuietPeriodFileFilter(FileStore store, ITimingParameters timingParameters) { + assert store != null; + assert timingParameters != null; + this.store = store; this.quietPeriodMillis = timingParameters.getQuietPeriodMillis(); assert quietPeriodMillis > 0; @@ -48,7 +54,9 @@ public class QuietPeriodFileFilter public boolean accept(StoreItem item) { - return (System.currentTimeMillis() - store.lastChanged(item)) > quietPeriodMillis; + final long now = System.currentTimeMillis(); + final long stopWhenFindYounger = now - (quietPeriodMillis - SAFETY_MARGIN_MILLIS); + return (System.currentTimeMillis() - store.lastChanged(item, stopWhenFindYounger)) > quietPeriodMillis; } } diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java index 841cff4629348c3d4b9a89bc49ab0220ee941ff0..2581be93f0a93f3aa010040635aaf3354e5ada9b 100644 --- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java +++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java @@ -90,12 +90,12 @@ public class CopyActivityMonitorTest private static interface LastChangedChecker { - public long lastChanged(StoreItem item); + public long lastChanged(StoreItem item, long stopWhenFindYounger); } private final class HappyPathLastChangedChecker implements LastChangedChecker { - public long lastChanged(StoreItem item) + public long lastChanged(StoreItem item, long stopWhenFindYounger) { return System.currentTimeMillis() - INACTIVITY_PERIOD_MILLIS / 2; } @@ -161,9 +161,9 @@ public class CopyActivityMonitorTest } @Override - public long lastChanged(StoreItem item) + public long lastChanged(StoreItem item, long stopWhenFindYounger) { - return checker.lastChanged(item); + return checker.lastChanged(item, stopWhenFindYounger); } @Override @@ -265,7 +265,7 @@ public class CopyActivityMonitorTest { private int numberOfTimesCalled = 0; - public long lastChanged(StoreItem item) + public long lastChanged(StoreItem item, long stopWhenFindYounger) { ++numberOfTimesCalled; if (numberOfTimesCalled == 2) @@ -307,7 +307,7 @@ public class CopyActivityMonitorTest private final class PathLastChangedCheckerStalled implements LastChangedChecker { - public long lastChanged(StoreItem item) + public long lastChanged(StoreItem item, long stopWhenFindYounger) { return System.currentTimeMillis() - INACTIVITY_PERIOD_MILLIS * 2; } @@ -399,7 +399,7 @@ public class CopyActivityMonitorTest } } - public long lastChanged(StoreItem item) + public long lastChanged(StoreItem item, long stopWhenFindYounger) { try {