Skip to content
Snippets Groups Projects
Commit 0ed13791 authored by ribeaudc's avatar ribeaudc
Browse files

[SE-47]

change: - Implement the 'high water mark' stuff for incoming -> buffer and NOT for buffer -> buffer.

SVN: 6283
parent f6381505
No related branches found
No related tags found
No related merge requests found
......@@ -21,7 +21,6 @@ import java.util.Timer;
import ch.systemsx.cisd.common.Constants;
import ch.systemsx.cisd.common.highwatermark.HighwaterMarkDirectoryScanningHandler;
import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher;
import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask;
import ch.systemsx.cisd.common.utilities.FaultyPathDirectoryScanningHandler;
import ch.systemsx.cisd.common.utilities.FileUtilities;
......@@ -136,18 +135,13 @@ public class DataMover
private final DataMoverProcess createLocalProcessor()
{
final HighwaterMarkWatcher highwaterMarkWatcher =
new HighwaterMarkWatcher(bufferDirs.getBufferDirHighwaterMark());
final LocalProcessor localProcessor =
new LocalProcessor(parameters, bufferDirs, factory.getImmutableCopier(), factory
.getMover());
final File sourceDirectory = bufferDirs.getCopyCompleteDir();
final HighwaterMarkDirectoryScanningHandler directoryScanningHandler =
new HighwaterMarkDirectoryScanningHandler(new FaultyPathDirectoryScanningHandler(sourceDirectory),
highwaterMarkWatcher, bufferDirs.getReadyToMoveDir());
final DirectoryScanningTimerTask localProcessingTask =
new DirectoryScanningTimerTask(sourceDirectory, FileUtilities.ACCEPT_ALL_FILTER,
localProcessor, directoryScanningHandler);
localProcessor);
return new DataMoverProcess(localProcessingTask, "Local Processor", localProcessor);
}
......@@ -159,9 +153,9 @@ public class DataMover
FileStoreFactory.createLocal(sourceDirectory, "ready-to-move", factory);
final IStoreHandler remoteStoreMover =
createRemotePathMover(readyToMoveStore, outgoingStore);
final HighwaterMarkDirectoryScanningHandler directoryScanningHandler = new HighwaterMarkDirectoryScanningHandler(
new FaultyPathDirectoryScanningHandler(sourceDirectory), readyToMoveStore
.getHighwaterMarkWatcher());
final HighwaterMarkDirectoryScanningHandler directoryScanningHandler =
new HighwaterMarkDirectoryScanningHandler(new FaultyPathDirectoryScanningHandler(
sourceDirectory), readyToMoveStore.getHighwaterMarkWatcher());
final DirectoryScanningTimerTask outgoingMovingTask =
new DirectoryScanningTimerTask(sourceDirectory, FileUtilities.ACCEPT_ALL_FILTER,
remoteStoreMover, directoryScanningHandler);
......
......@@ -23,11 +23,14 @@ import java.util.TimerTask;
import org.apache.log4j.Logger;
import ch.systemsx.cisd.common.highwatermark.HighwaterMarkDirectoryScanningHandler;
import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher;
import ch.systemsx.cisd.common.logging.ISimpleLogger;
import ch.systemsx.cisd.common.logging.Log4jSimpleLogger;
import ch.systemsx.cisd.common.logging.LogCategory;
import ch.systemsx.cisd.common.logging.LogFactory;
import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask;
import ch.systemsx.cisd.common.utilities.FaultyPathDirectoryScanningHandler;
import ch.systemsx.cisd.common.utilities.FileUtilities;
import ch.systemsx.cisd.common.utilities.IStoreHandler;
import ch.systemsx.cisd.common.utilities.ITimeProvider;
......@@ -124,12 +127,17 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory
private final DataMoverProcess create()
{
final File copyInProgressDir = bufferDirs.getCopyInProgressDir();
final HighwaterMarkWatcher highwaterMarkWatcher =
new HighwaterMarkWatcher(bufferDirs.getBufferDirHighwaterMark());
final HighwaterMarkDirectoryScanningHandler directoryScanningHandler =
new HighwaterMarkDirectoryScanningHandler(new FaultyPathDirectoryScanningHandler(
copyInProgressDir), highwaterMarkWatcher, copyInProgressDir);
final IStoreHandler pathHandler = createIncomingMovingPathHandler();
final DirectoryScanningTimerTask movingTask =
new DirectoryScanningTimerTask(
new FileScannedStore(incomingStore, storeItemFilter), bufferDirs
.getCopyInProgressDir(), pathHandler,
NUMBER_OF_ERRORS_IN_LISTING_IGNORED);
new FileScannedStore(incomingStore, storeItemFilter),
directoryScanningHandler, pathHandler, NUMBER_OF_ERRORS_IN_LISTING_IGNORED);
return new DataMoverProcess(movingTask, "Mover of Incoming Data", this);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment