From 0ed137918aa393157e52938972306c6dc6d7d5d8 Mon Sep 17 00:00:00 2001 From: ribeaudc <ribeaudc> Date: Fri, 23 May 2008 15:14:50 +0000 Subject: [PATCH] [SE-47] change: - Implement the 'high water mark' stuff for incoming -> buffer and NOT for buffer -> buffer. SVN: 6283 --- .../java/ch/systemsx/cisd/datamover/DataMover.java | 14 ++++---------- .../systemsx/cisd/datamover/IncomingProcessor.java | 14 +++++++++++--- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java index b39ff953b75..7cf0f584104 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java @@ -21,7 +21,6 @@ import java.util.Timer; import ch.systemsx.cisd.common.Constants; import ch.systemsx.cisd.common.highwatermark.HighwaterMarkDirectoryScanningHandler; -import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask; import ch.systemsx.cisd.common.utilities.FaultyPathDirectoryScanningHandler; import ch.systemsx.cisd.common.utilities.FileUtilities; @@ -136,18 +135,13 @@ public class DataMover private final DataMoverProcess createLocalProcessor() { - final HighwaterMarkWatcher highwaterMarkWatcher = - new HighwaterMarkWatcher(bufferDirs.getBufferDirHighwaterMark()); final LocalProcessor localProcessor = new LocalProcessor(parameters, bufferDirs, factory.getImmutableCopier(), factory .getMover()); final File sourceDirectory = bufferDirs.getCopyCompleteDir(); - final HighwaterMarkDirectoryScanningHandler directoryScanningHandler = - new HighwaterMarkDirectoryScanningHandler(new FaultyPathDirectoryScanningHandler(sourceDirectory), - highwaterMarkWatcher, bufferDirs.getReadyToMoveDir()); final DirectoryScanningTimerTask localProcessingTask = new DirectoryScanningTimerTask(sourceDirectory, FileUtilities.ACCEPT_ALL_FILTER, - localProcessor, directoryScanningHandler); + localProcessor); return new DataMoverProcess(localProcessingTask, "Local Processor", localProcessor); } @@ -159,9 +153,9 @@ public class DataMover FileStoreFactory.createLocal(sourceDirectory, "ready-to-move", factory); final IStoreHandler remoteStoreMover = createRemotePathMover(readyToMoveStore, outgoingStore); - final HighwaterMarkDirectoryScanningHandler directoryScanningHandler = new HighwaterMarkDirectoryScanningHandler( - new FaultyPathDirectoryScanningHandler(sourceDirectory), readyToMoveStore - .getHighwaterMarkWatcher()); + final HighwaterMarkDirectoryScanningHandler directoryScanningHandler = + new HighwaterMarkDirectoryScanningHandler(new FaultyPathDirectoryScanningHandler( + sourceDirectory), readyToMoveStore.getHighwaterMarkWatcher()); final DirectoryScanningTimerTask outgoingMovingTask = new DirectoryScanningTimerTask(sourceDirectory, FileUtilities.ACCEPT_ALL_FILTER, remoteStoreMover, directoryScanningHandler); diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java b/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java index 4ca518871e6..a3fe5afea17 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java @@ -23,11 +23,14 @@ import java.util.TimerTask; import org.apache.log4j.Logger; +import ch.systemsx.cisd.common.highwatermark.HighwaterMarkDirectoryScanningHandler; +import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.logging.ISimpleLogger; import ch.systemsx.cisd.common.logging.Log4jSimpleLogger; import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask; +import ch.systemsx.cisd.common.utilities.FaultyPathDirectoryScanningHandler; import ch.systemsx.cisd.common.utilities.FileUtilities; import ch.systemsx.cisd.common.utilities.IStoreHandler; import ch.systemsx.cisd.common.utilities.ITimeProvider; @@ -124,12 +127,17 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory private final DataMoverProcess create() { + final File copyInProgressDir = bufferDirs.getCopyInProgressDir(); + final HighwaterMarkWatcher highwaterMarkWatcher = + new HighwaterMarkWatcher(bufferDirs.getBufferDirHighwaterMark()); + final HighwaterMarkDirectoryScanningHandler directoryScanningHandler = + new HighwaterMarkDirectoryScanningHandler(new FaultyPathDirectoryScanningHandler( + copyInProgressDir), highwaterMarkWatcher, copyInProgressDir); final IStoreHandler pathHandler = createIncomingMovingPathHandler(); final DirectoryScanningTimerTask movingTask = new DirectoryScanningTimerTask( - new FileScannedStore(incomingStore, storeItemFilter), bufferDirs - .getCopyInProgressDir(), pathHandler, - NUMBER_OF_ERRORS_IN_LISTING_IGNORED); + new FileScannedStore(incomingStore, storeItemFilter), + directoryScanningHandler, pathHandler, NUMBER_OF_ERRORS_IN_LISTING_IGNORED); return new DataMoverProcess(movingTask, "Mover of Incoming Data", this); } -- GitLab