Skip to content
Snippets Groups Projects
Commit ed5f1b85 authored by brinn's avatar brinn
Browse files

refactor: simplify CopyActivityMonitor by introducing cutoff periods instead...

refactor: simplify CopyActivityMonitor by introducing cutoff periods instead of cutoff times when checking for the last change time

SVN: 3122
parent b4235f68
No related branches found
No related tags found
No related merge requests found
...@@ -178,11 +178,12 @@ public abstract class FileStore ...@@ -178,11 +178,12 @@ public abstract class FileStore
* Returns the last time when there was a write access to <var>item</var>. * Returns the last time when there was a write access to <var>item</var>.
* *
* @param item The {@link StoreItem} to check. * @param item The {@link StoreItem} to check.
* @param stopWhenFindYounger If &gt; 0, the recursive search for younger file will be stopped when a file or * @param stopWhenFindYoungerRelative If &gt; 0, the recursive search for younger file will be stopped when a file
* directory is found that is as young as or younger than the time specified in this parameter. * or directory is found that is as young as or younger than
* <code>System.currentTimeMillis() - stopWhenYoungerRelative</code>.
* @return The time (in milliseconds since the start of the epoch) when <var>resource</var> was last changed. * @return The time (in milliseconds since the start of the epoch) when <var>resource</var> was last changed.
*/ */
public abstract long lastChanged(StoreItem item, long stopWhenFindYounger); public abstract long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative);
/** /**
* List files in the scanned store. Sort in order of "oldest first". * List files in the scanned store. Sort in order of "oldest first".
......
...@@ -51,8 +51,6 @@ public class CopyActivityMonitor ...@@ -51,8 +51,6 @@ public class CopyActivityMonitor
private final long checkIntervallMillis; private final long checkIntervallMillis;
private final long quickCheckActivityMillis;
private final long inactivityPeriodMillis; private final long inactivityPeriodMillis;
private final String threadNamePrefix; private final String threadNamePrefix;
...@@ -71,19 +69,6 @@ public class CopyActivityMonitor ...@@ -71,19 +69,6 @@ public class CopyActivityMonitor
*/ */
private ActivityMonitoringTimerTask activityMonitoringTimerTask; private ActivityMonitoringTimerTask activityMonitoringTimerTask;
/**
* Creates a monitor. Uses 20% of <code>timingParameters.getCheckIntervalMillis()</code> for the quick check.
*
* @param destinationStore The file store to monitor for write access.
* @param copyProcess The {@link ITerminable} representing the copy process. This will get terminated if the copy
* process gets stuck.
* @param timingParameters The {@link ITimingParameters} to get the check interval and the inactivity period from.
*/
public CopyActivityMonitor(FileStore destinationStore, ITerminable copyProcess, ITimingParameters timingParameters)
{
this(destinationStore, copyProcess, timingParameters, (long) (timingParameters.getCheckIntervalMillis() * 0.2));
}
/** /**
* Creates a monitor. * Creates a monitor.
* *
...@@ -91,10 +76,8 @@ public class CopyActivityMonitor ...@@ -91,10 +76,8 @@ public class CopyActivityMonitor
* @param copyProcess The {@link ITerminable} representing the copy process. This will get terminated if the copy * @param copyProcess The {@link ITerminable} representing the copy process. This will get terminated if the copy
* process gets stuck. * process gets stuck.
* @param timingParameters The {@link ITimingParameters} to get the check interval and the inactivity period from. * @param timingParameters The {@link ITimingParameters} to get the check interval and the inactivity period from.
* @param quickCheckActivityMillis The time to give the monitor for quickly check recently changed files.
*/ */
public CopyActivityMonitor(FileStore destinationStore, ITerminable copyProcess, ITimingParameters timingParameters, public CopyActivityMonitor(FileStore destinationStore, ITerminable copyProcess, ITimingParameters timingParameters)
long quickCheckActivityMillis)
{ {
assert destinationStore != null; assert destinationStore != null;
assert copyProcess != null; assert copyProcess != null;
...@@ -104,7 +87,6 @@ public class CopyActivityMonitor ...@@ -104,7 +87,6 @@ public class CopyActivityMonitor
this.terminable = copyProcess; this.terminable = copyProcess;
this.checkIntervallMillis = timingParameters.getCheckIntervalMillis(); this.checkIntervallMillis = timingParameters.getCheckIntervalMillis();
this.inactivityPeriodMillis = timingParameters.getInactivityPeriodMillis(); this.inactivityPeriodMillis = timingParameters.getInactivityPeriodMillis();
this.quickCheckActivityMillis = quickCheckActivityMillis;
assert this.checkIntervallMillis > 0; assert this.checkIntervallMillis > 0;
...@@ -121,8 +103,8 @@ public class CopyActivityMonitor ...@@ -121,8 +103,8 @@ public class CopyActivityMonitor
/** /**
* Starts the activity monitoring. * Starts the activity monitoring.
* *
* @param itemToBeCopied The item that will be copied to the destination file store and whose write progress * @param itemToBeCopied The item that will be copied to the destination file store and whose write progress should
* should be monitored. * be monitored.
*/ */
public void start(StoreItem itemToBeCopied) public void start(StoreItem itemToBeCopied)
{ {
...@@ -156,7 +138,7 @@ public class CopyActivityMonitor ...@@ -156,7 +138,7 @@ public class CopyActivityMonitor
ConcurrencyUtilities.newNamedPool("Last Changed Explorer", 1, Integer.MAX_VALUE); ConcurrencyUtilities.newNamedPool("Last Changed Explorer", 1, Integer.MAX_VALUE);
private final StoreItem itemToBeCopied; private final StoreItem itemToBeCopied;
private long monitoredItemLastChanged; private long monitoredItemLastChanged;
private ActivityMonitoringTimerTask(StoreItem itemToBeCopied) private ActivityMonitoringTimerTask(StoreItem itemToBeCopied)
...@@ -239,38 +221,25 @@ public class CopyActivityMonitor ...@@ -239,38 +221,25 @@ public class CopyActivityMonitor
private long lastChanged(FileStore store, StoreItem item, long lastLastChanged) private long lastChanged(FileStore store, StoreItem item, long lastLastChanged)
{ {
// Give the system quickCheckActivityMillis to find recently changed files, otherwise perform full check
final long stopWhenYoungerThan =
System.currentTimeMillis() - (inactivityPeriodMillis - 2 * quickCheckActivityMillis);
final ISimpleLogger simpleMachineLog = new Log4jSimpleLogger(machineLog); final ISimpleLogger simpleMachineLog = new Log4jSimpleLogger(machineLog);
final Future<Long> quickCheckLastChangedFuture = final Future<Long> lastChangedFuture =
lastChangedExecutor.submit(createCheckerCallable(store, item, stopWhenYoungerThan)); lastChangedExecutor.submit(createCheckerCallable(store, item,
final Long quickLastChanged = minusSafetyMargin(inactivityPeriodMillis)));
ConcurrencyUtilities.tryGetResult(quickCheckLastChangedFuture, quickCheckActivityMillis, final long timeoutMillis = Math.min(checkIntervallMillis * 3, inactivityPeriodMillis);
simpleMachineLog, "Quick check for recent paths"); final Long lastChanged =
if (quickLastChanged == null) ConcurrencyUtilities.tryGetResult(lastChangedFuture, timeoutMillis, simpleMachineLog,
"Check for recent paths");
if (lastChanged == null)
{ {
if (machineLog.isDebugEnabled()) operationLog.error(String.format("Could not determine \"last changed time\" of %s: time out.", item));
{ return lastLastChanged;
machineLog.debug("Performing full check for most recent path now.");
}
final Future<Long> lastChangedFuture =
lastChangedExecutor.submit(createCheckerCallable(store, item, 0L));
final long timeoutMillis = Math.min(checkIntervallMillis * 3, inactivityPeriodMillis);
final Long lastChanged =
ConcurrencyUtilities.tryGetResult(lastChangedFuture, timeoutMillis, simpleMachineLog,
"Check for recent paths");
if (lastChanged == null)
{
operationLog.error(String
.format("Could not determine \"last changed time\" of %s: time out.", item));
return lastLastChanged;
}
return lastChanged;
} else
{
return quickLastChanged;
} }
return lastChanged;
}
private long minusSafetyMargin(long period)
{
return Math.max(0L, period - 1000L);
} }
private Callable<Long> createCheckerCallable(final FileStore store, final StoreItem item, private Callable<Long> createCheckerCallable(final FileStore store, final StoreItem item,
...@@ -286,7 +255,7 @@ public class CopyActivityMonitor ...@@ -286,7 +255,7 @@ public class CopyActivityMonitor
} }
try try
{ {
final long lastChanged = store.lastChanged(item, stopWhenYoungerThan); final long lastChanged = store.lastChangedRelative(item, stopWhenYoungerThan);
if (machineLog.isTraceEnabled()) if (machineLog.isTraceEnabled())
{ {
machineLog.trace(String.format( machineLog.trace(String.format(
......
...@@ -66,9 +66,9 @@ public class FileStoreLocal extends ExtendedFileStore ...@@ -66,9 +66,9 @@ public class FileStoreLocal extends ExtendedFileStore
} }
@Override @Override
public long lastChanged(StoreItem item, long stopWhenFindYounger) public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative)
{ {
return FileUtilities.lastChanged(getChildFile(item), true, stopWhenFindYounger); return FileUtilities.lastChangedRelative(getChildFile(item), true, stopWhenFindYoungerRelative);
} }
@Override @Override
......
...@@ -62,7 +62,7 @@ public class FileStoreRemote extends FileStore ...@@ -62,7 +62,7 @@ public class FileStoreRemote extends FileStore
} }
@Override @Override
public long lastChanged(StoreItem item, long stopWhenFindYounger) public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative)
{ {
// TODO 2007-10-09, Tomasz Pylak: implement ssh tunneling mode // TODO 2007-10-09, Tomasz Pylak: implement ssh tunneling mode
return 0; return 0;
......
...@@ -77,9 +77,9 @@ public class FileStoreRemoteMounted extends FileStore ...@@ -77,9 +77,9 @@ public class FileStoreRemoteMounted extends FileStore
} }
@Override @Override
public long lastChanged(StoreItem item, long stopWhenFindYounger) public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative)
{ {
return localImpl.lastChanged(item, stopWhenFindYounger); return localImpl.lastChangedRelative(item, stopWhenFindYoungerRelative);
} }
@Override @Override
......
...@@ -44,9 +44,9 @@ public class QuietPeriodFileFilter ...@@ -44,9 +44,9 @@ public class QuietPeriodFileFilter
*/ */
public QuietPeriodFileFilter(FileStore store, ITimingParameters timingParameters) public QuietPeriodFileFilter(FileStore store, ITimingParameters timingParameters)
{ {
assert store != null; assert store != null;
assert timingParameters != null; assert timingParameters != null;
this.store = store; this.store = store;
this.quietPeriodMillis = timingParameters.getQuietPeriodMillis(); this.quietPeriodMillis = timingParameters.getQuietPeriodMillis();
assert quietPeriodMillis > 0; assert quietPeriodMillis > 0;
...@@ -54,9 +54,8 @@ public class QuietPeriodFileFilter ...@@ -54,9 +54,8 @@ public class QuietPeriodFileFilter
public boolean accept(StoreItem item) public boolean accept(StoreItem item)
{ {
final long now = System.currentTimeMillis(); final long stopWhenFindYoungerRelative = quietPeriodMillis - SAFETY_MARGIN_MILLIS;
final long stopWhenFindYounger = now - (quietPeriodMillis - SAFETY_MARGIN_MILLIS); return (System.currentTimeMillis() - store.lastChangedRelative(item, stopWhenFindYoungerRelative)) > quietPeriodMillis;
return (System.currentTimeMillis() - store.lastChanged(item, stopWhenFindYounger)) > quietPeriodMillis;
} }
} }
...@@ -90,13 +90,21 @@ public class CopyActivityMonitorTest ...@@ -90,13 +90,21 @@ public class CopyActivityMonitorTest
private static interface ILastChangedChecker private static interface ILastChangedChecker
{ {
public long lastChanged(StoreItem item, long stopWhenFindYounger); public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative);
} }
private final class HappyPathLastChangedChecker implements ILastChangedChecker private final class HappyPathLastChangedChecker implements ILastChangedChecker
{ {
public long lastChanged(StoreItem item, long stopWhenFindYounger) private final long stopWhenFindYoungerRelativeExpected;
public HappyPathLastChangedChecker(long stopWhenFindYoungerRelativeExpected)
{ {
this.stopWhenFindYoungerRelativeExpected = stopWhenFindYoungerRelativeExpected;
}
public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative)
{
assertEquals(stopWhenFindYoungerRelativeExpected, stopWhenFindYoungerRelative);
return System.currentTimeMillis() - INACTIVITY_PERIOD_MILLIS / 2; return System.currentTimeMillis() - INACTIVITY_PERIOD_MILLIS / 2;
} }
} }
...@@ -169,9 +177,9 @@ public class CopyActivityMonitorTest ...@@ -169,9 +177,9 @@ public class CopyActivityMonitorTest
} }
@Override @Override
public long lastChanged(StoreItem item, long stopWhenFindYounger) public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative)
{ {
return checker.lastChanged(item, stopWhenFindYounger); return checker.lastChangedRelative(item, stopWhenFindYoungerRelative);
} }
@Override @Override
...@@ -242,9 +250,10 @@ public class CopyActivityMonitorTest ...@@ -242,9 +250,10 @@ public class CopyActivityMonitorTest
{ "slow" }) { "slow" })
public void testHappyPath() throws Throwable public void testHappyPath() throws Throwable
{ {
final ILastChangedChecker checker = new HappyPathLastChangedChecker();
final ITerminable dummyTerminable = new DummyTerminable(); final ITerminable dummyTerminable = new DummyTerminable();
final ITimingParameters parameters = new MyTimingParameters(0); final long inactivityPeriodMillis = 5000L;
final ITimingParameters parameters = new MyTimingParameters(0, inactivityPeriodMillis);
final ILastChangedChecker checker = new HappyPathLastChangedChecker(inactivityPeriodMillis - 1000L);
final CopyActivityMonitor monitor = final CopyActivityMonitor monitor =
new CopyActivityMonitor(asFileStore(workingDirectory, checker), dummyTerminable, parameters); new CopyActivityMonitor(asFileStore(workingDirectory, checker), dummyTerminable, parameters);
StoreItem item = createDirectoryInside(workingDirectory); StoreItem item = createDirectoryInside(workingDirectory);
...@@ -273,7 +282,7 @@ public class CopyActivityMonitorTest ...@@ -273,7 +282,7 @@ public class CopyActivityMonitorTest
{ {
private int numberOfTimesCalled = 0; private int numberOfTimesCalled = 0;
public long lastChanged(StoreItem item, long stopWhenFindYounger) public long lastChangedRelative(StoreItem item, long stopWhenFindYounger)
{ {
++numberOfTimesCalled; ++numberOfTimesCalled;
if (numberOfTimesCalled == 2) if (numberOfTimesCalled == 2)
...@@ -292,10 +301,10 @@ public class CopyActivityMonitorTest ...@@ -292,10 +301,10 @@ public class CopyActivityMonitorTest
/** /**
* This test case catches a case that I first hadn't thought of: since we use <code>rsync</code> in a mode where * This test case catches a case that I first hadn't thought of: since we use <code>rsync</code> in a mode where
* at the end of copying a file they set the "last modified" time back to the one of the source file, there is a * at the end of copying a file they set the "last modified" time back to the one of the source file, there is a
* short time interval after finishing copying one file anst starting copying the next file where the copy monitor * short time interval after finishing copying one file and starting copying the next file where the copy monitor
* could be tempted to trigger false alarm: the just finished file will have already the "last modified" time of the * could be tempted to trigger false alarm: the just finished file will have already the "last modified" time of the
* source file (which is when the data produce finished writing the source file). In fact everything is fine but * source file (which is when the data produce finished writing the source file). In fact everything is fine but
* still the copy process will be cancelled. * still the copy process will be canceled.
*/ */
@Test(groups = @Test(groups =
{ "slow" }) { "slow" })
...@@ -315,7 +324,7 @@ public class CopyActivityMonitorTest ...@@ -315,7 +324,7 @@ public class CopyActivityMonitorTest
private final class PathLastChangedCheckerStalled implements ILastChangedChecker private final class PathLastChangedCheckerStalled implements ILastChangedChecker
{ {
public long lastChanged(StoreItem item, long stopWhenFindYounger) public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative)
{ {
return System.currentTimeMillis() - INACTIVITY_PERIOD_MILLIS * 2; return System.currentTimeMillis() - INACTIVITY_PERIOD_MILLIS * 2;
} }
...@@ -369,33 +378,6 @@ public class CopyActivityMonitorTest ...@@ -369,33 +378,6 @@ public class CopyActivityMonitorTest
return item; return item;
} }
@Test(groups = "slow")
public void testTriggerFullCheck() throws Throwable
{
final LogMonitoringAppender appender =
LogMonitoringAppender.addAppender(LogCategory.MACHINE,
"Performing full check for most recent path now.");
final PathLastChangedCheckerDelayed checker =
new PathLastChangedCheckerDelayed(0L, (long) (INACTIVITY_PERIOD_MILLIS / 10 * 1.5), 0L);
final MockTerminable copyProcess = new MockTerminable();
final ITimingParameters parameters = new MyTimingParameters(0, INACTIVITY_PERIOD_MILLIS);
final CopyActivityMonitor monitor =
new CopyActivityMonitor(asFileStore(workingDirectory, checker), copyProcess, parameters,
INACTIVITY_PERIOD_MILLIS / 10);
final File directory = new File(workingDirectory, "some-directory");
directory.mkdir();
directory.deleteOnExit();
final StoreItem item = createDirectoryInside(directory);
monitor.start(item);
Thread.sleep(INACTIVITY_PERIOD_MILLIS * 15);
monitor.stop();
LogMonitoringAppender.removeAppender(appender);
assertFalse(checker.lastCheckInterrupted());
assertEquals(1, checker.getInterruptionCount());
assertFalse(copyProcess.isTerminated());
appender.verifyLogHappendNTimes(1);
}
private final class PathLastChangedCheckerDelayed implements ILastChangedChecker private final class PathLastChangedCheckerDelayed implements ILastChangedChecker
{ {
private final long[] delayMillis; private final long[] delayMillis;
...@@ -429,7 +411,7 @@ public class CopyActivityMonitorTest ...@@ -429,7 +411,7 @@ public class CopyActivityMonitorTest
} }
} }
public long lastChanged(StoreItem item, long stopWhenFindYounger) public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative)
{ {
try try
{ {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment