diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java index 037faf0896a44b65e1f29b628d002207438c34f3..c60820cca736a36e9b53544bf5c8219fd81ca288 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitor.java @@ -18,19 +18,11 @@ package ch.systemsx.cisd.datamover.filesystem.remote; import java.util.Timer; import java.util.TimerTask; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import org.apache.log4j.Logger; import ch.rinn.restrictions.Private; -import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities; -import ch.systemsx.cisd.common.concurrent.ExecutionResult; -import ch.systemsx.cisd.common.concurrent.NamingThreadPoolExecutor; import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel; -import ch.systemsx.cisd.common.logging.ISimpleLogger; -import ch.systemsx.cisd.common.logging.Log4jSimpleLogger; import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.utilities.ITerminable; @@ -63,9 +55,6 @@ public class CopyActivityMonitor private final String threadNamePrefix; - private final ExecutorService lastChangedExecutor = - new NamingThreadPoolExecutor("Last Changed Explorer").daemonize(); - /** handler to terminate monitored process if the observed store item does not change */ private final ITerminable terminable; @@ -375,7 +364,9 @@ public class CopyActivityMonitor private NumberStatus lastChanged(StoreItem item) { - final NumberStatus lastChanged = lastChanged(destinationStore, item); + long stopWhenFindYoungerRelative = minusSafetyMargin(inactivityPeriodMillis); + final NumberStatus lastChanged = + destinationStore.lastChangedRelative(item, stopWhenFindYoungerRelative); if (lastChanged.isError()) { operationLog.error(lastChanged.tryGetMessage()); @@ -394,54 +385,4 @@ public class CopyActivityMonitor { return Math.max(0L, period - 1000L); } - - private NumberStatus lastChanged(IFileStoreMonitor store, StoreItem item) - { - long stopWhenFindYoungerRelative = minusSafetyMargin(inactivityPeriodMillis); - final long timeoutMillis = Math.min(checkIntervallMillis * 3, inactivityPeriodMillis); - final ISimpleLogger simpleMachineLog = new Log4jSimpleLogger(machineLog); - final Future<NumberStatus> lastChangedFuture = - lastChangedExecutor.submit(createCheckerCallable(store, item, - stopWhenFindYoungerRelative)); - ExecutionResult<NumberStatus> executionResult = - ConcurrencyUtilities.getResult(lastChangedFuture, timeoutMillis, simpleMachineLog, - "Check for recent paths"); - NumberStatus result = executionResult.tryGetResult(); - if (result == null) - { - return NumberStatus.createError(String.format( - "Could not determine \"last changed time\" of %s: time out.", item)); - } else - { - return result; - } - } - - private static Callable<NumberStatus> createCheckerCallable(final IFileStoreMonitor store, - final StoreItem item, final long stopWhenYoungerThan) - { - return new Callable<NumberStatus>() - { - public NumberStatus call() throws Exception - { - if (machineLog.isTraceEnabled()) - { - machineLog - .trace("Starting quick check for recent paths on '" + item + "'."); - } - final NumberStatus lastChanged = - store.lastChangedRelative(item, stopWhenYoungerThan); - if (machineLog.isTraceEnabled()) - { - machineLog - .trace(String - .format( - "Finishing quick check for recent paths on '%s', found to be %2$tF %2$tT.", - item, lastChanged)); - } - return lastChanged; - } - }; - } - } diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java index a497e97e57e7baed964add5899da0c603f5f931a..3f9debbf41ad4fda7847e33fbc2f8d228992f64f 100644 --- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java +++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java @@ -17,8 +17,6 @@ package ch.systemsx.cisd.datamover.filesystem.remote; import static org.testng.AssertJUnit.assertEquals; -import static org.testng.AssertJUnit.assertFalse; -import static org.testng.AssertJUnit.assertTrue; import java.io.File; @@ -28,10 +26,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import ch.rinn.restrictions.Friend; -import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel; -import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogInitializer; -import ch.systemsx.cisd.common.test.LogMonitoringAppender; import ch.systemsx.cisd.common.test.StoringUncaughtExceptionHandler; import ch.systemsx.cisd.common.utilities.ITerminable; import ch.systemsx.cisd.common.utilities.StoreItem; @@ -328,49 +323,6 @@ public class CopyActivityMonitorTest } } - @Test(groups = - { "slow" }) - public void testActivityMonitorTimedOut() throws Throwable - { - final PathLastChangedCheckerDelayed checker = - new PathLastChangedCheckerDelayed(INACTIVITY_PERIOD_MILLIS); - final MockTerminable copyProcess = new MockTerminable(); - final ITimingParameters parameters = new MyTimingParameters(0); - final CopyActivityMonitor monitor = - new CopyActivityMonitor(asFileStore(workingDirectory, checker), copyProcess, - parameters); - final StoreItem item = createDirectoryInside(workingDirectory); - final LogMonitoringAppender appender = - LogMonitoringAppender.addAppender(LogCategory.OPERATION, String.format( - "Could not determine \"last changed time\" of %s: time out.", item)); - monitor.start(item); - Thread.sleep(INACTIVITY_PERIOD_MILLIS * 15); - monitor.stop(); - LogMonitoringAppender.removeAppender(appender); - assertTrue(checker.lastCheckInterrupted()); - assertTrue(copyProcess.isTerminated()); - appender.verifyLogHasHappened(); - } - - @Test(groups = - { "slow" }) - public void testActivityMonitorOnceTimedOutTheOK() throws Throwable - { - final PathLastChangedCheckerDelayed checker = - new PathLastChangedCheckerDelayed(INACTIVITY_PERIOD_MILLIS, 0L); - final MockTerminable copyProcess = new MockTerminable(); - final ITimingParameters parameters = new MyTimingParameters(0); - final CopyActivityMonitor monitor = - new CopyActivityMonitor(asFileStore(workingDirectory, checker), copyProcess, - parameters); - final StoreItem item = createDirectoryInside(workingDirectory); - monitor.start(item); - Thread.sleep(INACTIVITY_PERIOD_MILLIS * 15); - monitor.stop(); - assertFalse(checker.lastCheckInterrupted()); - assertFalse(copyProcess.isTerminated()); - } - private StoreItem createDirectoryInside(File parentDir) { StoreItem item = new StoreItem("some-directory"); @@ -381,67 +333,6 @@ public class CopyActivityMonitorTest return item; } - private final static class PathLastChangedCheckerDelayed implements ILastChangedChecker - { - private final long[] delayMillis; - - private int callNumber; - - private volatile boolean interrupted; - - private int interruptionCount; - - public PathLastChangedCheckerDelayed(long... delayMillis) - { - assert delayMillis.length > 0; - - this.interrupted = false; - this.interruptionCount = 0; - this.delayMillis = delayMillis; - } - - private long timeToSleepMillis() - { - try - { - return delayMillis[callNumber]; - } finally - { - if (callNumber < delayMillis.length - 1) - { - ++callNumber; - } - } - } - - public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative) - { - try - { - Thread.sleep(timeToSleepMillis()); // Wait predefined time. - } catch (InterruptedException e) - { - this.interrupted = true; - ++this.interruptionCount; - // That is what we expect if we are terminated. - throw new CheckedExceptionTunnel(new InterruptedException(e.getMessage())); - } - this.interrupted = false; - return System.currentTimeMillis(); - } - - synchronized boolean lastCheckInterrupted() - { - return interrupted; - } - - int getInterruptionCount() - { - return interruptionCount; - } - - } - @Test(groups = { "slow" }) // check if copy is terminated if destination file is never visible diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMountedTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMountedTest.java new file mode 100644 index 0000000000000000000000000000000000000000..1e2b0c06732c44295dbefb3a43a625ae23a6a91e --- /dev/null +++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMountedTest.java @@ -0,0 +1,151 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.datamover.filesystem.store; + +import static org.testng.AssertJUnit.assertFalse; +import static org.testng.AssertJUnit.assertTrue; + +import org.hamcrest.Description; +import org.jmock.Expectations; +import org.jmock.Mockery; +import org.jmock.api.Action; +import org.jmock.api.Invocation; +import org.testng.AssertJUnit; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import ch.rinn.restrictions.Friend; +import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel; +import ch.systemsx.cisd.common.logging.LogInitializer; +import ch.systemsx.cisd.common.utilities.StoreItem; +import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore; +import ch.systemsx.cisd.datamover.filesystem.intf.NumberStatus; +import ch.systemsx.cisd.datamover.filesystem.store.FileStoreRemoteMounted.LastChangeWrapper; + +/** + * @author Tomasz Pylak + */ +@Friend(toClasses = FileStoreRemoteMounted.LastChangeWrapper.class) +public class FileStoreRemoteMountedTest +{ + private Mockery context; + + private IFileStore localStoreMock; + + @BeforeMethod + public void setUp() + { + LogInitializer.init(); + this.context = new Mockery(); + this.localStoreMock = context.mock(IFileStore.class); + } + + @AfterMethod + public void tearDown() + { + // To following line of code should also be called at the end of each test method. + // Otherwise one do not known which test failed. + context.assertIsSatisfied(); + } + + @Test + public void testLastChangeHappyCase() throws Throwable + { + final StoreItem item = new StoreItem("some-directory"); + final long age = 212; + long timeout = 100; + LastChangeWrapper lastChangeWrapper = + new FileStoreRemoteMounted.LastChangeWrapper(localStoreMock, timeout); + final PathLastChangedCheckerDelayed lastChangedSleepAction = + new PathLastChangedCheckerDelayed(timeout / 2); // return before timeout + context.checking(new Expectations() + { + { + one(localStoreMock).lastChangedRelative(item, age); + will(lastChangedSleepAction); + } + }); + + NumberStatus result = lastChangeWrapper.lastChangedInternal(item, age, true); + AssertJUnit.assertFalse(result.isError()); + assertFalse(lastChangedSleepAction.lastCheckInterrupted()); + context.assertIsSatisfied(); + } + + @Test + public void testLastChangedBlockedAndTerminated() throws Throwable + { + final StoreItem item = new StoreItem("some-directory"); + final long age = 212; + long timeout = 10; + LastChangeWrapper lastChangeWrapper = + new FileStoreRemoteMounted.LastChangeWrapper(localStoreMock, timeout); + final PathLastChangedCheckerDelayed lastChangedSleepAction = + new PathLastChangedCheckerDelayed(timeout * 2); + context.checking(new Expectations() + { + { + one(localStoreMock).lastChanged(item, age); + will(lastChangedSleepAction); + } + }); + + NumberStatus result = lastChangeWrapper.lastChangedInternal(item, age, false); + assertTrue(result.isError()); + assertTrue(lastChangedSleepAction.lastCheckInterrupted()); + context.assertIsSatisfied(); + } + + private final static class PathLastChangedCheckerDelayed implements Action + { + private final long delayMillis; + + private volatile boolean interrupted; + + public PathLastChangedCheckerDelayed(long delayMillis) + { + this.interrupted = false; + this.delayMillis = delayMillis; + } + + public Object invoke(Invocation invocation) throws Throwable + { + try + { + Thread.sleep(delayMillis); // Wait predefined time. + } catch (InterruptedException e) + { + this.interrupted = true; + // That is what we expect if we are terminated. + throw new CheckedExceptionTunnel(new InterruptedException(e.getMessage())); + } + this.interrupted = false; + return NumberStatus.create(System.currentTimeMillis()); + } + + synchronized boolean lastCheckInterrupted() + { + return interrupted; + } + + public void describeTo(Description description) + { + description.appendText(toString()); + } + } +}