diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java index bcf6b5b99a352fd9f40c1747baf2d63e0a574c72..a8daa22f446e7ca28974a2aafd76eeeb6f86bd52 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java @@ -20,6 +20,7 @@ import java.io.File; import java.util.Timer; import ch.systemsx.cisd.common.Constants; +import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.highwatermark.PathHandlerInterceptor; import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask; import ch.systemsx.cisd.common.utilities.FileUtilities; @@ -134,11 +135,14 @@ public class DataMover private DataMoverProcess createLocalProcessor() { + final HighwaterMarkWatcher highwaterMarkWatcher = + new HighwaterMarkWatcher(bufferDirs.getBufferDirHighwaterMark()); final LocalProcessor localProcessor = new LocalProcessor(parameters, bufferDirs, factory.getImmutableCopier(), factory - .getMover()); + .getMover(), highwaterMarkWatcher); final PathHandlerInterceptor pathHandlerInterceptor = new PathHandlerInterceptor(localProcessor); + highwaterMarkWatcher.addChangeListener(pathHandlerInterceptor); final DirectoryScanningTimerTask localProcessingTask = new DirectoryScanningTimerTask(bufferDirs.getCopyCompleteDir(), FileUtilities.ACCEPT_ALL_FILTER, pathHandlerInterceptor); diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/LocalProcessor.java b/datamover/source/java/ch/systemsx/cisd/datamover/LocalProcessor.java index f3a3b61a9c4a5087856aa0b13f8916d6147e799a..c53071feee16e68c983f1205e31e4250c6829eb2 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/LocalProcessor.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/LocalProcessor.java @@ -81,13 +81,14 @@ public class LocalProcessor implements IPathHandler, IRecoverableTimerTaskFactor private final HighwaterMarkWatcher highwaterMarkWatcher; LocalProcessor(final Parameters parameters, final LocalBufferDirs bufferDirs, - final IPathImmutableCopier copier, final IPathMover mover) + final IPathImmutableCopier copier, final IPathMover mover, + final HighwaterMarkWatcher highwaterMarkWatcher) { this.parameters = parameters; this.inputDir = bufferDirs.getCopyCompleteDir(); this.outputDir = bufferDirs.getReadyToMoveDir(); this.tempDir = bufferDirs.getTempDir(); - highwaterMarkWatcher = new HighwaterMarkWatcher(bufferDirs.getBufferDirHighwaterMark()); + this.highwaterMarkWatcher = highwaterMarkWatcher; this.extraCopyDirOrNull = parameters.tryGetExtraCopyDir(); this.copier = copier; this.mover = mover;