diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/Parameters.java b/datamover/source/java/ch/systemsx/cisd/datamover/Parameters.java index cfe9552c4eb7190a4e0444f5aa00de9c2decd7bd..658317ed8b733c716d97cc3c2213bc4aafd634f6 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/Parameters.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/Parameters.java @@ -300,7 +300,8 @@ public final class Parameters implements ITimingParameters, IFileSysParameters @Option(longName = "help", skipForExample = true, usage = "Prints out a description of the options.") void printHelp(final boolean exit) { - parser.printHelp("Datamover", "<required options> [option [...]]", "[status|mstatus]", ExampleMode.ALL); + parser.printHelp("Datamover", "<required options> [option [...]]", "[status|mstatus]", + ExampleMode.ALL); if (exit) { System.exit(0); @@ -657,9 +658,10 @@ public final class Parameters implements ITimingParameters, IFileSysParameters public final IFileStore getIncomingStore(final IFileSysOperationsFactory factory) { return FileStoreFactory.createStore(incomingTarget, INCOMING_KIND_DESC, - treatIncomingAsRemote, factory, incomingHostFindExecutableOrNull); + treatIncomingAsRemote, factory, incomingHostFindExecutableOrNull, + checkIntervalMillis); } - + public final HostAwareFileWithHighwaterMark getOutgoingTarget() { return outgoingTarget; @@ -679,7 +681,7 @@ public final class Parameters implements ITimingParameters, IFileSysParameters public final IFileStore getOutgoingStore(final IFileSysOperationsFactory factory) { return FileStoreFactory.createStore(outgoingTarget, OUTGOING_KIND_DESC, true, factory, - outgoingHostFindExecutableOrNull); + outgoingHostFindExecutableOrNull, checkIntervalMillis); } /** @@ -730,13 +732,12 @@ public final class Parameters implements ITimingParameters, IFileSysParameters { return prefixForIncoming; } - + public final List<String> getArgs() { return Collections.unmodifiableList(arguments); } - /** * Logs the current parameters to the {@link LogCategory#OPERATION} log. */ diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/FileStoreFactory.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/FileStoreFactory.java index 7783a9a0d91f2d9d350687d8ecbcaf9c11ca9b14..a37128a4e83b4905e42a74c51a0298b395686157 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/FileStoreFactory.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/FileStoreFactory.java @@ -56,13 +56,6 @@ public final class FileStoreFactory return createLocal(new HostAwareFileWithHighwaterMark(readyToMoveDir), string, factory); } - /** use when file store is on a remote share mounted on local host */ - public static final IFileStore createRemoteShare(final HostAwareFileWithHighwaterMark path, - final String kind, final IFileSysOperationsFactory factory) - { - return new FileStoreRemoteMounted(path, kind, factory); - } - /** * use when file store is on a remote share mounted on local host */ @@ -83,17 +76,6 @@ public final class FileStoreFactory return createRemoteHost(new HostAwareFileWithHighwaterMark(host, path), kind, factory, null); } - /** - * Returns the most convenient <code>IFileStore</code> implementation with given <var>values</var>. - */ - public final static IFileStore createStore(final File path, final String kind, - final String hostOrNull, final boolean isRemote, - final IFileSysOperationsFactory factory, String findExecutableOrNull) - { - return createStore(new HostAwareFileWithHighwaterMark(hostOrNull, path), kind, isRemote, - factory, findExecutableOrNull); - } - /** * Returns the most convenient <code>IFileStore</code> implementation with given <var>values</var>. * @@ -101,7 +83,7 @@ public final class FileStoreFactory */ public final static IFileStore createStore(final HostAwareFileWithHighwaterMark path, final String kind, final boolean isRemote, final IFileSysOperationsFactory factory, - final String findExecutableOrNull) + final String findExecutableOrNull, final long checkIntervalMillis) { if (path.tryGetHost() != null) { @@ -110,7 +92,7 @@ public final class FileStoreFactory { if (isRemote) { - return createRemoteShare(path, kind, factory); + return new FileStoreRemoteMounted(path, kind, factory, checkIntervalMillis * 3); } else { return createLocal(path, kind, factory); diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java index ee0f5173b750d83a5be1e78b0dfdc0599224f844..39fc4024e71f70a96f3df1ec95baeff3146a5299 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java @@ -16,12 +16,25 @@ package ch.systemsx.cisd.datamover.filesystem.store; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.log4j.Logger; + +import ch.rinn.restrictions.Private; +import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities; +import ch.systemsx.cisd.common.concurrent.ExecutionResult; +import ch.systemsx.cisd.common.concurrent.NamingThreadPoolExecutor; import ch.systemsx.cisd.common.exceptions.ConfigurationFailureException; import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException; import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.highwatermark.HostAwareFileWithHighwaterMark; import ch.systemsx.cisd.common.logging.ISimpleLogger; +import ch.systemsx.cisd.common.logging.Log4jSimpleLogger; +import ch.systemsx.cisd.common.logging.LogCategory; +import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.utilities.StoreItem; import ch.systemsx.cisd.datamover.filesystem.intf.BooleanStatus; import ch.systemsx.cisd.datamover.filesystem.intf.FileStore; @@ -41,13 +54,24 @@ import ch.systemsx.cisd.datamover.filesystem.intf.NumberStatus; */ public final class FileStoreRemoteMounted extends FileStore { - private final FileStoreLocal localImpl; + private static final Logger machineLog = + LogFactory.getLogger(LogCategory.MACHINE, FileStoreRemoteMounted.class); + + private final IFileStore localImpl; + + private final LastChangeWrapper lastChangeInvoker; + /** + * @param lastChangedTimeoutMillis number of milliseconds after which checking last modification + * time of the item will be terminated and will return with an error. + */ public FileStoreRemoteMounted(final HostAwareFileWithHighwaterMark file, - final String desription, final IFileSysOperationsFactory factory) + final String desription, final IFileSysOperationsFactory factory, + long lastChangedTimeoutMillis) { super(file, desription, factory); this.localImpl = new FileStoreLocal(file, desription, factory); + this.lastChangeInvoker = new LastChangeWrapper(localImpl, lastChangedTimeoutMillis); } // @@ -82,13 +106,13 @@ public final class FileStoreRemoteMounted extends FileStore public final NumberStatus lastChanged(final StoreItem item, final long stopWhenFindYounger) { - return localImpl.lastChanged(item, stopWhenFindYounger); + return lastChangeInvoker.lastChangedInternal(item, stopWhenFindYounger, false); } public final NumberStatus lastChangedRelative(final StoreItem item, final long stopWhenFindYoungerRelative) { - return localImpl.lastChangedRelative(item, stopWhenFindYoungerRelative); + return lastChangeInvoker.lastChangedInternal(item, stopWhenFindYoungerRelative, true); } public final BooleanStatus tryCheckDirectoryFullyAccessible(final long timeOutMillis) @@ -122,4 +146,79 @@ public final class FileStoreRemoteMounted extends FileStore final String pathStr = getPath().getPath(); return "[mounted remote fs] " + pathStr; } + + // ----- + + @Private + static final class LastChangeWrapper + { + private final ExecutorService lastChangedExecutor = + new NamingThreadPoolExecutor("Last Changed Explorer").daemonize(); + + private final long lastChangedTimeoutMillis; + + private final IFileStore localImpl; + + public LastChangeWrapper(IFileStore localImpl, long lastChangedTimeoutMillis) + { + this.lastChangedTimeoutMillis = lastChangedTimeoutMillis; + this.localImpl = localImpl; + } + + // call checking last change in a separate thread with timeout + public NumberStatus lastChangedInternal(StoreItem item, long stopWhenFindYoungerAge, + boolean isAgeRelative) + { + Callable<NumberStatus> callable = + createLastChangedCallable(localImpl, item, stopWhenFindYoungerAge, + isAgeRelative); + final ISimpleLogger simpleMachineLog = new Log4jSimpleLogger(machineLog); + final Future<NumberStatus> future = lastChangedExecutor.submit(callable); + ExecutionResult<NumberStatus> executionResult = + ConcurrencyUtilities.getResult(future, lastChangedTimeoutMillis, + simpleMachineLog, "Check for recent paths"); + NumberStatus result = executionResult.tryGetResult(); + if (result == null) + { + return NumberStatus.createError(String.format( + "Could not determine \"last changed time\" of %s: time out.", item)); + } else + { + return result; + } + } + + private Callable<NumberStatus> createLastChangedCallable(final IFileStore store, + final StoreItem item, final long stopWhenFindYoungerAge, final boolean isAgeRelative) + { + return new Callable<NumberStatus>() + { + public NumberStatus call() throws Exception + { + if (machineLog.isTraceEnabled()) + { + machineLog.trace("Starting quick check for recent paths on '" + item + + "'."); + } + final NumberStatus lastChanged; + if (isAgeRelative) + { + lastChanged = store.lastChangedRelative(item, stopWhenFindYoungerAge); + } else + { + lastChanged = store.lastChanged(item, stopWhenFindYoungerAge); + } + if (machineLog.isTraceEnabled()) + { + machineLog + .trace(String + .format( + "Finishing quick check for recent paths on '%s', found to be %2$tF %2$tT.", + item, lastChanged)); + } + return lastChanged; + } + }; + } + } }