Skip to content
Snippets Groups Projects
Commit 99aa906e authored by ribeaudc's avatar ribeaudc
Browse files

[DMV-12] add: - 'mayHandle' to 'IStoreHandler'.

- 'FileScannedStore' (extracted from 'IncomingProcessor').
change: - More property keys moved to 'PropertyNames'.

SVN: 6061
parent 01546820
No related branches found
No related tags found
No related merge requests found
/*
* Copyright 2007 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.systemsx.cisd.datamover;
import java.util.Vector;
import ch.systemsx.cisd.common.Constants;
import ch.systemsx.cisd.common.logging.ISimpleLogger;
import ch.systemsx.cisd.common.utilities.StoreItem;
import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask.IScannedStore;
import ch.systemsx.cisd.datamover.filesystem.intf.IFileStore;
import ch.systemsx.cisd.datamover.utils.QuietPeriodFileFilter;
/**
* An <code>IScannedStore</code> implementation which is based on {@link IFileStore}.
*
* @author Christian Ribeaud
*/
final class FileScannedStore implements IScannedStore
{
private final IFileStore fileStore;
private final QuietPeriodFileFilter quietPeriodFileFilter;
FileScannedStore(final IFileStore fileStore, final QuietPeriodFileFilter quietPeriodFileFilter)
{
this.fileStore = fileStore;
this.quietPeriodFileFilter = quietPeriodFileFilter;
}
private final StoreItem[] filterReadyToProcess(final StoreItem[] items)
{
final Vector<StoreItem> result = new Vector<StoreItem>();
for (final StoreItem item : items)
{
if (isReadyToProcess(item))
{
result.add(item);
}
}
return result.toArray(StoreItem.EMPTY_ARRAY);
}
private final boolean isReadyToProcess(final StoreItem item)
{
if (item.getName().startsWith(Constants.DELETION_IN_PROGRESS_PREFIX))
{
return false;
}
return quietPeriodFileFilter.accept(item);
}
//
// IScannedStore
//
public final boolean exists(final StoreItem item)
{
return fileStore.exists(item);
}
public final String getLocationDescription(final StoreItem item)
{
return fileStore.getLocationDescription(item);
}
public final StoreItem[] tryListSortedReadyToProcess(final ISimpleLogger loggerOrNull)
{
// Older items will be handled before newer items.
// This becomes important when doing online quality control of measurements.
final StoreItem[] items = fileStore.tryListSortByLastModified(loggerOrNull);
if (items == null)
{
return null;
}
return filterReadyToProcess(items);
}
}
\ No newline at end of file
......@@ -18,11 +18,9 @@ package ch.systemsx.cisd.datamover;
import java.io.File;
import java.util.TimerTask;
import java.util.Vector;
import org.apache.log4j.Logger;
import ch.systemsx.cisd.common.Constants;
import ch.systemsx.cisd.common.logging.ISimpleLogger;
import ch.systemsx.cisd.common.logging.Log4jSimpleLogger;
import ch.systemsx.cisd.common.logging.LogCategory;
......@@ -31,7 +29,6 @@ import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask;
import ch.systemsx.cisd.common.utilities.FileUtilities;
import ch.systemsx.cisd.common.utilities.IStoreHandler;
import ch.systemsx.cisd.common.utilities.StoreItem;
import ch.systemsx.cisd.common.utilities.DirectoryScanningTimerTask.IScannedStore;
import ch.systemsx.cisd.datamover.common.MarkerFile;
import ch.systemsx.cisd.datamover.filesystem.FileStoreFactory;
import ch.systemsx.cisd.datamover.filesystem.RemoteMonitoredMoverFactory;
......@@ -74,16 +71,16 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory
private final QuietPeriodFileFilter quietPeriodFileFilter;
public static final DataMoverProcess createMovingProcess(Parameters parameters,
IFileSysOperationsFactory factory, LocalBufferDirs bufferDirs)
public static final DataMoverProcess createMovingProcess(final Parameters parameters,
final IFileSysOperationsFactory factory, final LocalBufferDirs bufferDirs)
{
final IncomingProcessor processor = new IncomingProcessor(parameters, factory, bufferDirs);
return processor.create();
}
private IncomingProcessor(Parameters parameters, IFileSysOperationsFactory factory,
LocalBufferDirs bufferDirs)
private IncomingProcessor(final Parameters parameters, final IFileSysOperationsFactory factory,
final LocalBufferDirs bufferDirs)
{
this.parameters = parameters;
this.prefixForIncoming = parameters.getPrefixForIncoming();
......@@ -99,73 +96,33 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory
return new IncomingProcessorRecoveryTask();
}
private DataMoverProcess create()
private final DataMoverProcess create()
{
final IStoreHandler pathHandler = createIncomingMovingPathHandler();
final DirectoryScanningTimerTask movingTask =
new DirectoryScanningTimerTask(createIncomingStoreScanner(), bufferDirs
.getCopyInProgressDir(), pathHandler, NUMBER_OF_ERRORS_IN_LISTING_IGNORED);
new DirectoryScanningTimerTask(new FileScannedStore(incomingStore,
quietPeriodFileFilter), bufferDirs.getCopyInProgressDir(), pathHandler,
NUMBER_OF_ERRORS_IN_LISTING_IGNORED);
return new DataMoverProcess(movingTask, "Mover of Incoming Data", this);
}
private IScannedStore createIncomingStoreScanner()
private IStoreHandler createIncomingMovingPathHandler()
{
return new IScannedStore()
return new IStoreHandler()
{
public boolean exists(StoreItem item)
{
return incomingStore.exists(item);
}
public String getLocationDescription(StoreItem item)
{
return incomingStore.getLocationDescription(item);
}
//
// IStoreHandler
//
public StoreItem[] tryListSortedReadyToProcess(ISimpleLogger loggerOrNull)
public final boolean mayHandle(final StoreItem item)
{
// Older items will be handled before newer items.
// This becomes important when doing online quality control of measurements.
StoreItem[] items = incomingStore.tryListSortByLastModified(loggerOrNull);
if (items == null)
{
return null;
}
return filterReadyToProcess(items);
return true;
}
};
}
private StoreItem[] filterReadyToProcess(StoreItem[] items)
{
Vector<StoreItem> result = new Vector<StoreItem>();
for (StoreItem item : items)
{
if (isReadyToProcess(item))
{
result.add(item);
}
}
return result.toArray(StoreItem.EMPTY_ARRAY);
}
private boolean isReadyToProcess(StoreItem item)
{
if (item.getName().startsWith(Constants.DELETION_IN_PROGRESS_PREFIX))
{
return false;
}
return quietPeriodFileFilter.accept(item);
}
private IStoreHandler createIncomingMovingPathHandler()
{
return new IStoreHandler()
{
public void handle(StoreItem sourceItem)
public final void handle(final StoreItem sourceItem)
{
IExtendedFileStore extendedFileStore = incomingStore.tryAsExtended();
final IExtendedFileStore extendedFileStore = incomingStore.tryAsExtended();
if (extendedFileStore == null)
{
moveFromRemoteIncoming(sourceItem);
......@@ -177,13 +134,14 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory
};
}
private void moveFromLocalIncoming(IExtendedFileStore sourceStore, StoreItem sourceItem)
private void moveFromLocalIncoming(final IExtendedFileStore sourceStore,
final StoreItem sourceItem)
{
sourceStore.tryMoveLocal(sourceItem, bufferDirs.getCopyCompleteDir(), parameters
.getPrefixForIncoming());
}
private void moveFromRemoteIncoming(StoreItem sourceItem)
private void moveFromRemoteIncoming(final StoreItem sourceItem)
{
// 1. move from incoming: copy, delete, create copy-finished-marker
final File copyInProgressDir = bufferDirs.getCopyInProgressDir();
......@@ -199,8 +157,8 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory
tryMoveFromInProgressToFinished(copiedFile, markerFile, bufferDirs.getCopyCompleteDir());
}
private File tryMoveFromInProgressToFinished(File copiedFile, File markerFileOrNull,
File copyCompleteDir)
private File tryMoveFromInProgressToFinished(final File copiedFile,
final File markerFileOrNull, final File copyCompleteDir)
{
final File finalFile = tryMoveLocal(copiedFile, copyCompleteDir, prefixForIncoming);
if (finalFile != null)
......@@ -223,48 +181,35 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory
}
}
private void moveFromRemoteToLocal(StoreItem sourceItem, IFileStore sourceStore,
File localDestDir)
private void moveFromRemoteToLocal(final StoreItem sourceItem, final IFileStore sourceStore,
final File localDestDir)
{
createRemotePathMover(sourceStore,
FileStoreFactory.createLocal(localDestDir, "local", factory)).handle(sourceItem);
}
private IStoreHandler createRemotePathMover(IFileStore sourceDirectory,
FileStore destinationDirectory)
private IStoreHandler createRemotePathMover(final IFileStore sourceDirectory,
final FileStore destinationDirectory)
{
return RemoteMonitoredMoverFactory
.create(sourceDirectory, destinationDirectory, parameters);
}
private File tryMoveLocal(File sourceFile, File destinationDir, String prefixTemplate)
private File tryMoveLocal(final File sourceFile, final File destinationDir,
final String prefixTemplate)
{
return pathMover.tryMove(sourceFile, destinationDir, prefixTemplate);
}
// ------------------- recovery ------------------------
//
// Helper classes
//
class IncomingProcessorRecoveryTask extends TimerTask
private final class IncomingProcessorRecoveryTask extends TimerTask
{
@Override
public void run()
{
if (operationLog.isDebugEnabled())
{
operationLog.debug("Recovery starts.");
}
if (incomingStore.isRemote())
{
recoverIncomingInProgress(bufferDirs.getCopyInProgressDir(), bufferDirs
.getCopyCompleteDir());
}
if (operationLog.isDebugEnabled())
{
operationLog.debug("Recovery is finished.");
}
}
private void recoverIncomingInProgress(File copyInProgressDir, File copyCompleteDir)
private final void recoverIncomingInProgress(final File copyInProgressDir,
final File copyCompleteDir)
{
final File[] files = FileUtilities.tryListFiles(copyInProgressDir, simpleOperationLog);
if (files == null || files.length == 0)
......@@ -272,7 +217,7 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory
return; // directory is empty, no recovery is needed
}
for (File file : files)
for (final File file : files)
{
if (MarkerFile.isDeletionInProgressMarker(file))
{
......@@ -282,7 +227,8 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory
}
}
private void recoverIncomingAfterShutdown(File unfinishedFile, File copyCompleteDir)
private final void recoverIncomingAfterShutdown(final File unfinishedFile,
final File copyCompleteDir)
{
if (MarkerFile.isCopyFinishedMarker(unfinishedFile))
{
......@@ -319,5 +265,27 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory
}
}
}
//
// TimerTask
//
@Override
public final void run()
{
if (operationLog.isDebugEnabled())
{
operationLog.debug("Recovery starts.");
}
if (incomingStore.isRemote())
{
recoverIncomingInProgress(bufferDirs.getCopyInProgressDir(), bufferDirs
.getCopyCompleteDir());
}
if (operationLog.isDebugEnabled())
{
operationLog.debug("Recovery is finished.");
}
}
}
}
......@@ -53,7 +53,7 @@ import ch.systemsx.cisd.datamover.intf.ITimingParameters;
*
* @author Bernd Rinn
*/
public class Parameters implements ITimingParameters, IFileSysParameters
public final class Parameters implements ITimingParameters, IFileSysParameters
{
private static final Logger operationLog =
LogFactory.getLogger(LogCategory.OPERATION, Parameters.class);
......@@ -225,7 +225,7 @@ public class Parameters implements ITimingParameters, IFileSysParameters
/**
* The remote host to copy the data to (only with rsync, will use an ssh tunnel).
*/
@Option(longName = "outgoing-host", metaVar = "HOST", usage = "The remote host to move the data to (only "
@Option(longName = PropertyNames.OUTGOING_HOST, metaVar = "HOST", usage = "The remote host to move the data to (only "
+ "with rsync).")
private String outgoingHost = null;
......@@ -241,7 +241,7 @@ public class Parameters implements ITimingParameters, IFileSysParameters
* The regular expression to use for cleansing on the incoming directory before moving it to the
* buffer.
*/
@Option(longName = "cleansing-regex", usage = "The regular expression to use for cleansing before "
@Option(longName = PropertyNames.CLEANSING_REGEX, usage = "The regular expression to use for cleansing before "
+ "moving to outgoing.")
private Pattern cleansingRegex = null;
......@@ -249,7 +249,7 @@ public class Parameters implements ITimingParameters, IFileSysParameters
* The regular expression to use for deciding whether a path in the incoming directory needs
* manual intervention.
*/
@Option(longName = "manual-intervention-regex", usage = "The regular expression to use for deciding whether an "
@Option(longName = PropertyNames.MANUAL_INTERVENTION_REGEX, usage = "The regular expression to use for deciding whether an "
+ "incoming paths needs manual intervention. ")
private Pattern manualInterventionRegex = null;
......@@ -259,7 +259,7 @@ public class Parameters implements ITimingParameters, IFileSysParameters
*/
@Option(longName = PropertyNames.PREFIX_FOR_INCOMING, usage = "A string that all incoming items will be prepended with, "
+ "'%t' will be replaced with the current time stamp.")
private String prefixForIncoming;
private String prefixForIncoming = "";
/**
* The command line parser.
......@@ -387,7 +387,8 @@ public class Parameters implements ITimingParameters, IFileSysParameters
PropertyUtils.getProperty(serviceProperties, PropertyNames.HARD_LINK_EXECUTABLE,
hardLinkExecutable);
checkIntervalMillis =
PropertyUtils.getPosLong(serviceProperties, PropertyNames.CHECK_INTERVAL, checkIntervalMillis);
PropertyUtils.getPosLong(serviceProperties, PropertyNames.CHECK_INTERVAL,
checkIntervalMillis);
checkIntervalInternalMillis =
PropertyUtils.getPosLong(serviceProperties, PropertyNames.CHECK_INTERVAL_INTERNAL,
checkIntervalInternalMillis);
......@@ -395,18 +396,23 @@ public class Parameters implements ITimingParameters, IFileSysParameters
PropertyUtils.getPosLong(serviceProperties, PropertyNames.INACTIVITY_PERIOD,
inactivityPeriodMillis);
quietPeriodMillis =
PropertyUtils.getPosLong(serviceProperties, PropertyNames.QUIET_PERIOD, quietPeriodMillis);
PropertyUtils.getPosLong(serviceProperties, PropertyNames.QUIET_PERIOD,
quietPeriodMillis);
intervalToWaitAfterFailureMillis =
PropertyUtils.getPosLong(serviceProperties, PropertyNames.FAILURE_INTERVAL,
intervalToWaitAfterFailureMillis);
maximalNumberOfRetries =
PropertyUtils.getPosInt(serviceProperties, PropertyNames.MAX_RETRIES, maximalNumberOfRetries);
PropertyUtils.getPosInt(serviceProperties, PropertyNames.MAX_RETRIES,
maximalNumberOfRetries);
treatIncomingAsRemote =
PropertyUtils.getBoolean(serviceProperties, PropertyNames.TREAT_INCOMING_AS_REMOTE,
treatIncomingAsRemote);
prefixForIncoming = serviceProperties.getProperty(PropertyNames.PREFIX_FOR_INCOMING, "").trim();
incomingDirectory = tryCreateFile(serviceProperties, PropertyNames.INCOMING_DIR);
incomingHost = serviceProperties.getProperty(PropertyNames.INCOMING_HOST);
prefixForIncoming =
PropertyUtils.getProperty(serviceProperties, PropertyNames.PREFIX_FOR_INCOMING,
prefixForIncoming);
incomingDirectory =
tryCreateFile(serviceProperties, PropertyNames.INCOMING_DIR, incomingDirectory);
incomingHost = PropertyUtils.getProperty(serviceProperties, PropertyNames.INCOMING_HOST);
if (serviceProperties.getProperty(PropertyNames.BUFFER_DIR) != null)
{
bufferDirectory =
......@@ -414,35 +420,38 @@ public class Parameters implements ITimingParameters, IFileSysParameters
PropertyNames.BUFFER_DIR);
}
manualInterventionDirectoryOrNull =
tryCreateFile(serviceProperties, PropertyNames.MANUAL_INTERVENTION_DIR);
tryCreateFile(serviceProperties, PropertyNames.MANUAL_INTERVENTION_DIR,
manualInterventionDirectoryOrNull);
if (serviceProperties.getProperty(PropertyNames.OUTGOING_DIR) != null)
{
outgoingDirectory =
FileWithHighwaterMark.fromProperties(serviceProperties,
PropertyNames.OUTGOING_DIR);
}
outgoingHost = serviceProperties.getProperty("outgoing-host");
extraCopyDirectory = tryCreateFile(serviceProperties, PropertyNames.EXTRA_COPY_DIR);
if (serviceProperties.getProperty("cleansing-regex") != null)
outgoingHost = serviceProperties.getProperty(PropertyNames.OUTGOING_HOST);
extraCopyDirectory =
tryCreateFile(serviceProperties, PropertyNames.EXTRA_COPY_DIR, extraCopyDirectory);
if (serviceProperties.getProperty(PropertyNames.CLEANSING_REGEX) != null)
{
cleansingRegex = Pattern.compile(serviceProperties.getProperty("cleansing-regex"));
cleansingRegex = Pattern.compile(serviceProperties.getProperty(PropertyNames.CLEANSING_REGEX));
}
if (serviceProperties.getProperty("manual-intervention-regex") != null)
if (serviceProperties.getProperty(PropertyNames.MANUAL_INTERVENTION_REGEX) != null)
{
manualInterventionRegex =
Pattern.compile(serviceProperties.getProperty("manual-intervention-regex"));
Pattern.compile(serviceProperties.getProperty(PropertyNames.MANUAL_INTERVENTION_REGEX));
}
}
private final File tryCreateFile(final Properties serviceProperties, final String propertyKey)
private final File tryCreateFile(final Properties serviceProperties, final String propertyKey,
final File defaultValue)
{
final String propertyValue = serviceProperties.getProperty(propertyKey);
final String propertyValue = PropertyUtils.getProperty(serviceProperties, propertyKey);
if (propertyValue != null)
{
return new File(propertyValue.trim());
return new File(propertyValue);
} else
{
return null;
return defaultValue;
}
}
......
......@@ -33,6 +33,8 @@ public final class PropertyNames
static final String CHECK_INTERVAL_INTERNAL = "check-interval-internal";
static final String CLEANSING_REGEX = "cleansing-regex";
/** The local directory where we create additional copy of the incoming data. */
static final String EXTRA_COPY_DIR = "extra-copy-dir";
......@@ -52,11 +54,15 @@ public final class PropertyNames
/** The local directory to store paths that need manual intervention. */
static final String MANUAL_INTERVENTION_DIR = "manual-intervention-dir";
static final String MANUAL_INTERVENTION_REGEX = "manual-intervention-regex";
static final String MAX_RETRIES = "max-retries";
/** The remote directory to move the data to. */
static final String OUTGOING_DIR = "outgoing-dir";
static final String OUTGOING_HOST = "outgoing-host";
static final String PREFIX_FOR_INCOMING = "prefix-for-incoming";
static final String QUIET_PERIOD = "quiet-period";
......
......@@ -103,9 +103,9 @@ public final class RemotePathMover implements IStoreHandler
* situations.
* @throws ConfigurationFailureException If the destination directory is not fully accessible.
*/
public RemotePathMover(IFileStore sourceDirectory, IFileStore destinationDirectory,
IStoreCopier copier, CopyActivityMonitor monitor, ITimingParameters timingParameters)
throws ConfigurationFailureException
public RemotePathMover(final IFileStore sourceDirectory, final IFileStore destinationDirectory,
final IStoreCopier copier, final CopyActivityMonitor monitor,
final ITimingParameters timingParameters) throws ConfigurationFailureException
{
assert sourceDirectory != null;
assert destinationDirectory != null;
......@@ -115,7 +115,8 @@ public final class RemotePathMover implements IStoreHandler
|| destinationDirectory.tryAsExtended() != null;
final String errorMsg =
destinationDirectory.tryCheckDirectoryFullyAccessible(DIRECTORY_ACCESSIBLE_TIMEOUT_MILLIS);
destinationDirectory
.tryCheckDirectoryFullyAccessible(DIRECTORY_ACCESSIBLE_TIMEOUT_MILLIS);
if (StringUtils.isNotBlank(errorMsg))
{
throw new ConfigurationFailureException(errorMsg);
......@@ -131,87 +132,7 @@ public final class RemotePathMover implements IStoreHandler
assert maximalNumberOfRetries >= 0;
}
public void handle(StoreItem item)
{
if (isDeletionInProgress(item))
{
// This is a recovery situation: we have been interrupted removing the path and now
// finish the job.
if (operationLog.isInfoEnabled())
{
operationLog
.info(String
.format(
"Detected recovery situation: '%s' has been interrupted in deletion phase, finishing up.",
getSrcPath(item)));
}
removeAndMark(item);
return;
}
int tryCount = 0;
do
{
if (operationLog.isInfoEnabled())
{
if (tryCount > 0) // This is a retry
{
operationLog.info(String.format(START_COPYING_PATH_RETRY_TEMPLATE,
getSrcPath(item), destinationDirectory, tryCount));
} else
{
operationLog.info(String.format(START_COPYING_PATH_TEMPLATE, getSrcPath(item),
destinationDirectory));
}
}
// TODO 2008-03-17, Bernd Rinn: There needs to be a limit for how often we take this
// exit without sending
// a notification email.
if (checkTargetAvailable() == false)
{
return;
}
final long startTime = System.currentTimeMillis();
final Status copyStatus = copyAndMonitor(item);
if (StatusFlag.OK.equals(copyStatus.getFlag()))
{
if (operationLog.isInfoEnabled())
{
final long endTime = System.currentTimeMillis();
operationLog.info(String.format(FINISH_COPYING_PATH_TEMPLATE, getSrcPath(item),
destinationDirectory, (endTime - startTime) / 1000.0));
}
removeAndMark(item);
return;
} else
{
operationLog.warn(String.format(COPYING_PATH_TO_REMOTE_FAILED, getSrcPath(item),
destinationDirectory, copyStatus));
if (StatusFlag.FATAL_ERROR.equals(copyStatus.getFlag()))
{
break;
}
}
// Leave the loop if we have re-tried it too often.
++tryCount;
if (tryCount > maximalNumberOfRetries)
{
break;
}
try
{
Thread.sleep(intervallToWaitAfterFailure);
} catch (InterruptedException e)
{
// We don't expect to get interrupted, but even if, there is no need to handle this
// here.
}
} while (true);
notificationLog.error(String.format(MOVING_PATH_TO_REMOTE_FAILED_TEMPLATE,
getSrcPath(item), destinationDirectory));
}
private Status copyAndMonitor(StoreItem item)
private final Status copyAndMonitor(final StoreItem item)
{
monitor.start(item);
final Status copyStatus = copier.copy(item);
......@@ -219,16 +140,17 @@ public final class RemotePathMover implements IStoreHandler
return copyStatus;
}
private void removeAndMark(StoreItem item)
private final void removeAndMark(final StoreItem item)
{
remove(item);
markAsFinished(item);
}
private boolean checkTargetAvailable()
private final boolean checkTargetAvailable()
{
final String msg =
destinationDirectory.tryCheckDirectoryFullyAccessible(DIRECTORY_ACCESSIBLE_TIMEOUT_MILLIS);
destinationDirectory
.tryCheckDirectoryFullyAccessible(DIRECTORY_ACCESSIBLE_TIMEOUT_MILLIS);
if (msg != null)
{
machineLog.error(msg);
......@@ -237,7 +159,7 @@ public final class RemotePathMover implements IStoreHandler
return true;
}
private void remove(StoreItem sourceItem)
private final void remove(final StoreItem sourceItem)
{
final StoreItem removalInProgressMarkerFile = tryMarkAsDeletionInProgress(sourceItem);
final Status removalStatus = sourceDirectory.delete(sourceItem);
......@@ -253,14 +175,14 @@ public final class RemotePathMover implements IStoreHandler
}
}
private boolean isDeletionInProgress(StoreItem item)
private final boolean isDeletionInProgress(final StoreItem item)
{
StoreItem markDeletionInProgressMarkerFile =
final StoreItem markDeletionInProgressMarkerFile =
MarkerFile.createDeletionInProgressMarker(item);
return getDeletionMarkerStore().exists(markDeletionInProgressMarkerFile);
}
private StoreItem tryMarkAsDeletionInProgress(StoreItem item)
private final StoreItem tryMarkAsDeletionInProgress(final StoreItem item)
{
final StoreItem markDeletionInProgressMarkerFile =
MarkerFile.createDeletionInProgressMarker(item);
......@@ -276,7 +198,7 @@ public final class RemotePathMover implements IStoreHandler
}
}
private void removeDeletionMarkerFile(StoreItem markerOrNull)
private final void removeDeletionMarkerFile(final StoreItem markerOrNull)
{
if (markerOrNull != null)
{
......@@ -289,7 +211,7 @@ public final class RemotePathMover implements IStoreHandler
}
}
private IExtendedFileStore getDeletionMarkerStore()
private final IExtendedFileStore getDeletionMarkerStore()
{
IExtendedFileStore fileStore = destinationDirectory.tryAsExtended();
if (fileStore == null)
......@@ -301,9 +223,9 @@ public final class RemotePathMover implements IStoreHandler
}
// Creates a finish-marker inside destination directory.
private boolean markAsFinished(StoreItem item)
private final boolean markAsFinished(final StoreItem item)
{
StoreItem markerItem = MarkerFile.createCopyFinishedMarker(item);
final StoreItem markerItem = MarkerFile.createCopyFinishedMarker(item);
IExtendedFileStore extendedFileStore = destinationDirectory.tryAsExtended();
if (extendedFileStore != null)
{
......@@ -319,8 +241,8 @@ public final class RemotePathMover implements IStoreHandler
}
}
private boolean markOnSourceLocalAndCopyToRemoteDestination(IExtendedFileStore sourceFileStore,
StoreItem markerFile)
private final boolean markOnSourceLocalAndCopyToRemoteDestination(
final IExtendedFileStore sourceFileStore, final StoreItem markerFile)
{
try
{
......@@ -344,9 +266,10 @@ public final class RemotePathMover implements IStoreHandler
}
}
private static boolean createFileInside(IExtendedFileStore directory, StoreItem item)
private final static boolean createFileInside(final IExtendedFileStore directory,
final StoreItem item)
{
boolean success = directory.createNewFile(item);
final boolean success = directory.createNewFile(item);
if (success == false)
{
machineLog.error(String.format(FAILED_TO_CREATE_FILE_TEMPLATE, item, directory));
......@@ -354,13 +277,103 @@ public final class RemotePathMover implements IStoreHandler
return success;
}
private String getSrcPath(StoreItem item)
private String getSrcPath(final StoreItem item)
{
return getPath(sourceDirectory, item);
}
private static String getPath(IFileStore directory, StoreItem item)
private final static String getPath(final IFileStore directory, final StoreItem item)
{
return item + " inside " + directory;
}
//
// IStoreHandler
//
public final boolean mayHandle(StoreItem item)
{
return true;
}
public final void handle(final StoreItem item)
{
if (isDeletionInProgress(item))
{
// This is a recovery situation: we have been interrupted removing the path and now
// finish the job.
if (operationLog.isInfoEnabled())
{
operationLog
.info(String
.format(
"Detected recovery situation: '%s' has been interrupted in deletion phase, finishing up.",
getSrcPath(item)));
}
removeAndMark(item);
return;
}
int tryCount = 0;
do
{
if (operationLog.isInfoEnabled())
{
if (tryCount > 0) // This is a retry
{
operationLog.info(String.format(START_COPYING_PATH_RETRY_TEMPLATE,
getSrcPath(item), destinationDirectory, tryCount));
} else
{
operationLog.info(String.format(START_COPYING_PATH_TEMPLATE, getSrcPath(item),
destinationDirectory));
}
}
// TODO 2008-03-17, Bernd Rinn: There needs to be a limit for how often we take this
// exit without sending
// a notification email.
if (checkTargetAvailable() == false)
{
return;
}
final long startTime = System.currentTimeMillis();
final Status copyStatus = copyAndMonitor(item);
if (StatusFlag.OK.equals(copyStatus.getFlag()))
{
if (operationLog.isInfoEnabled())
{
final long endTime = System.currentTimeMillis();
operationLog.info(String.format(FINISH_COPYING_PATH_TEMPLATE, getSrcPath(item),
destinationDirectory, (endTime - startTime) / 1000.0));
}
removeAndMark(item);
return;
} else
{
operationLog.warn(String.format(COPYING_PATH_TO_REMOTE_FAILED, getSrcPath(item),
destinationDirectory, copyStatus));
if (StatusFlag.FATAL_ERROR.equals(copyStatus.getFlag()))
{
break;
}
}
// Leave the loop if we have re-tried it too often.
++tryCount;
if (tryCount > maximalNumberOfRetries)
{
break;
}
try
{
Thread.sleep(intervallToWaitAfterFailure);
} catch (final InterruptedException e)
{
// We don't expect to get interrupted, but even if, there is no need to handle this
// here.
}
} while (true);
notificationLog.error(String.format(MOVING_PATH_TO_REMOTE_FAILED_TEMPLATE,
getSrcPath(item), destinationDirectory));
}
}
......@@ -114,7 +114,7 @@ public final class FileStoreRemoteMounted extends FileStore
}
@Override
public void check() throws EnvironmentFailureException, ConfigurationFailureException
public final void check() throws EnvironmentFailureException, ConfigurationFailureException
{
localImpl.check();
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment