diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/FileStore.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/FileStore.java index 610bb2ee2c18b47210b2dd81e0d58afad742639c..babd7a6e0e3d90da617605a8cb287f50740a9311 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/FileStore.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/FileStore.java @@ -178,11 +178,12 @@ public abstract class FileStore * Returns the last time when there was a write access to <var>item</var>. * * @param item The {@link StoreItem} to check. - * @param stopWhenFindYounger If > 0, the recursive search for younger file will be stopped when a file or - * directory is found that is as young as or younger than the time specified in this parameter. + * @param stopWhenFindYoungerRelative If > 0, the recursive search for younger file will be stopped when a file + * or directory is found that is as young as or younger than + * <code>System.currentTimeMillis() - stopWhenYoungerRelative</code>. * @return The time (in milliseconds since the start of the epoch) when <var>resource</var> was last changed. */ - public abstract long lastChanged(StoreItem item, long stopWhenFindYounger); + public abstract long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative); /** * List files in the scanned store. Sort in order of "oldest first". diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java index d37ef5d5ac071bfd1701b1d60901bcb022117b19..51e0944afd814aa463e68ceff051c40d49983d50 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java @@ -51,8 +51,6 @@ public class CopyActivityMonitor private final long checkIntervallMillis; - private final long quickCheckActivityMillis; - private final long inactivityPeriodMillis; private final String threadNamePrefix; @@ -71,19 +69,6 @@ public class CopyActivityMonitor */ private ActivityMonitoringTimerTask activityMonitoringTimerTask; - /** - * Creates a monitor. Uses 20% of <code>timingParameters.getCheckIntervalMillis()</code> for the quick check. - * - * @param destinationStore The file store to monitor for write access. - * @param copyProcess The {@link ITerminable} representing the copy process. This will get terminated if the copy - * process gets stuck. - * @param timingParameters The {@link ITimingParameters} to get the check interval and the inactivity period from. - */ - public CopyActivityMonitor(FileStore destinationStore, ITerminable copyProcess, ITimingParameters timingParameters) - { - this(destinationStore, copyProcess, timingParameters, (long) (timingParameters.getCheckIntervalMillis() * 0.2)); - } - /** * Creates a monitor. * @@ -91,10 +76,8 @@ public class CopyActivityMonitor * @param copyProcess The {@link ITerminable} representing the copy process. This will get terminated if the copy * process gets stuck. * @param timingParameters The {@link ITimingParameters} to get the check interval and the inactivity period from. - * @param quickCheckActivityMillis The time to give the monitor for quickly check recently changed files. */ - public CopyActivityMonitor(FileStore destinationStore, ITerminable copyProcess, ITimingParameters timingParameters, - long quickCheckActivityMillis) + public CopyActivityMonitor(FileStore destinationStore, ITerminable copyProcess, ITimingParameters timingParameters) { assert destinationStore != null; assert copyProcess != null; @@ -104,7 +87,6 @@ public class CopyActivityMonitor this.terminable = copyProcess; this.checkIntervallMillis = timingParameters.getCheckIntervalMillis(); this.inactivityPeriodMillis = timingParameters.getInactivityPeriodMillis(); - this.quickCheckActivityMillis = quickCheckActivityMillis; assert this.checkIntervallMillis > 0; @@ -121,8 +103,8 @@ public class CopyActivityMonitor /** * Starts the activity monitoring. * - * @param itemToBeCopied The item that will be copied to the destination file store and whose write progress - * should be monitored. + * @param itemToBeCopied The item that will be copied to the destination file store and whose write progress should + * be monitored. */ public void start(StoreItem itemToBeCopied) { @@ -156,7 +138,7 @@ public class CopyActivityMonitor ConcurrencyUtilities.newNamedPool("Last Changed Explorer", 1, Integer.MAX_VALUE); private final StoreItem itemToBeCopied; - + private long monitoredItemLastChanged; private ActivityMonitoringTimerTask(StoreItem itemToBeCopied) @@ -239,38 +221,25 @@ public class CopyActivityMonitor private long lastChanged(FileStore store, StoreItem item, long lastLastChanged) { - // Give the system quickCheckActivityMillis to find recently changed files, otherwise perform full check - final long stopWhenYoungerThan = - System.currentTimeMillis() - (inactivityPeriodMillis - 2 * quickCheckActivityMillis); final ISimpleLogger simpleMachineLog = new Log4jSimpleLogger(machineLog); - final Future<Long> quickCheckLastChangedFuture = - lastChangedExecutor.submit(createCheckerCallable(store, item, stopWhenYoungerThan)); - final Long quickLastChanged = - ConcurrencyUtilities.tryGetResult(quickCheckLastChangedFuture, quickCheckActivityMillis, - simpleMachineLog, "Quick check for recent paths"); - if (quickLastChanged == null) + final Future<Long> lastChangedFuture = + lastChangedExecutor.submit(createCheckerCallable(store, item, + minusSafetyMargin(inactivityPeriodMillis))); + final long timeoutMillis = Math.min(checkIntervallMillis * 3, inactivityPeriodMillis); + final Long lastChanged = + ConcurrencyUtilities.tryGetResult(lastChangedFuture, timeoutMillis, simpleMachineLog, + "Check for recent paths"); + if (lastChanged == null) { - if (machineLog.isDebugEnabled()) - { - machineLog.debug("Performing full check for most recent path now."); - } - final Future<Long> lastChangedFuture = - lastChangedExecutor.submit(createCheckerCallable(store, item, 0L)); - final long timeoutMillis = Math.min(checkIntervallMillis * 3, inactivityPeriodMillis); - final Long lastChanged = - ConcurrencyUtilities.tryGetResult(lastChangedFuture, timeoutMillis, simpleMachineLog, - "Check for recent paths"); - if (lastChanged == null) - { - operationLog.error(String - .format("Could not determine \"last changed time\" of %s: time out.", item)); - return lastLastChanged; - } - return lastChanged; - } else - { - return quickLastChanged; + operationLog.error(String.format("Could not determine \"last changed time\" of %s: time out.", item)); + return lastLastChanged; } + return lastChanged; + } + + private long minusSafetyMargin(long period) + { + return Math.max(0L, period - 1000L); } private Callable<Long> createCheckerCallable(final FileStore store, final StoreItem item, @@ -286,7 +255,7 @@ public class CopyActivityMonitor } try { - final long lastChanged = store.lastChanged(item, stopWhenYoungerThan); + final long lastChanged = store.lastChangedRelative(item, stopWhenYoungerThan); if (machineLog.isTraceEnabled()) { machineLog.trace(String.format( diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreLocal.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreLocal.java index 25a28d47d0f9e6c9bce8097480d3190943dfdccb..cfc0a8c7fccd9f62a899efa6bf3851337f8e1f40 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreLocal.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreLocal.java @@ -66,9 +66,9 @@ public class FileStoreLocal extends ExtendedFileStore } @Override - public long lastChanged(StoreItem item, long stopWhenFindYounger) + public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative) { - return FileUtilities.lastChanged(getChildFile(item), true, stopWhenFindYounger); + return FileUtilities.lastChangedRelative(getChildFile(item), true, stopWhenFindYoungerRelative); } @Override diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemote.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemote.java index 89c8533bcfbf0a5e1659990f7d27cdb59f9017ff..dd4d8cf6fc527c86ba6d989cc2dc0570122e15b5 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemote.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemote.java @@ -62,7 +62,7 @@ public class FileStoreRemote extends FileStore } @Override - public long lastChanged(StoreItem item, long stopWhenFindYounger) + public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative) { // TODO 2007-10-09, Tomasz Pylak: implement ssh tunneling mode return 0; diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java index 5cd3e48ee160da7f5704808ab030789e80e6bbdc..6379f79267468b073381fbf57376c600cfb342a8 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java @@ -77,9 +77,9 @@ public class FileStoreRemoteMounted extends FileStore } @Override - public long lastChanged(StoreItem item, long stopWhenFindYounger) + public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative) { - return localImpl.lastChanged(item, stopWhenFindYounger); + return localImpl.lastChangedRelative(item, stopWhenFindYoungerRelative); } @Override diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/utils/QuietPeriodFileFilter.java b/datamover/source/java/ch/systemsx/cisd/datamover/utils/QuietPeriodFileFilter.java index ffd03aa74c21de0baa48417cb318dc5166d78f7d..bacd0cd3cc5fddefb37539be3a9456a5b7bad7ce 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/utils/QuietPeriodFileFilter.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/utils/QuietPeriodFileFilter.java @@ -44,9 +44,9 @@ public class QuietPeriodFileFilter */ public QuietPeriodFileFilter(FileStore store, ITimingParameters timingParameters) { - assert store != null; - assert timingParameters != null; - + assert store != null; + assert timingParameters != null; + this.store = store; this.quietPeriodMillis = timingParameters.getQuietPeriodMillis(); assert quietPeriodMillis > 0; @@ -54,9 +54,8 @@ public class QuietPeriodFileFilter public boolean accept(StoreItem item) { - final long now = System.currentTimeMillis(); - final long stopWhenFindYounger = now - (quietPeriodMillis - SAFETY_MARGIN_MILLIS); - return (System.currentTimeMillis() - store.lastChanged(item, stopWhenFindYounger)) > quietPeriodMillis; + final long stopWhenFindYoungerRelative = quietPeriodMillis - SAFETY_MARGIN_MILLIS; + return (System.currentTimeMillis() - store.lastChangedRelative(item, stopWhenFindYoungerRelative)) > quietPeriodMillis; } } diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java index 6780a0c7486923dad3c4e6867e4ae88eb99f39e6..fba98d6fe12e3c622535d7793e045087b9ea8ebc 100644 --- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java +++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java @@ -90,13 +90,21 @@ public class CopyActivityMonitorTest private static interface ILastChangedChecker { - public long lastChanged(StoreItem item, long stopWhenFindYounger); + public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative); } private final class HappyPathLastChangedChecker implements ILastChangedChecker { - public long lastChanged(StoreItem item, long stopWhenFindYounger) + private final long stopWhenFindYoungerRelativeExpected; + + public HappyPathLastChangedChecker(long stopWhenFindYoungerRelativeExpected) { + this.stopWhenFindYoungerRelativeExpected = stopWhenFindYoungerRelativeExpected; + } + + public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative) + { + assertEquals(stopWhenFindYoungerRelativeExpected, stopWhenFindYoungerRelative); return System.currentTimeMillis() - INACTIVITY_PERIOD_MILLIS / 2; } } @@ -169,9 +177,9 @@ public class CopyActivityMonitorTest } @Override - public long lastChanged(StoreItem item, long stopWhenFindYounger) + public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative) { - return checker.lastChanged(item, stopWhenFindYounger); + return checker.lastChangedRelative(item, stopWhenFindYoungerRelative); } @Override @@ -242,9 +250,10 @@ public class CopyActivityMonitorTest { "slow" }) public void testHappyPath() throws Throwable { - final ILastChangedChecker checker = new HappyPathLastChangedChecker(); final ITerminable dummyTerminable = new DummyTerminable(); - final ITimingParameters parameters = new MyTimingParameters(0); + final long inactivityPeriodMillis = 5000L; + final ITimingParameters parameters = new MyTimingParameters(0, inactivityPeriodMillis); + final ILastChangedChecker checker = new HappyPathLastChangedChecker(inactivityPeriodMillis - 1000L); final CopyActivityMonitor monitor = new CopyActivityMonitor(asFileStore(workingDirectory, checker), dummyTerminable, parameters); StoreItem item = createDirectoryInside(workingDirectory); @@ -273,7 +282,7 @@ public class CopyActivityMonitorTest { private int numberOfTimesCalled = 0; - public long lastChanged(StoreItem item, long stopWhenFindYounger) + public long lastChangedRelative(StoreItem item, long stopWhenFindYounger) { ++numberOfTimesCalled; if (numberOfTimesCalled == 2) @@ -292,10 +301,10 @@ public class CopyActivityMonitorTest /** * This test case catches a case that I first hadn't thought of: since we use <code>rsync</code> in a mode where * at the end of copying a file they set the "last modified" time back to the one of the source file, there is a - * short time interval after finishing copying one file anst starting copying the next file where the copy monitor + * short time interval after finishing copying one file and starting copying the next file where the copy monitor * could be tempted to trigger false alarm: the just finished file will have already the "last modified" time of the * source file (which is when the data produce finished writing the source file). In fact everything is fine but - * still the copy process will be cancelled. + * still the copy process will be canceled. */ @Test(groups = { "slow" }) @@ -315,7 +324,7 @@ public class CopyActivityMonitorTest private final class PathLastChangedCheckerStalled implements ILastChangedChecker { - public long lastChanged(StoreItem item, long stopWhenFindYounger) + public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative) { return System.currentTimeMillis() - INACTIVITY_PERIOD_MILLIS * 2; } @@ -369,33 +378,6 @@ public class CopyActivityMonitorTest return item; } - @Test(groups = "slow") - public void testTriggerFullCheck() throws Throwable - { - final LogMonitoringAppender appender = - LogMonitoringAppender.addAppender(LogCategory.MACHINE, - "Performing full check for most recent path now."); - final PathLastChangedCheckerDelayed checker = - new PathLastChangedCheckerDelayed(0L, (long) (INACTIVITY_PERIOD_MILLIS / 10 * 1.5), 0L); - final MockTerminable copyProcess = new MockTerminable(); - final ITimingParameters parameters = new MyTimingParameters(0, INACTIVITY_PERIOD_MILLIS); - final CopyActivityMonitor monitor = - new CopyActivityMonitor(asFileStore(workingDirectory, checker), copyProcess, parameters, - INACTIVITY_PERIOD_MILLIS / 10); - final File directory = new File(workingDirectory, "some-directory"); - directory.mkdir(); - directory.deleteOnExit(); - final StoreItem item = createDirectoryInside(directory); - monitor.start(item); - Thread.sleep(INACTIVITY_PERIOD_MILLIS * 15); - monitor.stop(); - LogMonitoringAppender.removeAppender(appender); - assertFalse(checker.lastCheckInterrupted()); - assertEquals(1, checker.getInterruptionCount()); - assertFalse(copyProcess.isTerminated()); - appender.verifyLogHappendNTimes(1); - } - private final class PathLastChangedCheckerDelayed implements ILastChangedChecker { private final long[] delayMillis; @@ -429,7 +411,7 @@ public class CopyActivityMonitorTest } } - public long lastChanged(StoreItem item, long stopWhenFindYounger) + public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative) { try {