From 1b9db3d2dc27700992c9cb5c0e0adb1f9513890a Mon Sep 17 00:00:00 2001 From: brinn <brinn> Date: Thu, 3 Jul 2008 06:14:11 +0000 Subject: [PATCH] [DMV-19] add: monitoring for all calls that could potentially hang change: use MonitoringProxy for the actual monitoring work remove: unit test class as it has become obsolete SVN: 7019 --- .../store/FileStoreRemoteMounted.java | 176 ++++++++---------- .../store/FileStoreRemoteMountedTest.java | 151 --------------- 2 files changed, 78 insertions(+), 249 deletions(-) delete mode 100644 datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMountedTest.java diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java index 4baafe1953d..6d6f98c118b 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java @@ -16,33 +16,23 @@ package ch.systemsx.cisd.datamover.filesystem.store; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; - -import org.apache.log4j.Logger; - -import ch.rinn.restrictions.Private; -import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities; -import ch.systemsx.cisd.common.concurrent.ExecutionResult; -import ch.systemsx.cisd.common.concurrent.NamingThreadPoolExecutor; +import ch.systemsx.cisd.common.concurrent.MonitoringProxy; import ch.systemsx.cisd.common.exceptions.ConfigurationFailureException; import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException; import ch.systemsx.cisd.common.exceptions.Status; +import ch.systemsx.cisd.common.exceptions.StatusFlag; import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.highwatermark.HostAwareFileWithHighwaterMark; import ch.systemsx.cisd.common.logging.ISimpleLogger; -import ch.systemsx.cisd.common.logging.Log4jSimpleLogger; -import ch.systemsx.cisd.common.logging.LogCategory; -import ch.systemsx.cisd.common.logging.LogFactory; +import ch.systemsx.cisd.common.logging.LogLevel; import ch.systemsx.cisd.common.utilities.StoreItem; import ch.systemsx.cisd.datamover.filesystem.intf.AbstractFileStore; import ch.systemsx.cisd.datamover.filesystem.intf.BooleanStatus; +import ch.systemsx.cisd.datamover.filesystem.intf.DateStatus; import ch.systemsx.cisd.datamover.filesystem.intf.IExtendedFileStore; import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore; import ch.systemsx.cisd.datamover.filesystem.intf.IFileSysOperationsFactory; import ch.systemsx.cisd.datamover.filesystem.intf.IStoreCopier; -import ch.systemsx.cisd.datamover.filesystem.intf.DateStatus; /** * A <code>FileStore</code> extension for remote paths mounted. @@ -54,12 +44,9 @@ import ch.systemsx.cisd.datamover.filesystem.intf.DateStatus; */ public final class FileStoreRemoteMounted extends AbstractFileStore { - private static final Logger machineLog = - LogFactory.getLogger(LogCategory.MACHINE, FileStoreRemoteMounted.class); - private final IFileStore localImpl; - private final LastChangeWrapper lastChangeInvoker; + private final IFileStore localImplMonitored; /** * @param lastChangedTimeoutMillis number of milliseconds after which checking last modification @@ -71,7 +58,9 @@ public final class FileStoreRemoteMounted extends AbstractFileStore { super(file, desription, factory); this.localImpl = new FileStoreLocal(file, desription, factory); - this.lastChangeInvoker = new LastChangeWrapper(localImpl, lastChangedTimeoutMillis); + this.localImplMonitored = + MonitoringProxy.create(IFileStore.class, localImpl).timeoutMillis( + lastChangedTimeoutMillis).get(); } // @@ -96,33 +85,96 @@ public final class FileStoreRemoteMounted extends AbstractFileStore public final Status delete(final StoreItem item) { - return localImpl.delete(item); + final Status statusOrNull = localImplMonitored.delete(item); + if (statusOrNull == null) + { + return new Status(StatusFlag.RETRIABLE_ERROR, "Could not delete '" + item + + "': time out."); + } + return statusOrNull; } public final BooleanStatus exists(final StoreItem item) { - return localImpl.exists(item); + final BooleanStatus statusOrNull = localImplMonitored.exists(item); + if (statusOrNull == null) + { + return BooleanStatus.createError("Could not determine whether '" + item + + "' exists: time out."); + } + return statusOrNull; } public final DateStatus lastChanged(final StoreItem item, final long stopWhenFindYounger) { - return lastChangeInvoker.lastChangedInternal(item, stopWhenFindYounger, false); + final DateStatus statusOrNull = localImplMonitored.lastChanged(item, stopWhenFindYounger); + if (statusOrNull == null) + { + return DateStatus.createError(String.format( + "Could not determine \"last changed time\" of %s: time out.", item)); + } + return statusOrNull; } public final DateStatus lastChangedRelative(final StoreItem item, final long stopWhenFindYoungerRelative) { - return lastChangeInvoker.lastChangedInternal(item, stopWhenFindYoungerRelative, true); + final DateStatus statusOrNull = + localImplMonitored.lastChangedRelative(item, stopWhenFindYoungerRelative); + if (statusOrNull == null) + { + return DateStatus.createError(String.format( + "Could not determine \"last changed time\" of %s: time out.", item)); + } + return statusOrNull; } public final BooleanStatus tryCheckDirectoryFullyAccessible(final long timeOutMillis) { - return localImpl.tryCheckDirectoryFullyAccessible(timeOutMillis); + final BooleanStatus statusOrNull = + localImplMonitored.tryCheckDirectoryFullyAccessible(timeOutMillis); + if (statusOrNull == null) + { + return BooleanStatus.createError("Could not determine whether store '" + toString() + + "' is fully accessible: time out."); + } + return statusOrNull; + } + + private static class StoringLogger implements ISimpleLogger + { + + LogLevel storedLevel = null; + + String storedMessage = null; + + public void log(LogLevel level, String message) + { + this.storedLevel = level; + this.storedMessage = message; + } + } public final StoreItem[] tryListSortByLastModified(final ISimpleLogger loggerOrNull) { - return localImpl.tryListSortByLastModified(loggerOrNull); + final StoringLogger storingLoggerOrNull = + (loggerOrNull == null) ? null : new StoringLogger(); + final StoreItem[] itemsOrNull = + localImplMonitored.tryListSortByLastModified(storingLoggerOrNull); + if (loggerOrNull != null) + { + if (storingLoggerOrNull.storedMessage != null) + { + loggerOrNull + .log(storingLoggerOrNull.storedLevel, storingLoggerOrNull.storedMessage); + } else if (itemsOrNull == null) + { + loggerOrNull.log(LogLevel.ERROR, "Could not get listing of store '" + toString() + + "': time out."); + } + } + return itemsOrNull; } public final HighwaterMarkWatcher getHighwaterMarkWatcher() @@ -131,7 +183,7 @@ public final class FileStoreRemoteMounted extends AbstractFileStore } // - // FileStore + // AbstractFileStore // @Override @@ -147,76 +199,4 @@ public final class FileStoreRemoteMounted extends AbstractFileStore return "[mounted remote fs] " + pathStr; } - // ----- - - @Private - static final class LastChangeWrapper - { - private final ExecutorService lastChangedExecutor = - new NamingThreadPoolExecutor("Last Changed Explorer").daemonize(); - - private final long lastChangedTimeoutMillis; - - private final IFileStore localImpl; - - public LastChangeWrapper(IFileStore localImpl, long lastChangedTimeoutMillis) - { - this.lastChangedTimeoutMillis = lastChangedTimeoutMillis; - this.localImpl = localImpl; - } - - // call checking last change in a separate thread with timeout - public DateStatus lastChangedInternal(StoreItem item, long stopWhenFindYoungerAge, - boolean isAgeRelative) - { - Callable<DateStatus> callable = - createLastChangedCallable(localImpl, item, stopWhenFindYoungerAge, - isAgeRelative); - final ISimpleLogger simpleMachineLog = new Log4jSimpleLogger(machineLog); - final Future<DateStatus> future = lastChangedExecutor.submit(callable); - ExecutionResult<DateStatus> executionResult = - ConcurrencyUtilities.getResult(future, lastChangedTimeoutMillis, - simpleMachineLog, "Check for recent paths"); - DateStatus result = executionResult.tryGetResult(); - if (result == null) - { - return DateStatus.createError(String.format( - "Could not determine \"last changed time\" of %s: time out.", item)); - } else - { - return result; - } - } - - private Callable<DateStatus> createLastChangedCallable(final IFileStore store, - final StoreItem item, final long stopWhenFindYoungerAge, final boolean isAgeRelative) - { - return new Callable<DateStatus>() - { - public DateStatus call() throws Exception - { - if (machineLog.isTraceEnabled()) - { - machineLog.trace("Starting quick check for recent paths on '" + item - + "'."); - } - final DateStatus lastChanged; - if (isAgeRelative) - { - lastChanged = store.lastChangedRelative(item, stopWhenFindYoungerAge); - } else - { - lastChanged = store.lastChanged(item, stopWhenFindYoungerAge); - } - if (machineLog.isTraceEnabled()) - { - machineLog.trace(String.format( - "Finishing quick check for recent paths on '%s': %s.", item, - lastChanged)); - } - return lastChanged; - } - }; - } - } } diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMountedTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMountedTest.java deleted file mode 100644 index 2e07e6510e9..00000000000 --- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMountedTest.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Copyright 2008 ETH Zuerich, CISD - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.systemsx.cisd.datamover.filesystem.store; - -import static org.testng.AssertJUnit.assertFalse; -import static org.testng.AssertJUnit.assertTrue; - -import org.hamcrest.Description; -import org.jmock.Expectations; -import org.jmock.Mockery; -import org.jmock.api.Action; -import org.jmock.api.Invocation; -import org.testng.AssertJUnit; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import ch.rinn.restrictions.Friend; -import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel; -import ch.systemsx.cisd.common.logging.LogInitializer; -import ch.systemsx.cisd.common.utilities.StoreItem; -import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore; -import ch.systemsx.cisd.datamover.filesystem.intf.DateStatus; -import ch.systemsx.cisd.datamover.filesystem.store.FileStoreRemoteMounted.LastChangeWrapper; - -/** - * @author Tomasz Pylak - */ -@Friend(toClasses = FileStoreRemoteMounted.LastChangeWrapper.class) -public class FileStoreRemoteMountedTest -{ - private Mockery context; - - private IFileStore localStoreMock; - - @BeforeMethod - public void setUp() - { - LogInitializer.init(); - this.context = new Mockery(); - this.localStoreMock = context.mock(IFileStore.class); - } - - @AfterMethod - public void tearDown() - { - // To following line of code should also be called at the end of each test method. - // Otherwise one do not known which test failed. - context.assertIsSatisfied(); - } - - @Test - public void testLastChangeHappyCase() throws Throwable - { - final StoreItem item = new StoreItem("some-directory"); - final long age = 212; - long timeout = 100; - LastChangeWrapper lastChangeWrapper = - new FileStoreRemoteMounted.LastChangeWrapper(localStoreMock, timeout); - final PathLastChangedCheckerDelayed lastChangedSleepAction = - new PathLastChangedCheckerDelayed(timeout / 2); // return before timeout - context.checking(new Expectations() - { - { - one(localStoreMock).lastChangedRelative(item, age); - will(lastChangedSleepAction); - } - }); - - DateStatus result = lastChangeWrapper.lastChangedInternal(item, age, true); - AssertJUnit.assertFalse(result.isError()); - assertFalse(lastChangedSleepAction.lastCheckInterrupted()); - context.assertIsSatisfied(); - } - - @Test(groups = "broken") - public void testLastChangedBlockedAndTerminated() throws Throwable - { - final StoreItem item = new StoreItem("some-directory"); - final long age = 212; - long timeout = 100; - LastChangeWrapper lastChangeWrapper = - new FileStoreRemoteMounted.LastChangeWrapper(localStoreMock, timeout); - final PathLastChangedCheckerDelayed lastChangedSleepAction = - new PathLastChangedCheckerDelayed(timeout * 2); - context.checking(new Expectations() - { - { - one(localStoreMock).lastChanged(item, age); - will(lastChangedSleepAction); - } - }); - - DateStatus result = lastChangeWrapper.lastChangedInternal(item, age, false); - assertTrue(result.isError()); - assertTrue(lastChangedSleepAction.lastCheckInterrupted()); - context.assertIsSatisfied(); - } - - private final static class PathLastChangedCheckerDelayed implements Action - { - private final long delayMillis; - - private volatile boolean interrupted; - - public PathLastChangedCheckerDelayed(long delayMillis) - { - this.interrupted = false; - this.delayMillis = delayMillis; - } - - public Object invoke(Invocation invocation) throws Throwable - { - try - { - Thread.sleep(delayMillis); // Wait predefined time. - } catch (InterruptedException e) - { - this.interrupted = true; - // That is what we expect if we are terminated. - throw new CheckedExceptionTunnel(new InterruptedException(e.getMessage())); - } - this.interrupted = false; - return DateStatus.create(System.currentTimeMillis()); - } - - synchronized boolean lastCheckInterrupted() - { - return interrupted; - } - - public void describeTo(Description description) - { - description.appendText(toString()); - } - } -} -- GitLab