From 2e620c059ea64e9d13a9a0daba5ed67ac1b7317e Mon Sep 17 00:00:00 2001 From: brinn <brinn> Date: Thu, 29 Nov 2007 20:15:52 +0000 Subject: [PATCH] merged from branch datamover/1.0.x, r2788 fix: reset trigger when activity monitoring takes a long time and gets terminated change: interrupt monitoring thread if either the monitoring thread or the copy thread gets terminated SVN: 2861 --- .../remote/CopyActivityMonitor.java | 22 +++++ .../remote/CopyActivityMonitorTest.java | 84 +++++++++++++++---- 2 files changed, 90 insertions(+), 16 deletions(-) diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java index 7fbd142c3c4..21822b59292 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.log4j.Logger; +import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel; import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.utilities.ITerminable; @@ -179,12 +180,15 @@ public class CopyActivityMonitor { private AtomicBoolean terminated = new AtomicBoolean(false); + private AtomicReference<Thread> timerThread = new AtomicReference<Thread>(null); private ActivityMonitoringTimerTask() { assert pathToBeCopied != null; assert monitoredPathLastChanged != null; assert destinationDirectory != null; + + monitoredPathLastChecked.set(System.currentTimeMillis()); } @Override @@ -201,6 +205,7 @@ public class CopyActivityMonitor operationLog.trace("Start activity monitoring run."); } + timerThread.set(Thread.currentThread()); try { if (operationLog.isTraceEnabled()) @@ -244,8 +249,18 @@ public class CopyActivityMonitor } monitoredPathLastChecked.set(now); monitoredPathLastChanged.set(lastChanged); + } catch (CheckedExceptionTunnel ex) + { + if (ex.getCause() instanceof InterruptedException) + { + operationLog.warn("Activity monitor got terminated."); + } else + { + throw ex; + } } finally { + timerThread.set(null); if (operationLog.isTraceEnabled()) { operationLog.trace("Finished activity monitoring run."); @@ -258,6 +273,11 @@ public class CopyActivityMonitor */ public boolean terminate() { + final Thread timerThreadOrNull = timerThread.get(); + if (timerThreadOrNull != null) + { + timerThreadOrNull.interrupt(); + } terminated.set(true); return true; } @@ -322,6 +342,8 @@ public class CopyActivityMonitor noProgressSinceMillis / 1000.0f)); operationLog.warn(String.format(TERMINATION_LOG_TEMPLATE, terminable.getClass().getName())); terminable.terminate(); + activityMonitoringTimer.cancel(); + activityMonitoringTimerTask.terminate(); stop(); } diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java index faf41223b77..841cff46293 100644 --- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java +++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java @@ -27,7 +27,6 @@ import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel; import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.logging.ISimpleLogger; import ch.systemsx.cisd.common.logging.LogCategory; -import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.logging.LogInitializer; import ch.systemsx.cisd.common.logging.LogMonitoringAppender; import ch.systemsx.cisd.common.utilities.ITerminable; @@ -40,6 +39,8 @@ import ch.systemsx.cisd.datamover.filesystem.store.FileStoreLocal; import ch.systemsx.cisd.datamover.intf.ITimingParameters; import ch.systemsx.cisd.datamover.testhelper.FileOperationsUtil; +import static org.testng.AssertJUnit.*; + /** * Test cases for the {@link CopyActivityMonitor} class. * @@ -318,21 +319,47 @@ public class CopyActivityMonitorTest { LogMonitoringAppender appender = LogMonitoringAppender.addAppender(LogCategory.OPERATION, "Activity monitor got terminated"); - LogFactory.getLogger(LogCategory.OPERATION, CopyActivityMonitor.class).addAppender(appender); - final LastChangedChecker checker = new PathLastChangedCheckerStuck(); + final PathLastChangedCheckerDelayed checker = new PathLastChangedCheckerDelayed(INACTIVITY_PERIOD_MILLIS); final MockTerminable copyProcess = new MockTerminable(); final ITimingParameters parameters = new MyTimingParameters(0); final CopyActivityMonitor monitor = new CopyActivityMonitor(asFileStore(workingDirectory, checker), copyProcess, parameters); - StoreItem item = createDirectoryInside(workingDirectory); + final StoreItem item = createDirectoryInside(workingDirectory); monitor.start(item); Thread.sleep(INACTIVITY_PERIOD_MILLIS * 15); monitor.stop(); LogMonitoringAppender.removeAppender(appender); - assert copyProcess.isTerminated(); + assertTrue(checker.interrupted()); + assertTrue(copyProcess.isTerminated()); appender.verifyLogHasHappened(); } + @Test(groups = + { "slow" }) + public void testActivityMonitorFirstStuckSecondWorking() throws Throwable + { + LogMonitoringAppender appender = + LogMonitoringAppender.addAppender(LogCategory.OPERATION, "got stuck, starting a new one"); + final PathLastChangedCheckerDelayed checker = + new PathLastChangedCheckerDelayed(INACTIVITY_PERIOD_MILLIS, + (long) (INACTIVITY_PERIOD_MILLIS / 10 * 1.5)); + final MockTerminable copyProcess = new MockTerminable(); + final ITimingParameters parameters = new MyTimingParameters(0); + final CopyActivityMonitor monitor = + new CopyActivityMonitor(asFileStore(workingDirectory, checker), copyProcess, parameters); + final File directory = new File(workingDirectory, "some-directory"); + directory.mkdir(); + directory.deleteOnExit(); + final StoreItem item = createDirectoryInside(directory); + monitor.start(item); + Thread.sleep(INACTIVITY_PERIOD_MILLIS * 15); + monitor.stop(); + LogMonitoringAppender.removeAppender(appender); + assertFalse(checker.interrupted()); + assertFalse(copyProcess.isTerminated()); + appender.verifyLogHappendNTimes(1); + } + private StoreItem createDirectoryInside(File parentDir) { StoreItem item = new StoreItem("some-directory"); @@ -342,31 +369,56 @@ public class CopyActivityMonitorTest return item; } - private final class PathLastChangedCheckerStuck implements LastChangedChecker + private final class PathLastChangedCheckerDelayed implements LastChangedChecker { - private boolean interrupted = false; + private final long[] delayMillis; + + private int callNumber; + + private boolean interrupted; + + public PathLastChangedCheckerDelayed(long... delayMillis) + { + assert delayMillis.length > 0; + + this.interrupted = false; + this.delayMillis = delayMillis; + } + + private long timeToSleepMillis() + { + try + { + return delayMillis[callNumber]; + } finally + { + if (callNumber < delayMillis.length - 1) + { + ++callNumber; + } + } + } public long lastChanged(StoreItem item) { try { - Thread.sleep(INACTIVITY_PERIOD_MILLIS); // Wait longer than the activity monitor is willing to wait for - // us. + Thread.sleep(timeToSleepMillis()); // Wait predefined time. } catch (InterruptedException e) { - // Can't happen since this method runs in a TimerThread which isn't interrupted. - throw new CheckedExceptionTunnel(e); + this.interrupted = true; + // That is what we expect if we are terminated. + throw new CheckedExceptionTunnel(new InterruptedException(e.getMessage())); } - return 0; // Return value doesn't matter because the TImerTask is already terminated. + this.interrupted = false; + return System.currentTimeMillis(); } - /** - * @return If the checker has been interrupted. - */ - public boolean isInterrupted() + boolean interrupted() { return interrupted; } + } } -- GitLab