From dd9ea97581964c35716783d4fbab3b6d23203b0b Mon Sep 17 00:00:00 2001
From: brinn <brinn>
Date: Mon, 14 Jul 2008 07:52:35 +0000
Subject: [PATCH] [DMV-31] add support for detection and signaling of errors

SVN: 7200
---
 .../concurrent/DummyTimerTaskListener.java    |  4 +-
 .../common/concurrent/ITimerTaskListener.java | 13 +++-
 .../concurrent/TimerTaskWithListeners.java    | 12 ++-
 datamover/dist/datamover.sh                   |  5 ++
 .../ch/systemsx/cisd/datamover/DataMover.java | 46 ++++++++----
 .../cisd/datamover/IncomingProcessor.java     | 41 ++++++-----
 ...imerTaskListenerForMarkerFileProtocol.java | 73 ++++++++++++++-----
 .../cisd/datamover/IncomingProcessorTest.java | 62 +++++++++++++++-
 8 files changed, 197 insertions(+), 59 deletions(-)

diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/DummyTimerTaskListener.java b/common/source/java/ch/systemsx/cisd/common/concurrent/DummyTimerTaskListener.java
index 0e6df4cf16d..ba7e07e26c4 100644
--- a/common/source/java/ch/systemsx/cisd/common/concurrent/DummyTimerTaskListener.java
+++ b/common/source/java/ch/systemsx/cisd/common/concurrent/DummyTimerTaskListener.java
@@ -16,6 +16,8 @@
 
 package ch.systemsx.cisd.common.concurrent;
 
+import ch.systemsx.cisd.common.utilities.ITimerTaskStatusProvider;
+
 /**
  * Dummy implementation of the timer task listener interface. Useful for subclassing.
  *
@@ -29,7 +31,7 @@ public class DummyTimerTaskListener implements ITimerTaskListener
     }
     
     /** Does nothing. */
-    public void finishRunning()
+    public void finishRunning(ITimerTaskStatusProvider statusProviderOrNull)
     {
     }
 
diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/ITimerTaskListener.java b/common/source/java/ch/systemsx/cisd/common/concurrent/ITimerTaskListener.java
index 94aada3deb0..4a28aa3e356 100644
--- a/common/source/java/ch/systemsx/cisd/common/concurrent/ITimerTaskListener.java
+++ b/common/source/java/ch/systemsx/cisd/common/concurrent/ITimerTaskListener.java
@@ -18,9 +18,11 @@ package ch.systemsx.cisd.common.concurrent;
 
 import java.util.TimerTask;
 
+import ch.systemsx.cisd.common.utilities.ITimerTaskStatusProvider;
+
 /**
  * Listener for {@link TimerTask} events.
- *
+ * 
  * @author Franz-Josef Elmer
  */
 public interface ITimerTaskListener
@@ -29,13 +31,16 @@ public interface ITimerTaskListener
      * Starts running. This method is invoked before {@link TimerTask#run()}.
      */
     public void startRunning();
-    
+
     /**
      * Finishes running. This method is invoked after {@link TimerTask#run()} even if an exception
      * is thrown.
+     * 
+     * @param statusProviderOrNull The status provider for the timer task, or <code>null</code>,
+     *            if no status provider is available.
      */
-    public void finishRunning();
-    
+    public void finishRunning(ITimerTaskStatusProvider statusProviderOrNull);
+
     /**
      * Canceling the task. This method is invoked before {@link TimerTask#cancel()}.
      */
diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/TimerTaskWithListeners.java b/common/source/java/ch/systemsx/cisd/common/concurrent/TimerTaskWithListeners.java
index 3d848813a61..2fdfb0fb0ac 100644
--- a/common/source/java/ch/systemsx/cisd/common/concurrent/TimerTaskWithListeners.java
+++ b/common/source/java/ch/systemsx/cisd/common/concurrent/TimerTaskWithListeners.java
@@ -20,6 +20,8 @@ import java.util.LinkedHashSet;
 import java.util.Set;
 import java.util.TimerTask;
 
+import ch.systemsx.cisd.common.utilities.ITimerTaskStatusProvider;
+
 /**
  * Decorator of a {@link TimerTask} objects which allows to add {@link ITimerTaskListener} objects
  * which will be notified for certain type of events.
@@ -29,6 +31,7 @@ import java.util.TimerTask;
 public class TimerTaskWithListeners extends TimerTask
 {
     private final TimerTask timerTask;
+    private final ITimerTaskStatusProvider statusProviderOrNull;
     private final Set<ITimerTaskListener> listeners = new LinkedHashSet<ITimerTaskListener>();
 
     /**
@@ -43,6 +46,13 @@ public class TimerTaskWithListeners extends TimerTask
             throw new IllegalArgumentException("Unspecified timer task.");
         }
         this.timerTask = timerTask;
+        if (timerTask instanceof ITimerTaskStatusProvider)
+        {
+            this.statusProviderOrNull = (ITimerTaskStatusProvider) timerTask;
+        } else
+        {
+            this.statusProviderOrNull = null;
+        }
     }
     
     /**
@@ -73,7 +83,7 @@ public class TimerTaskWithListeners extends TimerTask
         {
             for (ITimerTaskListener listener : listeners)
             {
-                listener.finishRunning();
+                listener.finishRunning(statusProviderOrNull);
             }
         }
     }
diff --git a/datamover/dist/datamover.sh b/datamover/dist/datamover.sh
index abe03f6b9ab..75ddca29935 100755
--- a/datamover/dist/datamover.sh
+++ b/datamover/dist/datamover.sh
@@ -61,6 +61,8 @@ getStatus()
 			if [ -f .MARKER_shutdown ]; then
 				STATUS=SHUTDOWN
 				return 1
+			elif [ "`ls -a1 | awk '/\.MARKER_.*_error/ {print $1}'`" = "" ]; then
+				STATUS=ERROR
 			elif [ "`ls -a1 | awk '/\.MARKER_.*_processing/ {print $1}'`" = "" ]; then
 				STATUS=IDLE
 			else
@@ -82,6 +84,9 @@ printStatus()
 	PID=`cat $PIDFILE`
 	MSG_PREFIX="Datamover (pid $PID) is"
 	case "$1" in
+		ERROR)
+			echo "$MSG_PREFIX running and in error state"
+			;;
 		PROCESSING)
 			echo "$MSG_PREFIX running and in processing state"
 			;;
diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java
index 0c56c4cde37..1a0298af053 100644
--- a/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/DataMover.java
@@ -67,6 +67,8 @@ public final class DataMover
     private static final String PROCESSING_MARKER_TEMPLATE =
             PROCESS_MARKER_PREFIX + "%s_processing";
 
+    private static final String ERROR_MARKER_TEMPLATE = PROCESS_MARKER_PREFIX + "%s_error";
+
     @Private
     static final String INCOMING_PROCESS_MARKER_FILENAME =
             String.format(PROCESSING_MARKER_TEMPLATE, "incoming");
@@ -79,6 +81,17 @@ public final class DataMover
     static final String LOCAL_PROCESS_MARKER_FILENAME =
             String.format(PROCESSING_MARKER_TEMPLATE, "local");
 
+    @Private
+    static final String INCOMING_ERROR_MARKER_FILENAME =
+            String.format(ERROR_MARKER_TEMPLATE, "incoming");
+
+    @Private
+    static final String OUTGOING_ERROR_MARKER_FILENAME =
+            String.format(ERROR_MARKER_TEMPLATE, "outgoing");
+
+    @Private
+    static final String LOCAL_ERROR_MARKER_FILENAME = String.format(ERROR_MARKER_TEMPLATE, "local");
+
     @Private
     static final String RECOVERY_PROCESS_MARKER_FILENAME =
             String.format(PROCESSING_MARKER_TEMPLATE, "recovery");
@@ -91,8 +104,9 @@ public final class DataMover
 
     private static final String[] PROCESS_MARKER_FILENAMES =
                 { INCOMING_PROCESS_MARKER_FILENAME, OUTGOING_PROCESS_MARKER_FILENAME,
-                        LOCAL_PROCESS_MARKER_FILENAME, RECOVERY_PROCESS_MARKER_FILENAME,
-                        SHUTDOWN_PROCESS_MARKER_FILENAME };
+                        LOCAL_PROCESS_MARKER_FILENAME, INCOMING_ERROR_MARKER_FILENAME,
+                        OUTGOING_ERROR_MARKER_FILENAME, LOCAL_ERROR_MARKER_FILENAME,
+                        RECOVERY_PROCESS_MARKER_FILENAME, SHUTDOWN_PROCESS_MARKER_FILENAME };
 
     private final Parameters parameters;
 
@@ -118,21 +132,26 @@ public final class DataMover
     }
 
     static TimerTask createTimerTaskForMarkerFileProtocol(final TimerTask timerTask,
-            final String startMarkerFileName, final String endMarkerFileName)
+            final String markerFileName, final String errorFileNameOrNull,
+            final String successorMarkerFileNameOrNull)
     {
         final TimerTaskWithListeners timerTaskWithListeners = new TimerTaskWithListeners(timerTask);
         timerTaskWithListeners.addListener(new TimerTaskListenerForMarkerFileProtocol(
-                startMarkerFileName, endMarkerFileName));
+                markerFileName, errorFileNameOrNull, successorMarkerFileNameOrNull));
         return timerTaskWithListeners;
     }
 
-    static TimerTask createTimerTaskForMarkerFileProtocol(final TimerTask timerTask,
-            final String startMarkerFileName)
+    private static TimerTask createTimerTaskForMarkerFileProtocol(final TimerTask timerTask,
+            final String markerFileName, final String errorMarkerFileName)
     {
-        final TimerTaskWithListeners timerTaskWithListeners = new TimerTaskWithListeners(timerTask);
-        timerTaskWithListeners.addListener(new TimerTaskListenerForMarkerFileProtocol(
-                startMarkerFileName, null));
-        return timerTaskWithListeners;
+        return createTimerTaskForMarkerFileProtocol(timerTask, markerFileName, errorMarkerFileName,
+                null);
+    }
+
+    private static TimerTask createTimerTaskForMarkerFileProtocol(final TimerTask timerTask,
+            final String markerFileName)
+    {
+        return createTimerTaskForMarkerFileProtocol(timerTask, markerFileName, null, null);
     }
 
     private static LocalBufferDirs createLocalBufferDirs(final Parameters parameters)
@@ -201,7 +220,7 @@ public final class DataMover
     private final DataMoverProcess createIncomingProcess()
     {
         return IncomingProcessor.createMovingProcess(parameters, INCOMING_PROCESS_MARKER_FILENAME,
-                LOCAL_PROCESS_MARKER_FILENAME, factory, bufferDirs);
+                INCOMING_ERROR_MARKER_FILENAME, LOCAL_PROCESS_MARKER_FILENAME, factory, bufferDirs);
     }
 
     private final DataMoverProcess createLocalProcess()
@@ -215,7 +234,8 @@ public final class DataMover
                         localProcessor);
         final TimerTask timerTask =
                 createTimerTaskForMarkerFileProtocol(localProcessingTask,
-                        LOCAL_PROCESS_MARKER_FILENAME, OUTGOING_PROCESS_MARKER_FILENAME);
+                        LOCAL_PROCESS_MARKER_FILENAME, LOCAL_ERROR_MARKER_FILENAME,
+                        OUTGOING_PROCESS_MARKER_FILENAME);
         final DataMoverProcess dataMoverProcess =
                 new RunOnceMoreAfterTerminateDataMoverProcess(timerTask, "Local Processor",
                         localProcessor);
@@ -238,7 +258,7 @@ public final class DataMover
                         remoteStoreMover, directoryScanningHandler);
         final TimerTask timerTask =
                 createTimerTaskForMarkerFileProtocol(outgoingMovingTask,
-                        OUTGOING_PROCESS_MARKER_FILENAME);
+                        OUTGOING_PROCESS_MARKER_FILENAME, OUTGOING_ERROR_MARKER_FILENAME);
         final DataMoverProcess outgoingProcess =
                 new RunOnceMoreAfterTerminateDataMoverProcess(timerTask, "Final Destination Mover");
         outgoingProcess.startup(0L, parameters.getCheckIntervalInternalMillis());
diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java b/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java
index b797a3a349f..28519602540 100644
--- a/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/IncomingProcessor.java
@@ -83,36 +83,41 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory
 
     private final IStoreItemFilter storeItemFilter;
 
-    private final String startMarkerFileName;
+    private final String markerFileName;
+    
+    private final String errorMarkerFileName;
 
-    private final String endMarkerFileName;
+    private final String successorMarkerFileName;
 
     public static final DataMoverProcess createMovingProcess(final Parameters parameters,
-            final String startMarkerFile, final String endMarkerFile,
-            final IFileSysOperationsFactory factory, final LocalBufferDirs bufferDirs)
+            final String markerFile, final String errorMarkerFile,
+            final String successorMarkerFile, final IFileSysOperationsFactory factory,
+            final LocalBufferDirs bufferDirs)
     {
-        return createMovingProcess(parameters, startMarkerFile, endMarkerFile, factory,
-                SYSTEM_TIME_PROVIDER, bufferDirs);
+        return createMovingProcess(parameters, markerFile, errorMarkerFile, successorMarkerFile,
+                factory, SYSTEM_TIME_PROVIDER, bufferDirs);
     }
 
     static final DataMoverProcess createMovingProcess(final Parameters parameters,
-            final String startMarkerFile, final String endMarkerFile,
-            final IFileSysOperationsFactory factory, final ITimeProvider timeProvider,
-            final LocalBufferDirs bufferDirs)
+            final String markerFile, final String errorMarkerFile,
+            final String successorMarkerFile, final IFileSysOperationsFactory factory,
+            final ITimeProvider timeProvider, final LocalBufferDirs bufferDirs)
     {
         final IncomingProcessor processor =
-                new IncomingProcessor(parameters, startMarkerFile, endMarkerFile, factory,
-                        timeProvider, bufferDirs);
+                new IncomingProcessor(parameters, markerFile, errorMarkerFile, successorMarkerFile,
+                        factory, timeProvider, bufferDirs);
         return processor.create();
     }
 
-    private IncomingProcessor(final Parameters parameters, final String startMarkerFileName,
-            final String endMarkerFileName, final IFileSysOperationsFactory factory,
-            final ITimeProvider timeProvider, final LocalBufferDirs bufferDirs)
+    private IncomingProcessor(final Parameters parameters, final String markerFileName,
+            final String errorMarkerFileName, final String successorMarkerFileName,
+            final IFileSysOperationsFactory factory, final ITimeProvider timeProvider,
+            final LocalBufferDirs bufferDirs)
     {
         this.parameters = parameters;
-        this.startMarkerFileName = startMarkerFileName;
-        this.endMarkerFileName = endMarkerFileName;
+        this.markerFileName = markerFileName;
+        this.errorMarkerFileName = errorMarkerFileName;
+        this.successorMarkerFileName = successorMarkerFileName;
         this.prefixForIncoming = parameters.getPrefixForIncoming();
         this.incomingStore = parameters.getIncomingStore(factory);
         this.pathMover = factory.getMover();
@@ -153,8 +158,8 @@ public class IncomingProcessor implements IRecoverableTimerTaskFactory
                         new FileScannedStore(incomingStore, storeItemFilter),
                         directoryScanningHandler, pathHandler, NUMBER_OF_ERRORS_IN_LISTING_IGNORED);
         final TimerTask timerTask =
-                DataMover.createTimerTaskForMarkerFileProtocol(movingTask, startMarkerFileName,
-                        endMarkerFileName);
+                DataMover.createTimerTaskForMarkerFileProtocol(movingTask, markerFileName,
+                        errorMarkerFileName, successorMarkerFileName);
         return new DataMoverProcess(timerTask, "Mover of Incoming Data", this)
             {
 
diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/TimerTaskListenerForMarkerFileProtocol.java b/datamover/source/java/ch/systemsx/cisd/datamover/TimerTaskListenerForMarkerFileProtocol.java
index 2f88932b089..69f3ce42181 100644
--- a/datamover/source/java/ch/systemsx/cisd/datamover/TimerTaskListenerForMarkerFileProtocol.java
+++ b/datamover/source/java/ch/systemsx/cisd/datamover/TimerTaskListenerForMarkerFileProtocol.java
@@ -24,6 +24,7 @@ import org.apache.commons.io.FileUtils;
 import ch.systemsx.cisd.common.concurrent.DummyTimerTaskListener;
 import ch.systemsx.cisd.common.concurrent.ITimerTaskListener;
 import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException;
+import ch.systemsx.cisd.common.utilities.ITimerTaskStatusProvider;
 
 /**
  * An implementation of {@link ITimerTaskListener} which creates an empty marker file before the
@@ -35,32 +36,48 @@ import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException;
  */
 public class TimerTaskListenerForMarkerFileProtocol extends DummyTimerTaskListener
 {
-    private final File startMarkerFile;
+    private final File markerFile;
 
-    private final File endMarkerFilOrNull;
+    private final File errorMarkerFileOrNull;
+
+    private final File successorMarkerFilOrNull;
 
     /**
      * Creates an instance for the specified marker file.
      * 
-     * @throws IllegalArgumentException if the argument is <code>null</code> or it denotes a
-     *             directory.
+     * @param markerFileName The name of the marker file that indicates that the
+     *            {@link java.util.TimerTask} is currently running.
+     * @param errorMarkerFileNameOrNull The name of the file indicating that the last time the timer
+     *            task was running it produced an error.
+     * @param successorMarkerFileNameOrNull The name of the file to indicate that the successor of
+     *            this timer task has some work to do.
+     * @throws IllegalArgumentException if the <var>markerFileName</var> is <code>null</code> or
+     *             it denotes a directory.
      */
-    public TimerTaskListenerForMarkerFileProtocol(String startMarkerFileName,
-            String endMarkerFileNameOrNull)
+    public TimerTaskListenerForMarkerFileProtocol(String markerFileName,
+            String errorMarkerFileNameOrNull, String successorMarkerFileNameOrNull)
     {
-        if (startMarkerFileName == null)
+        if (markerFileName == null)
         {
             throw new IllegalArgumentException("Unspecified start marker file name.");
         }
-        startMarkerFile = new File(startMarkerFileName);
-        failIfDirectory(startMarkerFile);
-        if (endMarkerFileNameOrNull != null)
+        markerFile = new File(markerFileName);
+        failIfDirectory(markerFile);
+        if (errorMarkerFileNameOrNull != null)
+        {
+            errorMarkerFileOrNull = new File(errorMarkerFileNameOrNull);
+            failIfDirectory(errorMarkerFileOrNull);
+        } else
+        {
+            errorMarkerFileOrNull = null;
+        }
+        if (successorMarkerFileNameOrNull != null)
         {
-            endMarkerFilOrNull = new File(endMarkerFileNameOrNull);
-            failIfDirectory(startMarkerFile);
+            successorMarkerFilOrNull = new File(successorMarkerFileNameOrNull);
+            failIfDirectory(successorMarkerFilOrNull);
         } else
         {
-            endMarkerFilOrNull = null;
+            successorMarkerFilOrNull = null;
         }
     }
 
@@ -72,20 +89,40 @@ public class TimerTaskListenerForMarkerFileProtocol extends DummyTimerTaskListen
     @Override
     public void startRunning()
     {
-        touch(startMarkerFile);
+        touch(markerFile);
     }
 
     /**
      * Deletes the marker file.
      */
     @Override
-    public void finishRunning()
+    public void finishRunning(ITimerTaskStatusProvider statusProviderOrNull)
     {
-        if (endMarkerFilOrNull != null)
+        if (successorMarkerFilOrNull != null && hasPerformedMeaningfullWork(statusProviderOrNull))
+        {
+            touch(successorMarkerFilOrNull);
+        }
+        if (errorMarkerFileOrNull != null && hasErrors(statusProviderOrNull))
         {
-            touch(endMarkerFilOrNull);
+            touch(errorMarkerFileOrNull);
         }
-        startMarkerFile.delete();
+        // Avoid deleting the marker file when it is used as error marker file, too, and an error
+        // occurred.
+        if (markerFile.equals(errorMarkerFileOrNull) == false
+                || hasErrors(statusProviderOrNull) == false)
+        {
+            markerFile.delete();
+        }
+    }
+
+    private boolean hasPerformedMeaningfullWork(ITimerTaskStatusProvider statusProviderOrNull)
+    {
+        return (statusProviderOrNull == null) || statusProviderOrNull.hasPerformedMeaningfulWork();
+    }
+
+    private boolean hasErrors(ITimerTaskStatusProvider statusProviderOrNull)
+    {
+        return (statusProviderOrNull != null) && statusProviderOrNull.hasErrors();
     }
 
     private static void failIfDirectory(File markerFile)
diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/IncomingProcessorTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/IncomingProcessorTest.java
index ef1634c48e2..eab28d57612 100644
--- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/IncomingProcessorTest.java
+++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/IncomingProcessorTest.java
@@ -18,6 +18,7 @@ package ch.systemsx.cisd.datamover;
 
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertTrue;
+import static org.testng.AssertJUnit.assertFalse;
 import static org.testng.AssertJUnit.fail;
 
 import java.io.File;
@@ -30,6 +31,8 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.jmock.Expectations;
 import org.jmock.Mockery;
+import org.jmock.api.Invocation;
+import org.jmock.lib.action.CustomAction;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -46,6 +49,7 @@ import ch.systemsx.cisd.common.process.ProcessExecutionHelper;
 import ch.systemsx.cisd.common.test.LogMonitoringAppender;
 import ch.systemsx.cisd.common.utilities.FileUtilities;
 import ch.systemsx.cisd.common.utilities.IExitHandler;
+import ch.systemsx.cisd.common.utilities.ITimerTaskStatusProvider;
 import ch.systemsx.cisd.common.utilities.MockTimeProvider;
 import ch.systemsx.cisd.common.utilities.OSUtilities;
 import ch.systemsx.cisd.datamover.filesystem.intf.IFileSysOperationsFactory;
@@ -64,6 +68,8 @@ public final class IncomingProcessorTest
 
     private static final String MARKER_FILE = ".marker";
 
+    private static final String ERROR_MARKER_FILE = ".error";
+
     private static final File TEST_FOLDER = new File("targets/unit-test/IncomingProcessorTest");
 
     private static final String INCOMING_DIR = "incoming";
@@ -110,7 +116,7 @@ public final class IncomingProcessorTest
                 assertEquals("Missing marker file " + markerFile, true, markerFile.exists());
             }
 
-            public void finishRunning()
+            public void finishRunning(ITimerTaskStatusProvider statusProviderOrNull)
             {
                 File markerFile = new File(MARKER_FILE);
                 assertEquals("Marker file " + markerFile + " still there", false, markerFile
@@ -185,17 +191,62 @@ public final class IncomingProcessorTest
         context.assertIsSatisfied();
     }
 
+    @Test
+    public void testFailureMarker() throws IOException
+    {
+        final File testDataFile = new File(incomingDir, "test-data.txt");
+        final File errorMarker = new File(ERROR_MARKER_FILE);
+        errorMarker.delete();
+        assertFalse(errorMarker.exists());
+        testDataFile.createNewFile();
+        context.checking(new Expectations()
+            {
+                {
+                    one(mover).tryMove(testDataFile, copyCompleteDir, "");
+                    will(returnValue(new File(copyCompleteDir, testDataFile.getName())));
+                }
+            });
+
+        final DataMoverProcess process =
+                createProcess("--" + PropertyNames.INCOMING_TARGET, incomingDir.toString(), "-q",
+                        "1");
+        final TimerTask dataMoverTimerTask = getInstrumentedTimerTaskFrom(process);
+
+        final LogMonitoringAppender operationAppender =
+                LogMonitoringAppender.addAppender(LogCategory.OPERATION,
+                        "has been added to faulty paths file");
+        dataMoverTimerTask.run(); // 1. round finds a file to process
+        assertFalse(errorMarker.exists());
+        dataMoverTimerTask.run(); // 2. round finds that quiet period is over
+        assertTrue(errorMarker.exists());
+        operationAppender.verifyLogHasHappened();
+        LogMonitoringAppender.removeAppender(operationAppender);
+        context.assertIsSatisfied();
+    }
+
     @Test
     public void testWithDataCompletedScript() throws IOException
     {
         createExampleScript(EXAMPLE_SCRIPT);
         final File testDataFile = new File(incomingDir, "test-data.txt");
         testDataFile.createNewFile();
+        final File errorMarker = new File(ERROR_MARKER_FILE);
+        errorMarker.delete();
+        assertFalse(errorMarker.exists());
         context.checking(new Expectations()
             {
                 {
                     one(mover).tryMove(testDataFile, copyCompleteDir, "");
-                    will(returnValue(new File(copyCompleteDir, testDataFile.getName())));
+                    will(new CustomAction("move file")
+                        {
+                            public Object invoke(Invocation invocation) throws Throwable
+                            {
+                                final File result =
+                                        new File(copyCompleteDir, testDataFile.getName());
+                                testDataFile.renameTo(result);
+                                return result;
+                            }
+                        });
                 }
             });
 
@@ -211,12 +262,15 @@ public final class IncomingProcessorTest
 
         final TimerTask dataMoverTimerTask = getInstrumentedTimerTaskFrom(process);
         dataMoverTimerTask.run(); // 1. round finds a file to process
+        assertFalse(errorMarker.exists());
         dataMoverTimerTask.run(); // 2. round finds that quiet period is over
+        assertFalse(errorMarker.exists());
         notifyAppender.verifyLogHasHappened();
         operationAppender.verifyLogHasHappened();
 
         logRecorder.resetLogContent();
         dataMoverTimerTask.run(); // 3. round does not change status, thus no log
+        assertFalse(errorMarker.exists());
         assertEquals("", logRecorder.getLogContent());
         context.assertIsSatisfied();
     }
@@ -301,8 +355,8 @@ public final class IncomingProcessorTest
                     will(returnValue(remover));
                 }
             });
-        return IncomingProcessor.createMovingProcess(parameters, MARKER_FILE, null,
-                fileSysOpertationFactory, new MockTimeProvider(), localBufferDirs);
+        return IncomingProcessor.createMovingProcess(parameters, MARKER_FILE, ERROR_MARKER_FILE,
+                null, fileSysOpertationFactory, new MockTimeProvider(), localBufferDirs);
 
     }
 }
-- 
GitLab