diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java index a5d8e01637960967d92372083c27dd4d2b748823..4f77930ef25f60e9f94e13b222090f1329aeda78 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java @@ -55,6 +55,10 @@ import ch.systemsx.cisd.datamover.utils.LocalBufferDirs; public final class DataMover { + static final String STARTED_TRANSFER = "STARTED_TRANSFER"; + + static final String FINISHED_TRANSFER = "FINISHED_TRANSFER"; + private static final Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, DataMover.class); @@ -264,7 +268,7 @@ public final class DataMover FileStoreFactory.createLocal(sourceDirectory, "ready-to-move", factory, false); final IStoreHandler remoteStoreMover = wrapHandleWithLogging(createOutgoingPathMover(readyToMoveStore, outgoingStore), - null, "FINISHED_TRANSFER"); + null, FINISHED_TRANSFER, null); final HighwaterMarkDirectoryScanningHandler directoryScanningHandler = new HighwaterMarkDirectoryScanningHandler(new FaultyPathDirectoryScanningHandler( sourceDirectory, remoteStoreMover), outgoingStore.getHighwaterMarkWatcher()); @@ -337,9 +341,12 @@ public final class DataMover * item name will be logged before item handling * @param prefixAfterOrNull if not <code>null</code> a message with this prefix and handled item * name will be logged after item handling + * @param prefixAfterFailureOnlyOrNull if not <code>null</code> a message with this prefix and handled item + * name will be logged after item handling, if the handling failed */ public final static IStoreHandler wrapHandleWithLogging(final IStoreHandler originalHandler, - final String prefixBeforeOrNull, final String prefixAfterOrNull) + final String prefixBeforeOrNull, final String prefixAfterOrNull, + final String prefixAfterFailureOnlyOrNull) { return new IStoreHandler() { @@ -355,6 +362,10 @@ public final class DataMover operationLog.info(prefixAfterOrNull + " " + item + ": " + (ok ? "OK" : "FAILED")); } + if (ok == false && prefixAfterFailureOnlyOrNull != null) + { + operationLog.info(prefixAfterFailureOnlyOrNull + " " + item + ": FAILED"); + } return ok; } diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java b/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java index 44d8a033ed55d15fb569bb1ba272ea496e09d7f3..44966f604cc643b045641a6550b755a37c4fef2e 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java @@ -75,6 +75,8 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory private final LocalBufferDirs bufferDirs; + private final IFileStore copyInProgressStore; + private final IFileStore incomingStore; private final String prefixForIncoming; @@ -120,6 +122,9 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory this.successorMarkerFileName = successorMarkerFileName; this.prefixForIncoming = parameters.getPrefixForIncoming(); this.incomingStore = parameters.getIncomingStore(factory); + final File sourceDirectory = bufferDirs.getCopyInProgressDir(); + this.copyInProgressStore = + FileStoreFactory.createLocal(sourceDirectory, "copy-in-progress", factory, false); this.pathMover = factory.getMover(); this.bufferDirs = bufferDirs; this.storeItemFilter = createFilter(timeProvider); @@ -148,11 +153,10 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory timeProvider, DatamoverConstants.IGNORED_ERROR_COUNT_BEFORE_NOTIFICATION); } - private IStoreHandler createRemotePathMover(final IFileStore sourceDirectory, - final IFileStore destinationDirectory) + private IStoreHandler createRemotePathMover(final IFileStore sourceStore, + final IFileStore destinationStore) { - return RemoteMonitoredMoverFactory - .create(sourceDirectory, destinationDirectory, parameters); + return RemoteMonitoredMoverFactory.create(sourceStore, destinationStore, parameters); } public TimerTask createRecoverableTimerTask() @@ -167,7 +171,7 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory new HighwaterMarkWatcher(bufferDirs.getBufferDirHighwaterMark()); final IStoreHandler pathHandler = DataMover.wrapHandleWithLogging(createIncomingMovingPathHandler(), - "STARTED_TRANSFER", null); + DataMover.STARTED_TRANSFER, null, DataMover.FINISHED_TRANSFER); final HighwaterMarkDirectoryScanningHandler directoryScanningHandler = new HighwaterMarkDirectoryScanningHandler(new FaultyPathDirectoryScanningHandler( copyInProgressDir, pathHandler), highwaterMarkWatcher, copyInProgressDir); @@ -226,23 +230,30 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory private boolean moveFromLocalIncoming(final IExtendedFileStore sourceStore, final StoreItem sourceItem) { - final File finalFile = sourceStore.tryMoveLocal(sourceItem, bufferDirs.getCopyCompleteDir(), - parameters.getPrefixForIncoming()); + final File finalFile = + sourceStore.tryMoveLocal(sourceItem, bufferDirs.getCopyCompleteDir(), + parameters.getPrefixForIncoming()); return (finalFile != null); } private boolean moveFromRemoteIncoming(final StoreItem sourceItem) { // 1. move from incoming: copy, delete, create copy-finished-marker - moveFromRemoteToLocal(sourceItem); + final boolean succeeded = moveFromRemoteToLocal(sourceItem); final File copiedFile = new File(bufferDirs.getCopyInProgressDir(), sourceItem.getName()); - if (copiedFile.exists() == false) + + final File markerFile = MarkerFile.createCopyFinishedMarker(copiedFile); + if (succeeded == false || copiedFile.exists() == false) { + // undo copying and remove marker as are unable to delete it from the source + if (copiedFile.exists()) + { + copyInProgressStore.delete(sourceItem); + } + markerFile.delete(); return false; } - // 2. Move to final directory, delete marker - final File markerFile = MarkerFile.createCopyFinishedMarker(copiedFile); final File finalFile = tryMoveFromInProgressToFinished(copiedFile, markerFile, bufferDirs.getCopyCompleteDir()); @@ -259,7 +270,7 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory { if (markerFileOrNull.exists() == false) { - operationLog.error("Could not find expected copy-finished-mrker file " + operationLog.error("Could not find expected copy-finished-marker file " + markerFileOrNull.getAbsolutePath()); } else { @@ -273,9 +284,9 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory } } - private void moveFromRemoteToLocal(final StoreItem sourceItem) + private boolean moveFromRemoteToLocal(final StoreItem sourceItem) { - remotePathMover.handle(sourceItem); + return remotePathMover.handle(sourceItem); } private File tryMoveLocal(final File sourceFile, final File destinationDir, diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/RemotePathMover.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/RemotePathMover.java index 4d6bad162ac9457bfa71e08f06f28305a7953ef7..fe1c7b6fedf2dc1f76ba6e44e30772a6b2c1709a 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/RemotePathMover.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/RemotePathMover.java @@ -88,9 +88,9 @@ public final class RemotePathMover implements IStoreHandler private static final ConditionalNotificationLogger conditionalLogger = new ConditionalNotificationLogger(machineLog, notificationLog, 3); - private final IFileStore sourceDirectory; + private final IFileStore sourceStore; - private final IFileStore destinationDirectory; + private final IFileStore destinationStore; private final IStoreCopier copier; @@ -122,8 +122,8 @@ public final class RemotePathMover implements IStoreHandler assert sourceDirectory.tryAsExtended() != null || destinationDirectory.tryAsExtended() != null; - this.sourceDirectory = sourceDirectory; - this.destinationDirectory = destinationDirectory; + this.sourceStore = sourceDirectory; + this.destinationStore = destinationDirectory; this.copier = copier; this.intervallToWaitAfterFailure = timingParameters.getIntervalToWaitAfterFailure(); this.maximalNumberOfRetries = timingParameters.getMaximalNumberOfRetries(); @@ -143,7 +143,7 @@ public final class RemotePathMover implements IStoreHandler { final InactivityMonitor monitor = new InactivityMonitor( - new RemoteStoreCopyActivitySensor(destinationDirectory, item), + new RemoteStoreCopyActivitySensor(destinationStore, item), new IInactivityObserver() { public void update(long inactiveSinceMillis, @@ -160,10 +160,11 @@ public final class RemotePathMover implements IStoreHandler return copyStatus; } - private final void removeAndMark(final StoreItem item) + private final boolean removeAndMark(final StoreItem item) { - remove(item); + final boolean removed = remove(item); markAsFinished(item); + return removed; } private final boolean checkTargetAvailableAgain() @@ -173,7 +174,7 @@ public final class RemotePathMover implements IStoreHandler { conditionalLogger.reset(String.format( "Following store '%s' is again fully accessible to the program.", - destinationDirectory)); + destinationStore)); return true; } else { @@ -184,7 +185,7 @@ public final class RemotePathMover implements IStoreHandler private BooleanStatus checkTargetAvailable() { final BooleanStatus status = - destinationDirectory + destinationStore .checkDirectoryFullyAccessible(Constants.MILLIS_TO_WAIT_BEFORE_TIMEOUT); if (status.isSuccess() == false) { @@ -193,20 +194,22 @@ public final class RemotePathMover implements IStoreHandler return status; } - private final void remove(final StoreItem sourceItem) + private final boolean remove(final StoreItem sourceItem) { final StoreItem removalInProgressMarkerFile = tryMarkAsDeletionInProgress(sourceItem); - final Status removalStatus = sourceDirectory.delete(sourceItem); + final Status removalStatus = sourceStore.delete(sourceItem); removeDeletionMarkerFile(removalInProgressMarkerFile); if (Status.OK.equals(removalStatus) == false) { notificationLog.error(String.format(REMOVING_LOCAL_PATH_FAILED_TEMPLATE, getSrcPath(sourceItem), removalStatus)); + return false; } else if (operationLog.isInfoEnabled()) { operationLog.info(String.format(REMOVED_PATH_TEMPLATE, getSrcPath(sourceItem))); } + return true; } private final boolean isDeletionInProgress(final StoreItem item) @@ -241,17 +244,17 @@ public final class RemotePathMover implements IStoreHandler if (status.equals(Status.OK) == false) { machineLog.error(String.format("Cannot remove marker file '%s'", getPath( - destinationDirectory, markerOrNull))); + destinationStore, markerOrNull))); } } } private final IExtendedFileStore getDeletionMarkerStore() { - IExtendedFileStore fileStore = destinationDirectory.tryAsExtended(); + IExtendedFileStore fileStore = destinationStore.tryAsExtended(); if (fileStore == null) { - fileStore = sourceDirectory.tryAsExtended(); + fileStore = sourceStore.tryAsExtended(); } assert fileStore != null; return fileStore; @@ -261,7 +264,7 @@ public final class RemotePathMover implements IStoreHandler private final boolean markAsFinished(final StoreItem item) { final StoreItem markerItem = MarkerFile.createCopyFinishedMarker(item); - IExtendedFileStore extendedFileStore = destinationDirectory.tryAsExtended(); + IExtendedFileStore extendedFileStore = destinationStore.tryAsExtended(); if (extendedFileStore != null) { // We create the marker directly inside the destination directory @@ -270,7 +273,7 @@ public final class RemotePathMover implements IStoreHandler { // When destination is remote, we put the item directory in the source directory and // copy it to destination. - extendedFileStore = sourceDirectory.tryAsExtended(); + extendedFileStore = sourceStore.tryAsExtended(); assert extendedFileStore != null; return markOnSourceLocalAndCopyToRemoteDestination(extendedFileStore, markerItem); } @@ -314,7 +317,7 @@ public final class RemotePathMover implements IStoreHandler private String getSrcPath(final StoreItem item) { - return getPath(sourceDirectory, item); + return getPath(sourceStore, item); } private final static String getPath(final IFileStore directory, final StoreItem item) @@ -337,8 +340,7 @@ public final class RemotePathMover implements IStoreHandler operationLog.info(String.format("Detected recovery situation: '%s' has been " + "interrupted in deletion phase, finishing up.", getSrcPath(item))); } - removeAndMark(item); - return true; + return removeAndMark(item); } int tryCount = 0; do @@ -353,11 +355,11 @@ public final class RemotePathMover implements IStoreHandler if (tryCount > 0) // This is a retry { operationLog.info(String.format(START_COPYING_PATH_RETRY_TEMPLATE, - getSrcPath(item), destinationDirectory, tryCount)); + getSrcPath(item), destinationStore, tryCount)); } else { operationLog.info(String.format(START_COPYING_PATH_TEMPLATE, getSrcPath(item), - destinationDirectory)); + destinationStore)); } } if (checkTargetAvailableAgain() == false) @@ -372,14 +374,13 @@ public final class RemotePathMover implements IStoreHandler { final long endTime = System.currentTimeMillis(); operationLog.info(String.format(FINISH_COPYING_PATH_TEMPLATE, getSrcPath(item), - destinationDirectory, (endTime - startTime) / 1000.0)); + destinationStore, (endTime - startTime) / 1000.0)); } - removeAndMark(item); - return true; + return removeAndMark(item); } else { operationLog.warn(String.format(COPYING_PATH_TO_REMOTE_FAILED, getSrcPath(item), - destinationDirectory, copyStatus)); + destinationStore, copyStatus)); if (StatusFlag.ERROR.equals(copyStatus.getFlag())) { break; @@ -403,11 +404,11 @@ public final class RemotePathMover implements IStoreHandler if (stopped) { operationLog.warn(String.format(MOVING_PATH_TO_REMOTE_STOPPED_TEMPLATE, - getSrcPath(item), destinationDirectory)); + getSrcPath(item), destinationStore)); } else { notificationLog.error(String.format(MOVING_PATH_TO_REMOTE_FAILED_TEMPLATE, - getSrcPath(item), destinationDirectory)); + getSrcPath(item), destinationStore)); } return false; }