From 3bf3c93e4b5fc49f590562bf68a468464c0bd827 Mon Sep 17 00:00:00 2001 From: brinn <brinn> Date: Fri, 12 Nov 2010 11:10:01 +0000 Subject: [PATCH] [DMV-41] Property 'transfer-finished-executable' should not run the executable if the transfer failed SVN: 18652 --- .../cisd/common/filesystem/IStoreHandler.java | 7 +- .../common/filesystem/PathHandlerAdapter.java | 6 +- .../ch/systemsx/cisd/datamover/DataMover.java | 70 ++++++++++--------- .../cisd/datamover/IncomingProcessor.java | 34 +++++---- .../filesystem/intf/IExtendedFileStore.java | 4 ++ .../filesystem/remote/RemotePathMover.java | 9 +-- 6 files changed, 74 insertions(+), 56 deletions(-) diff --git a/common/source/java/ch/systemsx/cisd/common/filesystem/IStoreHandler.java b/common/source/java/ch/systemsx/cisd/common/filesystem/IStoreHandler.java index 708d265dd18..eab14cff96a 100644 --- a/common/source/java/ch/systemsx/cisd/common/filesystem/IStoreHandler.java +++ b/common/source/java/ch/systemsx/cisd/common/filesystem/IStoreHandler.java @@ -31,8 +31,9 @@ import ch.systemsx.cisd.common.utilities.IStopSignaler; public interface IStoreHandler extends IStopSignaler { /** - * Handles given <var>item</var>. Successful handling is indicated by <var>item</var> being gone - * when the method returns. + * Handles given <var>item</var>. + * + * @return <code>true</code> if the handling of the item was successful. */ - void handle(StoreItem item); + boolean handle(StoreItem item); } diff --git a/common/source/java/ch/systemsx/cisd/common/filesystem/PathHandlerAdapter.java b/common/source/java/ch/systemsx/cisd/common/filesystem/PathHandlerAdapter.java index 0857ba4586f..daa17788cde 100644 --- a/common/source/java/ch/systemsx/cisd/common/filesystem/PathHandlerAdapter.java +++ b/common/source/java/ch/systemsx/cisd/common/filesystem/PathHandlerAdapter.java @@ -49,9 +49,11 @@ public class PathHandlerAdapter implements IStoreHandler // IStoreHandler // - public final void handle(final StoreItem item) + public final boolean handle(final StoreItem item) { - pathHandler.handle(asFile(item)); + final File file = asFile(item); + pathHandler.handle(file); + return file.exists() == false; } public boolean isStopped() diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java index d5c2305dabb..a5d8e016379 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java @@ -55,11 +55,11 @@ import ch.systemsx.cisd.datamover.utils.LocalBufferDirs; public final class DataMover { - private static final Logger operationLog = - LogFactory.getLogger(LogCategory.OPERATION, DataMover.class); + private static final Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, + DataMover.class); - private static final Logger machineLog = - LogFactory.getLogger(LogCategory.MACHINE, DataMover.class); + private static final Logger machineLog = LogFactory.getLogger(LogCategory.MACHINE, + DataMover.class); private final static String LOCAL_COPY_IN_PROGRESS_DIR = "copy-in-progress"; @@ -78,49 +78,49 @@ public final class DataMover @Private static final String PROCESS_MARKER_PREFIX = Constants.MARKER_PREFIX + "thread_"; - private static final String PROCESSING_MARKER_TEMPLATE = - PROCESS_MARKER_PREFIX + "%s_processing"; + private static final String PROCESSING_MARKER_TEMPLATE = PROCESS_MARKER_PREFIX + + "%s_processing"; private static final String ERROR_MARKER_TEMPLATE = PROCESS_MARKER_PREFIX + "%s_error"; @Private - static final String INCOMING_PROCESS_MARKER_FILENAME = - String.format(PROCESSING_MARKER_TEMPLATE, "incoming"); + static final String INCOMING_PROCESS_MARKER_FILENAME = String.format( + PROCESSING_MARKER_TEMPLATE, "incoming"); @Private - static final String OUTGOING_PROCESS_MARKER_FILENAME = - String.format(PROCESSING_MARKER_TEMPLATE, "outgoing"); + static final String OUTGOING_PROCESS_MARKER_FILENAME = String.format( + PROCESSING_MARKER_TEMPLATE, "outgoing"); @Private - static final String LOCAL_PROCESS_MARKER_FILENAME = - String.format(PROCESSING_MARKER_TEMPLATE, "local"); + static final String LOCAL_PROCESS_MARKER_FILENAME = String.format(PROCESSING_MARKER_TEMPLATE, + "local"); @Private - static final String INCOMING_ERROR_MARKER_FILENAME = - String.format(ERROR_MARKER_TEMPLATE, "incoming"); + static final String INCOMING_ERROR_MARKER_FILENAME = String.format(ERROR_MARKER_TEMPLATE, + "incoming"); @Private - static final String OUTGOING_ERROR_MARKER_FILENAME = - String.format(ERROR_MARKER_TEMPLATE, "outgoing"); + static final String OUTGOING_ERROR_MARKER_FILENAME = String.format(ERROR_MARKER_TEMPLATE, + "outgoing"); @Private static final String LOCAL_ERROR_MARKER_FILENAME = String.format(ERROR_MARKER_TEMPLATE, "local"); @Private - static final String RECOVERY_PROCESS_MARKER_FILENAME = - String.format(PROCESSING_MARKER_TEMPLATE, "recovery"); + static final String RECOVERY_PROCESS_MARKER_FILENAME = String.format( + PROCESSING_MARKER_TEMPLATE, "recovery"); /** * This marker file indicates that we are in a <i>shutdown</i> mode, started by the program. */ - static final String SHUTDOWN_PROCESS_MARKER_FILENAME = - String.format(PROCESSING_MARKER_TEMPLATE, "shutdown"); + static final String SHUTDOWN_PROCESS_MARKER_FILENAME = String.format( + PROCESSING_MARKER_TEMPLATE, "shutdown"); private static final String[] PROCESS_MARKER_FILENAMES = - { INCOMING_PROCESS_MARKER_FILENAME, OUTGOING_PROCESS_MARKER_FILENAME, - LOCAL_PROCESS_MARKER_FILENAME, INCOMING_ERROR_MARKER_FILENAME, - OUTGOING_ERROR_MARKER_FILENAME, LOCAL_ERROR_MARKER_FILENAME, - RECOVERY_PROCESS_MARKER_FILENAME, SHUTDOWN_PROCESS_MARKER_FILENAME }; + { INCOMING_PROCESS_MARKER_FILENAME, OUTGOING_PROCESS_MARKER_FILENAME, + LOCAL_PROCESS_MARKER_FILENAME, INCOMING_ERROR_MARKER_FILENAME, + OUTGOING_ERROR_MARKER_FILENAME, LOCAL_ERROR_MARKER_FILENAME, + RECOVERY_PROCESS_MARKER_FILENAME, SHUTDOWN_PROCESS_MARKER_FILENAME }; private final Parameters parameters; @@ -240,8 +240,8 @@ public final class DataMover private final DataMoverProcess createLocalProcess() { final LocalProcessor localProcessor = - new LocalProcessor(parameters, bufferDirs, factory.getImmutableCopier(), factory - .getMover()); + new LocalProcessor(parameters, bufferDirs, factory.getImmutableCopier(), + factory.getMover()); final File sourceDirectory = bufferDirs.getCopyCompleteDir(); final DirectoryScanningTimerTask localProcessingTask = new DirectoryScanningTimerTask(sourceDirectory, FileUtilities.ACCEPT_ALL_FILTER, @@ -296,10 +296,14 @@ public final class DataMover return new IStoreHandler() { - public void handle(StoreItem item) + public boolean handle(StoreItem item) { - moveHandler.handle(item); - callScript(transferFinishedExecutable, item); + final boolean ok = moveHandler.handle(item); + if (ok) + { + callScript(transferFinishedExecutable, item); + } + return ok; } public boolean isStopped() @@ -339,17 +343,19 @@ public final class DataMover { return new IStoreHandler() { - public void handle(StoreItem item) + public boolean handle(StoreItem item) { if (prefixBeforeOrNull != null) { operationLog.info(prefixBeforeOrNull + " " + item); } - originalHandler.handle(item); + final boolean ok = originalHandler.handle(item); if (prefixAfterOrNull != null) { - operationLog.info(prefixAfterOrNull + " " + item); + operationLog.info(prefixAfterOrNull + " " + item + ": " + + (ok ? "OK" : "FAILED")); } + return ok; } public boolean isStopped() diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java b/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java index 5427f392360..44d8a033ed5 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java @@ -64,8 +64,8 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory */ private final static int NUMBER_OF_ERRORS_IN_LISTING_IGNORED = 2; - private static final Logger operationLog = - LogFactory.getLogger(LogCategory.OPERATION, IncomingProcessor.class); + private static final Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, + IncomingProcessor.class); private static final ISimpleLogger simpleOperationLog = new Log4jSimpleLogger(operationLog); @@ -124,8 +124,8 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory this.bufferDirs = bufferDirs; this.storeItemFilter = createFilter(timeProvider); this.remotePathMover = - createRemotePathMover(incomingStore, FileStoreFactory.createLocal(bufferDirs - .getCopyInProgressDir(), "local", factory, false)); + createRemotePathMover(incomingStore, FileStoreFactory.createLocal( + bufferDirs.getCopyInProgressDir(), "local", factory, false)); } @@ -203,15 +203,15 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory // IStoreHandler // - public final void handle(final StoreItem sourceItem) + public final boolean handle(final StoreItem sourceItem) { final IExtendedFileStore extendedFileStore = incomingStore.tryAsExtended(); if (extendedFileStore == null) { - moveFromRemoteIncoming(sourceItem); + return moveFromRemoteIncoming(sourceItem); } else { - moveFromLocalIncoming(extendedFileStore, sourceItem); + return moveFromLocalIncoming(extendedFileStore, sourceItem); } } @@ -223,26 +223,30 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory }; } - private void moveFromLocalIncoming(final IExtendedFileStore sourceStore, + private boolean moveFromLocalIncoming(final IExtendedFileStore sourceStore, final StoreItem sourceItem) { - sourceStore.tryMoveLocal(sourceItem, bufferDirs.getCopyCompleteDir(), parameters - .getPrefixForIncoming()); + final File finalFile = sourceStore.tryMoveLocal(sourceItem, bufferDirs.getCopyCompleteDir(), + parameters.getPrefixForIncoming()); + return (finalFile != null); } - private void moveFromRemoteIncoming(final StoreItem sourceItem) + private boolean moveFromRemoteIncoming(final StoreItem sourceItem) { // 1. move from incoming: copy, delete, create copy-finished-marker moveFromRemoteToLocal(sourceItem); final File copiedFile = new File(bufferDirs.getCopyInProgressDir(), sourceItem.getName()); if (copiedFile.exists() == false) { - return; + return false; } // 2. Move to final directory, delete marker final File markerFile = MarkerFile.createCopyFinishedMarker(copiedFile); - tryMoveFromInProgressToFinished(copiedFile, markerFile, bufferDirs.getCopyCompleteDir()); + final File finalFile = + tryMoveFromInProgressToFinished(copiedFile, markerFile, + bufferDirs.getCopyCompleteDir()); + return (finalFile != null); } private File tryMoveFromInProgressToFinished(final File copiedFile, @@ -359,8 +363,8 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory { operationLog.debug("Recovery starts."); } - recoverIncomingInProgress(bufferDirs.getCopyInProgressDir(), bufferDirs - .getCopyCompleteDir()); + recoverIncomingInProgress(bufferDirs.getCopyInProgressDir(), + bufferDirs.getCopyCompleteDir()); if (operationLog.isDebugEnabled()) { operationLog.debug("Recovery is finished."); diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/IExtendedFileStore.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/IExtendedFileStore.java index 367280637bb..a95cede91eb 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/IExtendedFileStore.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/intf/IExtendedFileStore.java @@ -30,6 +30,10 @@ public interface IExtendedFileStore extends IFileStore public boolean createNewFile(StoreItem item); + /** + * @return the target file of the move, or <code>null</code> if the operation fails. + */ + public File tryMoveLocal(StoreItem sourceItem, File destinationDir, String newFilePrefix); } \ No newline at end of file diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/RemotePathMover.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/RemotePathMover.java index c6fe2d60baf..4d6bad162ac 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/RemotePathMover.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/RemotePathMover.java @@ -326,7 +326,7 @@ public final class RemotePathMover implements IStoreHandler // IStoreHandler // - public final void handle(final StoreItem item) + public final boolean handle(final StoreItem item) { if (isDeletionInProgress(item)) { @@ -338,7 +338,7 @@ public final class RemotePathMover implements IStoreHandler + "interrupted in deletion phase, finishing up.", getSrcPath(item))); } removeAndMark(item); - return; + return true; } int tryCount = 0; do @@ -362,7 +362,7 @@ public final class RemotePathMover implements IStoreHandler } if (checkTargetAvailableAgain() == false) { - return; + return true; } final long startTime = System.currentTimeMillis(); final Status copyStatus = copyAndMonitor(item); @@ -375,7 +375,7 @@ public final class RemotePathMover implements IStoreHandler destinationDirectory, (endTime - startTime) / 1000.0)); } removeAndMark(item); - return; + return true; } else { operationLog.warn(String.format(COPYING_PATH_TO_REMOTE_FAILED, getSrcPath(item), @@ -409,6 +409,7 @@ public final class RemotePathMover implements IStoreHandler notificationLog.error(String.format(MOVING_PATH_TO_REMOTE_FAILED_TEMPLATE, getSrcPath(item), destinationDirectory)); } + return false; } public boolean isStopped() -- GitLab