diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java index a8daa22f446e7ca28974a2aafd76eeeb6f86bd52..9b6a635a37632e601575dc2ffab81381d9d13564 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java @@ -22,6 +22,7 @@ import java.util.Timer; import ch.systemsx.cisd.common.Constants; import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.highwatermark.PathHandlerInterceptor; +import ch.systemsx.cisd.common.highwatermark.StoreHandlerInterceptor; import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask; import ch.systemsx.cisd.common.utilities.FileUtilities; import ch.systemsx.cisd.common.utilities.IStoreHandler; @@ -133,7 +134,7 @@ public class DataMover return IncomingProcessor.createMovingProcess(parameters, factory, bufferDirs); } - private DataMoverProcess createLocalProcessor() + private final DataMoverProcess createLocalProcessor() { final HighwaterMarkWatcher highwaterMarkWatcher = new HighwaterMarkWatcher(bufferDirs.getBufferDirHighwaterMark()); @@ -150,7 +151,7 @@ public class DataMover return new DataMoverProcess(localProcessingTask, "Local Processor", localProcessor); } - private DataMoverProcess createOutgoingMovingProcess() + private final DataMoverProcess createOutgoingMovingProcess() { final FileStore outgoingStore = parameters.getOutgoingStore(factory); final File readyToMoveDir = bufferDirs.getReadyToMoveDir(); @@ -158,10 +159,13 @@ public class DataMover FileStoreFactory.createLocal(readyToMoveDir, "ready-to-move", factory); final IStoreHandler remoteStoreMover = createRemotePathMover(readyToMoveStore, outgoingStore); - + final StoreHandlerInterceptor storeHandlerInterceptor = + new StoreHandlerInterceptor(remoteStoreMover); final DirectoryScanningTimerTask outgoingMovingTask = new DirectoryScanningTimerTask(readyToMoveDir, FileUtilities.ACCEPT_ALL_FILTER, - remoteStoreMover); + storeHandlerInterceptor); + outgoingStore.getHighwaterMarkWatcher().addChangeListener(storeHandlerInterceptor); + storeHandlerInterceptor.setDirectoryScanning(outgoingMovingTask); return new DataMoverProcess(outgoingMovingTask, "Final Destination Mover"); } diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/FileStore.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/FileStore.java index 9fec5a272b85222582f784bcfa2cfee9db4ea1c3..598824bfb88daf3eb9297a8a29abc8a8bfdf5913 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/FileStore.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/FileStore.java @@ -28,7 +28,6 @@ import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException; import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.highwatermark.FileWithHighwaterMark; import ch.systemsx.cisd.common.highwatermark.HighwaterMarkSelfTestable; -import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.logging.ISimpleLogger; import ch.systemsx.cisd.common.utilities.StoreItem; import ch.systemsx.cisd.datamover.filesystem.remote.RemotePathMover; @@ -169,8 +168,8 @@ public abstract class FileStore implements IFileStore { throw new ConfigurationFailureException(errorMessage); } - new HighwaterMarkSelfTestable(fileWithHighwaterMark.getFile(), new HighwaterMarkWatcher( - fileWithHighwaterMark.getHighwaterMark())).check(); + new HighwaterMarkSelfTestable(fileWithHighwaterMark.getFile(), getHighwaterMarkWatcher()) + .check(); } // diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/IFileStore.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/IFileStore.java index 211e844ad9f5ffa6231dd42a8a6ffc80da338fd9..99312d347a89d3f66cda85b2db44f2eafd5ed917 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/IFileStore.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/IFileStore.java @@ -17,6 +17,7 @@ package ch.systemsx.cisd.datamover.filesystem.intf; import ch.systemsx.cisd.common.exceptions.Status; +import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.logging.ISimpleLogger; import ch.systemsx.cisd.common.utilities.ISelfTestable; import ch.systemsx.cisd.common.utilities.StoreItem; @@ -102,4 +103,9 @@ public interface IFileStore extends ISelfTestable public String getLocationDescription(StoreItem item); public IExtendedFileStore tryAsExtended(); + + /** + * Returns the <code>HighwaterMarkWatcher</code> for this implementation. + */ + public HighwaterMarkWatcher getHighwaterMarkWatcher(); } \ No newline at end of file diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreLocal.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreLocal.java index 7a3aa1aa390c059e352090e3d3ff32198f6c51a1..caf40cf1f8861a24c615529f05e2e4870949ace0 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreLocal.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreLocal.java @@ -23,10 +23,10 @@ import org.apache.log4j.Logger; import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.highwatermark.FileWithHighwaterMark; +import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.logging.ISimpleLogger; import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; -import ch.systemsx.cisd.common.utilities.DirectoryScannedStore; import ch.systemsx.cisd.common.utilities.FileUtilities; import ch.systemsx.cisd.common.utilities.StoreItem; import ch.systemsx.cisd.datamover.common.MarkerFile; @@ -53,12 +53,15 @@ public class FileStoreLocal extends ExtendedFileStore private final IPathRemover remover; + private final HighwaterMarkWatcher highwaterMarkWatcher; + public FileStoreLocal(final FileWithHighwaterMark file, final String desription, final IFileSysOperationsFactory factory) { super(file, null, false, desription, factory); this.remover = factory.getRemover(); this.mover = factory.getMover(); + this.highwaterMarkWatcher = new HighwaterMarkWatcher(file.getHighwaterMark()); } // @@ -168,13 +171,18 @@ public class FileStoreLocal extends ExtendedFileStore if (files != null) { FileUtilities.sortByLastModified(files); - return DirectoryScannedStore.asItems(files); + return StoreItem.asItems(files); } else { return null; } } + public final HighwaterMarkWatcher getHighwaterMarkWatcher() + { + return highwaterMarkWatcher; + } + // ------ /** diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemote.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemote.java index f95c41c2820aebb911751d0138b9ccd6e9a136db..5595f74b7ca490c0f2384ed7a99047bdcd43a6db 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemote.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemote.java @@ -16,11 +16,10 @@ package ch.systemsx.cisd.datamover.filesystem.store; -import ch.systemsx.cisd.common.exceptions.ConfigurationFailureException; -import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException; import ch.systemsx.cisd.common.exceptions.NotImplementedException; import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.highwatermark.FileWithHighwaterMark; +import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.logging.ISimpleLogger; import ch.systemsx.cisd.common.utilities.StoreItem; import ch.systemsx.cisd.datamover.filesystem.intf.FileStore; @@ -108,8 +107,7 @@ public class FileStoreRemote extends FileStore throw new NotImplementedException(); } - @Override - public final void check() throws EnvironmentFailureException, ConfigurationFailureException + public final HighwaterMarkWatcher getHighwaterMarkWatcher() { throw new NotImplementedException(); } diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java index 7579a9b98eaea0a7c99d5920b2f76f5389181f29..bf2c386212947167af9fbc09ef9acfe722e3d9bf 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java @@ -20,6 +20,7 @@ import ch.systemsx.cisd.common.exceptions.ConfigurationFailureException; import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException; import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.highwatermark.FileWithHighwaterMark; +import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.logging.ISimpleLogger; import ch.systemsx.cisd.common.utilities.StoreItem; import ch.systemsx.cisd.datamover.filesystem.intf.FileStore; @@ -118,4 +119,9 @@ public final class FileStoreRemoteMounted extends FileStore { localImpl.check(); } + + public final HighwaterMarkWatcher getHighwaterMarkWatcher() + { + return localImpl.getHighwaterMarkWatcher(); + } } diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java index 67f7be0ad241f0da1dfc241dddce9248679411dd..b55759fb4ecb5b623493ac83faa2f0b7ea0b00c3 100644 --- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java +++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java @@ -30,6 +30,7 @@ import org.testng.annotations.Test; import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel; import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.highwatermark.FileWithHighwaterMark; +import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.logging.ISimpleLogger; import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogInitializer; @@ -228,6 +229,11 @@ public class CopyActivityMonitorTest { return localImpl.tryListSortByLastModified(loggerOrNull); } + + public final HighwaterMarkWatcher getHighwaterMarkWatcher() + { + return localImpl.getHighwaterMarkWatcher(); + } }; }