diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java index 7d496c0fc048290d831b8d2d52b37c41d9f1d17f..0c56c4cde37b93eb4b56ae4c5967a55a4b84d31a 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java @@ -118,11 +118,20 @@ public final class DataMover } static TimerTask createTimerTaskForMarkerFileProtocol(final TimerTask timerTask, - final String markerFileName) + final String startMarkerFileName, final String endMarkerFileName) { final TimerTaskWithListeners timerTaskWithListeners = new TimerTaskWithListeners(timerTask); timerTaskWithListeners.addListener(new TimerTaskListenerForMarkerFileProtocol( - markerFileName)); + startMarkerFileName, endMarkerFileName)); + return timerTaskWithListeners; + } + + static TimerTask createTimerTaskForMarkerFileProtocol(final TimerTask timerTask, + final String startMarkerFileName) + { + final TimerTaskWithListeners timerTaskWithListeners = new TimerTaskWithListeners(timerTask); + timerTaskWithListeners.addListener(new TimerTaskListenerForMarkerFileProtocol( + startMarkerFileName, null)); return timerTaskWithListeners; } @@ -192,7 +201,7 @@ public final class DataMover private final DataMoverProcess createIncomingProcess() { return IncomingProcessor.createMovingProcess(parameters, INCOMING_PROCESS_MARKER_FILENAME, - factory, bufferDirs); + LOCAL_PROCESS_MARKER_FILENAME, factory, bufferDirs); } private final DataMoverProcess createLocalProcess() @@ -206,7 +215,7 @@ public final class DataMover localProcessor); final TimerTask timerTask = createTimerTaskForMarkerFileProtocol(localProcessingTask, - LOCAL_PROCESS_MARKER_FILENAME); + LOCAL_PROCESS_MARKER_FILENAME, OUTGOING_PROCESS_MARKER_FILENAME); final DataMoverProcess dataMoverProcess = new RunOnceMoreAfterTerminateDataMoverProcess(timerTask, "Local Processor", localProcessor); diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java b/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java index 8e82ed8aef065ebf264a9f3c64e9c71c2b3a6283..b797a3a349fe57bda9d8c098eb9dc74aca24855c 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java @@ -83,31 +83,36 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory private final IStoreItemFilter storeItemFilter; - private final String markerFileName; + private final String startMarkerFileName; + + private final String endMarkerFileName; public static final DataMoverProcess createMovingProcess(final Parameters parameters, - final String markerFile, final IFileSysOperationsFactory factory, - final LocalBufferDirs bufferDirs) + final String startMarkerFile, final String endMarkerFile, + final IFileSysOperationsFactory factory, final LocalBufferDirs bufferDirs) { - return createMovingProcess(parameters, markerFile, factory, SYSTEM_TIME_PROVIDER, - bufferDirs); + return createMovingProcess(parameters, startMarkerFile, endMarkerFile, factory, + SYSTEM_TIME_PROVIDER, bufferDirs); } static final DataMoverProcess createMovingProcess(final Parameters parameters, - final String markerFile, final IFileSysOperationsFactory factory, - final ITimeProvider timeProvider, final LocalBufferDirs bufferDirs) + final String startMarkerFile, final String endMarkerFile, + final IFileSysOperationsFactory factory, final ITimeProvider timeProvider, + final LocalBufferDirs bufferDirs) { final IncomingProcessor processor = - new IncomingProcessor(parameters, markerFile, factory, timeProvider, bufferDirs); + new IncomingProcessor(parameters, startMarkerFile, endMarkerFile, factory, + timeProvider, bufferDirs); return processor.create(); } - private IncomingProcessor(final Parameters parameters, final String markerFileName, - final IFileSysOperationsFactory factory, final ITimeProvider timeProvider, - final LocalBufferDirs bufferDirs) + private IncomingProcessor(final Parameters parameters, final String startMarkerFileName, + final String endMarkerFileName, final IFileSysOperationsFactory factory, + final ITimeProvider timeProvider, final LocalBufferDirs bufferDirs) { this.parameters = parameters; - this.markerFileName = markerFileName; + this.startMarkerFileName = startMarkerFileName; + this.endMarkerFileName = endMarkerFileName; this.prefixForIncoming = parameters.getPrefixForIncoming(); this.incomingStore = parameters.getIncomingStore(factory); this.pathMover = factory.getMover(); @@ -148,7 +153,8 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory new FileScannedStore(incomingStore, storeItemFilter), directoryScanningHandler, pathHandler, NUMBER_OF_ERRORS_IN_LISTING_IGNORED); final TimerTask timerTask = - DataMover.createTimerTaskForMarkerFileProtocol(movingTask, markerFileName); + DataMover.createTimerTaskForMarkerFileProtocol(movingTask, startMarkerFileName, + endMarkerFileName); return new DataMoverProcess(timerTask, "Mover of Incoming Data", this) { diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/TimerTaskListenerForMarkerFileProtocol.java b/datamover/source/java/ch/systemsx/cisd/datamover/TimerTaskListenerForMarkerFileProtocol.java index 58e18545b07f50ad7168e1638adea81bd223e16f..2f88932b089c4f6631e37c30dc002b0edbf7cc62 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/TimerTaskListenerForMarkerFileProtocol.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/TimerTaskListenerForMarkerFileProtocol.java @@ -26,14 +26,18 @@ import ch.systemsx.cisd.common.concurrent.ITimerTaskListener; import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException; /** - * An implementation of {@link ITimerTaskListener} which creates an empty marker file before - * the timer task is executed and which removes this marker file if the task has been finished. - * + * An implementation of {@link ITimerTaskListener} which creates an empty marker file before the + * timer task is executed and which removes this marker file when the task has been finished. + * Optionally, the listener can create another marker file when the task is finished (which it will + * not remove itself). + * * @author Franz-Josef Elmer */ public class TimerTaskListenerForMarkerFileProtocol extends DummyTimerTaskListener { - private final File markerFile; + private final File startMarkerFile; + + private final File endMarkerFilOrNull; /** * Creates an instance for the specified marker file. @@ -41,17 +45,22 @@ public class TimerTaskListenerForMarkerFileProtocol extends DummyTimerTaskListen * @throws IllegalArgumentException if the argument is <code>null</code> or it denotes a * directory. */ - public TimerTaskListenerForMarkerFileProtocol(String markerFileName) + public TimerTaskListenerForMarkerFileProtocol(String startMarkerFileName, + String endMarkerFileNameOrNull) { - if (markerFileName == null) + if (startMarkerFileName == null) { - throw new IllegalArgumentException("Unspecified marker file name."); + throw new IllegalArgumentException("Unspecified start marker file name."); } - markerFile = new File(markerFileName); - if (markerFile.isDirectory()) + startMarkerFile = new File(startMarkerFileName); + failIfDirectory(startMarkerFile); + if (endMarkerFileNameOrNull != null) { - throw new IllegalArgumentException("Marker file is a directory: " - + markerFile.getAbsolutePath()); + endMarkerFilOrNull = new File(endMarkerFileNameOrNull); + failIfDirectory(startMarkerFile); + } else + { + endMarkerFilOrNull = null; } } @@ -62,6 +71,33 @@ public class TimerTaskListenerForMarkerFileProtocol extends DummyTimerTaskListen */ @Override public void startRunning() + { + touch(startMarkerFile); + } + + /** + * Deletes the marker file. + */ + @Override + public void finishRunning() + { + if (endMarkerFilOrNull != null) + { + touch(endMarkerFilOrNull); + } + startMarkerFile.delete(); + } + + private static void failIfDirectory(File markerFile) + { + if (markerFile.isDirectory()) + { + throw new IllegalArgumentException("Marker file is a directory: " + + markerFile.getAbsolutePath()); + } + } + + private static void touch(final File markerFile) throws EnvironmentFailureException { try { @@ -72,15 +108,5 @@ public class TimerTaskListenerForMarkerFileProtocol extends DummyTimerTaskListen + markerFile.getAbsolutePath() + "'.", ex); } } - - /** - * Deletes the marker file. - */ - @Override - public void finishRunning() - { - markerFile.delete(); - } - }