Skip to content
Snippets Groups Projects
Commit d4ea176e authored by brinn's avatar brinn
Browse files

refactor: get rid of internal queues and use the directory listing instead in...

refactor: get rid of internal queues and use the directory listing instead in order to simplify the recovery

SVN: 1937
parent 174e92b0
No related branches found
No related tags found
No related merge requests found
...@@ -20,12 +20,11 @@ import java.io.File; ...@@ -20,12 +20,11 @@ import java.io.File;
import java.util.Timer; import java.util.Timer;
import ch.systemsx.cisd.common.Constants; import ch.systemsx.cisd.common.Constants;
import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask;
import ch.systemsx.cisd.common.utilities.FileUtilities;
import ch.systemsx.cisd.common.utilities.IPathHandler; import ch.systemsx.cisd.common.utilities.IPathHandler;
import ch.systemsx.cisd.common.utilities.IRecoverable;
import ch.systemsx.cisd.common.utilities.ITerminable; import ch.systemsx.cisd.common.utilities.ITerminable;
import ch.systemsx.cisd.common.utilities.ITriggerable; import ch.systemsx.cisd.common.utilities.ITriggerable;
import ch.systemsx.cisd.common.utilities.QueuingPathHandler;
import ch.systemsx.cisd.common.utilities.SynchronizationMonitor;
import ch.systemsx.cisd.common.utilities.TimerHelper; import ch.systemsx.cisd.common.utilities.TimerHelper;
import ch.systemsx.cisd.common.utilities.TriggeringTimerTask; import ch.systemsx.cisd.common.utilities.TriggeringTimerTask;
import ch.systemsx.cisd.datamover.filesystem.RemoteMonitoredMoverFactory; import ch.systemsx.cisd.datamover.filesystem.RemoteMonitoredMoverFactory;
...@@ -90,35 +89,18 @@ public class DataMover ...@@ -90,35 +89,18 @@ public class DataMover
private ITerminable start() private ITerminable start()
{ {
final SynchronizationMonitor monitor = SynchronizationMonitor.create(); final DataMoverProcess outgoingMovingProcess = createOutgoingMovingProcess();
final QueuingPathHandler outgoingProcessingHandler = final DataMoverProcess localProcessor = createLocalProcessor();
startupOutgoingMovingProcess(parameters.getOutgoingStore()); final DataMoverProcess incomingProcess = createIncomingMovingProcess();
final LocalProcessor localProcessor = createLocalProcessor(outgoingProcessingHandler); final ITerminable recoveryProcess = startupRecoveryProcess(localProcessor, incomingProcess);
final QueuingPathHandler localProcessingHandler = QueuingPathHandler.create(localProcessor, "Local Processor"); outgoingMovingProcess.startup(0L, parameters.getCheckIntervalMillis());
final IncomingProcessor.IncomingMovingProcess incomingProcess = localProcessor.startup(parameters.getCheckIntervalMillis() / 2L, parameters.getCheckIntervalMillis());
createIncomingMovingProcess(localProcessingHandler, monitor); incomingProcess.startup(0L, parameters.getCheckIntervalMillis());
final IRecoverable localProcessingRecoverable = new IRecoverable() return createCompoundTerminable(recoveryProcess, outgoingMovingProcess, localProcessor, incomingProcess);
{
public void recover()
{
localProcessingHandler.handle(new QueuingPathHandler.ISpecialCondition()
{
public void handle()
{
localProcessor.recover();
}
}, "recovery", false);
}
};
final ITerminable recoveryProcess =
startupRecoveryProcess(localProcessingRecoverable, incomingProcess.getProcessor(), monitor);
incomingProcess.startup(parameters.getCheckIntervalMillis() / 2L);
return createCompoundTerminable(recoveryProcess, outgoingProcessingHandler, localProcessingHandler,
incomingProcess);
} }
private ITerminable startupRecoveryProcess(final IRecoverable localProcessor, final IRecoverable incomingProcessor, private ITerminable startupRecoveryProcess(final DataMoverProcess localProcessor,
SynchronizationMonitor monitor) final DataMoverProcess incomingProcessor)
{ {
final ITriggerable recoverable = new ITriggerable() final ITriggerable recoverable = new ITriggerable()
{ {
...@@ -131,30 +113,36 @@ public class DataMover ...@@ -131,30 +113,36 @@ public class DataMover
// Trigger initial recovery cycle. // Trigger initial recovery cycle.
recoverable.trigger(); recoverable.trigger();
final TriggeringTimerTask recoveryingTimerTask = final TriggeringTimerTask recoveryingTimerTask =
new TriggeringTimerTask(new File(RECOVERY_MARKER_FIILENAME), recoverable, monitor); new TriggeringTimerTask(new File(RECOVERY_MARKER_FIILENAME), recoverable);
final Timer recoveryTimer = new Timer("Recovery"); final Timer recoveryTimer = new Timer("Recovery");
recoveryTimer.scheduleAtFixedRate(recoveryingTimerTask, 0, parameters.getCheckIntervalMillis()); recoveryTimer.scheduleAtFixedRate(recoveryingTimerTask, 0, parameters.getCheckIntervalMillis());
return TimerHelper.asTerminable(recoveryTimer); return TimerHelper.asTerminable(recoveryTimer);
} }
private IncomingProcessor.IncomingMovingProcess createIncomingMovingProcess(QueuingPathHandler localProcessor, private DataMoverProcess createIncomingMovingProcess()
SynchronizationMonitor monitor)
{ {
return IncomingProcessor.createMovingProcess(parameters, factory, bufferDirs, localProcessor, monitor); return IncomingProcessor.createMovingProcess(parameters, factory, bufferDirs);
} }
private LocalProcessor createLocalProcessor(QueuingPathHandler outgoingHandler) private DataMoverProcess createLocalProcessor()
{ {
final LocalProcessor localProcessor = final LocalProcessor localProcessor =
LocalProcessor.create(parameters, bufferDirs.getCopyCompleteDir(), bufferDirs.getReadyToMoveDir(), LocalProcessor.create(parameters, bufferDirs.getCopyCompleteDir(), bufferDirs.getReadyToMoveDir(),
bufferDirs.getTempDir(), outgoingHandler, factory); bufferDirs.getTempDir(), factory);
return localProcessor; final DirectoryScanningTimerTask localProcessingTask =
new DirectoryScanningTimerTask(bufferDirs.getCopyCompleteDir(), FileUtilities.ACCEPT_ALL_FILTER,
localProcessor);
return new DataMoverProcess(localProcessingTask, "Local Processor", localProcessor);
} }
private QueuingPathHandler startupOutgoingMovingProcess(FileStore outputDir) private DataMoverProcess createOutgoingMovingProcess()
{ {
final IPathHandler remoteMover = createRemotePathMover(null, outputDir.getPath(), outputDir.getHost()); final FileStore outgoingStore = parameters.getOutgoingStore();
return QueuingPathHandler.create(remoteMover, "Final Destination Mover"); final IPathHandler remoteMover = createRemotePathMover(null, outgoingStore.getPath(), outgoingStore.getHost());
final DirectoryScanningTimerTask outgoingMovingTask =
new DirectoryScanningTimerTask(bufferDirs.getReadyToMoveDir(), FileUtilities.ACCEPT_ALL_FILTER,
remoteMover);
return new DataMoverProcess(outgoingMovingTask, "Final Destination Mover");
} }
private IPathHandler createRemotePathMover(String sourceHost, File destinationDirectory, String destinationHost) private IPathHandler createRemotePathMover(String sourceHost, File destinationDirectory, String destinationHost)
......
package ch.systemsx.cisd.datamover;
import java.util.Timer;
import java.util.TimerTask;
import ch.systemsx.cisd.common.utilities.ITerminable;
import ch.systemsx.cisd.common.utilities.TimerHelper;
import ch.systemsx.cisd.datamover.filesystem.intf.IRecoverableTimerTaskFactory;
/**
* A class that represents the incoming moving process.
*/
public class DataMoverProcess implements ITerminable
{
private final Timer timer;
private final TimerTask dataMoverTimerTask;
private final ITerminable terminable;
private final IRecoverableTimerTaskFactory recoverableTimerTaskFactory;
DataMoverProcess(TimerTask timerTask, String taskName)
{
this(timerTask, taskName, null);
}
DataMoverProcess(TimerTask dataMoverTimerTask, String taskName,
IRecoverableTimerTaskFactory recoverableTimerTaskFactory)
{
this.dataMoverTimerTask = dataMoverTimerTask;
this.recoverableTimerTaskFactory = recoverableTimerTaskFactory;
this.timer = new Timer(taskName);
this.terminable = TimerHelper.asTerminable(timer);
}
/**
* Starts up the process with a the given <var>delay</var> and <var>period</var> in milli seconds.
*/
public void startup(long delay, long period)
{
// The moving task is scheduled at fixed rate. It makes sense especially if the task is moving data from the
// remote share. The rationale behind this is that if new items are
// added to the source directory while the incoming timer task has been running for a long time, busy moving
// data, the task shouldn't sit idle for the check time when there is actually work to do.
timer.scheduleAtFixedRate(dataMoverTimerTask, delay, period);
}
public boolean terminate()
{
return terminable.terminate();
}
public void recover()
{
if (recoverableTimerTaskFactory != null)
{
timer.schedule(recoverableTimerTaskFactory.createRecoverableTimerTask(), 0);
}
}
}
\ No newline at end of file
...@@ -18,7 +18,6 @@ package ch.systemsx.cisd.datamover; ...@@ -18,7 +18,6 @@ package ch.systemsx.cisd.datamover;
import java.io.File; import java.io.File;
import java.io.FileFilter; import java.io.FileFilter;
import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import org.apache.log4j.Level; import org.apache.log4j.Level;
...@@ -31,16 +30,13 @@ import ch.systemsx.cisd.common.logging.LogCategory; ...@@ -31,16 +30,13 @@ import ch.systemsx.cisd.common.logging.LogCategory;
import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.logging.LogFactory;
import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask; import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask;
import ch.systemsx.cisd.common.utilities.IPathHandler; import ch.systemsx.cisd.common.utilities.IPathHandler;
import ch.systemsx.cisd.common.utilities.IRecoverable;
import ch.systemsx.cisd.common.utilities.ITerminable;
import ch.systemsx.cisd.common.utilities.NamePrefixFileFilter; import ch.systemsx.cisd.common.utilities.NamePrefixFileFilter;
import ch.systemsx.cisd.common.utilities.SynchronizationMonitor;
import ch.systemsx.cisd.common.utilities.TimerHelper;
import ch.systemsx.cisd.datamover.common.MarkerFile; import ch.systemsx.cisd.datamover.common.MarkerFile;
import ch.systemsx.cisd.datamover.filesystem.RemoteMonitoredMoverFactory; import ch.systemsx.cisd.datamover.filesystem.RemoteMonitoredMoverFactory;
import ch.systemsx.cisd.datamover.filesystem.intf.IFileSysOperationsFactory; import ch.systemsx.cisd.datamover.filesystem.intf.IFileSysOperationsFactory;
import ch.systemsx.cisd.datamover.filesystem.intf.IPathMover; import ch.systemsx.cisd.datamover.filesystem.intf.IPathMover;
import ch.systemsx.cisd.datamover.filesystem.intf.IReadPathOperations; import ch.systemsx.cisd.datamover.filesystem.intf.IReadPathOperations;
import ch.systemsx.cisd.datamover.filesystem.intf.IRecoverableTimerTaskFactory;
import ch.systemsx.cisd.datamover.utils.FileStore; import ch.systemsx.cisd.datamover.utils.FileStore;
import ch.systemsx.cisd.datamover.utils.LocalBufferDirs; import ch.systemsx.cisd.datamover.utils.LocalBufferDirs;
import ch.systemsx.cisd.datamover.utils.QuietPeriodFileFilter; import ch.systemsx.cisd.datamover.utils.QuietPeriodFileFilter;
...@@ -48,7 +44,7 @@ import ch.systemsx.cisd.datamover.utils.QuietPeriodFileFilter; ...@@ -48,7 +44,7 @@ import ch.systemsx.cisd.datamover.utils.QuietPeriodFileFilter;
/** /**
* @author Tomasz Pylak on Sep 7, 2007 * @author Tomasz Pylak on Sep 7, 2007
*/ */
public class IncomingProcessor implements IRecoverable public class IncomingProcessor implements IRecoverableTimerTaskFactory
{ {
private static final Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, IncomingProcessor.class); private static final Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, IncomingProcessor.class);
...@@ -62,8 +58,6 @@ public class IncomingProcessor implements IRecoverable ...@@ -62,8 +58,6 @@ public class IncomingProcessor implements IRecoverable
private final IPathMover pathMover; private final IPathMover pathMover;
private final IPathHandler localProcessor;
private final LocalBufferDirs bufferDirs; private final LocalBufferDirs bufferDirs;
private final boolean isIncomingRemote; private final boolean isIncomingRemote;
...@@ -72,55 +66,15 @@ public class IncomingProcessor implements IRecoverable ...@@ -72,55 +66,15 @@ public class IncomingProcessor implements IRecoverable
private final String prefixForIncoming; private final String prefixForIncoming;
/** public static final DataMoverProcess createMovingProcess(Parameters parameters, IFileSysOperationsFactory factory,
* A class that represents the incoming moving process. LocalBufferDirs bufferDirs)
*/
public class IncomingMovingProcess implements ITerminable
{ {
private final Timer movingTimer; final IncomingProcessor processor = new IncomingProcessor(parameters, factory, bufferDirs);
private final TimerTask movingTask;
private final ITerminable terminable;
IncomingMovingProcess(TimerTask movingTask)
{
this.movingTask = movingTask;
this.movingTimer = new Timer("Mover of Incoming Data");
this.terminable = TimerHelper.asTerminable(movingTimer);
}
public IncomingProcessor getProcessor()
{
return IncomingProcessor.this;
}
/** Starts up the process with <var>delay</var> milli seconds. */
public void startup(long delay)
{
// The moving task is scheduled at fixed rate. It makes sense especially if the task is moving data from the
// remote share. The rationale behind this is that if new items are
// added to the source directory while the incoming timer task has been running for a long time, busy moving
// data, the task shouldn't sit idle for the check time when there is actually work to do.
movingTimer.scheduleAtFixedRate(movingTask, delay, parameters.getCheckIntervalMillis());
}
public boolean terminate()
{
return terminable.terminate();
}
return processor.create();
} }
public static final IncomingMovingProcess createMovingProcess(Parameters parameters, private IncomingProcessor(Parameters parameters, IFileSysOperationsFactory factory, LocalBufferDirs bufferDirs)
IFileSysOperationsFactory factory, LocalBufferDirs bufferDirs, final IPathHandler localProcessor,
final SynchronizationMonitor monitor)
{
final IncomingProcessor processor = new IncomingProcessor(parameters, factory, bufferDirs, localProcessor);
return processor.createIncomingMovingProcess(monitor);
}
private IncomingProcessor(Parameters parameters, IFileSysOperationsFactory factory, LocalBufferDirs bufferDirs,
IPathHandler localProcessor)
{ {
this.parameters = parameters; this.parameters = parameters;
this.prefixForIncoming = parameters.getPrefixForIncoming(); this.prefixForIncoming = parameters.getPrefixForIncoming();
...@@ -128,32 +82,24 @@ public class IncomingProcessor implements IRecoverable ...@@ -128,32 +82,24 @@ public class IncomingProcessor implements IRecoverable
this.incomingStore = parameters.getIncomingStore(); this.incomingStore = parameters.getIncomingStore();
this.incomingReadOperations = factory.getReadPathOperations(); this.incomingReadOperations = factory.getReadPathOperations();
this.pathMover = factory.getMover(); this.pathMover = factory.getMover();
this.localProcessor = localProcessor;
this.factory = factory; this.factory = factory;
this.bufferDirs = bufferDirs; this.bufferDirs = bufferDirs;
} }
public void recover()
public TimerTask createRecoverableTimerTask()
{ {
if (operationLog.isDebugEnabled()) return new IncomingProcessorRecoveryTask();
{
operationLog.debug("Recovery cycle starts.");
}
new IncomingProcessorRecovery().recoverIncomingAfterShutdown();
if (operationLog.isDebugEnabled())
{
operationLog.debug("Recovery cycle is finished.");
}
} }
private IncomingMovingProcess createIncomingMovingProcess(SynchronizationMonitor monitor) private DataMoverProcess create()
{ {
final IPathHandler pathHandler = createIncomingMovingPathHandler(incomingStore.getHost()); final IPathHandler pathHandler = createIncomingMovingPathHandler(incomingStore.getHost());
final FileFilter filter = createQuietPeriodFilter(); final FileFilter filter = createQuietPeriodFilter();
final DirectoryScanningTimerTask movingTask = final DirectoryScanningTimerTask movingTask =
new DirectoryScanningTimerTask(incomingStore.getPath(), filter, pathHandler, monitor); new DirectoryScanningTimerTask(incomingStore.getPath(), filter, pathHandler);
return new IncomingMovingProcess(movingTask); return new DataMoverProcess(movingTask, "Mover of Incoming Data", this);
} }
private FileFilter createQuietPeriodFilter() private FileFilter createQuietPeriodFilter()
...@@ -199,7 +145,6 @@ public class IncomingProcessor implements IRecoverable ...@@ -199,7 +145,6 @@ public class IncomingProcessor implements IRecoverable
{ {
return; return;
} }
localProcessor.handle(finalFile);
} }
private void moveFromRemoteIncoming(File source, String sourceHostOrNull) private void moveFromRemoteIncoming(File source, String sourceHostOrNull)
...@@ -223,9 +168,6 @@ public class IncomingProcessor implements IRecoverable ...@@ -223,9 +168,6 @@ public class IncomingProcessor implements IRecoverable
{ {
return; return;
} }
// 3. schedule local processing, always successful
localProcessor.handle(finalFile);
} }
private File tryMoveFromInProgressToFinished(File copiedFile, File markerFileOrNull, File copyCompleteDir) private File tryMoveFromInProgressToFinished(File copiedFile, File markerFileOrNull, File copyCompleteDir)
...@@ -262,15 +204,24 @@ public class IncomingProcessor implements IRecoverable ...@@ -262,15 +204,24 @@ public class IncomingProcessor implements IRecoverable
// ------------------- recovery ------------------------ // ------------------- recovery ------------------------
class IncomingProcessorRecovery class IncomingProcessorRecoveryTask extends TimerTask
{ {
public void recoverIncomingAfterShutdown() @Override
public void run()
{ {
if (operationLog.isDebugEnabled())
{
operationLog.debug("Recovery starts.");
}
if (isIncomingRemote) if (isIncomingRemote)
{ {
recoverIncomingInProgress(bufferDirs.getCopyInProgressDir(), bufferDirs.getCopyCompleteDir()); recoverIncomingInProgress(bufferDirs.getCopyInProgressDir(), bufferDirs.getCopyCompleteDir());
} }
recoverIncomingCopyComplete(bufferDirs.getCopyCompleteDir()); recoverIncomingCopyComplete(bufferDirs.getCopyCompleteDir());
if (operationLog.isDebugEnabled())
{
operationLog.debug("Recovery is finished.");
}
} }
private void recoverIncomingInProgress(File copyInProgressDir, File copyCompleteDir) private void recoverIncomingInProgress(File copyInProgressDir, File copyCompleteDir)
...@@ -339,11 +290,6 @@ public class IncomingProcessor implements IRecoverable ...@@ -339,11 +290,6 @@ public class IncomingProcessor implements IRecoverable
{ {
return; // directory is empty, no recovery is needed return; // directory is empty, no recovery is needed
} }
for (File file : files)
{
localProcessor.handle(file);
}
} }
} }
} }
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package ch.systemsx.cisd.datamover; package ch.systemsx.cisd.datamover;
import java.io.File; import java.io.File;
import java.util.TimerTask;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.log4j.Level; import org.apache.log4j.Level;
...@@ -28,13 +29,13 @@ import ch.systemsx.cisd.common.logging.LogCategory; ...@@ -28,13 +29,13 @@ import ch.systemsx.cisd.common.logging.LogCategory;
import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.logging.LogFactory;
import ch.systemsx.cisd.common.utilities.FileUtilities; import ch.systemsx.cisd.common.utilities.FileUtilities;
import ch.systemsx.cisd.common.utilities.IPathHandler; import ch.systemsx.cisd.common.utilities.IPathHandler;
import ch.systemsx.cisd.common.utilities.IRecoverable;
import ch.systemsx.cisd.common.utilities.RegexFileFilter; import ch.systemsx.cisd.common.utilities.RegexFileFilter;
import ch.systemsx.cisd.common.utilities.RegexFileFilter.PathType; import ch.systemsx.cisd.common.utilities.RegexFileFilter.PathType;
import ch.systemsx.cisd.datamover.filesystem.intf.IFileSysOperationsFactory; import ch.systemsx.cisd.datamover.filesystem.intf.IFileSysOperationsFactory;
import ch.systemsx.cisd.datamover.filesystem.intf.IPathImmutableCopier; import ch.systemsx.cisd.datamover.filesystem.intf.IPathImmutableCopier;
import ch.systemsx.cisd.datamover.filesystem.intf.IPathMover; import ch.systemsx.cisd.datamover.filesystem.intf.IPathMover;
import ch.systemsx.cisd.datamover.filesystem.intf.IReadPathOperations; import ch.systemsx.cisd.datamover.filesystem.intf.IReadPathOperations;
import ch.systemsx.cisd.datamover.filesystem.intf.IRecoverableTimerTaskFactory;
/** /**
* Processing of the files on the local machine. This class does not scan its input directory, all resources must * Processing of the files on the local machine. This class does not scan its input directory, all resources must
...@@ -42,7 +43,7 @@ import ch.systemsx.cisd.datamover.filesystem.intf.IReadPathOperations; ...@@ -42,7 +43,7 @@ import ch.systemsx.cisd.datamover.filesystem.intf.IReadPathOperations;
* *
* @author Tomasz Pylak on Aug 24, 2007 * @author Tomasz Pylak on Aug 24, 2007
*/ */
public class LocalProcessor implements IPathHandler, IRecoverable public class LocalProcessor implements IPathHandler, IRecoverableTimerTaskFactory
{ {
private static final Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, LocalProcessor.class); private static final Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, LocalProcessor.class);
...@@ -72,33 +73,42 @@ public class LocalProcessor implements IPathHandler, IRecoverable ...@@ -72,33 +73,42 @@ public class LocalProcessor implements IPathHandler, IRecoverable
private final File extraCopyDirOrNull; private final File extraCopyDirOrNull;
private final IPathHandler outgoingHandler;
private LocalProcessor(Parameters parameters, File inputDir, File outputDir, File tempDir, private LocalProcessor(Parameters parameters, File inputDir, File outputDir, File tempDir,
IPathHandler outgoingHandler, IFileSysOperationsFactory factory) IFileSysOperationsFactory factory)
{ {
this.parameters = parameters; this.parameters = parameters;
this.inputDir = inputDir; this.inputDir = inputDir;
this.outputDir = outputDir; this.outputDir = outputDir;
this.tempDir = tempDir; this.tempDir = tempDir;
this.outgoingHandler = outgoingHandler;
this.extraCopyDirOrNull = parameters.tryGetExtraCopyDir(); this.extraCopyDirOrNull = parameters.tryGetExtraCopyDir();
this.copier = factory.getImmutableCopier(); this.copier = factory.getImmutableCopier();
this.mover = factory.getMover(); this.mover = factory.getMover();
this.readOperations = factory.getReadPathOperations(); this.readOperations = factory.getReadPathOperations();
} }
public static final LocalProcessor create(Parameters parameters, File inputDir, File outputDir, public static final LocalProcessor create(Parameters parameters, File inputDir, File outputDir, File bufferDir,
File bufferDir, IPathHandler lastStepHandler, IFileSysOperationsFactory factory) IFileSysOperationsFactory factory)
{ {
final LocalProcessor handlerAndRecoverable = final LocalProcessor handlerAndRecoverable =
new LocalProcessor(parameters, inputDir, outputDir, bufferDir, lastStepHandler, factory); new LocalProcessor(parameters, inputDir, outputDir, bufferDir, factory);
return handlerAndRecoverable; return handlerAndRecoverable;
} }
// ---------------- // ----------------
public void recover() public TimerTask createRecoverableTimerTask()
{
return new TimerTask()
{
@Override
public void run()
{
recover();
}
};
}
private void recover()
{ {
if (operationLog.isDebugEnabled()) if (operationLog.isDebugEnabled())
{ {
...@@ -151,11 +161,6 @@ public class LocalProcessor implements IPathHandler, IRecoverable ...@@ -151,11 +161,6 @@ public class LocalProcessor implements IPathHandler, IRecoverable
{ {
return; // directory is empty, no recovery is needed return; // directory is empty, no recovery is needed
} }
for (File file : files)
{
outgoingHandler.handle(file);
}
} }
// ---------------- // ----------------
...@@ -179,10 +184,7 @@ public class LocalProcessor implements IPathHandler, IRecoverable ...@@ -179,10 +184,7 @@ public class LocalProcessor implements IPathHandler, IRecoverable
} }
final File movedFile = mover.tryMove(path, outputDir); final File movedFile = mover.tryMove(path, outputDir);
if (movedFile != null) if (movedFile == null)
{
outgoingHandler.handle(movedFile);
} else
{ {
notificationLog.error(String notificationLog.error(String
.format("Moving '%s' to '%s' for final moving process failed.", path, outputDir)); .format("Moving '%s' to '%s' for final moving process failed.", path, outputDir));
......
package ch.systemsx.cisd.datamover.filesystem.intf;
import java.util.TimerTask;
/*
* Copyright 2007 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* A factory for creating {@link TimerTask}s that, when run, perform a recovery operation on the data mover.
*
* @author Bernd Rinn
*/
public interface IRecoverableTimerTaskFactory
{
public TimerTask createRecoverableTimerTask();
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment