Skip to content
Snippets Groups Projects
Commit 9b88299c authored by brinn's avatar brinn
Browse files

[DMV-40] Avoid correctly copied incoming items to be copied again when deletion fails

fix: moving from incoming to ready-to-move when file / directory is not writable, permissions cannot be changed and incoming is remote

SVN: 18663
parent 303ef29d
No related branches found
No related tags found
No related merge requests found
...@@ -55,6 +55,10 @@ import ch.systemsx.cisd.datamover.utils.LocalBufferDirs; ...@@ -55,6 +55,10 @@ import ch.systemsx.cisd.datamover.utils.LocalBufferDirs;
public final class DataMover public final class DataMover
{ {
static final String STARTED_TRANSFER = "STARTED_TRANSFER";
static final String FINISHED_TRANSFER = "FINISHED_TRANSFER";
private static final Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, private static final Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION,
DataMover.class); DataMover.class);
...@@ -264,7 +268,7 @@ public final class DataMover ...@@ -264,7 +268,7 @@ public final class DataMover
FileStoreFactory.createLocal(sourceDirectory, "ready-to-move", factory, false); FileStoreFactory.createLocal(sourceDirectory, "ready-to-move", factory, false);
final IStoreHandler remoteStoreMover = final IStoreHandler remoteStoreMover =
wrapHandleWithLogging(createOutgoingPathMover(readyToMoveStore, outgoingStore), wrapHandleWithLogging(createOutgoingPathMover(readyToMoveStore, outgoingStore),
null, "FINISHED_TRANSFER"); null, FINISHED_TRANSFER, null);
final HighwaterMarkDirectoryScanningHandler directoryScanningHandler = final HighwaterMarkDirectoryScanningHandler directoryScanningHandler =
new HighwaterMarkDirectoryScanningHandler(new FaultyPathDirectoryScanningHandler( new HighwaterMarkDirectoryScanningHandler(new FaultyPathDirectoryScanningHandler(
sourceDirectory, remoteStoreMover), outgoingStore.getHighwaterMarkWatcher()); sourceDirectory, remoteStoreMover), outgoingStore.getHighwaterMarkWatcher());
...@@ -337,9 +341,12 @@ public final class DataMover ...@@ -337,9 +341,12 @@ public final class DataMover
* item name will be logged before item handling * item name will be logged before item handling
* @param prefixAfterOrNull if not <code>null</code> a message with this prefix and handled item * @param prefixAfterOrNull if not <code>null</code> a message with this prefix and handled item
* name will be logged after item handling * name will be logged after item handling
* @param prefixAfterFailureOnlyOrNull if not <code>null</code> a message with this prefix and handled item
* name will be logged after item handling, if the handling failed
*/ */
public final static IStoreHandler wrapHandleWithLogging(final IStoreHandler originalHandler, public final static IStoreHandler wrapHandleWithLogging(final IStoreHandler originalHandler,
final String prefixBeforeOrNull, final String prefixAfterOrNull) final String prefixBeforeOrNull, final String prefixAfterOrNull,
final String prefixAfterFailureOnlyOrNull)
{ {
return new IStoreHandler() return new IStoreHandler()
{ {
...@@ -355,6 +362,10 @@ public final class DataMover ...@@ -355,6 +362,10 @@ public final class DataMover
operationLog.info(prefixAfterOrNull + " " + item + ": " operationLog.info(prefixAfterOrNull + " " + item + ": "
+ (ok ? "OK" : "FAILED")); + (ok ? "OK" : "FAILED"));
} }
if (ok == false && prefixAfterFailureOnlyOrNull != null)
{
operationLog.info(prefixAfterFailureOnlyOrNull + " " + item + ": FAILED");
}
return ok; return ok;
} }
......
...@@ -75,6 +75,8 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory ...@@ -75,6 +75,8 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory
private final LocalBufferDirs bufferDirs; private final LocalBufferDirs bufferDirs;
private final IFileStore copyInProgressStore;
private final IFileStore incomingStore; private final IFileStore incomingStore;
private final String prefixForIncoming; private final String prefixForIncoming;
...@@ -120,6 +122,9 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory ...@@ -120,6 +122,9 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory
this.successorMarkerFileName = successorMarkerFileName; this.successorMarkerFileName = successorMarkerFileName;
this.prefixForIncoming = parameters.getPrefixForIncoming(); this.prefixForIncoming = parameters.getPrefixForIncoming();
this.incomingStore = parameters.getIncomingStore(factory); this.incomingStore = parameters.getIncomingStore(factory);
final File sourceDirectory = bufferDirs.getCopyInProgressDir();
this.copyInProgressStore =
FileStoreFactory.createLocal(sourceDirectory, "copy-in-progress", factory, false);
this.pathMover = factory.getMover(); this.pathMover = factory.getMover();
this.bufferDirs = bufferDirs; this.bufferDirs = bufferDirs;
this.storeItemFilter = createFilter(timeProvider); this.storeItemFilter = createFilter(timeProvider);
...@@ -148,11 +153,10 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory ...@@ -148,11 +153,10 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory
timeProvider, DatamoverConstants.IGNORED_ERROR_COUNT_BEFORE_NOTIFICATION); timeProvider, DatamoverConstants.IGNORED_ERROR_COUNT_BEFORE_NOTIFICATION);
} }
private IStoreHandler createRemotePathMover(final IFileStore sourceDirectory, private IStoreHandler createRemotePathMover(final IFileStore sourceStore,
final IFileStore destinationDirectory) final IFileStore destinationStore)
{ {
return RemoteMonitoredMoverFactory return RemoteMonitoredMoverFactory.create(sourceStore, destinationStore, parameters);
.create(sourceDirectory, destinationDirectory, parameters);
} }
public TimerTask createRecoverableTimerTask() public TimerTask createRecoverableTimerTask()
...@@ -167,7 +171,7 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory ...@@ -167,7 +171,7 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory
new HighwaterMarkWatcher(bufferDirs.getBufferDirHighwaterMark()); new HighwaterMarkWatcher(bufferDirs.getBufferDirHighwaterMark());
final IStoreHandler pathHandler = final IStoreHandler pathHandler =
DataMover.wrapHandleWithLogging(createIncomingMovingPathHandler(), DataMover.wrapHandleWithLogging(createIncomingMovingPathHandler(),
"STARTED_TRANSFER", null); DataMover.STARTED_TRANSFER, null, DataMover.FINISHED_TRANSFER);
final HighwaterMarkDirectoryScanningHandler directoryScanningHandler = final HighwaterMarkDirectoryScanningHandler directoryScanningHandler =
new HighwaterMarkDirectoryScanningHandler(new FaultyPathDirectoryScanningHandler( new HighwaterMarkDirectoryScanningHandler(new FaultyPathDirectoryScanningHandler(
copyInProgressDir, pathHandler), highwaterMarkWatcher, copyInProgressDir); copyInProgressDir, pathHandler), highwaterMarkWatcher, copyInProgressDir);
...@@ -226,23 +230,30 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory ...@@ -226,23 +230,30 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory
private boolean moveFromLocalIncoming(final IExtendedFileStore sourceStore, private boolean moveFromLocalIncoming(final IExtendedFileStore sourceStore,
final StoreItem sourceItem) final StoreItem sourceItem)
{ {
final File finalFile = sourceStore.tryMoveLocal(sourceItem, bufferDirs.getCopyCompleteDir(), final File finalFile =
parameters.getPrefixForIncoming()); sourceStore.tryMoveLocal(sourceItem, bufferDirs.getCopyCompleteDir(),
parameters.getPrefixForIncoming());
return (finalFile != null); return (finalFile != null);
} }
private boolean moveFromRemoteIncoming(final StoreItem sourceItem) private boolean moveFromRemoteIncoming(final StoreItem sourceItem)
{ {
// 1. move from incoming: copy, delete, create copy-finished-marker // 1. move from incoming: copy, delete, create copy-finished-marker
moveFromRemoteToLocal(sourceItem); final boolean succeeded = moveFromRemoteToLocal(sourceItem);
final File copiedFile = new File(bufferDirs.getCopyInProgressDir(), sourceItem.getName()); final File copiedFile = new File(bufferDirs.getCopyInProgressDir(), sourceItem.getName());
if (copiedFile.exists() == false)
final File markerFile = MarkerFile.createCopyFinishedMarker(copiedFile);
if (succeeded == false || copiedFile.exists() == false)
{ {
// undo copying and remove marker as are unable to delete it from the source
if (copiedFile.exists())
{
copyInProgressStore.delete(sourceItem);
}
markerFile.delete();
return false; return false;
} }
// 2. Move to final directory, delete marker // 2. Move to final directory, delete marker
final File markerFile = MarkerFile.createCopyFinishedMarker(copiedFile);
final File finalFile = final File finalFile =
tryMoveFromInProgressToFinished(copiedFile, markerFile, tryMoveFromInProgressToFinished(copiedFile, markerFile,
bufferDirs.getCopyCompleteDir()); bufferDirs.getCopyCompleteDir());
...@@ -259,7 +270,7 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory ...@@ -259,7 +270,7 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory
{ {
if (markerFileOrNull.exists() == false) if (markerFileOrNull.exists() == false)
{ {
operationLog.error("Could not find expected copy-finished-mrker file " operationLog.error("Could not find expected copy-finished-marker file "
+ markerFileOrNull.getAbsolutePath()); + markerFileOrNull.getAbsolutePath());
} else } else
{ {
...@@ -273,9 +284,9 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory ...@@ -273,9 +284,9 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory
} }
} }
private void moveFromRemoteToLocal(final StoreItem sourceItem) private boolean moveFromRemoteToLocal(final StoreItem sourceItem)
{ {
remotePathMover.handle(sourceItem); return remotePathMover.handle(sourceItem);
} }
private File tryMoveLocal(final File sourceFile, final File destinationDir, private File tryMoveLocal(final File sourceFile, final File destinationDir,
......
...@@ -88,9 +88,9 @@ public final class RemotePathMover implements IStoreHandler ...@@ -88,9 +88,9 @@ public final class RemotePathMover implements IStoreHandler
private static final ConditionalNotificationLogger conditionalLogger = private static final ConditionalNotificationLogger conditionalLogger =
new ConditionalNotificationLogger(machineLog, notificationLog, 3); new ConditionalNotificationLogger(machineLog, notificationLog, 3);
private final IFileStore sourceDirectory; private final IFileStore sourceStore;
private final IFileStore destinationDirectory; private final IFileStore destinationStore;
private final IStoreCopier copier; private final IStoreCopier copier;
...@@ -122,8 +122,8 @@ public final class RemotePathMover implements IStoreHandler ...@@ -122,8 +122,8 @@ public final class RemotePathMover implements IStoreHandler
assert sourceDirectory.tryAsExtended() != null assert sourceDirectory.tryAsExtended() != null
|| destinationDirectory.tryAsExtended() != null; || destinationDirectory.tryAsExtended() != null;
this.sourceDirectory = sourceDirectory; this.sourceStore = sourceDirectory;
this.destinationDirectory = destinationDirectory; this.destinationStore = destinationDirectory;
this.copier = copier; this.copier = copier;
this.intervallToWaitAfterFailure = timingParameters.getIntervalToWaitAfterFailure(); this.intervallToWaitAfterFailure = timingParameters.getIntervalToWaitAfterFailure();
this.maximalNumberOfRetries = timingParameters.getMaximalNumberOfRetries(); this.maximalNumberOfRetries = timingParameters.getMaximalNumberOfRetries();
...@@ -143,7 +143,7 @@ public final class RemotePathMover implements IStoreHandler ...@@ -143,7 +143,7 @@ public final class RemotePathMover implements IStoreHandler
{ {
final InactivityMonitor monitor = final InactivityMonitor monitor =
new InactivityMonitor( new InactivityMonitor(
new RemoteStoreCopyActivitySensor(destinationDirectory, item), new RemoteStoreCopyActivitySensor(destinationStore, item),
new IInactivityObserver() new IInactivityObserver()
{ {
public void update(long inactiveSinceMillis, public void update(long inactiveSinceMillis,
...@@ -160,10 +160,11 @@ public final class RemotePathMover implements IStoreHandler ...@@ -160,10 +160,11 @@ public final class RemotePathMover implements IStoreHandler
return copyStatus; return copyStatus;
} }
private final void removeAndMark(final StoreItem item) private final boolean removeAndMark(final StoreItem item)
{ {
remove(item); final boolean removed = remove(item);
markAsFinished(item); markAsFinished(item);
return removed;
} }
private final boolean checkTargetAvailableAgain() private final boolean checkTargetAvailableAgain()
...@@ -173,7 +174,7 @@ public final class RemotePathMover implements IStoreHandler ...@@ -173,7 +174,7 @@ public final class RemotePathMover implements IStoreHandler
{ {
conditionalLogger.reset(String.format( conditionalLogger.reset(String.format(
"Following store '%s' is again fully accessible to the program.", "Following store '%s' is again fully accessible to the program.",
destinationDirectory)); destinationStore));
return true; return true;
} else } else
{ {
...@@ -184,7 +185,7 @@ public final class RemotePathMover implements IStoreHandler ...@@ -184,7 +185,7 @@ public final class RemotePathMover implements IStoreHandler
private BooleanStatus checkTargetAvailable() private BooleanStatus checkTargetAvailable()
{ {
final BooleanStatus status = final BooleanStatus status =
destinationDirectory destinationStore
.checkDirectoryFullyAccessible(Constants.MILLIS_TO_WAIT_BEFORE_TIMEOUT); .checkDirectoryFullyAccessible(Constants.MILLIS_TO_WAIT_BEFORE_TIMEOUT);
if (status.isSuccess() == false) if (status.isSuccess() == false)
{ {
...@@ -193,20 +194,22 @@ public final class RemotePathMover implements IStoreHandler ...@@ -193,20 +194,22 @@ public final class RemotePathMover implements IStoreHandler
return status; return status;
} }
private final void remove(final StoreItem sourceItem) private final boolean remove(final StoreItem sourceItem)
{ {
final StoreItem removalInProgressMarkerFile = tryMarkAsDeletionInProgress(sourceItem); final StoreItem removalInProgressMarkerFile = tryMarkAsDeletionInProgress(sourceItem);
final Status removalStatus = sourceDirectory.delete(sourceItem); final Status removalStatus = sourceStore.delete(sourceItem);
removeDeletionMarkerFile(removalInProgressMarkerFile); removeDeletionMarkerFile(removalInProgressMarkerFile);
if (Status.OK.equals(removalStatus) == false) if (Status.OK.equals(removalStatus) == false)
{ {
notificationLog.error(String.format(REMOVING_LOCAL_PATH_FAILED_TEMPLATE, notificationLog.error(String.format(REMOVING_LOCAL_PATH_FAILED_TEMPLATE,
getSrcPath(sourceItem), removalStatus)); getSrcPath(sourceItem), removalStatus));
return false;
} else if (operationLog.isInfoEnabled()) } else if (operationLog.isInfoEnabled())
{ {
operationLog.info(String.format(REMOVED_PATH_TEMPLATE, getSrcPath(sourceItem))); operationLog.info(String.format(REMOVED_PATH_TEMPLATE, getSrcPath(sourceItem)));
} }
return true;
} }
private final boolean isDeletionInProgress(final StoreItem item) private final boolean isDeletionInProgress(final StoreItem item)
...@@ -241,17 +244,17 @@ public final class RemotePathMover implements IStoreHandler ...@@ -241,17 +244,17 @@ public final class RemotePathMover implements IStoreHandler
if (status.equals(Status.OK) == false) if (status.equals(Status.OK) == false)
{ {
machineLog.error(String.format("Cannot remove marker file '%s'", getPath( machineLog.error(String.format("Cannot remove marker file '%s'", getPath(
destinationDirectory, markerOrNull))); destinationStore, markerOrNull)));
} }
} }
} }
private final IExtendedFileStore getDeletionMarkerStore() private final IExtendedFileStore getDeletionMarkerStore()
{ {
IExtendedFileStore fileStore = destinationDirectory.tryAsExtended(); IExtendedFileStore fileStore = destinationStore.tryAsExtended();
if (fileStore == null) if (fileStore == null)
{ {
fileStore = sourceDirectory.tryAsExtended(); fileStore = sourceStore.tryAsExtended();
} }
assert fileStore != null; assert fileStore != null;
return fileStore; return fileStore;
...@@ -261,7 +264,7 @@ public final class RemotePathMover implements IStoreHandler ...@@ -261,7 +264,7 @@ public final class RemotePathMover implements IStoreHandler
private final boolean markAsFinished(final StoreItem item) private final boolean markAsFinished(final StoreItem item)
{ {
final StoreItem markerItem = MarkerFile.createCopyFinishedMarker(item); final StoreItem markerItem = MarkerFile.createCopyFinishedMarker(item);
IExtendedFileStore extendedFileStore = destinationDirectory.tryAsExtended(); IExtendedFileStore extendedFileStore = destinationStore.tryAsExtended();
if (extendedFileStore != null) if (extendedFileStore != null)
{ {
// We create the marker directly inside the destination directory // We create the marker directly inside the destination directory
...@@ -270,7 +273,7 @@ public final class RemotePathMover implements IStoreHandler ...@@ -270,7 +273,7 @@ public final class RemotePathMover implements IStoreHandler
{ {
// When destination is remote, we put the item directory in the source directory and // When destination is remote, we put the item directory in the source directory and
// copy it to destination. // copy it to destination.
extendedFileStore = sourceDirectory.tryAsExtended(); extendedFileStore = sourceStore.tryAsExtended();
assert extendedFileStore != null; assert extendedFileStore != null;
return markOnSourceLocalAndCopyToRemoteDestination(extendedFileStore, markerItem); return markOnSourceLocalAndCopyToRemoteDestination(extendedFileStore, markerItem);
} }
...@@ -314,7 +317,7 @@ public final class RemotePathMover implements IStoreHandler ...@@ -314,7 +317,7 @@ public final class RemotePathMover implements IStoreHandler
private String getSrcPath(final StoreItem item) private String getSrcPath(final StoreItem item)
{ {
return getPath(sourceDirectory, item); return getPath(sourceStore, item);
} }
private final static String getPath(final IFileStore directory, final StoreItem item) private final static String getPath(final IFileStore directory, final StoreItem item)
...@@ -337,8 +340,7 @@ public final class RemotePathMover implements IStoreHandler ...@@ -337,8 +340,7 @@ public final class RemotePathMover implements IStoreHandler
operationLog.info(String.format("Detected recovery situation: '%s' has been " operationLog.info(String.format("Detected recovery situation: '%s' has been "
+ "interrupted in deletion phase, finishing up.", getSrcPath(item))); + "interrupted in deletion phase, finishing up.", getSrcPath(item)));
} }
removeAndMark(item); return removeAndMark(item);
return true;
} }
int tryCount = 0; int tryCount = 0;
do do
...@@ -353,11 +355,11 @@ public final class RemotePathMover implements IStoreHandler ...@@ -353,11 +355,11 @@ public final class RemotePathMover implements IStoreHandler
if (tryCount > 0) // This is a retry if (tryCount > 0) // This is a retry
{ {
operationLog.info(String.format(START_COPYING_PATH_RETRY_TEMPLATE, operationLog.info(String.format(START_COPYING_PATH_RETRY_TEMPLATE,
getSrcPath(item), destinationDirectory, tryCount)); getSrcPath(item), destinationStore, tryCount));
} else } else
{ {
operationLog.info(String.format(START_COPYING_PATH_TEMPLATE, getSrcPath(item), operationLog.info(String.format(START_COPYING_PATH_TEMPLATE, getSrcPath(item),
destinationDirectory)); destinationStore));
} }
} }
if (checkTargetAvailableAgain() == false) if (checkTargetAvailableAgain() == false)
...@@ -372,14 +374,13 @@ public final class RemotePathMover implements IStoreHandler ...@@ -372,14 +374,13 @@ public final class RemotePathMover implements IStoreHandler
{ {
final long endTime = System.currentTimeMillis(); final long endTime = System.currentTimeMillis();
operationLog.info(String.format(FINISH_COPYING_PATH_TEMPLATE, getSrcPath(item), operationLog.info(String.format(FINISH_COPYING_PATH_TEMPLATE, getSrcPath(item),
destinationDirectory, (endTime - startTime) / 1000.0)); destinationStore, (endTime - startTime) / 1000.0));
} }
removeAndMark(item); return removeAndMark(item);
return true;
} else } else
{ {
operationLog.warn(String.format(COPYING_PATH_TO_REMOTE_FAILED, getSrcPath(item), operationLog.warn(String.format(COPYING_PATH_TO_REMOTE_FAILED, getSrcPath(item),
destinationDirectory, copyStatus)); destinationStore, copyStatus));
if (StatusFlag.ERROR.equals(copyStatus.getFlag())) if (StatusFlag.ERROR.equals(copyStatus.getFlag()))
{ {
break; break;
...@@ -403,11 +404,11 @@ public final class RemotePathMover implements IStoreHandler ...@@ -403,11 +404,11 @@ public final class RemotePathMover implements IStoreHandler
if (stopped) if (stopped)
{ {
operationLog.warn(String.format(MOVING_PATH_TO_REMOTE_STOPPED_TEMPLATE, operationLog.warn(String.format(MOVING_PATH_TO_REMOTE_STOPPED_TEMPLATE,
getSrcPath(item), destinationDirectory)); getSrcPath(item), destinationStore));
} else } else
{ {
notificationLog.error(String.format(MOVING_PATH_TO_REMOTE_FAILED_TEMPLATE, notificationLog.error(String.format(MOVING_PATH_TO_REMOTE_FAILED_TEMPLATE,
getSrcPath(item), destinationDirectory)); getSrcPath(item), destinationStore));
} }
return false; return false;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment