Skip to content
Snippets Groups Projects
Commit bacf4dcf authored by tpylak's avatar tpylak
Browse files

DMV-19 Introducing timeout for calling IFileStore.lastChanged in the right...

DMV-19 Introducing timeout for calling IFileStore.lastChanged in the right place (rest of the files)

SVN: 6958
parent 23749b8b
No related branches found
No related tags found
No related merge requests found
...@@ -18,19 +18,11 @@ package ch.systemsx.cisd.datamover.filesystem.remote; ...@@ -18,19 +18,11 @@ package ch.systemsx.cisd.datamover.filesystem.remote;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import ch.rinn.restrictions.Private; import ch.rinn.restrictions.Private;
import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities;
import ch.systemsx.cisd.common.concurrent.ExecutionResult;
import ch.systemsx.cisd.common.concurrent.NamingThreadPoolExecutor;
import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel; import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel;
import ch.systemsx.cisd.common.logging.ISimpleLogger;
import ch.systemsx.cisd.common.logging.Log4jSimpleLogger;
import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogCategory;
import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.logging.LogFactory;
import ch.systemsx.cisd.common.utilities.ITerminable; import ch.systemsx.cisd.common.utilities.ITerminable;
...@@ -63,9 +55,6 @@ public class CopyActivityMonitor ...@@ -63,9 +55,6 @@ public class CopyActivityMonitor
private final String threadNamePrefix; private final String threadNamePrefix;
private final ExecutorService lastChangedExecutor =
new NamingThreadPoolExecutor("Last Changed Explorer").daemonize();
/** handler to terminate monitored process if the observed store item does not change */ /** handler to terminate monitored process if the observed store item does not change */
private final ITerminable terminable; private final ITerminable terminable;
...@@ -375,7 +364,9 @@ public class CopyActivityMonitor ...@@ -375,7 +364,9 @@ public class CopyActivityMonitor
private NumberStatus lastChanged(StoreItem item) private NumberStatus lastChanged(StoreItem item)
{ {
final NumberStatus lastChanged = lastChanged(destinationStore, item); long stopWhenFindYoungerRelative = minusSafetyMargin(inactivityPeriodMillis);
final NumberStatus lastChanged =
destinationStore.lastChangedRelative(item, stopWhenFindYoungerRelative);
if (lastChanged.isError()) if (lastChanged.isError())
{ {
operationLog.error(lastChanged.tryGetMessage()); operationLog.error(lastChanged.tryGetMessage());
...@@ -394,54 +385,4 @@ public class CopyActivityMonitor ...@@ -394,54 +385,4 @@ public class CopyActivityMonitor
{ {
return Math.max(0L, period - 1000L); return Math.max(0L, period - 1000L);
} }
private NumberStatus lastChanged(IFileStoreMonitor store, StoreItem item)
{
long stopWhenFindYoungerRelative = minusSafetyMargin(inactivityPeriodMillis);
final long timeoutMillis = Math.min(checkIntervallMillis * 3, inactivityPeriodMillis);
final ISimpleLogger simpleMachineLog = new Log4jSimpleLogger(machineLog);
final Future<NumberStatus> lastChangedFuture =
lastChangedExecutor.submit(createCheckerCallable(store, item,
stopWhenFindYoungerRelative));
ExecutionResult<NumberStatus> executionResult =
ConcurrencyUtilities.getResult(lastChangedFuture, timeoutMillis, simpleMachineLog,
"Check for recent paths");
NumberStatus result = executionResult.tryGetResult();
if (result == null)
{
return NumberStatus.createError(String.format(
"Could not determine \"last changed time\" of %s: time out.", item));
} else
{
return result;
}
}
private static Callable<NumberStatus> createCheckerCallable(final IFileStoreMonitor store,
final StoreItem item, final long stopWhenYoungerThan)
{
return new Callable<NumberStatus>()
{
public NumberStatus call() throws Exception
{
if (machineLog.isTraceEnabled())
{
machineLog
.trace("Starting quick check for recent paths on '" + item + "'.");
}
final NumberStatus lastChanged =
store.lastChangedRelative(item, stopWhenYoungerThan);
if (machineLog.isTraceEnabled())
{
machineLog
.trace(String
.format(
"Finishing quick check for recent paths on '%s', found to be %2$tF %2$tT.",
item, lastChanged));
}
return lastChanged;
}
};
}
} }
...@@ -17,8 +17,6 @@ ...@@ -17,8 +17,6 @@
package ch.systemsx.cisd.datamover.filesystem.remote; package ch.systemsx.cisd.datamover.filesystem.remote;
import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;
import java.io.File; import java.io.File;
...@@ -28,10 +26,7 @@ import org.testng.annotations.BeforeMethod; ...@@ -28,10 +26,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import ch.rinn.restrictions.Friend; import ch.rinn.restrictions.Friend;
import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel;
import ch.systemsx.cisd.common.logging.LogCategory;
import ch.systemsx.cisd.common.logging.LogInitializer; import ch.systemsx.cisd.common.logging.LogInitializer;
import ch.systemsx.cisd.common.test.LogMonitoringAppender;
import ch.systemsx.cisd.common.test.StoringUncaughtExceptionHandler; import ch.systemsx.cisd.common.test.StoringUncaughtExceptionHandler;
import ch.systemsx.cisd.common.utilities.ITerminable; import ch.systemsx.cisd.common.utilities.ITerminable;
import ch.systemsx.cisd.common.utilities.StoreItem; import ch.systemsx.cisd.common.utilities.StoreItem;
...@@ -328,49 +323,6 @@ public class CopyActivityMonitorTest ...@@ -328,49 +323,6 @@ public class CopyActivityMonitorTest
} }
} }
@Test(groups =
{ "slow" })
public void testActivityMonitorTimedOut() throws Throwable
{
final PathLastChangedCheckerDelayed checker =
new PathLastChangedCheckerDelayed(INACTIVITY_PERIOD_MILLIS);
final MockTerminable copyProcess = new MockTerminable();
final ITimingParameters parameters = new MyTimingParameters(0);
final CopyActivityMonitor monitor =
new CopyActivityMonitor(asFileStore(workingDirectory, checker), copyProcess,
parameters);
final StoreItem item = createDirectoryInside(workingDirectory);
final LogMonitoringAppender appender =
LogMonitoringAppender.addAppender(LogCategory.OPERATION, String.format(
"Could not determine \"last changed time\" of %s: time out.", item));
monitor.start(item);
Thread.sleep(INACTIVITY_PERIOD_MILLIS * 15);
monitor.stop();
LogMonitoringAppender.removeAppender(appender);
assertTrue(checker.lastCheckInterrupted());
assertTrue(copyProcess.isTerminated());
appender.verifyLogHasHappened();
}
@Test(groups =
{ "slow" })
public void testActivityMonitorOnceTimedOutTheOK() throws Throwable
{
final PathLastChangedCheckerDelayed checker =
new PathLastChangedCheckerDelayed(INACTIVITY_PERIOD_MILLIS, 0L);
final MockTerminable copyProcess = new MockTerminable();
final ITimingParameters parameters = new MyTimingParameters(0);
final CopyActivityMonitor monitor =
new CopyActivityMonitor(asFileStore(workingDirectory, checker), copyProcess,
parameters);
final StoreItem item = createDirectoryInside(workingDirectory);
monitor.start(item);
Thread.sleep(INACTIVITY_PERIOD_MILLIS * 15);
monitor.stop();
assertFalse(checker.lastCheckInterrupted());
assertFalse(copyProcess.isTerminated());
}
private StoreItem createDirectoryInside(File parentDir) private StoreItem createDirectoryInside(File parentDir)
{ {
StoreItem item = new StoreItem("some-directory"); StoreItem item = new StoreItem("some-directory");
...@@ -381,67 +333,6 @@ public class CopyActivityMonitorTest ...@@ -381,67 +333,6 @@ public class CopyActivityMonitorTest
return item; return item;
} }
private final static class PathLastChangedCheckerDelayed implements ILastChangedChecker
{
private final long[] delayMillis;
private int callNumber;
private volatile boolean interrupted;
private int interruptionCount;
public PathLastChangedCheckerDelayed(long... delayMillis)
{
assert delayMillis.length > 0;
this.interrupted = false;
this.interruptionCount = 0;
this.delayMillis = delayMillis;
}
private long timeToSleepMillis()
{
try
{
return delayMillis[callNumber];
} finally
{
if (callNumber < delayMillis.length - 1)
{
++callNumber;
}
}
}
public long lastChangedRelative(StoreItem item, long stopWhenFindYoungerRelative)
{
try
{
Thread.sleep(timeToSleepMillis()); // Wait predefined time.
} catch (InterruptedException e)
{
this.interrupted = true;
++this.interruptionCount;
// That is what we expect if we are terminated.
throw new CheckedExceptionTunnel(new InterruptedException(e.getMessage()));
}
this.interrupted = false;
return System.currentTimeMillis();
}
synchronized boolean lastCheckInterrupted()
{
return interrupted;
}
int getInterruptionCount()
{
return interruptionCount;
}
}
@Test(groups = @Test(groups =
{ "slow" }) { "slow" })
// check if copy is terminated if destination file is never visible // check if copy is terminated if destination file is never visible
......
/*
* Copyright 2008 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.systemsx.cisd.datamover.filesystem.store;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;
import org.hamcrest.Description;
import org.jmock.Expectations;
import org.jmock.Mockery;
import org.jmock.api.Action;
import org.jmock.api.Invocation;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import ch.rinn.restrictions.Friend;
import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel;
import ch.systemsx.cisd.common.logging.LogInitializer;
import ch.systemsx.cisd.common.utilities.StoreItem;
import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore;
import ch.systemsx.cisd.datamover.filesystem.intf.NumberStatus;
import ch.systemsx.cisd.datamover.filesystem.store.FileStoreRemoteMounted.LastChangeWrapper;
/**
* @author Tomasz Pylak
*/
@Friend(toClasses = FileStoreRemoteMounted.LastChangeWrapper.class)
public class FileStoreRemoteMountedTest
{
private Mockery context;
private IFileStore localStoreMock;
@BeforeMethod
public void setUp()
{
LogInitializer.init();
this.context = new Mockery();
this.localStoreMock = context.mock(IFileStore.class);
}
@AfterMethod
public void tearDown()
{
// To following line of code should also be called at the end of each test method.
// Otherwise one do not known which test failed.
context.assertIsSatisfied();
}
@Test
public void testLastChangeHappyCase() throws Throwable
{
final StoreItem item = new StoreItem("some-directory");
final long age = 212;
long timeout = 100;
LastChangeWrapper lastChangeWrapper =
new FileStoreRemoteMounted.LastChangeWrapper(localStoreMock, timeout);
final PathLastChangedCheckerDelayed lastChangedSleepAction =
new PathLastChangedCheckerDelayed(timeout / 2); // return before timeout
context.checking(new Expectations()
{
{
one(localStoreMock).lastChangedRelative(item, age);
will(lastChangedSleepAction);
}
});
NumberStatus result = lastChangeWrapper.lastChangedInternal(item, age, true);
AssertJUnit.assertFalse(result.isError());
assertFalse(lastChangedSleepAction.lastCheckInterrupted());
context.assertIsSatisfied();
}
@Test
public void testLastChangedBlockedAndTerminated() throws Throwable
{
final StoreItem item = new StoreItem("some-directory");
final long age = 212;
long timeout = 10;
LastChangeWrapper lastChangeWrapper =
new FileStoreRemoteMounted.LastChangeWrapper(localStoreMock, timeout);
final PathLastChangedCheckerDelayed lastChangedSleepAction =
new PathLastChangedCheckerDelayed(timeout * 2);
context.checking(new Expectations()
{
{
one(localStoreMock).lastChanged(item, age);
will(lastChangedSleepAction);
}
});
NumberStatus result = lastChangeWrapper.lastChangedInternal(item, age, false);
assertTrue(result.isError());
assertTrue(lastChangedSleepAction.lastCheckInterrupted());
context.assertIsSatisfied();
}
private final static class PathLastChangedCheckerDelayed implements Action
{
private final long delayMillis;
private volatile boolean interrupted;
public PathLastChangedCheckerDelayed(long delayMillis)
{
this.interrupted = false;
this.delayMillis = delayMillis;
}
public Object invoke(Invocation invocation) throws Throwable
{
try
{
Thread.sleep(delayMillis); // Wait predefined time.
} catch (InterruptedException e)
{
this.interrupted = true;
// That is what we expect if we are terminated.
throw new CheckedExceptionTunnel(new InterruptedException(e.getMessage()));
}
this.interrupted = false;
return NumberStatus.create(System.currentTimeMillis());
}
synchronized boolean lastCheckInterrupted()
{
return interrupted;
}
public void describeTo(Description description)
{
description.appendText(toString());
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment