diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/RemoteMonitoredMoverFactory.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/RemoteMonitoredMoverFactory.java index 0f739f8dfa46d38d8afd1996e884f14083242b68..75da0748c717b522ddba70b18ab11dee032b5636 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/RemoteMonitoredMoverFactory.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/RemoteMonitoredMoverFactory.java @@ -19,7 +19,6 @@ package ch.systemsx.cisd.datamover.filesystem; import ch.systemsx.cisd.common.utilities.IStoreHandler; import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore; import ch.systemsx.cisd.datamover.filesystem.intf.IStoreCopier; -import ch.systemsx.cisd.datamover.filesystem.remote.CopyActivityMonitor; import ch.systemsx.cisd.datamover.filesystem.remote.RemotePathMover; import ch.systemsx.cisd.datamover.intf.ITimingParameters; @@ -45,9 +44,6 @@ public final class RemoteMonitoredMoverFactory final IFileStore destinationDirectory, final ITimingParameters parameters) { final IStoreCopier copier = sourceDirectory.getCopier(destinationDirectory); - final CopyActivityMonitor monitor = - new CopyActivityMonitor(destinationDirectory, copier, parameters); - return new RemotePathMover(sourceDirectory, destinationDirectory, copier, monitor, - parameters); + return new RemotePathMover(sourceDirectory, destinationDirectory, copier, parameters); } } diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java deleted file mode 100644 index 6405797c583e3ffb3f48e977eabd709fa24b8299..0000000000000000000000000000000000000000 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java +++ /dev/null @@ -1,388 +0,0 @@ -/* - * Copyright 2007 ETH Zuerich, CISD - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.systemsx.cisd.datamover.filesystem.remote; - -import java.util.Timer; -import java.util.TimerTask; - -import org.apache.log4j.Logger; - -import ch.rinn.restrictions.Private; -import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel; -import ch.systemsx.cisd.common.logging.LogCategory; -import ch.systemsx.cisd.common.logging.LogFactory; -import ch.systemsx.cisd.common.utilities.ITerminable; -import ch.systemsx.cisd.common.utilities.StoreItem; -import ch.systemsx.cisd.datamover.filesystem.intf.BooleanStatus; -import ch.systemsx.cisd.datamover.filesystem.intf.DateStatus; -import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore; -import ch.systemsx.cisd.datamover.intf.ITimingParameters; - -/** - * A <code>CopyActivityMonitor</code> monitors write activity on a <var>destinationStore</var> - * and triggers an alarm if there was a period of inactivity that exceeds a given inactivity period. - * - * @author Bernd Rinn - */ -public class CopyActivityMonitor -{ - - private static final Logger operationLog = - LogFactory.getLogger(LogCategory.OPERATION, CopyActivityMonitor.class); - - private static final Logger machineLog = - LogFactory.getLogger(LogCategory.MACHINE, CopyActivityMonitor.class); - - private final IFileStoreMonitor destinationStore; - - private final long checkIntervallMillis; - - private final long inactivityPeriodMillis; - - private final String threadNamePrefix; - - /** handler to terminate monitored process if the observed store item does not change */ - private final ITerminable terminable; - - /** - * We need to keep a reference to the timer since we want to be able to cancel it and because - * otherwise the timer thread will terminate. - */ - private Timer activityMonitoringTimer; - - /** - * Unfortunately there is no way for a {@link TimerTask} to know whether it has been canceled. - * So we have to keep it around in order to be able to terminate it directly. - */ - private ActivityMonitoringTimerTask activityMonitoringTimerTask; - - /** - * Creates a monitor. - * - * @param destinationStore The file store to monitor for write access. Note that this store - * needs to detect and signal time out conditions itself. <i>If an operation on this - * store hangs, then the CopyActivityMonitor hangs, too!</i> - * @param copyProcess The {@link ITerminable} representing the copy process. This will get - * terminated if the copy process gets stuck. - * @param timingParameters The {@link ITimingParameters} to get the check interval and the - * inactivity period from. - */ - public CopyActivityMonitor(IFileStore destinationStore, ITerminable copyProcess, - ITimingParameters timingParameters) - { - this(createFileStoreMonitor(destinationStore), copyProcess, timingParameters); - } - - @Private - CopyActivityMonitor(IFileStoreMonitor destinationStoreMonitor, ITerminable copyProcess, - ITimingParameters timingParameters) - { - assert destinationStoreMonitor != null; - assert copyProcess != null; - assert timingParameters != null; - - this.destinationStore = destinationStoreMonitor; - this.terminable = copyProcess; - this.checkIntervallMillis = timingParameters.getCheckIntervalMillis(); - this.inactivityPeriodMillis = timingParameters.getInactivityPeriodMillis(); - - assert this.checkIntervallMillis > 0; - - final String currentThreadName = Thread.currentThread().getName(); - if ("main".equals(currentThreadName)) - { - this.threadNamePrefix = ""; - } else - { - this.threadNamePrefix = currentThreadName + " - "; - } - } - - // Used for all file system operations in this class. - @Private - static interface IFileStoreMonitor - { - DateStatus lastChangedRelative(StoreItem item, long stopWhenYoungerThan); - - BooleanStatus exists(StoreItem item); - - // description of the store for logging purposes - String toString(); - } - - /** - * Starts the activity monitoring. - * - * @param itemToBeCopied The item that will be copied to the destination file store and whose - * write progress should be monitored. - */ - public void start(StoreItem itemToBeCopied) - { - assert itemToBeCopied != null; - - activityMonitoringTimer = new Timer(threadNamePrefix + "Activity Monitor", true); - activityMonitoringTimerTask = new ActivityMonitoringTimerTask(itemToBeCopied); - // we start the timer after some delay to let the copy process be started - activityMonitoringTimer.schedule(activityMonitoringTimerTask, checkIntervallMillis / 2, - checkIntervallMillis); - } - - /** - * Stops the activity monitoring. The activity monitor must not be used after calling this - * method. - */ - public void stop() - { - activityMonitoringTimer.cancel(); - } - - /** - * A value object that holds the information about the last check performed for a path. - */ - public static final class PathCheckRecord - { - final private long timeChecked; - - final private DateStatus timeOfLastModification; - - public PathCheckRecord(final long timeChecked, final DateStatus timeLastChanged) - { - this.timeChecked = timeChecked; - this.timeOfLastModification = timeLastChanged; - } - - /** - * The time when the entry was checked. - */ - public long getTimeChecked() - { - return timeChecked; - } - - /** - * The newest last modification time found during the check. - */ - public DateStatus getTimeOfLastModification() - { - return timeOfLastModification; - } - } - - private static IFileStoreMonitor createFileStoreMonitor(final IFileStore destinationStore) - { - return new IFileStoreMonitor() - { - public DateStatus lastChangedRelative(StoreItem item, - long stopWhenFindYoungerRelative) - { - return destinationStore.lastChangedRelative(item, stopWhenFindYoungerRelative); - } - - public BooleanStatus exists(StoreItem item) - { - return destinationStore.exists(item); - } - - @Override - public String toString() - { - return destinationStore.toString(); - } - }; - } - - /** - * A {@link TimerTask} that monitors writing activity on a directory. - */ - private final class ActivityMonitoringTimerTask extends TimerTask - { - - private static final String TERMINATION_LOG_TEMPLATE = - "Terminating %s due to a lack of activity."; - - private static final String INACTIVITY_REPORT_TEMPLATE = - "No progress on copying '%s' to '%s' for %f seconds - network connection might be stalled."; - - private final StoreItem itemToBeCopied; - - private PathCheckRecord lastCheckOrNull; - - private ActivityMonitoringTimerTask(StoreItem itemToBeCopied) - { - assert terminable != null; - assert itemToBeCopied != null; - assert destinationStore != null; - - this.lastCheckOrNull = null; - this.itemToBeCopied = itemToBeCopied; - } - - @Override - public void run() - { - if (operationLog.isTraceEnabled()) - { - operationLog.trace("Start activity monitoring run."); - } - - try - { - if (operationLog.isTraceEnabled()) - { - operationLog.trace(String.format( - "Asking for last change time of '%s' inside '%s'.", itemToBeCopied, - destinationStore)); - } - final long now = System.currentTimeMillis(); - if (isQuietFor(inactivityPeriodMillis, now)) - { - final long noProgressSinceMillis = now - lastCheckOrNull.getTimeChecked(); - machineLog.error(String.format(INACTIVITY_REPORT_TEMPLATE, itemToBeCopied, - destinationStore, noProgressSinceMillis / 1000.0f)); - - operationLog.warn(String.format(TERMINATION_LOG_TEMPLATE, terminable.getClass() - .getName())); - terminable.terminate(); - stop(); - } - } catch (CheckedExceptionTunnel ex) - { - if (ex.getCause() instanceof InterruptedException) - { - operationLog.warn("Activity monitor got terminated."); - } else - { - throw ex; - } - } finally - { - if (operationLog.isTraceEnabled()) - { - operationLog.trace("Finished activity monitoring run."); - } - } - } - - // true if nothing has changed during the specified period - private boolean isQuietFor(long quietPeriodMillis, long now) - { - if (destinationStore.exists(itemToBeCopied).isSuccess() == false) - { - return checkNonexistentPeriod(quietPeriodMillis, now); - } - if (lastCheckOrNull == null) // never checked before - { - setFirstModificationDate(now); - return false; - } else - { - final boolean oldIsUnknown = lastCheckOrNull.getTimeOfLastModification().isError(); - // no need to check yet - if (now - lastCheckOrNull.getTimeChecked() < quietPeriodMillis) - { - // if last check finished with an error, try to redo it and save it with the - // previous check time - if (oldIsUnknown) - { - setFirstModificationDate(lastCheckOrNull.getTimeChecked()); - } - return false; - } else if (oldIsUnknown) - { - // during the whole period modification time could not be fetched. It could be - // unchanged, trying to fetch it now will give us no information. - return true; - } else - { - long prevModificationTime = - lastCheckOrNull.getTimeOfLastModification().getResult(); - return checkIfUnmodifiedAndSet(now, prevModificationTime); - } - } - } - - // Checks how much time elapsed since the last check without looking into file system. - // Returns true if it's more than quietPeriodMillis. - // Used to stop the copy process if file does not appear at all for a long time. - private boolean checkNonexistentPeriod(long quietPeriodMillis, long now) - { - if (lastCheckOrNull == null) - { - lastCheckOrNull = new PathCheckRecord(now, DateStatus.createError()); - return false; - } else - { - if (lastCheckOrNull.getTimeOfLastModification().isError() == false) - { - operationLog.warn(String.format( - "File or directory '%s' has vanished from '%s'.", itemToBeCopied, - destinationStore)); - } else - { - operationLog.warn(String.format( - "File or directory '%s' inside '%s' does not (yet?) exist.", - itemToBeCopied, destinationStore)); - } - return (now - lastCheckOrNull.getTimeChecked() >= quietPeriodMillis); - } - } - - // check if item has been unmodified ("quite") since last check by comparing its current - // modification time to the one acquired in the past. - private boolean checkIfUnmodifiedAndSet(long now, long prevModificationTime) - { - final DateStatus newModificationTime = lastChanged(itemToBeCopied); - if (newModificationTime.isError() == false - && newModificationTime.getResult() != prevModificationTime) - { - lastCheckOrNull = new PathCheckRecord(now, newModificationTime); - return false; - } else - { - return true; // item unchanged or we could not fetch this information - } - } - - private void setFirstModificationDate(final long timeChecked) - { - DateStatus lastChanged = lastChanged(itemToBeCopied); - lastCheckOrNull = new PathCheckRecord(timeChecked, lastChanged); - } - } - - private DateStatus lastChanged(StoreItem item) - { - long stopWhenFindYoungerRelative = minusSafetyMargin(inactivityPeriodMillis); - final DateStatus lastChanged = - destinationStore.lastChangedRelative(item, stopWhenFindYoungerRelative); - if (lastChanged.isError()) - { - operationLog.error(lastChanged.tryGetMessage()); - } else if (operationLog.isTraceEnabled()) - { - String msgTemplate = "Checker reported last changed time of '%s' inside '%s' to be %s."; - String msg = String.format(msgTemplate, item, destinationStore, lastChanged); - operationLog.trace(msg); - } - return lastChanged; - } - - private long minusSafetyMargin(long period) - { - return Math.max(0L, period - 1000L); - } -} diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/RemotePathMover.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/RemotePathMover.java index dd93085945831f416a90786c14aa68d786d5093b..cdd4aedf0f82395f3778d7b62633a203c1039020 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/RemotePathMover.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/RemotePathMover.java @@ -19,6 +19,8 @@ package ch.systemsx.cisd.datamover.filesystem.remote; import org.apache.log4j.Logger; import ch.systemsx.cisd.common.Constants; +import ch.systemsx.cisd.common.concurrent.InactivityMonitor; +import ch.systemsx.cisd.common.concurrent.InactivityMonitor.IInactivityObserver; import ch.systemsx.cisd.common.exceptions.ConfigurationFailureException; import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.exceptions.StatusFlag; @@ -69,6 +71,8 @@ public final class RemotePathMover implements IStoreHandler private static final String FAILED_TO_COPY_FILE_TO_REMOTE_TEMPLATE = "Failed to copy file '%s' from '%s' to remote (%s)"; + private static final String TERMINATING_COPIER_LOG_TEMPLATE = "Terminating copier %s: %s."; + private static final Logger machineLog = LogFactory.getLogger(LogCategory.MACHINE, RemotePathMover.class); @@ -87,10 +91,12 @@ public final class RemotePathMover implements IStoreHandler private final IStoreCopier copier; - private final CopyActivityMonitor monitor; - private final long intervallToWaitAfterFailure; + private final long checkIntervallMillis; + + private final long inactivityPeriodMillis; + private final int maximalNumberOfRetries; /** @@ -99,18 +105,16 @@ public final class RemotePathMover implements IStoreHandler * @param sourceDirectory The directory to move paths from. * @param destinationDirectory The directory to move paths to. * @param copier Copies items from source to destination - * @param monitor The activity monitor to inform about actions. * @param timingParameters The timing parameters used for monitoring and reporting stall * situations. * @throws ConfigurationFailureException If the destination directory is not fully accessible. */ public RemotePathMover(final IFileStore sourceDirectory, final IFileStore destinationDirectory, - final IStoreCopier copier, final CopyActivityMonitor monitor, + final IStoreCopier copier, final ITimingParameters timingParameters) throws ConfigurationFailureException { assert sourceDirectory != null; assert destinationDirectory != null; - assert monitor != null; assert timingParameters != null; assert sourceDirectory.tryAsExtended() != null || destinationDirectory.tryAsExtended() != null; @@ -118,9 +122,10 @@ public final class RemotePathMover implements IStoreHandler this.sourceDirectory = sourceDirectory; this.destinationDirectory = destinationDirectory; this.copier = copier; - this.monitor = monitor; this.intervallToWaitAfterFailure = timingParameters.getIntervalToWaitAfterFailure(); this.maximalNumberOfRetries = timingParameters.getMaximalNumberOfRetries(); + this.checkIntervallMillis = timingParameters.getCheckIntervalMillis(); + this.inactivityPeriodMillis = timingParameters.getInactivityPeriodMillis(); assert intervallToWaitAfterFailure >= 0; assert maximalNumberOfRetries >= 0; @@ -133,7 +138,19 @@ public final class RemotePathMover implements IStoreHandler private final Status copyAndMonitor(final StoreItem item) { - monitor.start(item); + final InactivityMonitor monitor = + new InactivityMonitor( + new RemoteStoreCopyActivitySensor(destinationDirectory, item), + new IInactivityObserver() + { + public void update(long inactiveSinceMillis, + String descriptionOfInactivity) + { + operationLog.warn(String.format(TERMINATING_COPIER_LOG_TEMPLATE, + copier.getClass().getName(), descriptionOfInactivity)); + copier.terminate(); + } + }, checkIntervallMillis, inactivityPeriodMillis, true); final Status copyStatus = copier.copy(item); monitor.stop(); return copyStatus; diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/RemoteStoreCopyActivitySensor.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/RemoteStoreCopyActivitySensor.java new file mode 100644 index 0000000000000000000000000000000000000000..a65c768d18ed256de5fd1ddb44dced66200d9c2c --- /dev/null +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/RemoteStoreCopyActivitySensor.java @@ -0,0 +1,143 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.datamover.filesystem.remote; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.time.DurationFormatUtils; +import org.apache.log4j.Logger; + +import ch.systemsx.cisd.common.concurrent.InactivityMonitor.IActivitySensor; +import ch.systemsx.cisd.common.logging.LogCategory; +import ch.systemsx.cisd.common.logging.LogFactory; +import ch.systemsx.cisd.common.utilities.StoreItem; +import ch.systemsx.cisd.datamover.filesystem.intf.DateStatus; +import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore; + +/** + * A {@link IActivitySensor} that senses changes in copy operations to a {@link StoreItem} in a + * remote store. + * + * @author Bernd Rinn + */ +public class RemoteStoreCopyActivitySensor implements IActivitySensor +{ + private static final Logger machineLog = + LogFactory.getLogger(LogCategory.MACHINE, RemoteStoreCopyActivitySensor.class); + + private static final int MAX_ERROR_COUNT = 1; + + private final int maxErrorsToIgnore; + + private final IFileStore destinationStore; + + private final StoreItem copyItem; + + private final long timeOfCreation = System.currentTimeMillis(); + + private long timeOfLastConfirmedActivity = timeOfCreation; + + private long timeOfLastReportedActivity = timeOfCreation; + + private long lastNonErrorResult = -1L; + + private DateStatus currentResult; + + private int errorCount = 0; + + public RemoteStoreCopyActivitySensor(IFileStore destinationStore, StoreItem copyItem) + { + this(destinationStore, copyItem, MAX_ERROR_COUNT); + } + + public RemoteStoreCopyActivitySensor(IFileStore destinationStore, StoreItem copyItem, + int maxErrorsToIgnore) + { + this.destinationStore = destinationStore; + this.copyItem = copyItem; + this.maxErrorsToIgnore = maxErrorsToIgnore; + this.currentResult = + DateStatus.createError(String.format( + "Last activity on item '%s' of store '%s' never checked", copyItem, + destinationStore)); + } + + public long getTimeOfLastActivityMoreRecentThan(long thresholdMillis) + { + currentResult = destinationStore.lastChangedRelative(copyItem, thresholdMillis); + final long now = System.currentTimeMillis(); + if (currentResult.isError()) + { + ++errorCount; + if (errorCount <= maxErrorsToIgnore) + { + timeOfLastReportedActivity = now; + machineLog.error(describeInactivity(now) + + String.format(" (error count: %d <= %d, goes unreported)", errorCount, + maxErrorsToIgnore)); + } else + { + machineLog.error(describeInactivity(now) + + " (error count: %s, reported to monitor)"); + } + } else + { + if (currentResult.getResult() != lastNonErrorResult) + { + timeOfLastConfirmedActivity = now; + lastNonErrorResult = currentResult.getResult(); + if (machineLog.isDebugEnabled()) + { + machineLog.debug(String.format( + "Observing write activity on item '%s' in store '%s'", copyItem, + destinationStore)); + } + } + // Implementation note: This means we can report an older time of activity than what we + // reported the last time if the last time we had an error. This is on purpose as it + // helps avoiding a situation where error and non-error situations do "flip-flop" and we + // could report progress where there is no progress. + timeOfLastReportedActivity = timeOfLastConfirmedActivity; + errorCount = 0; + } + + return timeOfLastReportedActivity; + } + + public String describeInactivity(long now) + { + if (currentResult.isError()) + { + final String msg = currentResult.tryGetMessage(); + if (StringUtils.isBlank(msg)) + { + return String.format("Error: Unable to determine the time of write activity " + + "on item '%s' in store '%s'", copyItem, destinationStore); + } else + { + return String.format("Error [%s]: Unable to determine the time of write activity " + + "on item '%s' in store '%s'", msg, copyItem, destinationStore); + } + } else + { + final String inactivityPeriod = + DurationFormatUtils.formatDurationHMS(now - timeOfLastConfirmedActivity); + return String.format("No write activity on item '%s' in store '%s' for %s", copyItem, + destinationStore, inactivityPeriod); + } + } + +} diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java deleted file mode 100644 index 12de2e66f869431d9273fcd15ceade84fdb539d0..0000000000000000000000000000000000000000 --- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java +++ /dev/null @@ -1,440 +0,0 @@ -/* - * Copyright 2007 ETH Zuerich, CISD - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.systemsx.cisd.datamover.filesystem.remote; - -import static org.testng.AssertJUnit.assertEquals; - -import java.io.File; - -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import ch.rinn.restrictions.Friend; -import ch.systemsx.cisd.common.logging.LogInitializer; -import ch.systemsx.cisd.common.test.StoringUncaughtExceptionHandler; -import ch.systemsx.cisd.common.utilities.ITerminable; -import ch.systemsx.cisd.common.utilities.StoreItem; -import ch.systemsx.cisd.datamover.filesystem.intf.BooleanStatus; -import ch.systemsx.cisd.datamover.filesystem.intf.DateStatus; -import ch.systemsx.cisd.datamover.filesystem.remote.CopyActivityMonitor.IFileStoreMonitor; -import ch.systemsx.cisd.datamover.intf.ITimingParameters; - -/** - * Test cases for the {@link CopyActivityMonitor} class. - * - * @author Bernd Rinn - */ -@Friend(toClasses = - { CopyActivityMonitor.class, CopyActivityMonitor.IFileStoreMonitor.class }) -public class CopyActivityMonitorTest -{ - - private static final File unitTestRootDirectory = - new File("targets" + File.separator + "unit-test-wd"); - - private static final File workingDirectory = - new File(unitTestRootDirectory, "CopyActivityMonitorTest"); - - private static final int INACTIVITY_PERIOD_MILLIS = 50; - - private final StoringUncaughtExceptionHandler exceptionHandler = - new StoringUncaughtExceptionHandler(); - - // //////////////////////////////////////// - // Some mock and dummy implementations. - // - - private final static class DummyTerminable implements ITerminable - { - public boolean terminate() - { - throw new AssertionError("call not expected"); - } - } - - private final static class MockTerminable implements ITerminable - { - private boolean terminated = false; - - public boolean terminate() - { - terminated = true; - return true; - } - - /** - * @return <code>true</code> if {@link #terminate} has been called. - */ - public boolean isTerminated() - { - return terminated; - } - } - - private static interface ILastChangedChecker - { - public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative); - } - - private final static class HappyPathLastChangedChecker implements ILastChangedChecker - { - private final long stopWhenFindYoungerRelativeExpected; - - public HappyPathLastChangedChecker(long stopWhenFindYoungerRelativeExpected) - { - this.stopWhenFindYoungerRelativeExpected = stopWhenFindYoungerRelativeExpected; - } - - public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative) - { - if (stopWhenFindYoungerRelative != 0) - { - assertEquals(stopWhenFindYoungerRelativeExpected, stopWhenFindYoungerRelative); - } - return System.currentTimeMillis() - INACTIVITY_PERIOD_MILLIS / 2; - } - } - - private final static class MyTimingParameters implements ITimingParameters - { - - private final long inactivityPeriodMillis; - - private final int maximalNumberOfRetries; - - MyTimingParameters(int maximalNumberOfRetries) - { - this(maximalNumberOfRetries, INACTIVITY_PERIOD_MILLIS); - } - - MyTimingParameters(int maximalNumberOfRetries, long inactivityPeriodMillis) - { - this.inactivityPeriodMillis = inactivityPeriodMillis; - this.maximalNumberOfRetries = maximalNumberOfRetries; - } - - public long getCheckIntervalMillis() - { - return inactivityPeriodMillis / 10; - } - - public long getQuietPeriodMillis() - { - return inactivityPeriodMillis / 10; - } - - public long getInactivityPeriodMillis() - { - return inactivityPeriodMillis; - } - - public long getIntervalToWaitAfterFailure() - { - return 0; - } - - public int getMaximalNumberOfRetries() - { - return maximalNumberOfRetries; - } - } - - abstract private static class AlwaysExistsStoreMonitor implements IFileStoreMonitor - { - private final StoreItem expectedItem; - - public AlwaysExistsStoreMonitor(StoreItem item) - { - this.expectedItem = item; - } - - public BooleanStatus exists(StoreItem item) - { - assertEquals(this.expectedItem, item); - return BooleanStatus.createTrue(); - } - } - - private IFileStoreMonitor asFileStore(final File directory, final ILastChangedChecker checker) - { - return new IFileStoreMonitor() - { - public BooleanStatus exists(StoreItem item) - { - return BooleanStatus.createTrue(); - } - - public DateStatus lastChangedRelative(StoreItem item, - long stopWhenFindYoungerRelative) - { - long lastChanged = - checker.lastChangedRelative(item, stopWhenFindYoungerRelative); - return DateStatus.create(lastChanged); - } - - @Override - public String toString() - { - return "[test store] " + directory.getPath(); - } - }; - } - - // //////////////////////////////////////// - // Initialization methods. - // - - @BeforeClass - public void init() - { - LogInitializer.init(); - unitTestRootDirectory.mkdirs(); - assert unitTestRootDirectory.isDirectory(); - Thread.setDefaultUncaughtExceptionHandler(exceptionHandler); - } - - @BeforeMethod - public void setUp() - { - workingDirectory.delete(); - workingDirectory.mkdirs(); - workingDirectory.deleteOnExit(); - exceptionHandler.reset(); - } - - @AfterMethod - public void checkException() throws Throwable - { - exceptionHandler.checkAndRethrowException(); - } - - // //////////////////////////////////////// - // Test cases. - // - - @Test(groups = - { "slow" }) - public void testHappyPath() throws Throwable - { - final ITerminable dummyTerminable = new DummyTerminable(); - final long inactivityPeriodMillis = 5000L; - final ITimingParameters parameters = new MyTimingParameters(0, inactivityPeriodMillis); - final ILastChangedChecker checker = - new HappyPathLastChangedChecker(inactivityPeriodMillis - 1000L); - final CopyActivityMonitor monitor = - new CopyActivityMonitor(asFileStore(workingDirectory, checker), dummyTerminable, - parameters); - StoreItem item = createDirectoryInside(workingDirectory); - monitor.start(item); - Thread.sleep(INACTIVITY_PERIOD_MILLIS * 15); - monitor.stop(); - } - - @Test(groups = - { "slow" }) - public void testCopyStalled() throws Throwable - { - final ILastChangedChecker checker = new PathLastChangedCheckerStalled(); - final MockTerminable copyProcess = new MockTerminable(); - final ITimingParameters parameters = new MyTimingParameters(0); - final CopyActivityMonitor monitor = - new CopyActivityMonitor(asFileStore(workingDirectory, checker), copyProcess, - parameters); - StoreItem item = createDirectoryInside(workingDirectory); - monitor.start(item); - Thread.sleep(INACTIVITY_PERIOD_MILLIS * 15); - monitor.stop(); - assert copyProcess.isTerminated(); - } - - private final static class SimulateShortInterruptionChangedChecker implements - ILastChangedChecker - { - private int numberOfTimesCalled = 0; - - public long lastChangedRelative(StoreItem item, long stopWhenFindYounger) - { - ++numberOfTimesCalled; - if (numberOfTimesCalled == 2) - { - // Here we simulate the rare case where one file has been finished but the next file - // hasn't yet been - // started. - return System.currentTimeMillis() - INACTIVITY_PERIOD_MILLIS * 2; - } else - { - // Here we simulate normal activity. - return System.currentTimeMillis() - INACTIVITY_PERIOD_MILLIS / 2; - } - } - } - - /** - * This test case catches a case that I first hadn't thought of: since we use <code>rsync</code> - * in a mode where at the end of copying a file they set the "last modified" time back to the - * one of the source file, there is a short time interval after finishing copying one file and - * starting copying the next file where the copy monitor could be tempted to trigger false - * alarm: the just finished file will have already the "last modified" time of the source file - * (which is when the data produce finished writing the source file). In fact everything is fine - * but still the copy process will be canceled. - */ - @Test(groups = - { "slow" }) - public void testCopySeemsStalledButActuallyIsFine() throws Throwable - { - final ILastChangedChecker checker = new SimulateShortInterruptionChangedChecker(); - final MockTerminable copyProcess = new MockTerminable(); - final ITimingParameters parameters = new MyTimingParameters(0); - final CopyActivityMonitor monitor = - new CopyActivityMonitor(asFileStore(workingDirectory, checker), copyProcess, - parameters); - StoreItem item = createDirectoryInside(workingDirectory); - monitor.start(item); - Thread.sleep(INACTIVITY_PERIOD_MILLIS * 15); - monitor.stop(); - assert copyProcess.isTerminated() == false; - } - - private final static class PathLastChangedCheckerStalled implements ILastChangedChecker - { - private final static long MAX_TIME = - System.currentTimeMillis() + INACTIVITY_PERIOD_MILLIS * 2; - - public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative) - { - long now = System.currentTimeMillis(); - return now < MAX_TIME ? now : MAX_TIME; - } - } - - private StoreItem createDirectoryInside(File parentDir) - { - StoreItem item = new StoreItem("some-directory"); - final File directory = new File(parentDir, item.getName()); - directory.mkdir(); - assert directory.isDirectory() : "directory could not be created: " + directory; - directory.deleteOnExit(); - return item; - } - - @Test(groups = - { "slow" }) - // check if copy is terminated if destination file is never visible - public void testActivityFileNeverExistsFail() throws Throwable - { - final StoreItem dummyItem = createDummyItem(); - final IFileStoreMonitor store = new IFileStoreMonitor() - { - public BooleanStatus exists(StoreItem item) - { - assertEquals(dummyItem, item); - return BooleanStatus.createFalse(); - } - - public DateStatus lastChangedRelative(StoreItem item, long stopWhenYoungerThan) - { - throw new UnsupportedOperationException(); // should be never called - } - - }; - checkCopyTerminated(store, dummyItem); - } - - @Test(groups = - { "slow" }) - // check if copy is terminated if lastChange always fails - public void testActivityLastChangeUnavailableFail() throws Throwable - { - final StoreItem dummyItem = createDummyItem(); - final IFileStoreMonitor store = new AlwaysExistsStoreMonitor(dummyItem) - { - public DateStatus lastChangedRelative(StoreItem item, long stopWhenYoungerThan) - { - assertEquals(dummyItem, item); - return DateStatus.createError("mock: lastChange error"); - } - }; - checkCopyTerminated(store, dummyItem); - } - - @Test(groups = - { "slow" }) - // check if copy is terminated if lastChange fails on even calls and returns the unchanged value - // on odd calls - public void testActivityLastChangeUnavailableOftenFail() throws Throwable - { - final StoreItem dummyItem = createDummyItem(); - final IFileStoreMonitor store = new AlwaysExistsStoreMonitor(dummyItem) - { - private boolean oddCall = true; - - public DateStatus lastChangedRelative(StoreItem item, long stopWhenYoungerThan) - { - assertEquals(dummyItem, item); - oddCall = !oddCall; - // error or unchanged value - return oddCall ? DateStatus.create(10) : DateStatus - .createError("mock: simulate error while getting last change"); - } - }; - checkCopyTerminated(store, dummyItem); - } - - @Test(groups = - { "slow" }) - // happy case - check if copy is not terminated if lastChange returns changing values - public void testActivityChangingCopyCompletes() throws Throwable - { - final StoreItem dummyItem = createDummyItem(); - final IFileStoreMonitor store = new AlwaysExistsStoreMonitor(dummyItem) - { - private int counter = 1; - - public DateStatus lastChangedRelative(StoreItem item, long stopWhenYoungerThan) - { - return DateStatus.create(counter++); - } - }; - checkCopyTerminationStatus(store, dummyItem, false); - } - - private void checkCopyTerminated(final IFileStoreMonitor store, StoreItem dummyItem) - throws InterruptedException - { - checkCopyTerminationStatus(store, dummyItem, true); - } - - private void checkCopyTerminationStatus(final IFileStoreMonitor store, StoreItem dummyItem, - boolean expectedIsTerminated) throws InterruptedException - { - final MockTerminable copyProcess = new MockTerminable(); - final ITimingParameters parameters = new MyTimingParameters(0); - final CopyActivityMonitor monitor = new CopyActivityMonitor(store, copyProcess, parameters); - monitor.start(dummyItem); - Thread.sleep(INACTIVITY_PERIOD_MILLIS * 15); - monitor.stop(); - assertEquals(expectedIsTerminated, copyProcess.isTerminated()); - } - - private static StoreItem createDummyItem() - { - return new StoreItem("dummy-item"); - } - -} diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/RemoteStoreCopyActivitySensorTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/RemoteStoreCopyActivitySensorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..1fce22a6fe60e744a129de3c2858a1931266c973 --- /dev/null +++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/RemoteStoreCopyActivitySensorTest.java @@ -0,0 +1,169 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.datamover.filesystem.remote; + +import static org.testng.AssertJUnit.*; + +import org.jmock.Expectations; +import org.jmock.Mockery; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities; +import ch.systemsx.cisd.common.logging.LogInitializer; +import ch.systemsx.cisd.common.utilities.StoreItem; +import ch.systemsx.cisd.datamover.filesystem.intf.DateStatus; +import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore; + +/** + * Test cases for the {@link RemoteStoreCopyActivitySensor}. + * + * @author Bernd Rinn + */ +public class RemoteStoreCopyActivitySensorTest +{ + + private static final String ITEM_NAME = "I am probed"; + + private static final long THRESHOLD = 123L; + + private static final long MAX_DELTA = 5L; + + private Mockery context; + + private IFileStore destinationStore; + + private StoreItem copyItem; + + private RemoteStoreCopyActivitySensor sensorUnderTest; + + @BeforeClass + public void init() + { + LogInitializer.init(); + } + + @BeforeMethod + public final void beforeMethod() + { + context = new Mockery(); + destinationStore = context.mock(IFileStore.class); + copyItem = new StoreItem(ITEM_NAME); + sensorUnderTest = new RemoteStoreCopyActivitySensor(destinationStore, copyItem); + } + + @AfterMethod + public final void afterMethod() throws Throwable + { + // To following lines of code should also be called at the end of each test method. + // Otherwise one do not known which test failed. + context.assertIsSatisfied(); + } + + @Test + public void testHappyCase() + { + final long lastChanged = 1111L; + context.checking(new Expectations() + { + { + one(destinationStore).lastChangedRelative(copyItem, THRESHOLD); + will(returnValue(DateStatus.create(lastChanged))); + } + }); + final long delta = + System.currentTimeMillis() + - sensorUnderTest.getTimeOfLastActivityMoreRecentThan(THRESHOLD); + assertTrue("Delta is " + delta, delta < MAX_DELTA); + final String msg = sensorUnderTest.describeInactivity(System.currentTimeMillis()); + assertTrue( + msg, + msg + .startsWith("No write activity on item 'I am probed' in store 'iFileStore' for 0:00:00.0")); + } + + @Test + public void testOneError() + { + final String errorMsg = "ERROR message"; + context.checking(new Expectations() + { + { + one(destinationStore).lastChangedRelative(copyItem, THRESHOLD); + will(returnValue(DateStatus.createError(errorMsg))); + } + }); + final long now = System.currentTimeMillis(); + final long delta = now - sensorUnderTest.getTimeOfLastActivityMoreRecentThan(THRESHOLD); + assertTrue("Delta is " + delta, delta < MAX_DELTA); + assertEquals("Error [ERROR message]: Unable to determine the time of write activity on " + + "item 'I am probed' in store 'iFileStore'", sensorUnderTest + .describeInactivity(now)); + } + + @Test + public void testThreeErrors() + { + final String errorMsg = "ERROR message"; + context.checking(new Expectations() + { + { + exactly(3).of(destinationStore).lastChangedRelative(copyItem, THRESHOLD); + will(returnValue(DateStatus.createError(errorMsg))); + } + }); + ConcurrencyUtilities.sleep(10L); + final long now = System.currentTimeMillis(); + final long lastActivity1 = sensorUnderTest.getTimeOfLastActivityMoreRecentThan(THRESHOLD); + assertEquals(now, lastActivity1); + ConcurrencyUtilities.sleep(10L); + final long lastActivity2 = sensorUnderTest.getTimeOfLastActivityMoreRecentThan(THRESHOLD); + assertEquals(now, lastActivity2); + ConcurrencyUtilities.sleep(10L); + final long lastActivity3 = sensorUnderTest.getTimeOfLastActivityMoreRecentThan(THRESHOLD); + assertEquals(now, lastActivity3); + } + + @Test + public void testValueErrorValue() + { + final String errorMsg = "ERROR message"; + context.checking(new Expectations() + { + { + exactly(3).of(destinationStore).lastChangedRelative(copyItem, THRESHOLD); + will(onConsecutiveCalls(returnValue(DateStatus.create(17L)), + returnValue(DateStatus.createError(errorMsg)), returnValue(DateStatus + .create(17L)))); + } + }); + ConcurrencyUtilities.sleep(10L); + final long now1 = System.currentTimeMillis(); + final long lastActivity1 = sensorUnderTest.getTimeOfLastActivityMoreRecentThan(THRESHOLD); + assertEquals(now1, lastActivity1); + ConcurrencyUtilities.sleep(10L); + final long now2 = System.currentTimeMillis(); + final long lastActivity2 = sensorUnderTest.getTimeOfLastActivityMoreRecentThan(THRESHOLD); + assertEquals(now2, lastActivity2); + ConcurrencyUtilities.sleep(10L); + final long lastActivity3 = sensorUnderTest.getTimeOfLastActivityMoreRecentThan(THRESHOLD); + assertEquals(now1, lastActivity3); + } + +}