Skip to content
Snippets Groups Projects
Commit 1b9db3d2 authored by brinn's avatar brinn
Browse files

[DMV-19] add: monitoring for all calls that could potentially hang

change: use MonitoringProxy for the actual monitoring work
remove: unit test class as it has become obsolete

SVN: 7019
parent 3dd5d880
No related branches found
No related tags found
No related merge requests found
...@@ -16,33 +16,23 @@ ...@@ -16,33 +16,23 @@
package ch.systemsx.cisd.datamover.filesystem.store; package ch.systemsx.cisd.datamover.filesystem.store;
import java.util.concurrent.Callable; import ch.systemsx.cisd.common.concurrent.MonitoringProxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.log4j.Logger;
import ch.rinn.restrictions.Private;
import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities;
import ch.systemsx.cisd.common.concurrent.ExecutionResult;
import ch.systemsx.cisd.common.concurrent.NamingThreadPoolExecutor;
import ch.systemsx.cisd.common.exceptions.ConfigurationFailureException; import ch.systemsx.cisd.common.exceptions.ConfigurationFailureException;
import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException; import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException;
import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.exceptions.Status;
import ch.systemsx.cisd.common.exceptions.StatusFlag;
import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher;
import ch.systemsx.cisd.common.highwatermark.HostAwareFileWithHighwaterMark; import ch.systemsx.cisd.common.highwatermark.HostAwareFileWithHighwaterMark;
import ch.systemsx.cisd.common.logging.ISimpleLogger; import ch.systemsx.cisd.common.logging.ISimpleLogger;
import ch.systemsx.cisd.common.logging.Log4jSimpleLogger; import ch.systemsx.cisd.common.logging.LogLevel;
import ch.systemsx.cisd.common.logging.LogCategory;
import ch.systemsx.cisd.common.logging.LogFactory;
import ch.systemsx.cisd.common.utilities.StoreItem; import ch.systemsx.cisd.common.utilities.StoreItem;
import ch.systemsx.cisd.datamover.filesystem.intf.AbstractFileStore; import ch.systemsx.cisd.datamover.filesystem.intf.AbstractFileStore;
import ch.systemsx.cisd.datamover.filesystem.intf.BooleanStatus; import ch.systemsx.cisd.datamover.filesystem.intf.BooleanStatus;
import ch.systemsx.cisd.datamover.filesystem.intf.DateStatus;
import ch.systemsx.cisd.datamover.filesystem.intf.IExtendedFileStore; import ch.systemsx.cisd.datamover.filesystem.intf.IExtendedFileStore;
import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore; import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore;
import ch.systemsx.cisd.datamover.filesystem.intf.IFileSysOperationsFactory; import ch.systemsx.cisd.datamover.filesystem.intf.IFileSysOperationsFactory;
import ch.systemsx.cisd.datamover.filesystem.intf.IStoreCopier; import ch.systemsx.cisd.datamover.filesystem.intf.IStoreCopier;
import ch.systemsx.cisd.datamover.filesystem.intf.DateStatus;
/** /**
* A <code>FileStore</code> extension for remote paths mounted. * A <code>FileStore</code> extension for remote paths mounted.
...@@ -54,12 +44,9 @@ import ch.systemsx.cisd.datamover.filesystem.intf.DateStatus; ...@@ -54,12 +44,9 @@ import ch.systemsx.cisd.datamover.filesystem.intf.DateStatus;
*/ */
public final class FileStoreRemoteMounted extends AbstractFileStore public final class FileStoreRemoteMounted extends AbstractFileStore
{ {
private static final Logger machineLog =
LogFactory.getLogger(LogCategory.MACHINE, FileStoreRemoteMounted.class);
private final IFileStore localImpl; private final IFileStore localImpl;
private final LastChangeWrapper lastChangeInvoker; private final IFileStore localImplMonitored;
/** /**
* @param lastChangedTimeoutMillis number of milliseconds after which checking last modification * @param lastChangedTimeoutMillis number of milliseconds after which checking last modification
...@@ -71,7 +58,9 @@ public final class FileStoreRemoteMounted extends AbstractFileStore ...@@ -71,7 +58,9 @@ public final class FileStoreRemoteMounted extends AbstractFileStore
{ {
super(file, desription, factory); super(file, desription, factory);
this.localImpl = new FileStoreLocal(file, desription, factory); this.localImpl = new FileStoreLocal(file, desription, factory);
this.lastChangeInvoker = new LastChangeWrapper(localImpl, lastChangedTimeoutMillis); this.localImplMonitored =
MonitoringProxy.create(IFileStore.class, localImpl).timeoutMillis(
lastChangedTimeoutMillis).get();
} }
// //
...@@ -96,33 +85,96 @@ public final class FileStoreRemoteMounted extends AbstractFileStore ...@@ -96,33 +85,96 @@ public final class FileStoreRemoteMounted extends AbstractFileStore
public final Status delete(final StoreItem item) public final Status delete(final StoreItem item)
{ {
return localImpl.delete(item); final Status statusOrNull = localImplMonitored.delete(item);
if (statusOrNull == null)
{
return new Status(StatusFlag.RETRIABLE_ERROR, "Could not delete '" + item
+ "': time out.");
}
return statusOrNull;
} }
public final BooleanStatus exists(final StoreItem item) public final BooleanStatus exists(final StoreItem item)
{ {
return localImpl.exists(item); final BooleanStatus statusOrNull = localImplMonitored.exists(item);
if (statusOrNull == null)
{
return BooleanStatus.createError("Could not determine whether '" + item
+ "' exists: time out.");
}
return statusOrNull;
} }
public final DateStatus lastChanged(final StoreItem item, final long stopWhenFindYounger) public final DateStatus lastChanged(final StoreItem item, final long stopWhenFindYounger)
{ {
return lastChangeInvoker.lastChangedInternal(item, stopWhenFindYounger, false); final DateStatus statusOrNull = localImplMonitored.lastChanged(item, stopWhenFindYounger);
if (statusOrNull == null)
{
return DateStatus.createError(String.format(
"Could not determine \"last changed time\" of %s: time out.", item));
}
return statusOrNull;
} }
public final DateStatus lastChangedRelative(final StoreItem item, public final DateStatus lastChangedRelative(final StoreItem item,
final long stopWhenFindYoungerRelative) final long stopWhenFindYoungerRelative)
{ {
return lastChangeInvoker.lastChangedInternal(item, stopWhenFindYoungerRelative, true); final DateStatus statusOrNull =
localImplMonitored.lastChangedRelative(item, stopWhenFindYoungerRelative);
if (statusOrNull == null)
{
return DateStatus.createError(String.format(
"Could not determine \"last changed time\" of %s: time out.", item));
}
return statusOrNull;
} }
public final BooleanStatus tryCheckDirectoryFullyAccessible(final long timeOutMillis) public final BooleanStatus tryCheckDirectoryFullyAccessible(final long timeOutMillis)
{ {
return localImpl.tryCheckDirectoryFullyAccessible(timeOutMillis); final BooleanStatus statusOrNull =
localImplMonitored.tryCheckDirectoryFullyAccessible(timeOutMillis);
if (statusOrNull == null)
{
return BooleanStatus.createError("Could not determine whether store '" + toString()
+ "' is fully accessible: time out.");
}
return statusOrNull;
}
private static class StoringLogger implements ISimpleLogger
{
LogLevel storedLevel = null;
String storedMessage = null;
public void log(LogLevel level, String message)
{
this.storedLevel = level;
this.storedMessage = message;
}
} }
public final StoreItem[] tryListSortByLastModified(final ISimpleLogger loggerOrNull) public final StoreItem[] tryListSortByLastModified(final ISimpleLogger loggerOrNull)
{ {
return localImpl.tryListSortByLastModified(loggerOrNull); final StoringLogger storingLoggerOrNull =
(loggerOrNull == null) ? null : new StoringLogger();
final StoreItem[] itemsOrNull =
localImplMonitored.tryListSortByLastModified(storingLoggerOrNull);
if (loggerOrNull != null)
{
if (storingLoggerOrNull.storedMessage != null)
{
loggerOrNull
.log(storingLoggerOrNull.storedLevel, storingLoggerOrNull.storedMessage);
} else if (itemsOrNull == null)
{
loggerOrNull.log(LogLevel.ERROR, "Could not get listing of store '" + toString()
+ "': time out.");
}
}
return itemsOrNull;
} }
public final HighwaterMarkWatcher getHighwaterMarkWatcher() public final HighwaterMarkWatcher getHighwaterMarkWatcher()
...@@ -131,7 +183,7 @@ public final class FileStoreRemoteMounted extends AbstractFileStore ...@@ -131,7 +183,7 @@ public final class FileStoreRemoteMounted extends AbstractFileStore
} }
// //
// FileStore // AbstractFileStore
// //
@Override @Override
...@@ -147,76 +199,4 @@ public final class FileStoreRemoteMounted extends AbstractFileStore ...@@ -147,76 +199,4 @@ public final class FileStoreRemoteMounted extends AbstractFileStore
return "[mounted remote fs] " + pathStr; return "[mounted remote fs] " + pathStr;
} }
// -----
@Private
static final class LastChangeWrapper
{
private final ExecutorService lastChangedExecutor =
new NamingThreadPoolExecutor("Last Changed Explorer").daemonize();
private final long lastChangedTimeoutMillis;
private final IFileStore localImpl;
public LastChangeWrapper(IFileStore localImpl, long lastChangedTimeoutMillis)
{
this.lastChangedTimeoutMillis = lastChangedTimeoutMillis;
this.localImpl = localImpl;
}
// call checking last change in a separate thread with timeout
public DateStatus lastChangedInternal(StoreItem item, long stopWhenFindYoungerAge,
boolean isAgeRelative)
{
Callable<DateStatus> callable =
createLastChangedCallable(localImpl, item, stopWhenFindYoungerAge,
isAgeRelative);
final ISimpleLogger simpleMachineLog = new Log4jSimpleLogger(machineLog);
final Future<DateStatus> future = lastChangedExecutor.submit(callable);
ExecutionResult<DateStatus> executionResult =
ConcurrencyUtilities.getResult(future, lastChangedTimeoutMillis,
simpleMachineLog, "Check for recent paths");
DateStatus result = executionResult.tryGetResult();
if (result == null)
{
return DateStatus.createError(String.format(
"Could not determine \"last changed time\" of %s: time out.", item));
} else
{
return result;
}
}
private Callable<DateStatus> createLastChangedCallable(final IFileStore store,
final StoreItem item, final long stopWhenFindYoungerAge, final boolean isAgeRelative)
{
return new Callable<DateStatus>()
{
public DateStatus call() throws Exception
{
if (machineLog.isTraceEnabled())
{
machineLog.trace("Starting quick check for recent paths on '" + item
+ "'.");
}
final DateStatus lastChanged;
if (isAgeRelative)
{
lastChanged = store.lastChangedRelative(item, stopWhenFindYoungerAge);
} else
{
lastChanged = store.lastChanged(item, stopWhenFindYoungerAge);
}
if (machineLog.isTraceEnabled())
{
machineLog.trace(String.format(
"Finishing quick check for recent paths on '%s': %s.", item,
lastChanged));
}
return lastChanged;
}
};
}
}
} }
/*
* Copyright 2008 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.systemsx.cisd.datamover.filesystem.store;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;
import org.hamcrest.Description;
import org.jmock.Expectations;
import org.jmock.Mockery;
import org.jmock.api.Action;
import org.jmock.api.Invocation;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import ch.rinn.restrictions.Friend;
import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel;
import ch.systemsx.cisd.common.logging.LogInitializer;
import ch.systemsx.cisd.common.utilities.StoreItem;
import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore;
import ch.systemsx.cisd.datamover.filesystem.intf.DateStatus;
import ch.systemsx.cisd.datamover.filesystem.store.FileStoreRemoteMounted.LastChangeWrapper;
/**
* @author Tomasz Pylak
*/
@Friend(toClasses = FileStoreRemoteMounted.LastChangeWrapper.class)
public class FileStoreRemoteMountedTest
{
private Mockery context;
private IFileStore localStoreMock;
@BeforeMethod
public void setUp()
{
LogInitializer.init();
this.context = new Mockery();
this.localStoreMock = context.mock(IFileStore.class);
}
@AfterMethod
public void tearDown()
{
// To following line of code should also be called at the end of each test method.
// Otherwise one do not known which test failed.
context.assertIsSatisfied();
}
@Test
public void testLastChangeHappyCase() throws Throwable
{
final StoreItem item = new StoreItem("some-directory");
final long age = 212;
long timeout = 100;
LastChangeWrapper lastChangeWrapper =
new FileStoreRemoteMounted.LastChangeWrapper(localStoreMock, timeout);
final PathLastChangedCheckerDelayed lastChangedSleepAction =
new PathLastChangedCheckerDelayed(timeout / 2); // return before timeout
context.checking(new Expectations()
{
{
one(localStoreMock).lastChangedRelative(item, age);
will(lastChangedSleepAction);
}
});
DateStatus result = lastChangeWrapper.lastChangedInternal(item, age, true);
AssertJUnit.assertFalse(result.isError());
assertFalse(lastChangedSleepAction.lastCheckInterrupted());
context.assertIsSatisfied();
}
@Test(groups = "broken")
public void testLastChangedBlockedAndTerminated() throws Throwable
{
final StoreItem item = new StoreItem("some-directory");
final long age = 212;
long timeout = 100;
LastChangeWrapper lastChangeWrapper =
new FileStoreRemoteMounted.LastChangeWrapper(localStoreMock, timeout);
final PathLastChangedCheckerDelayed lastChangedSleepAction =
new PathLastChangedCheckerDelayed(timeout * 2);
context.checking(new Expectations()
{
{
one(localStoreMock).lastChanged(item, age);
will(lastChangedSleepAction);
}
});
DateStatus result = lastChangeWrapper.lastChangedInternal(item, age, false);
assertTrue(result.isError());
assertTrue(lastChangedSleepAction.lastCheckInterrupted());
context.assertIsSatisfied();
}
private final static class PathLastChangedCheckerDelayed implements Action
{
private final long delayMillis;
private volatile boolean interrupted;
public PathLastChangedCheckerDelayed(long delayMillis)
{
this.interrupted = false;
this.delayMillis = delayMillis;
}
public Object invoke(Invocation invocation) throws Throwable
{
try
{
Thread.sleep(delayMillis); // Wait predefined time.
} catch (InterruptedException e)
{
this.interrupted = true;
// That is what we expect if we are terminated.
throw new CheckedExceptionTunnel(new InterruptedException(e.getMessage()));
}
this.interrupted = false;
return DateStatus.create(System.currentTimeMillis());
}
synchronized boolean lastCheckInterrupted()
{
return interrupted;
}
public void describeTo(Description description)
{
description.appendText(toString());
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment