diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java index 1bf2c4f7b40b10afa1d75132e94ddf4c3174849c..024c2226e8ef5d9758e1d4f05c151646ae909b73 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java @@ -20,12 +20,11 @@ import java.io.File; import java.util.Timer; import ch.systemsx.cisd.common.Constants; +import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask; +import ch.systemsx.cisd.common.utilities.FileUtilities; import ch.systemsx.cisd.common.utilities.IPathHandler; -import ch.systemsx.cisd.common.utilities.IRecoverable; import ch.systemsx.cisd.common.utilities.ITerminable; import ch.systemsx.cisd.common.utilities.ITriggerable; -import ch.systemsx.cisd.common.utilities.QueuingPathHandler; -import ch.systemsx.cisd.common.utilities.SynchronizationMonitor; import ch.systemsx.cisd.common.utilities.TimerHelper; import ch.systemsx.cisd.common.utilities.TriggeringTimerTask; import ch.systemsx.cisd.datamover.filesystem.RemoteMonitoredMoverFactory; @@ -90,35 +89,18 @@ public class DataMover private ITerminable start() { - final SynchronizationMonitor monitor = SynchronizationMonitor.create(); - final QueuingPathHandler outgoingProcessingHandler = - startupOutgoingMovingProcess(parameters.getOutgoingStore()); - final LocalProcessor localProcessor = createLocalProcessor(outgoingProcessingHandler); - final QueuingPathHandler localProcessingHandler = QueuingPathHandler.create(localProcessor, "Local Processor"); - final IncomingProcessor.IncomingMovingProcess incomingProcess = - createIncomingMovingProcess(localProcessingHandler, monitor); - final IRecoverable localProcessingRecoverable = new IRecoverable() - { - public void recover() - { - localProcessingHandler.handle(new QueuingPathHandler.ISpecialCondition() - { - public void handle() - { - localProcessor.recover(); - } - }, "recovery", false); - } - }; - final ITerminable recoveryProcess = - startupRecoveryProcess(localProcessingRecoverable, incomingProcess.getProcessor(), monitor); - incomingProcess.startup(parameters.getCheckIntervalMillis() / 2L); - return createCompoundTerminable(recoveryProcess, outgoingProcessingHandler, localProcessingHandler, - incomingProcess); + final DataMoverProcess outgoingMovingProcess = createOutgoingMovingProcess(); + final DataMoverProcess localProcessor = createLocalProcessor(); + final DataMoverProcess incomingProcess = createIncomingMovingProcess(); + final ITerminable recoveryProcess = startupRecoveryProcess(localProcessor, incomingProcess); + outgoingMovingProcess.startup(0L, parameters.getCheckIntervalMillis()); + localProcessor.startup(parameters.getCheckIntervalMillis() / 2L, parameters.getCheckIntervalMillis()); + incomingProcess.startup(0L, parameters.getCheckIntervalMillis()); + return createCompoundTerminable(recoveryProcess, outgoingMovingProcess, localProcessor, incomingProcess); } - private ITerminable startupRecoveryProcess(final IRecoverable localProcessor, final IRecoverable incomingProcessor, - SynchronizationMonitor monitor) + private ITerminable startupRecoveryProcess(final DataMoverProcess localProcessor, + final DataMoverProcess incomingProcessor) { final ITriggerable recoverable = new ITriggerable() { @@ -131,30 +113,36 @@ public class DataMover // Trigger initial recovery cycle. recoverable.trigger(); final TriggeringTimerTask recoveryingTimerTask = - new TriggeringTimerTask(new File(RECOVERY_MARKER_FIILENAME), recoverable, monitor); + new TriggeringTimerTask(new File(RECOVERY_MARKER_FIILENAME), recoverable); final Timer recoveryTimer = new Timer("Recovery"); recoveryTimer.scheduleAtFixedRate(recoveryingTimerTask, 0, parameters.getCheckIntervalMillis()); return TimerHelper.asTerminable(recoveryTimer); } - private IncomingProcessor.IncomingMovingProcess createIncomingMovingProcess(QueuingPathHandler localProcessor, - SynchronizationMonitor monitor) + private DataMoverProcess createIncomingMovingProcess() { - return IncomingProcessor.createMovingProcess(parameters, factory, bufferDirs, localProcessor, monitor); + return IncomingProcessor.createMovingProcess(parameters, factory, bufferDirs); } - private LocalProcessor createLocalProcessor(QueuingPathHandler outgoingHandler) + private DataMoverProcess createLocalProcessor() { final LocalProcessor localProcessor = LocalProcessor.create(parameters, bufferDirs.getCopyCompleteDir(), bufferDirs.getReadyToMoveDir(), - bufferDirs.getTempDir(), outgoingHandler, factory); - return localProcessor; + bufferDirs.getTempDir(), factory); + final DirectoryScanningTimerTask localProcessingTask = + new DirectoryScanningTimerTask(bufferDirs.getCopyCompleteDir(), FileUtilities.ACCEPT_ALL_FILTER, + localProcessor); + return new DataMoverProcess(localProcessingTask, "Local Processor", localProcessor); } - private QueuingPathHandler startupOutgoingMovingProcess(FileStore outputDir) + private DataMoverProcess createOutgoingMovingProcess() { - final IPathHandler remoteMover = createRemotePathMover(null, outputDir.getPath(), outputDir.getHost()); - return QueuingPathHandler.create(remoteMover, "Final Destination Mover"); + final FileStore outgoingStore = parameters.getOutgoingStore(); + final IPathHandler remoteMover = createRemotePathMover(null, outgoingStore.getPath(), outgoingStore.getHost()); + final DirectoryScanningTimerTask outgoingMovingTask = + new DirectoryScanningTimerTask(bufferDirs.getReadyToMoveDir(), FileUtilities.ACCEPT_ALL_FILTER, + remoteMover); + return new DataMoverProcess(outgoingMovingTask, "Final Destination Mover"); } private IPathHandler createRemotePathMover(String sourceHost, File destinationDirectory, String destinationHost) diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/DataMoverProcess.java b/datamover/source/java/ch/systemsx/cisd/datamover/DataMoverProcess.java new file mode 100644 index 0000000000000000000000000000000000000000..fc982da3218d3eaa3a058674043b4f73c851b0c9 --- /dev/null +++ b/datamover/source/java/ch/systemsx/cisd/datamover/DataMoverProcess.java @@ -0,0 +1,62 @@ +package ch.systemsx.cisd.datamover; + +import java.util.Timer; +import java.util.TimerTask; + +import ch.systemsx.cisd.common.utilities.ITerminable; +import ch.systemsx.cisd.common.utilities.TimerHelper; +import ch.systemsx.cisd.datamover.filesystem.intf.IRecoverableTimerTaskFactory; + +/** + * A class that represents the incoming moving process. + */ +public class DataMoverProcess implements ITerminable +{ + private final Timer timer; + + private final TimerTask dataMoverTimerTask; + + private final ITerminable terminable; + + private final IRecoverableTimerTaskFactory recoverableTimerTaskFactory; + + DataMoverProcess(TimerTask timerTask, String taskName) + { + this(timerTask, taskName, null); + } + + DataMoverProcess(TimerTask dataMoverTimerTask, String taskName, + IRecoverableTimerTaskFactory recoverableTimerTaskFactory) + { + this.dataMoverTimerTask = dataMoverTimerTask; + this.recoverableTimerTaskFactory = recoverableTimerTaskFactory; + this.timer = new Timer(taskName); + this.terminable = TimerHelper.asTerminable(timer); + } + + /** + * Starts up the process with a the given <var>delay</var> and <var>period</var> in milli seconds. + */ + public void startup(long delay, long period) + { + // The moving task is scheduled at fixed rate. It makes sense especially if the task is moving data from the + // remote share. The rationale behind this is that if new items are + // added to the source directory while the incoming timer task has been running for a long time, busy moving + // data, the task shouldn't sit idle for the check time when there is actually work to do. + timer.scheduleAtFixedRate(dataMoverTimerTask, delay, period); + } + + public boolean terminate() + { + return terminable.terminate(); + } + + public void recover() + { + if (recoverableTimerTaskFactory != null) + { + timer.schedule(recoverableTimerTaskFactory.createRecoverableTimerTask(), 0); + } + } + +} \ No newline at end of file diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java b/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java index e2c5c6afdedd8eff6adbefb79f20b4bb60784fa9..d4013da54c992234ac97e8839d066185c2abd063 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java @@ -18,7 +18,6 @@ package ch.systemsx.cisd.datamover; import java.io.File; import java.io.FileFilter; -import java.util.Timer; import java.util.TimerTask; import org.apache.log4j.Level; @@ -31,16 +30,13 @@ import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask; import ch.systemsx.cisd.common.utilities.IPathHandler; -import ch.systemsx.cisd.common.utilities.IRecoverable; -import ch.systemsx.cisd.common.utilities.ITerminable; import ch.systemsx.cisd.common.utilities.NamePrefixFileFilter; -import ch.systemsx.cisd.common.utilities.SynchronizationMonitor; -import ch.systemsx.cisd.common.utilities.TimerHelper; import ch.systemsx.cisd.datamover.common.MarkerFile; import ch.systemsx.cisd.datamover.filesystem.RemoteMonitoredMoverFactory; import ch.systemsx.cisd.datamover.filesystem.intf.IFileSysOperationsFactory; import ch.systemsx.cisd.datamover.filesystem.intf.IPathMover; import ch.systemsx.cisd.datamover.filesystem.intf.IReadPathOperations; +import ch.systemsx.cisd.datamover.filesystem.intf.IRecoverableTimerTaskFactory; import ch.systemsx.cisd.datamover.utils.FileStore; import ch.systemsx.cisd.datamover.utils.LocalBufferDirs; import ch.systemsx.cisd.datamover.utils.QuietPeriodFileFilter; @@ -48,7 +44,7 @@ import ch.systemsx.cisd.datamover.utils.QuietPeriodFileFilter; /** * @author Tomasz Pylak on Sep 7, 2007 */ -public class IncomingProcessor implements IRecoverable +public class IncomingProcessor implements IRecoverableTimerTaskFactory { private static final Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, IncomingProcessor.class); @@ -62,8 +58,6 @@ public class IncomingProcessor implements IRecoverable private final IPathMover pathMover; - private final IPathHandler localProcessor; - private final LocalBufferDirs bufferDirs; private final boolean isIncomingRemote; @@ -72,55 +66,15 @@ public class IncomingProcessor implements IRecoverable private final String prefixForIncoming; - /** - * A class that represents the incoming moving process. - */ - public class IncomingMovingProcess implements ITerminable + public static final DataMoverProcess createMovingProcess(Parameters parameters, IFileSysOperationsFactory factory, + LocalBufferDirs bufferDirs) { - private final Timer movingTimer; - private final TimerTask movingTask; - private final ITerminable terminable; - - IncomingMovingProcess(TimerTask movingTask) - { - this.movingTask = movingTask; - this.movingTimer = new Timer("Mover of Incoming Data"); - this.terminable = TimerHelper.asTerminable(movingTimer); - } - - public IncomingProcessor getProcessor() - { - return IncomingProcessor.this; - } - - /** Starts up the process with <var>delay</var> milli seconds. */ - public void startup(long delay) - { - // The moving task is scheduled at fixed rate. It makes sense especially if the task is moving data from the - // remote share. The rationale behind this is that if new items are - // added to the source directory while the incoming timer task has been running for a long time, busy moving - // data, the task shouldn't sit idle for the check time when there is actually work to do. - movingTimer.scheduleAtFixedRate(movingTask, delay, parameters.getCheckIntervalMillis()); - } - - public boolean terminate() - { - return terminable.terminate(); - } + final IncomingProcessor processor = new IncomingProcessor(parameters, factory, bufferDirs); + return processor.create(); } - public static final IncomingMovingProcess createMovingProcess(Parameters parameters, - IFileSysOperationsFactory factory, LocalBufferDirs bufferDirs, final IPathHandler localProcessor, - final SynchronizationMonitor monitor) - { - final IncomingProcessor processor = new IncomingProcessor(parameters, factory, bufferDirs, localProcessor); - - return processor.createIncomingMovingProcess(monitor); - } - - private IncomingProcessor(Parameters parameters, IFileSysOperationsFactory factory, LocalBufferDirs bufferDirs, - IPathHandler localProcessor) + private IncomingProcessor(Parameters parameters, IFileSysOperationsFactory factory, LocalBufferDirs bufferDirs) { this.parameters = parameters; this.prefixForIncoming = parameters.getPrefixForIncoming(); @@ -128,32 +82,24 @@ public class IncomingProcessor implements IRecoverable this.incomingStore = parameters.getIncomingStore(); this.incomingReadOperations = factory.getReadPathOperations(); this.pathMover = factory.getMover(); - this.localProcessor = localProcessor; this.factory = factory; this.bufferDirs = bufferDirs; } - public void recover() + + public TimerTask createRecoverableTimerTask() { - if (operationLog.isDebugEnabled()) - { - operationLog.debug("Recovery cycle starts."); - } - new IncomingProcessorRecovery().recoverIncomingAfterShutdown(); - if (operationLog.isDebugEnabled()) - { - operationLog.debug("Recovery cycle is finished."); - } + return new IncomingProcessorRecoveryTask(); } - private IncomingMovingProcess createIncomingMovingProcess(SynchronizationMonitor monitor) + private DataMoverProcess create() { final IPathHandler pathHandler = createIncomingMovingPathHandler(incomingStore.getHost()); final FileFilter filter = createQuietPeriodFilter(); final DirectoryScanningTimerTask movingTask = - new DirectoryScanningTimerTask(incomingStore.getPath(), filter, pathHandler, monitor); - return new IncomingMovingProcess(movingTask); + new DirectoryScanningTimerTask(incomingStore.getPath(), filter, pathHandler); + return new DataMoverProcess(movingTask, "Mover of Incoming Data", this); } private FileFilter createQuietPeriodFilter() @@ -199,7 +145,6 @@ public class IncomingProcessor implements IRecoverable { return; } - localProcessor.handle(finalFile); } private void moveFromRemoteIncoming(File source, String sourceHostOrNull) @@ -223,9 +168,6 @@ public class IncomingProcessor implements IRecoverable { return; } - - // 3. schedule local processing, always successful - localProcessor.handle(finalFile); } private File tryMoveFromInProgressToFinished(File copiedFile, File markerFileOrNull, File copyCompleteDir) @@ -262,15 +204,24 @@ public class IncomingProcessor implements IRecoverable // ------------------- recovery ------------------------ - class IncomingProcessorRecovery + class IncomingProcessorRecoveryTask extends TimerTask { - public void recoverIncomingAfterShutdown() + @Override + public void run() { + if (operationLog.isDebugEnabled()) + { + operationLog.debug("Recovery starts."); + } if (isIncomingRemote) { recoverIncomingInProgress(bufferDirs.getCopyInProgressDir(), bufferDirs.getCopyCompleteDir()); } recoverIncomingCopyComplete(bufferDirs.getCopyCompleteDir()); + if (operationLog.isDebugEnabled()) + { + operationLog.debug("Recovery is finished."); + } } private void recoverIncomingInProgress(File copyInProgressDir, File copyCompleteDir) @@ -339,11 +290,6 @@ public class IncomingProcessor implements IRecoverable { return; // directory is empty, no recovery is needed } - - for (File file : files) - { - localProcessor.handle(file); - } } } } diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/LocalProcessor.java b/datamover/source/java/ch/systemsx/cisd/datamover/LocalProcessor.java index c823d5415649da5aceb90bb2b4ae6005c2e8a98d..0aaabb0e77c5d08329c86b9a0033fb323d67db97 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/LocalProcessor.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/LocalProcessor.java @@ -17,6 +17,7 @@ package ch.systemsx.cisd.datamover; import java.io.File; +import java.util.TimerTask; import java.util.regex.Pattern; import org.apache.log4j.Level; @@ -28,13 +29,13 @@ import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.utilities.FileUtilities; import ch.systemsx.cisd.common.utilities.IPathHandler; -import ch.systemsx.cisd.common.utilities.IRecoverable; import ch.systemsx.cisd.common.utilities.RegexFileFilter; import ch.systemsx.cisd.common.utilities.RegexFileFilter.PathType; import ch.systemsx.cisd.datamover.filesystem.intf.IFileSysOperationsFactory; import ch.systemsx.cisd.datamover.filesystem.intf.IPathImmutableCopier; import ch.systemsx.cisd.datamover.filesystem.intf.IPathMover; import ch.systemsx.cisd.datamover.filesystem.intf.IReadPathOperations; +import ch.systemsx.cisd.datamover.filesystem.intf.IRecoverableTimerTaskFactory; /** * Processing of the files on the local machine. This class does not scan its input directory, all resources must @@ -42,7 +43,7 @@ import ch.systemsx.cisd.datamover.filesystem.intf.IReadPathOperations; * * @author Tomasz Pylak on Aug 24, 2007 */ -public class LocalProcessor implements IPathHandler, IRecoverable +public class LocalProcessor implements IPathHandler, IRecoverableTimerTaskFactory { private static final Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, LocalProcessor.class); @@ -72,33 +73,42 @@ public class LocalProcessor implements IPathHandler, IRecoverable private final File extraCopyDirOrNull; - private final IPathHandler outgoingHandler; - private LocalProcessor(Parameters parameters, File inputDir, File outputDir, File tempDir, - IPathHandler outgoingHandler, IFileSysOperationsFactory factory) + IFileSysOperationsFactory factory) { this.parameters = parameters; this.inputDir = inputDir; this.outputDir = outputDir; this.tempDir = tempDir; - this.outgoingHandler = outgoingHandler; this.extraCopyDirOrNull = parameters.tryGetExtraCopyDir(); this.copier = factory.getImmutableCopier(); this.mover = factory.getMover(); this.readOperations = factory.getReadPathOperations(); } - public static final LocalProcessor create(Parameters parameters, File inputDir, File outputDir, - File bufferDir, IPathHandler lastStepHandler, IFileSysOperationsFactory factory) + public static final LocalProcessor create(Parameters parameters, File inputDir, File outputDir, File bufferDir, + IFileSysOperationsFactory factory) { final LocalProcessor handlerAndRecoverable = - new LocalProcessor(parameters, inputDir, outputDir, bufferDir, lastStepHandler, factory); + new LocalProcessor(parameters, inputDir, outputDir, bufferDir, factory); return handlerAndRecoverable; } // ---------------- - public void recover() + public TimerTask createRecoverableTimerTask() + { + return new TimerTask() + { + @Override + public void run() + { + recover(); + } + }; + } + + private void recover() { if (operationLog.isDebugEnabled()) { @@ -151,11 +161,6 @@ public class LocalProcessor implements IPathHandler, IRecoverable { return; // directory is empty, no recovery is needed } - - for (File file : files) - { - outgoingHandler.handle(file); - } } // ---------------- @@ -179,10 +184,7 @@ public class LocalProcessor implements IPathHandler, IRecoverable } final File movedFile = mover.tryMove(path, outputDir); - if (movedFile != null) - { - outgoingHandler.handle(movedFile); - } else + if (movedFile == null) { notificationLog.error(String .format("Moving '%s' to '%s' for final moving process failed.", path, outputDir)); diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/IRecoverableTimerTaskFactory.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/IRecoverableTimerTaskFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..8dc784475ac4e4336abf2227003b123617addb28 --- /dev/null +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/IRecoverableTimerTaskFactory.java @@ -0,0 +1,30 @@ +package ch.systemsx.cisd.datamover.filesystem.intf; +import java.util.TimerTask; + +/* + * Copyright 2007 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * A factory for creating {@link TimerTask}s that, when run, perform a recovery operation on the data mover. + * + * @author Bernd Rinn + */ +public interface IRecoverableTimerTaskFactory +{ + + public TimerTask createRecoverableTimerTask(); + +}