From b638ebc6bd3fd9d353b3a4ac894e637d34bc910c Mon Sep 17 00:00:00 2001 From: ribeaudc <ribeaudc> Date: Thu, 15 May 2008 09:18:35 +0000 Subject: [PATCH] [DMV-12] add: - 'StoreHandlerInterceptor' - 'IFileStore.getHighwaterMarkWatcher' change: - 'asItems'/'asItem' methods moved from 'DirectoryScannedStore' to 'StoreItem'. SVN: 6065 --- .../java/ch/systemsx/cisd/datamover/DataMover.java | 12 ++++++++---- .../cisd/datamover/filesystem/intf/FileStore.java | 5 ++--- .../cisd/datamover/filesystem/intf/IFileStore.java | 6 ++++++ .../datamover/filesystem/store/FileStoreLocal.java | 12 ++++++++++-- .../datamover/filesystem/store/FileStoreRemote.java | 6 ++---- .../filesystem/store/FileStoreRemoteMounted.java | 6 ++++++ .../filesystem/remote/CopyActivityMonitorTest.java | 6 ++++++ 7 files changed, 40 insertions(+), 13 deletions(-) diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java index a8daa22f446..9b6a635a376 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java @@ -22,6 +22,7 @@ import java.util.Timer; import ch.systemsx.cisd.common.Constants; import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.highwatermark.PathHandlerInterceptor; +import ch.systemsx.cisd.common.highwatermark.StoreHandlerInterceptor; import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask; import ch.systemsx.cisd.common.utilities.FileUtilities; import ch.systemsx.cisd.common.utilities.IStoreHandler; @@ -133,7 +134,7 @@ public class DataMover return IncomingProcessor.createMovingProcess(parameters, factory, bufferDirs); } - private DataMoverProcess createLocalProcessor() + private final DataMoverProcess createLocalProcessor() { final HighwaterMarkWatcher highwaterMarkWatcher = new HighwaterMarkWatcher(bufferDirs.getBufferDirHighwaterMark()); @@ -150,7 +151,7 @@ public class DataMover return new DataMoverProcess(localProcessingTask, "Local Processor", localProcessor); } - private DataMoverProcess createOutgoingMovingProcess() + private final DataMoverProcess createOutgoingMovingProcess() { final FileStore outgoingStore = parameters.getOutgoingStore(factory); final File readyToMoveDir = bufferDirs.getReadyToMoveDir(); @@ -158,10 +159,13 @@ public class DataMover FileStoreFactory.createLocal(readyToMoveDir, "ready-to-move", factory); final IStoreHandler remoteStoreMover = createRemotePathMover(readyToMoveStore, outgoingStore); - + final StoreHandlerInterceptor storeHandlerInterceptor = + new StoreHandlerInterceptor(remoteStoreMover); final DirectoryScanningTimerTask outgoingMovingTask = new DirectoryScanningTimerTask(readyToMoveDir, FileUtilities.ACCEPT_ALL_FILTER, - remoteStoreMover); + storeHandlerInterceptor); + outgoingStore.getHighwaterMarkWatcher().addChangeListener(storeHandlerInterceptor); + storeHandlerInterceptor.setDirectoryScanning(outgoingMovingTask); return new DataMoverProcess(outgoingMovingTask, "Final Destination Mover"); } diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/FileStore.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/FileStore.java index 9fec5a272b8..598824bfb88 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/FileStore.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/FileStore.java @@ -28,7 +28,6 @@ import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException; import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.highwatermark.FileWithHighwaterMark; import ch.systemsx.cisd.common.highwatermark.HighwaterMarkSelfTestable; -import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.logging.ISimpleLogger; import ch.systemsx.cisd.common.utilities.StoreItem; import ch.systemsx.cisd.datamover.filesystem.remote.RemotePathMover; @@ -169,8 +168,8 @@ public abstract class FileStore implements IFileStore { throw new ConfigurationFailureException(errorMessage); } - new HighwaterMarkSelfTestable(fileWithHighwaterMark.getFile(), new HighwaterMarkWatcher( - fileWithHighwaterMark.getHighwaterMark())).check(); + new HighwaterMarkSelfTestable(fileWithHighwaterMark.getFile(), getHighwaterMarkWatcher()) + .check(); } // diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/IFileStore.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/IFileStore.java index 211e844ad9f..99312d347a8 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/IFileStore.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/IFileStore.java @@ -17,6 +17,7 @@ package ch.systemsx.cisd.datamover.filesystem.intf; import ch.systemsx.cisd.common.exceptions.Status; +import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.logging.ISimpleLogger; import ch.systemsx.cisd.common.utilities.ISelfTestable; import ch.systemsx.cisd.common.utilities.StoreItem; @@ -102,4 +103,9 @@ public interface IFileStore extends ISelfTestable public String getLocationDescription(StoreItem item); public IExtendedFileStore tryAsExtended(); + + /** + * Returns the <code>HighwaterMarkWatcher</code> for this implementation. + */ + public HighwaterMarkWatcher getHighwaterMarkWatcher(); } \ No newline at end of file diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreLocal.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreLocal.java index 7a3aa1aa390..caf40cf1f88 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreLocal.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreLocal.java @@ -23,10 +23,10 @@ import org.apache.log4j.Logger; import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.highwatermark.FileWithHighwaterMark; +import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.logging.ISimpleLogger; import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; -import ch.systemsx.cisd.common.utilities.DirectoryScannedStore; import ch.systemsx.cisd.common.utilities.FileUtilities; import ch.systemsx.cisd.common.utilities.StoreItem; import ch.systemsx.cisd.datamover.common.MarkerFile; @@ -53,12 +53,15 @@ public class FileStoreLocal extends ExtendedFileStore private final IPathRemover remover; + private final HighwaterMarkWatcher highwaterMarkWatcher; + public FileStoreLocal(final FileWithHighwaterMark file, final String desription, final IFileSysOperationsFactory factory) { super(file, null, false, desription, factory); this.remover = factory.getRemover(); this.mover = factory.getMover(); + this.highwaterMarkWatcher = new HighwaterMarkWatcher(file.getHighwaterMark()); } // @@ -168,13 +171,18 @@ public class FileStoreLocal extends ExtendedFileStore if (files != null) { FileUtilities.sortByLastModified(files); - return DirectoryScannedStore.asItems(files); + return StoreItem.asItems(files); } else { return null; } } + public final HighwaterMarkWatcher getHighwaterMarkWatcher() + { + return highwaterMarkWatcher; + } + // ------ /** diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemote.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemote.java index f95c41c2820..5595f74b7ca 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemote.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemote.java @@ -16,11 +16,10 @@ package ch.systemsx.cisd.datamover.filesystem.store; -import ch.systemsx.cisd.common.exceptions.ConfigurationFailureException; -import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException; import ch.systemsx.cisd.common.exceptions.NotImplementedException; import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.highwatermark.FileWithHighwaterMark; +import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.logging.ISimpleLogger; import ch.systemsx.cisd.common.utilities.StoreItem; import ch.systemsx.cisd.datamover.filesystem.intf.FileStore; @@ -108,8 +107,7 @@ public class FileStoreRemote extends FileStore throw new NotImplementedException(); } - @Override - public final void check() throws EnvironmentFailureException, ConfigurationFailureException + public final HighwaterMarkWatcher getHighwaterMarkWatcher() { throw new NotImplementedException(); } diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java index 7579a9b98ea..bf2c3862129 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/store/FileStoreRemoteMounted.java @@ -20,6 +20,7 @@ import ch.systemsx.cisd.common.exceptions.ConfigurationFailureException; import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException; import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.highwatermark.FileWithHighwaterMark; +import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.logging.ISimpleLogger; import ch.systemsx.cisd.common.utilities.StoreItem; import ch.systemsx.cisd.datamover.filesystem.intf.FileStore; @@ -118,4 +119,9 @@ public final class FileStoreRemoteMounted extends FileStore { localImpl.check(); } + + public final HighwaterMarkWatcher getHighwaterMarkWatcher() + { + return localImpl.getHighwaterMarkWatcher(); + } } diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java index 67f7be0ad24..b55759fb4ec 100644 --- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java +++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java @@ -30,6 +30,7 @@ import org.testng.annotations.Test; import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel; import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.highwatermark.FileWithHighwaterMark; +import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.logging.ISimpleLogger; import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogInitializer; @@ -228,6 +229,11 @@ public class CopyActivityMonitorTest { return localImpl.tryListSortByLastModified(loggerOrNull); } + + public final HighwaterMarkWatcher getHighwaterMarkWatcher() + { + return localImpl.getHighwaterMarkWatcher(); + } }; } -- GitLab