diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java index 00277d8aea7e7e3436558cc9d8cc22f5fea79608..6af1719841bf703f5121b34117be3d1501ee304e 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java @@ -20,10 +20,10 @@ import java.io.File; import java.util.Timer; import ch.systemsx.cisd.common.Constants; +import ch.systemsx.cisd.common.highwatermark.HighwaterMarkDirectoryScanningHandler; import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; -import ch.systemsx.cisd.common.highwatermark.PathHandlerInterceptor; -import ch.systemsx.cisd.common.highwatermark.StoreHandlerInterceptor; import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask; +import ch.systemsx.cisd.common.utilities.FaultyPathHandler; import ch.systemsx.cisd.common.utilities.FileUtilities; import ch.systemsx.cisd.common.utilities.IStoreHandler; import ch.systemsx.cisd.common.utilities.ITerminable; @@ -67,7 +67,8 @@ public class DataMover * * @return object which can be used to terminate the process and all its threads */ - public static final ITerminable start(final Parameters parameters, final IFileSysOperationsFactory factory) + public static final ITerminable start(final Parameters parameters, + final IFileSysOperationsFactory factory) { return start(parameters, factory, createLocalBufferDirs(parameters)); } @@ -79,8 +80,8 @@ public class DataMover } /** Allows to specify buffer directories. Exposed for testing purposes. */ - public static final ITerminable start(final Parameters parameters, final IFileSysOperationsFactory factory, - final LocalBufferDirs localBufferDirs) + public static final ITerminable start(final Parameters parameters, + final IFileSysOperationsFactory factory, final LocalBufferDirs localBufferDirs) { return new DataMover(parameters, factory, localBufferDirs).start(); } @@ -137,39 +138,38 @@ public class DataMover { final HighwaterMarkWatcher highwaterMarkWatcher = new HighwaterMarkWatcher(bufferDirs.getBufferDirHighwaterMark()); - highwaterMarkWatcher.setPath(bufferDirs.getReadyToMoveDir()); final LocalProcessor localProcessor = new LocalProcessor(parameters, bufferDirs, factory.getImmutableCopier(), factory - .getMover(), highwaterMarkWatcher); - final PathHandlerInterceptor pathHandlerInterceptor = - new PathHandlerInterceptor(localProcessor); - highwaterMarkWatcher.addChangeListener(pathHandlerInterceptor); + .getMover()); + final File sourceDirectory = bufferDirs.getCopyCompleteDir(); + final HighwaterMarkDirectoryScanningHandler directoryScanningHandler = + new HighwaterMarkDirectoryScanningHandler(new FaultyPathHandler(sourceDirectory), + highwaterMarkWatcher, bufferDirs.getReadyToMoveDir()); final DirectoryScanningTimerTask localProcessingTask = - new DirectoryScanningTimerTask(bufferDirs.getCopyCompleteDir(), - FileUtilities.ACCEPT_ALL_FILTER, pathHandlerInterceptor); - pathHandlerInterceptor.setDirectoryScanning(localProcessingTask); + new DirectoryScanningTimerTask(sourceDirectory, FileUtilities.ACCEPT_ALL_FILTER, + localProcessor, directoryScanningHandler); return new DataMoverProcess(localProcessingTask, "Local Processor", localProcessor); } private final DataMoverProcess createOutgoingMovingProcess() { final IFileStore outgoingStore = parameters.getOutgoingStore(factory); - final File readyToMoveDir = bufferDirs.getReadyToMoveDir(); + final File sourceDirectory = bufferDirs.getReadyToMoveDir(); final IFileStore readyToMoveStore = - FileStoreFactory.createLocal(readyToMoveDir, "ready-to-move", factory); + FileStoreFactory.createLocal(sourceDirectory, "ready-to-move", factory); final IStoreHandler remoteStoreMover = createRemotePathMover(readyToMoveStore, outgoingStore); - final StoreHandlerInterceptor storeHandlerInterceptor = - new StoreHandlerInterceptor(remoteStoreMover); + final HighwaterMarkDirectoryScanningHandler directoryScanningHandler = new HighwaterMarkDirectoryScanningHandler( + new FaultyPathHandler(sourceDirectory), readyToMoveStore + .getHighwaterMarkWatcher()); final DirectoryScanningTimerTask outgoingMovingTask = - new DirectoryScanningTimerTask(readyToMoveDir, FileUtilities.ACCEPT_ALL_FILTER, - storeHandlerInterceptor); - outgoingStore.getHighwaterMarkWatcher().addChangeListener(storeHandlerInterceptor); - storeHandlerInterceptor.setDirectoryScanning(outgoingMovingTask); + new DirectoryScanningTimerTask(sourceDirectory, FileUtilities.ACCEPT_ALL_FILTER, + remoteStoreMover, directoryScanningHandler); return new DataMoverProcess(outgoingMovingTask, "Final Destination Mover"); } - private IStoreHandler createRemotePathMover(final IFileStore source, final IFileStore destination) + private IStoreHandler createRemotePathMover(final IFileStore source, + final IFileStore destination) { return RemoteMonitoredMoverFactory.create(source, destination, parameters); } diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java b/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java index 3a0a2ab7cfa6f3c1f5ca76245b1c1f8d4f2e745a..4ca518871e6d36aef6c9ae60ede62255e3965ebf 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java @@ -142,11 +142,6 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory // IStoreHandler // - public final boolean mayHandle(final StoreItem item) - { - return true; - } - public final void handle(final StoreItem sourceItem) { final IExtendedFileStore extendedFileStore = incomingStore.tryAsExtended(); diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/LocalProcessor.java b/datamover/source/java/ch/systemsx/cisd/datamover/LocalProcessor.java index 56d4e7121e8352c75a24de63947fa85efb806875..e0100129927e486b9fd5919b5b723b9bb8d7bd6d 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/LocalProcessor.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/LocalProcessor.java @@ -23,7 +23,6 @@ import java.util.regex.Pattern; import org.apache.log4j.Level; import org.apache.log4j.Logger; -import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.logging.ISimpleLogger; import ch.systemsx.cisd.common.logging.Log4jSimpleLogger; import ch.systemsx.cisd.common.logging.LogCategory; @@ -78,17 +77,13 @@ public class LocalProcessor implements IPathHandler, IRecoverableTimerTaskFactor private final File extraCopyDirOrNull; - private final HighwaterMarkWatcher highwaterMarkWatcher; - LocalProcessor(final Parameters parameters, final LocalBufferDirs bufferDirs, - final IPathImmutableCopier copier, final IPathMover mover, - final HighwaterMarkWatcher highwaterMarkWatcher) + final IPathImmutableCopier copier, final IPathMover mover) { this.parameters = parameters; this.inputDir = bufferDirs.getCopyCompleteDir(); this.outputDir = bufferDirs.getReadyToMoveDir(); this.tempDir = bufferDirs.getTempDir(); - this.highwaterMarkWatcher = highwaterMarkWatcher; this.extraCopyDirOrNull = parameters.tryGetExtraCopyDir(); this.copier = copier; this.mover = mover; @@ -301,12 +296,6 @@ public class LocalProcessor implements IPathHandler, IRecoverableTimerTaskFactor } } - public final boolean mayHandle(final File path) - { - highwaterMarkWatcher.run(); - return highwaterMarkWatcher.isBelow() == false; - } - // // Helper classes // diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/RemotePathMover.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/RemotePathMover.java index 6749a4c700b4d00cb10737509aa5eec01981906c..21eb32a3e22639adeeb3502aa80e7c63aa43df61 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/RemotePathMover.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/RemotePathMover.java @@ -23,7 +23,6 @@ import ch.systemsx.cisd.common.Constants; import ch.systemsx.cisd.common.exceptions.ConfigurationFailureException; import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.exceptions.StatusFlag; -import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.utilities.IStoreHandler; @@ -288,15 +287,6 @@ public final class RemotePathMover implements IStoreHandler // IStoreHandler // - public final boolean mayHandle(final StoreItem item) - { - final HighwaterMarkWatcher highwaterMarkWatcher = - destinationDirectory.getHighwaterMarkWatcher(); - assert highwaterMarkWatcher.getPath() != null : "Remote path not set"; - highwaterMarkWatcher.run(); - return highwaterMarkWatcher.isBelow() == false; - } - public final void handle(final StoreItem item) { if (isDeletionInProgress(item))