Skip to content
Snippets Groups Projects
Commit 30f98612 authored by brinn's avatar brinn
Browse files

refactor: simplify threading model of CopyActivityMonitor

SVN: 3108
parent 0340229e
No related branches found
No related tags found
No related merge requests found
...@@ -18,13 +18,15 @@ package ch.systemsx.cisd.datamover.filesystem.remote; ...@@ -18,13 +18,15 @@ package ch.systemsx.cisd.datamover.filesystem.remote;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.Future;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities;
import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel; import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel;
import ch.systemsx.cisd.common.logging.ISimpleLogger;
import ch.systemsx.cisd.common.logging.Log4jSimpleLogger;
import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogCategory;
import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.logging.LogFactory;
import ch.systemsx.cisd.common.utilities.ITerminable; import ch.systemsx.cisd.common.utilities.ITerminable;
...@@ -33,7 +35,7 @@ import ch.systemsx.cisd.datamover.filesystem.intf.FileStore; ...@@ -33,7 +35,7 @@ import ch.systemsx.cisd.datamover.filesystem.intf.FileStore;
import ch.systemsx.cisd.datamover.intf.ITimingParameters; import ch.systemsx.cisd.datamover.intf.ITimingParameters;
/** /**
* A <code>CopyActivityMonitor</code> monitors write activity on a <var>destinationPath</var> and triggers an alarm * A <code>CopyActivityMonitor</code> monitors write activity on a <var>destinationStore</var> and triggers an alarm
* if there was a period of inactivity that exceeds a given inactivity period. * if there was a period of inactivity that exceeds a given inactivity period.
* *
* @author Bernd Rinn * @author Bernd Rinn
...@@ -45,14 +47,18 @@ public class CopyActivityMonitor ...@@ -45,14 +47,18 @@ public class CopyActivityMonitor
private static final Logger machineLog = LogFactory.getLogger(LogCategory.MACHINE, CopyActivityMonitor.class); private static final Logger machineLog = LogFactory.getLogger(LogCategory.MACHINE, CopyActivityMonitor.class);
private final FileStore destinationDirectory; private final FileStore destinationStore;
private final long checkIntervallMillis; private final long checkIntervallMillis;
private final long quickCheckActivityMillis;
private final long inactivityPeriodMillis; private final long inactivityPeriodMillis;
private final String threadNamePrefix; private final String threadNamePrefix;
private final ITerminable terminable;
/** /**
* We need to keep a reference to the timer since we want to be able to cancel it and because otherwise the timer * We need to keep a reference to the timer since we want to be able to cancel it and because otherwise the timer
* thread will terminate. * thread will terminate.
...@@ -60,59 +66,45 @@ public class CopyActivityMonitor ...@@ -60,59 +66,45 @@ public class CopyActivityMonitor
private Timer activityMonitoringTimer; private Timer activityMonitoringTimer;
/** /**
* Unfortunately there is no way for a {@link TimerTask} to know whether it has been cancelled. So we have to keep * Unfortunately there is no way for a {@link TimerTask} to know whether it has been canceled. So we have to keep it
* it around in order to be able to terminate it directly. * around in order to be able to terminate it directly.
*/ */
private ActivityMonitoringTimerTask activityMonitoringTimerTask; private ActivityMonitoringTimerTask activityMonitoringTimerTask;
/** /**
* The current number of the activity monitor. Starts with 1 is in increased by 1 every time a new activity monitor * Creates a monitor. Uses 20% of <code>timingParameters.getCheckIntervalMillis()</code> for the quick check.
* is started because the old one has get stuck. *
*/ * @param destinationStore The file store to monitor for write access.
private int currentNumberOfActivityMonitor; * @param copyProcess The {@link ITerminable} representing the copy process. This will get terminated if the copy
* process gets stuck.
/** * @param timingParameters The {@link ITimingParameters} to get the check interval and the inactivity period from.
* We need to keep a reference to the timer since otherwise the timer thread will terminate.
*/
private final Timer inactivityReportingTimer;
/**
* A <code>null</code> reference means: no monitoring.
*/
private final AtomicReference<StoreItem> pathToBeCopied;
/**
* The time in milliseconds since start of the epoch when the monitored path has last been changed.
*/
private final AtomicLong monitoredPathLastChanged;
/**
* The time in milliseconds since start of the epoch when the monitored path has last been checked for changes.
*/ */
private final AtomicLong monitoredPathLastChecked; public CopyActivityMonitor(FileStore destinationStore, ITerminable copyProcess, ITimingParameters timingParameters)
{
this(destinationStore, copyProcess, timingParameters, (long) (timingParameters.getCheckIntervalMillis() * 0.2));
}
/** /**
* Creates a monitor. * Creates a monitor.
* *
* @param destinationDirectory The directory to monitor for write access. * @param destinationStore The file store to monitor for write access.
* @param copyProcess The {@link ITerminable} representing the copy process. This will get terminated if the copy * @param copyProcess The {@link ITerminable} representing the copy process. This will get terminated if the copy
* process gets stuck. * process gets stuck.
* @param timingParameters The {@link ITimingParameters} to get the check interval and the inactivity period from. * @param timingParameters The {@link ITimingParameters} to get the check interval and the inactivity period from.
* @param quickCheckActivityMillis The time to give the monitor for quickly check recently changed files.
*/ */
public CopyActivityMonitor(FileStore destinationDirectory, ITerminable copyProcess, public CopyActivityMonitor(FileStore destinationStore, ITerminable copyProcess, ITimingParameters timingParameters,
ITimingParameters timingParameters) long quickCheckActivityMillis)
{ {
this.monitoredPathLastChecked = new AtomicLong(0); assert destinationStore != null;
this.monitoredPathLastChanged = new AtomicLong(0);
this.pathToBeCopied = new AtomicReference<StoreItem>(null);
assert destinationDirectory != null;
assert copyProcess != null; assert copyProcess != null;
assert timingParameters != null; assert timingParameters != null;
this.destinationDirectory = destinationDirectory; this.destinationStore = destinationStore;
this.terminable = copyProcess;
this.checkIntervallMillis = timingParameters.getCheckIntervalMillis(); this.checkIntervallMillis = timingParameters.getCheckIntervalMillis();
this.inactivityPeriodMillis = timingParameters.getInactivityPeriodMillis(); this.inactivityPeriodMillis = timingParameters.getInactivityPeriodMillis();
this.quickCheckActivityMillis = quickCheckActivityMillis;
assert this.checkIntervallMillis > 0; assert this.checkIntervallMillis > 0;
...@@ -124,136 +116,109 @@ public class CopyActivityMonitor ...@@ -124,136 +116,109 @@ public class CopyActivityMonitor
{ {
this.threadNamePrefix = currentThreadName + " - "; this.threadNamePrefix = currentThreadName + " - ";
} }
this.currentNumberOfActivityMonitor = 0;
startNewActivityMonitor();
this.inactivityReportingTimer = new Timer(threadNamePrefix + "Inactivity Reporter", true);
this.inactivityReportingTimer.schedule(new InactivityReportingTimerTask(copyProcess), 0, timingParameters
.getCheckIntervalMillis());
}
/**
* Starts a new activity monitor and {@link Timer#cancel()}s the old one if any.
*/
private void startNewActivityMonitor()
{
if (activityMonitoringTimer != null)
{
// This may or may not help to get rid of the thread, but at least we try it.
activityMonitoringTimer.cancel();
activityMonitoringTimerTask.terminate();
}
++currentNumberOfActivityMonitor;
activityMonitoringTimer =
new Timer(threadNamePrefix + "Activity Monitor " + currentNumberOfActivityMonitor, true);
activityMonitoringTimerTask = new ActivityMonitoringTimerTask();
activityMonitoringTimer.schedule(activityMonitoringTimerTask, 0, checkIntervallMillis);
} }
/** /**
* Starts the activity monitoring. * Starts the activity monitoring.
* *
* @param newPathToBeCopied The path that will be copied to the destination directory and whose write progress * @param itemToBeCopied The item that will be copied to the destination file store and whose write progress
* should be monitored. * should be monitored.
*/ */
public void start(StoreItem newPathToBeCopied) public void start(StoreItem itemToBeCopied)
{ {
assert newPathToBeCopied != null; assert itemToBeCopied != null;
// Ensure the alarm won't be run before the copier has a chance to get active. activityMonitoringTimer = new Timer(threadNamePrefix + "Activity Monitor", true);
final long now = System.currentTimeMillis(); activityMonitoringTimerTask = new ActivityMonitoringTimerTask(itemToBeCopied);
monitoredPathLastChecked.set(now); activityMonitoringTimer.schedule(activityMonitoringTimerTask, 0, checkIntervallMillis);
monitoredPathLastChanged.set(now);
pathToBeCopied.set(newPathToBeCopied);
} }
/** /**
* Stops the activity monitoring. * Stops the activity monitoring. The activity monitor must not be used after calling this method.
*/ */
public void stop() public void stop()
{ {
pathToBeCopied.set(null); activityMonitoringTimer.cancel();
} }
/** /**
* A {@link TimerTask} that monitors writing activity on a directory. * A {@link TimerTask} that monitors writing activity on a directory.
*/ */
private final class ActivityMonitoringTimerTask extends TimerTask implements ITerminable private final class ActivityMonitoringTimerTask extends TimerTask
{ {
private final static long SAFETY_MARGIN_MILLIS = 20000L; private static final String TERMINATION_LOG_TEMPLATE = "Terminating %s due to a lack of activity.";
private static final String INACTIVITY_REPORT_TEMPLATE =
"No progress on copying '%s' to '%s' for %f seconds - network connection might be stalled.";
private final ExecutorService lastChangedExecutor =
ConcurrencyUtilities.newNamedPool("Last Changed Explorer", 1, Integer.MAX_VALUE);
private AtomicBoolean terminated = new AtomicBoolean(false); private final StoreItem itemToBeCopied;
private AtomicReference<Thread> timerThread = new AtomicReference<Thread>(null);
private long monitoredItemLastChanged;
private ActivityMonitoringTimerTask() private ActivityMonitoringTimerTask(StoreItem itemToBeCopied)
{ {
assert pathToBeCopied != null; assert terminable != null;
assert monitoredPathLastChanged != null; assert itemToBeCopied != null;
assert destinationDirectory != null; assert destinationStore != null;
monitoredPathLastChecked.set(System.currentTimeMillis()); // Ensure the alarm won't be run before the copier has a chance to get active.
this.monitoredItemLastChanged = System.currentTimeMillis();
this.itemToBeCopied = itemToBeCopied;
} }
@Override @Override
public void run() public void run()
{ {
final StoreItem item = pathToBeCopied.get();
if (item == null)
{
return;
}
if (operationLog.isTraceEnabled()) if (operationLog.isTraceEnabled())
{ {
operationLog.trace("Start activity monitoring run."); operationLog.trace("Start activity monitoring run.");
} }
timerThread.set(Thread.currentThread());
try try
{ {
if (operationLog.isTraceEnabled()) if (operationLog.isTraceEnabled())
{ {
operationLog.trace(String.format("Asking for last change time of '%s' inside '%s'.", item, operationLog.trace(String.format("Asking for last change time of '%s' inside '%s'.",
destinationDirectory)); itemToBeCopied, destinationStore));
} }
if (destinationDirectory.exists(item) == false) if (destinationStore.exists(itemToBeCopied) == false)
{ {
operationLog.warn(String.format("File or directory '%s' inside '%s' does not (yet?) exist.", item, operationLog.warn(String.format("File or directory '%s' inside '%s' does not (yet?) exist.",
destinationDirectory)); itemToBeCopied, destinationStore));
monitoredPathLastChecked.set(System.currentTimeMillis());
return; return;
} }
final long lastChangedAsFoundByPathChecker = lastChanged(destinationDirectory, item); final long lastChangedAsFoundByPathChecker =
lastChanged(destinationStore, itemToBeCopied, monitoredItemLastChanged);
if (operationLog.isTraceEnabled()) if (operationLog.isTraceEnabled())
{ {
operationLog.trace(String.format( operationLog.trace(String.format(
"Checker reported last changed time of '%s' inside '%s' to be %3$tF %3$tT.", item, "Checker reported last changed time of '%s' inside '%s' to be %3$tF %3$tT.",
destinationDirectory, lastChangedAsFoundByPathChecker)); itemToBeCopied, destinationStore, lastChangedAsFoundByPathChecker));
}
if (terminated.get()) // Don't modify the time variables any more if we got terminated.
{
operationLog.warn("Activity monitor got terminated.");
return;
} }
final long lastChecked = monitoredPathLastChecked.get();
final long lastLastChanged = monitoredPathLastChanged.get();
// This catches the case where since the last check copying a files has been finished (and consequently // This catches the case where since the last check copying a files has been finished (and consequently
// the // the "last changed" time has been set to that of the source file), but copying of the next file has
// "last changed" time has been set to that of the source file), but copying of the next file has not // not yet been started.
// yet been final long lastChanged = Math.max(lastChangedAsFoundByPathChecker, monitoredItemLastChanged);
// started.
final long now = System.currentTimeMillis(); final long now = System.currentTimeMillis();
final long lastChanged =
Math.max(lastChangedAsFoundByPathChecker, lastLastChanged + (now - lastChecked) - 1);
if (lastChanged > now) // That can happen if the system clock of the data producer is screwed up. if (lastChanged > now) // That can happen if the system clock of the data producer is screwed up.
{ {
machineLog.error(String.format("Found \"last changed time\" in the future (%1$tF %1$tT), " machineLog.error(String.format("Found \"last changed time\" in the future (%1$tF %1$tT), "
+ "check system clock of data producer.", lastChanged)); + "check system clock of data producer.", lastChanged));
} }
monitoredPathLastChecked.set(now); monitoredItemLastChanged = lastChanged;
monitoredPathLastChanged.set(lastChanged); final long noProgressSinceMillis = now - lastChanged;
if (noProgressSinceMillis > inactivityPeriodMillis)
{
machineLog.error(String.format(INACTIVITY_REPORT_TEMPLATE, itemToBeCopied, destinationStore,
noProgressSinceMillis / 1000.0f));
operationLog.warn(String.format(TERMINATION_LOG_TEMPLATE, terminable.getClass().getName()));
terminable.terminate();
stop();
}
} catch (CheckedExceptionTunnel ex) } catch (CheckedExceptionTunnel ex)
{ {
if (ex.getCause() instanceof InterruptedException) if (ex.getCause() instanceof InterruptedException)
...@@ -265,7 +230,6 @@ public class CopyActivityMonitor ...@@ -265,7 +230,6 @@ public class CopyActivityMonitor
} }
} finally } finally
{ {
timerThread.set(null);
if (operationLog.isTraceEnabled()) if (operationLog.isTraceEnabled())
{ {
operationLog.trace("Finished activity monitoring run."); operationLog.trace("Finished activity monitoring run.");
...@@ -273,102 +237,74 @@ public class CopyActivityMonitor ...@@ -273,102 +237,74 @@ public class CopyActivityMonitor
} }
} }
private long lastChanged(FileStore store, StoreItem item) private long lastChanged(FileStore store, StoreItem item, long lastLastChanged)
{ {
final long now = System.currentTimeMillis(); // Give the system quickCheckActivityMillis to find recently changed files, otherwise perform full check
final long stopWhenYoungerThan = now - (inactivityPeriodMillis - SAFETY_MARGIN_MILLIS); final long stopWhenYoungerThan =
final long lastChangedAsFoundByPathChecker = System.currentTimeMillis() - (inactivityPeriodMillis - 2 * quickCheckActivityMillis);
store.lastChanged(item, stopWhenYoungerThan); final ISimpleLogger simpleMachineLog = new Log4jSimpleLogger(machineLog);
// Check if it took too long to find the value. final Future<Long> quickCheckLastChangedFuture =
if (System.currentTimeMillis() - now > SAFETY_MARGIN_MILLIS / 2) lastChangedExecutor.submit(createCheckerCallable(store, item, stopWhenYoungerThan));
final Long quickLastChanged =
ConcurrencyUtilities.tryGetResult(quickCheckLastChangedFuture, quickCheckActivityMillis,
simpleMachineLog, "Quick check for recent paths");
if (quickLastChanged == null)
{ {
return store.lastChanged(item, 0L); if (machineLog.isDebugEnabled())
{
machineLog.debug("Performing full check for most recent path now.");
}
final Future<Long> lastChangedFuture =
lastChangedExecutor.submit(createCheckerCallable(store, item, 0L));
final long timeoutMillis = Math.min(checkIntervallMillis * 3, inactivityPeriodMillis);
final Long lastChanged =
ConcurrencyUtilities.tryGetResult(lastChangedFuture, timeoutMillis, simpleMachineLog,
"Check for recent paths");
if (lastChanged == null)
{
operationLog.error(String
.format("Could not determine \"last changed time\" of %s: time out.", item));
return lastLastChanged;
}
return lastChanged;
} else } else
{ {
return lastChangedAsFoundByPathChecker; return quickLastChanged;
} }
} }
/**
* @return Always <code>true</code>.
*/
public boolean terminate()
{
final Thread timerThreadOrNull = timerThread.get();
if (timerThreadOrNull != null)
{
timerThreadOrNull.interrupt();
}
terminated.set(true);
return true;
}
}
/**
* A {@link TimerTask} that reports a lack of write activity on a directory which is supposed to be a stall in the
* copy operation of an item to this directory.
*/
private final class InactivityReportingTimerTask extends TimerTask
{
private static final String ACTIVITY_MONITOR_STUCK_TEMPLATE =
"The activity monitor timer thread %d got stuck, starting a new one.";
private static final String TERMINATION_LOG_TEMPLATE = "Terminating %s due to a lack of activity.";
private static final String INACTIVITY_REPORT_TEMPLATE =
"No progress on copying '%s' to '%s' for %f seconds - network connection might be stalled.";
private final ITerminable terminable;
public InactivityReportingTimerTask(ITerminable terminable) private Callable<Long> createCheckerCallable(final FileStore store, final StoreItem item,
final long stopWhenYoungerThan)
{ {
assert terminable != null; return new Callable<Long>()
assert inactivityPeriodMillis > 0; {
assert monitoredPathLastChanged != null; public Long call() throws Exception
assert destinationDirectory != null; {
if (machineLog.isTraceEnabled())
this.terminable = terminable; {
} machineLog.trace("Starting quick check for recent paths on '" + item + "'.");
}
@Override try
public void run() {
{ final long lastChanged = store.lastChanged(item, stopWhenYoungerThan);
final StoreItem path = pathToBeCopied.get(); if (machineLog.isTraceEnabled())
if (path == null) {
{ machineLog.trace(String.format(
return; "Finishing quick check for recent paths on '%s', found to be %2$tF %2$tT.",
} item, lastChanged));
}
if (operationLog.isTraceEnabled()) return lastChanged;
{ } catch (RuntimeException ex)
operationLog.trace("Start inactivity reporting run."); {
} if (machineLog.isTraceEnabled())
{
final long now = System.currentTimeMillis(); final Throwable th = (ex instanceof CheckedExceptionTunnel) ? ex.getCause() : ex;
final long noCheckSinceMillis = now - monitoredPathLastChecked.get(); machineLog.trace("Failed on quick check for recent paths on '" + item + "'.", th);
final long noProgressSinceMillis = now - monitoredPathLastChanged.get(); }
if (noCheckSinceMillis > Math.min(checkIntervallMillis * 3, inactivityPeriodMillis)) throw ex;
{ }
operationLog.warn(String.format(ACTIVITY_MONITOR_STUCK_TEMPLATE, currentNumberOfActivityMonitor)); }
startNewActivityMonitor(); };
}
if (noProgressSinceMillis > inactivityPeriodMillis)
{
machineLog.error(String.format(INACTIVITY_REPORT_TEMPLATE, path, destinationDirectory,
noProgressSinceMillis / 1000.0f));
operationLog.warn(String.format(TERMINATION_LOG_TEMPLATE, terminable.getClass().getName()));
terminable.terminate();
activityMonitoringTimer.cancel();
activityMonitoringTimerTask.terminate();
stop();
}
if (operationLog.isTraceEnabled())
{
operationLog.trace("Finished inactivity reporting run.");
}
} }
} }
......
...@@ -88,12 +88,12 @@ public class CopyActivityMonitorTest ...@@ -88,12 +88,12 @@ public class CopyActivityMonitorTest
} }
} }
private static interface LastChangedChecker private static interface ILastChangedChecker
{ {
public long lastChanged(StoreItem item, long stopWhenFindYounger); public long lastChanged(StoreItem item, long stopWhenFindYounger);
} }
private final class HappyPathLastChangedChecker implements LastChangedChecker private final class HappyPathLastChangedChecker implements ILastChangedChecker
{ {
public long lastChanged(StoreItem item, long stopWhenFindYounger) public long lastChanged(StoreItem item, long stopWhenFindYounger)
{ {
...@@ -104,26 +104,34 @@ public class CopyActivityMonitorTest ...@@ -104,26 +104,34 @@ public class CopyActivityMonitorTest
private final class MyTimingParameters implements ITimingParameters private final class MyTimingParameters implements ITimingParameters
{ {
private final long inactivityPeriodMillis;
private final int maximalNumberOfRetries; private final int maximalNumberOfRetries;
MyTimingParameters(int maximalNumberOfRetries) MyTimingParameters(int maximalNumberOfRetries)
{ {
this(maximalNumberOfRetries, INACTIVITY_PERIOD_MILLIS);
}
MyTimingParameters(int maximalNumberOfRetries, long inactivityPeriodMillis)
{
this.inactivityPeriodMillis = inactivityPeriodMillis;
this.maximalNumberOfRetries = maximalNumberOfRetries; this.maximalNumberOfRetries = maximalNumberOfRetries;
} }
public long getCheckIntervalMillis() public long getCheckIntervalMillis()
{ {
return INACTIVITY_PERIOD_MILLIS / 10; return inactivityPeriodMillis / 10;
} }
public long getQuietPeriodMillis() public long getQuietPeriodMillis()
{ {
return INACTIVITY_PERIOD_MILLIS / 10; return inactivityPeriodMillis / 10;
} }
public long getInactivityPeriodMillis() public long getInactivityPeriodMillis()
{ {
return INACTIVITY_PERIOD_MILLIS; return inactivityPeriodMillis;
} }
public long getIntervalToWaitAfterFailure() public long getIntervalToWaitAfterFailure()
...@@ -137,13 +145,13 @@ public class CopyActivityMonitorTest ...@@ -137,13 +145,13 @@ public class CopyActivityMonitorTest
} }
} }
private FileStore asFileStore(File directory, final LastChangedChecker checker) private FileStore asFileStore(File directory, final ILastChangedChecker checker)
{ {
IFileSysOperationsFactory factory = FileOperationsUtil.createTestFatory(); IFileSysOperationsFactory factory = FileOperationsUtil.createTestFatory();
return asFileStore(directory, checker, factory); return asFileStore(directory, checker, factory);
} }
private FileStore asFileStore(File directory, final LastChangedChecker checker, IFileSysOperationsFactory factory) private FileStore asFileStore(File directory, final ILastChangedChecker checker, IFileSysOperationsFactory factory)
{ {
final FileStoreLocal localImpl = new FileStoreLocal(directory, "input-test", factory); final FileStoreLocal localImpl = new FileStoreLocal(directory, "input-test", factory);
return new FileStore(directory, null, false, "input-test", factory) return new FileStore(directory, null, false, "input-test", factory)
...@@ -234,7 +242,7 @@ public class CopyActivityMonitorTest ...@@ -234,7 +242,7 @@ public class CopyActivityMonitorTest
{ "slow" }) { "slow" })
public void testHappyPath() throws Throwable public void testHappyPath() throws Throwable
{ {
final LastChangedChecker checker = new HappyPathLastChangedChecker(); final ILastChangedChecker checker = new HappyPathLastChangedChecker();
final ITerminable dummyTerminable = new DummyTerminable(); final ITerminable dummyTerminable = new DummyTerminable();
final ITimingParameters parameters = new MyTimingParameters(0); final ITimingParameters parameters = new MyTimingParameters(0);
final CopyActivityMonitor monitor = final CopyActivityMonitor monitor =
...@@ -249,7 +257,7 @@ public class CopyActivityMonitorTest ...@@ -249,7 +257,7 @@ public class CopyActivityMonitorTest
{ "slow" }) { "slow" })
public void testCopyStalled() throws Throwable public void testCopyStalled() throws Throwable
{ {
final LastChangedChecker checker = new PathLastChangedCheckerStalled(); final ILastChangedChecker checker = new PathLastChangedCheckerStalled();
final MockTerminable copyProcess = new MockTerminable(); final MockTerminable copyProcess = new MockTerminable();
final ITimingParameters parameters = new MyTimingParameters(0); final ITimingParameters parameters = new MyTimingParameters(0);
final CopyActivityMonitor monitor = final CopyActivityMonitor monitor =
...@@ -261,7 +269,7 @@ public class CopyActivityMonitorTest ...@@ -261,7 +269,7 @@ public class CopyActivityMonitorTest
assert copyProcess.isTerminated(); assert copyProcess.isTerminated();
} }
private final class SimulateShortInterruptionChangedChecker implements LastChangedChecker private final class SimulateShortInterruptionChangedChecker implements ILastChangedChecker
{ {
private int numberOfTimesCalled = 0; private int numberOfTimesCalled = 0;
...@@ -293,7 +301,7 @@ public class CopyActivityMonitorTest ...@@ -293,7 +301,7 @@ public class CopyActivityMonitorTest
{ "slow" }) { "slow" })
public void testCopySeemsStalledButActuallyIsFine() throws Throwable public void testCopySeemsStalledButActuallyIsFine() throws Throwable
{ {
final LastChangedChecker checker = new SimulateShortInterruptionChangedChecker(); final ILastChangedChecker checker = new SimulateShortInterruptionChangedChecker();
final MockTerminable copyProcess = new MockTerminable(); final MockTerminable copyProcess = new MockTerminable();
final ITimingParameters parameters = new MyTimingParameters(0); final ITimingParameters parameters = new MyTimingParameters(0);
final CopyActivityMonitor monitor = final CopyActivityMonitor monitor =
...@@ -305,7 +313,7 @@ public class CopyActivityMonitorTest ...@@ -305,7 +313,7 @@ public class CopyActivityMonitorTest
assert copyProcess.isTerminated() == false; assert copyProcess.isTerminated() == false;
} }
private final class PathLastChangedCheckerStalled implements LastChangedChecker private final class PathLastChangedCheckerStalled implements ILastChangedChecker
{ {
public long lastChanged(StoreItem item, long stopWhenFindYounger) public long lastChanged(StoreItem item, long stopWhenFindYounger)
{ {
...@@ -315,48 +323,41 @@ public class CopyActivityMonitorTest ...@@ -315,48 +323,41 @@ public class CopyActivityMonitorTest
@Test(groups = @Test(groups =
{ "slow" }) { "slow" })
public void testActivityMonitorStuck() throws Throwable public void testActivityMonitorTimedOut() throws Throwable
{ {
LogMonitoringAppender appender =
LogMonitoringAppender.addAppender(LogCategory.OPERATION, "Activity monitor got terminated");
final PathLastChangedCheckerDelayed checker = new PathLastChangedCheckerDelayed(INACTIVITY_PERIOD_MILLIS); final PathLastChangedCheckerDelayed checker = new PathLastChangedCheckerDelayed(INACTIVITY_PERIOD_MILLIS);
final MockTerminable copyProcess = new MockTerminable(); final MockTerminable copyProcess = new MockTerminable();
final ITimingParameters parameters = new MyTimingParameters(0); final ITimingParameters parameters = new MyTimingParameters(0);
final CopyActivityMonitor monitor = final CopyActivityMonitor monitor =
new CopyActivityMonitor(asFileStore(workingDirectory, checker), copyProcess, parameters); new CopyActivityMonitor(asFileStore(workingDirectory, checker), copyProcess, parameters);
final StoreItem item = createDirectoryInside(workingDirectory); final StoreItem item = createDirectoryInside(workingDirectory);
final LogMonitoringAppender appender =
LogMonitoringAppender.addAppender(LogCategory.OPERATION,
String.format("Could not determine \"last changed time\" of %s: time out.", item));
monitor.start(item); monitor.start(item);
Thread.sleep(INACTIVITY_PERIOD_MILLIS * 15); Thread.sleep(INACTIVITY_PERIOD_MILLIS * 15);
monitor.stop(); monitor.stop();
LogMonitoringAppender.removeAppender(appender); LogMonitoringAppender.removeAppender(appender);
assertTrue(checker.interrupted()); assertTrue(checker.lastCheckInterrupted());
assertTrue(copyProcess.isTerminated()); assertTrue(copyProcess.isTerminated());
appender.verifyLogHasHappened(); appender.verifyLogHasHappened();
} }
@Test(groups = @Test(groups =
{ "slow" }) { "slow" })
public void testActivityMonitorFirstStuckSecondWorking() throws Throwable public void testActivityMonitorOnceTimedOutTheOK() throws Throwable
{ {
LogMonitoringAppender appender = final PathLastChangedCheckerDelayed checker = new PathLastChangedCheckerDelayed(INACTIVITY_PERIOD_MILLIS, 0L);
LogMonitoringAppender.addAppender(LogCategory.OPERATION, "got stuck, starting a new one");
final PathLastChangedCheckerDelayed checker =
new PathLastChangedCheckerDelayed(INACTIVITY_PERIOD_MILLIS, 0L);
final MockTerminable copyProcess = new MockTerminable(); final MockTerminable copyProcess = new MockTerminable();
final ITimingParameters parameters = new MyTimingParameters(0); final ITimingParameters parameters = new MyTimingParameters(0);
final CopyActivityMonitor monitor = final CopyActivityMonitor monitor =
new CopyActivityMonitor(asFileStore(workingDirectory, checker), copyProcess, parameters); new CopyActivityMonitor(asFileStore(workingDirectory, checker), copyProcess, parameters);
final File directory = new File(workingDirectory, "some-directory"); final StoreItem item = createDirectoryInside(workingDirectory);
directory.mkdir();
directory.deleteOnExit();
final StoreItem item = createDirectoryInside(directory);
monitor.start(item); monitor.start(item);
Thread.sleep(INACTIVITY_PERIOD_MILLIS * 15); Thread.sleep(INACTIVITY_PERIOD_MILLIS * 15);
monitor.stop(); monitor.stop();
LogMonitoringAppender.removeAppender(appender); assertFalse(checker.lastCheckInterrupted());
assertFalse(checker.interrupted());
assertFalse(copyProcess.isTerminated()); assertFalse(copyProcess.isTerminated());
appender.verifyLogHappendNTimes(1);
} }
private StoreItem createDirectoryInside(File parentDir) private StoreItem createDirectoryInside(File parentDir)
...@@ -368,19 +369,49 @@ public class CopyActivityMonitorTest ...@@ -368,19 +369,49 @@ public class CopyActivityMonitorTest
return item; return item;
} }
private final class PathLastChangedCheckerDelayed implements LastChangedChecker @Test(groups = "slow")
public void testTriggerFullCheck() throws Throwable
{
final LogMonitoringAppender appender =
LogMonitoringAppender.addAppender(LogCategory.MACHINE,
"Performing full check for most recent path now.");
final PathLastChangedCheckerDelayed checker =
new PathLastChangedCheckerDelayed(0L, (long) (INACTIVITY_PERIOD_MILLIS / 10 * 1.5), 0L);
final MockTerminable copyProcess = new MockTerminable();
final ITimingParameters parameters = new MyTimingParameters(0, INACTIVITY_PERIOD_MILLIS);
final CopyActivityMonitor monitor =
new CopyActivityMonitor(asFileStore(workingDirectory, checker), copyProcess, parameters,
INACTIVITY_PERIOD_MILLIS / 10);
final File directory = new File(workingDirectory, "some-directory");
directory.mkdir();
directory.deleteOnExit();
final StoreItem item = createDirectoryInside(directory);
monitor.start(item);
Thread.sleep(INACTIVITY_PERIOD_MILLIS * 15);
monitor.stop();
LogMonitoringAppender.removeAppender(appender);
assertFalse(checker.lastCheckInterrupted());
assertEquals(1, checker.getInterruptionCount());
assertFalse(copyProcess.isTerminated());
appender.verifyLogHappendNTimes(1);
}
private final class PathLastChangedCheckerDelayed implements ILastChangedChecker
{ {
private final long[] delayMillis; private final long[] delayMillis;
private int callNumber; private int callNumber;
private boolean interrupted; private volatile boolean interrupted;
private int interruptionCount;
public PathLastChangedCheckerDelayed(long... delayMillis) public PathLastChangedCheckerDelayed(long... delayMillis)
{ {
assert delayMillis.length > 0; assert delayMillis.length > 0;
this.interrupted = false; this.interrupted = false;
this.interruptionCount = 0;
this.delayMillis = delayMillis; this.delayMillis = delayMillis;
} }
...@@ -406,6 +437,7 @@ public class CopyActivityMonitorTest ...@@ -406,6 +437,7 @@ public class CopyActivityMonitorTest
} catch (InterruptedException e) } catch (InterruptedException e)
{ {
this.interrupted = true; this.interrupted = true;
++this.interruptionCount;
// That is what we expect if we are terminated. // That is what we expect if we are terminated.
throw new CheckedExceptionTunnel(new InterruptedException(e.getMessage())); throw new CheckedExceptionTunnel(new InterruptedException(e.getMessage()));
} }
...@@ -413,11 +445,16 @@ public class CopyActivityMonitorTest ...@@ -413,11 +445,16 @@ public class CopyActivityMonitorTest
return System.currentTimeMillis(); return System.currentTimeMillis();
} }
boolean interrupted() synchronized boolean lastCheckInterrupted()
{ {
return interrupted; return interrupted;
} }
int getInterruptionCount()
{
return interruptionCount;
}
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment