From ca25e9895725f2c8ec6ac0e3673b3be3ec95cc98 Mon Sep 17 00:00:00 2001
From: brinn <brinn>
Date: Sun, 30 Sep 2007 20:28:58 +0000
Subject: [PATCH] add: support for recovery in DirectoryScanningTimerTask and
 the QueuingPathHandler

SVN: 1878
---
 .../ch/systemsx/cisd/common/Constants.java    |  3 +
 .../utilities/DirectoryScanningTimerTask.java | 52 +++++++++++++--
 .../cisd/common/utilities/IRecoverable.java   | 32 +++++++++
 .../common/utilities/QueuingPathHandler.java  | 66 ++++++++++++++++---
 .../DirectoryScanningTimerTaskTest.java       | 41 +++++++++++-
 .../utilities/QueuingPathHandlerTest.java     | 34 +++++++++-
 6 files changed, 205 insertions(+), 23 deletions(-)
 create mode 100644 common/source/java/ch/systemsx/cisd/common/utilities/IRecoverable.java

diff --git a/common/source/java/ch/systemsx/cisd/common/Constants.java b/common/source/java/ch/systemsx/cisd/common/Constants.java
index 1306a277a0d..cdce5159413 100644
--- a/common/source/java/ch/systemsx/cisd/common/Constants.java
+++ b/common/source/java/ch/systemsx/cisd/common/Constants.java
@@ -32,5 +32,8 @@ public class Constants
 
     /** The prefix of marker files that indicate that the processing of some path is finished. */
     public static final String DELETION_IN_PROGRESS_PREFIX = MARKER_PREFIX + "deletion_in_progress_";
+    
+    /** Name of marker file to trigger recovery operations. */
+    public static final String RECOVERY_MARKER_FIILENAME = MARKER_PREFIX + "recovery";
 
 }
diff --git a/common/source/java/ch/systemsx/cisd/common/utilities/DirectoryScanningTimerTask.java b/common/source/java/ch/systemsx/cisd/common/utilities/DirectoryScanningTimerTask.java
index ea7a2ff4d6c..ff549bed204 100644
--- a/common/source/java/ch/systemsx/cisd/common/utilities/DirectoryScanningTimerTask.java
+++ b/common/source/java/ch/systemsx/cisd/common/utilities/DirectoryScanningTimerTask.java
@@ -26,6 +26,7 @@ import java.util.TimerTask;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
+import ch.systemsx.cisd.common.Constants;
 import ch.systemsx.cisd.common.exceptions.ConfigurationFailureException;
 import ch.systemsx.cisd.common.logging.ISimpleLogger;
 import ch.systemsx.cisd.common.logging.LogCategory;
@@ -62,8 +63,12 @@ public final class DirectoryScanningTimerTask extends TimerTask implements ISelf
 
     private final IPathHandler handler;
 
+    private final IRecoverable recoverableOrNull;
+
     private final File sourceDirectory;
 
+    private final File recoveryTriggerFile;
+
     private boolean errorReadingDirectory;
 
     private final FileFilter filter;
@@ -74,25 +79,43 @@ public final class DirectoryScanningTimerTask extends TimerTask implements ISelf
 
     private long faultyPathsLastChanged;
 
+    /**
+     * Convenience form of
+     * {@link DirectoryScanningTimerTask#DirectoryScanningTimerTask(File, FileFilter, IPathHandler, IRecoverable)} with
+     * <var>recoverableOrNull</var> equal to <code>null</code>.
+     */
+    public DirectoryScanningTimerTask(File sourceDirectory, FileFilter filter, IPathHandler handler)
+    {
+        this(sourceDirectory, filter, handler, null);
+    }
+
     /**
      * Creates a <var>DirectoryScanningTimerTask</var>.
+     * <p>
+     * Performs a recovery if <var>recoverableOrNull</var> is not <code>null</code>.
      * 
      * @param sourceDirectory The directory to scan for entries.
      * @param filter The file filter that picks the entries to handle.
      * @param handler The handler that is used for treating the matching paths.
+     * @param recoverableOrNull The handler that can perform a recovery action, or <code>null</code> if there is no
+     *            recovery action available.
      */
-    public DirectoryScanningTimerTask(File sourceDirectory, FileFilter filter, IPathHandler handler)
+    public DirectoryScanningTimerTask(File sourceDirectory, FileFilter filter, IPathHandler handler,
+            IRecoverable recoverableOrNull)
     {
         assert sourceDirectory != null;
         assert filter != null;
         assert handler != null;
 
         this.sourceDirectory = sourceDirectory;
+        this.recoveryTriggerFile = new File(sourceDirectory, Constants.RECOVERY_MARKER_FIILENAME);
         this.filter = filter;
         this.handler = handler;
+        this.recoverableOrNull = recoverableOrNull;
         this.faultyPaths = new HashSet<File>();
         this.faultyPathsFile = new File(sourceDirectory, FAULTY_PATH_FILENAME);
         faultyPathsFile.delete();
+        runRecover();
     }
 
     /**
@@ -103,16 +126,17 @@ public final class DirectoryScanningTimerTask extends TimerTask implements ISelf
     {
         try
         {
+            if (recoveryTriggerFile.exists())
+            {
+                runRecover();
+                recoveryTriggerFile.delete();
+            }
             if (operationLog.isTraceEnabled())
             {
                 operationLog.trace("Start scanning directory " + sourceDirectory + ".");
             }
             checkForFaultyPathsFileChanged();
             final File[] paths = listFiles();
-            if (paths == null) // Means: error reading directory listing
-            {
-                return;
-            }
             // Sort in order of "oldest first" in order to move older items before newer items. This becomes important
             // when doing online quality control of measurements.
             Arrays.sort(paths, FileComparator.BY_LAST_MODIFIED);
@@ -134,6 +158,19 @@ public final class DirectoryScanningTimerTask extends TimerTask implements ISelf
         }
     }
 
+    private void runRecover()
+    {
+        if (recoverableOrNull != null)
+        {
+            if (operationLog.isInfoEnabled())
+            {
+                operationLog.info("Triggering recovery on directory '" + sourceDirectory + "'.");
+            }
+            recoverableOrNull.recover();
+        }
+
+    }
+
     private void checkForFaultyPathsFileChanged()
     {
         if (faultyPathsFile.exists())
@@ -161,9 +198,9 @@ public final class DirectoryScanningTimerTask extends TimerTask implements ISelf
         boolean logErrors = (errorReadingDirectory == false);
         final ISimpleLogger errorLogger = logErrors ? createSimpleErrorLogger() : null;
 
-        File[] paths = FileUtilities.tryListFiles(sourceDirectory, filter, errorLogger);
+        final File[] paths = FileUtilities.tryListFiles(sourceDirectory, filter, errorLogger);
         errorReadingDirectory = (paths == null); // Avoid mailbox flooding.
-        return paths;
+        return (paths == null) ? new File[0] : paths;
     }
 
     private ISimpleLogger createSimpleErrorLogger()
@@ -219,4 +256,5 @@ public final class DirectoryScanningTimerTask extends TimerTask implements ISelf
             throw new ConfigurationFailureException(errorMessage);
         }
     }
+
 }
diff --git a/common/source/java/ch/systemsx/cisd/common/utilities/IRecoverable.java b/common/source/java/ch/systemsx/cisd/common/utilities/IRecoverable.java
new file mode 100644
index 00000000000..6ddcec7fd0b
--- /dev/null
+++ b/common/source/java/ch/systemsx/cisd/common/utilities/IRecoverable.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2007 ETH Zuerich, CISD
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ch.systemsx.cisd.common.utilities;
+
+/**
+ * A role that allows to recover from a premature shutdown or crash. 
+ *
+ * @author Bernd Rinn
+ */
+public interface IRecoverable
+{
+
+    /**
+     * Perform a recovery.
+     */
+    public void recover();
+    
+}
diff --git a/common/source/java/ch/systemsx/cisd/common/utilities/QueuingPathHandler.java b/common/source/java/ch/systemsx/cisd/common/utilities/QueuingPathHandler.java
index 5be0a05bdf1..8cbb742dcb2 100644
--- a/common/source/java/ch/systemsx/cisd/common/utilities/QueuingPathHandler.java
+++ b/common/source/java/ch/systemsx/cisd/common/utilities/QueuingPathHandler.java
@@ -31,10 +31,10 @@ import ch.systemsx.cisd.common.logging.LogFactory;
  * 
  * @author Tomasz Pylak on Aug 24, 2007
  */
-public class QueuingPathHandler implements ITerminable, IPathHandler
+public class QueuingPathHandler implements ITerminable, IPathHandler, IRecoverable
 {
     private static final Logger notificationLog = LogFactory.getLogger(LogCategory.NOTIFY, QueuingPathHandler.class);
-    
+
     private static final Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, QueuingPathHandler.class);
 
     private final PathHandlerThread thread;
@@ -46,8 +46,18 @@ public class QueuingPathHandler implements ITerminable, IPathHandler
 
     public static QueuingPathHandler create(final IPathHandler handler, String threadName)
     {
-        PathHandlerThread thread = new PathHandlerThread(handler);
+        return create(handler, null, threadName);
+    }
+
+    public static QueuingPathHandler create(final IPathHandler handler, final IRecoverable recoverableOrNull,
+            String threadName)
+    {
+        assert handler != null;
+        assert threadName != null;
+
+        final PathHandlerThread thread = new PathHandlerThread(handler, recoverableOrNull);
         final QueuingPathHandler lazyHandler = new QueuingPathHandler(thread);
+        lazyHandler.recover();
         thread.setName(threadName);
         thread.start();
         return lazyHandler;
@@ -55,14 +65,19 @@ public class QueuingPathHandler implements ITerminable, IPathHandler
 
     private static class PathHandlerThread extends Thread
     {
+        private static final File DUMMY_FILE = new File(".");
+        
         private final BlockingQueue<File> queue;
 
         private final IPathHandler handler;
 
-        public PathHandlerThread(IPathHandler handler)
+        private final IRecoverable recoverableOrNull;
+
+        public PathHandlerThread(IPathHandler handler, IRecoverable recoverableOrNull)
         {
             this.queue = new LinkedBlockingQueue<File>();
             this.handler = handler;
+            this.recoverableOrNull = recoverableOrNull;
         }
 
         @Override
@@ -79,11 +94,17 @@ public class QueuingPathHandler implements ITerminable, IPathHandler
                             operationLog.trace("Waiting for new element in queue.");
                         }
                         File path = queue.take(); // blocks if empty
-                        if (operationLog.isTraceEnabled())
+                        if (path == DUMMY_FILE)
+                        {
+                            runRecover();
+                        } else
                         {
-                            operationLog.trace("Processing path '" + path + "'");
+                            if (operationLog.isTraceEnabled())
+                            {
+                                operationLog.trace("Processing path '" + path + "'");
+                            }
+                            handler.handle(path);
                         }
-                        handler.handle(path);
                     } catch (InterruptedException ex)
                     {
                         return;
@@ -96,14 +117,33 @@ public class QueuingPathHandler implements ITerminable, IPathHandler
             }
         }
 
-        private synchronized void queue(File resource)
+        synchronized void queue(File resource)
         {
             queue.add(resource);
         }
 
+        private void runRecover()
+        {
+            if (recoverableOrNull != null)
+            {
+                if (operationLog.isInfoEnabled())
+                {
+                    operationLog.info("Triggering recovery.");
+                }
+                recoverableOrNull.recover();
+            }
+        }
+
+        void queueRecover()
+        {
+            queue(DUMMY_FILE);
+        }
+
     }
 
-    /** cleans resources */
+    /**
+     * Interrupts the processing thread.
+     */
     public boolean terminate()
     {
         if (operationLog.isInfoEnabled())
@@ -120,11 +160,17 @@ public class QueuingPathHandler implements ITerminable, IPathHandler
     public void handle(File path)
     {
         assert thread.isInterrupted() == false;
-        
+
         if (operationLog.isTraceEnabled())
         {
             operationLog.trace("Queing path '" + path + "'");
         }
         thread.queue(path);
     }
+
+    public void recover()
+    {
+        thread.queueRecover();
+    }
+
 }
diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/utilities/DirectoryScanningTimerTaskTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/utilities/DirectoryScanningTimerTaskTest.java
index 28bbb47e9fd..6ad180e49dd 100644
--- a/common/sourceTest/java/ch/systemsx/cisd/common/utilities/DirectoryScanningTimerTaskTest.java
+++ b/common/sourceTest/java/ch/systemsx/cisd/common/utilities/DirectoryScanningTimerTaskTest.java
@@ -17,6 +17,8 @@
 package ch.systemsx.cisd.common.utilities;
 
 import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+import static org.testng.AssertJUnit.assertFalse;
 import static ch.systemsx.cisd.common.utilities.FileUtilities.ACCEPT_ALL_FILTER;
 
 import java.io.File;
@@ -31,6 +33,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import ch.systemsx.cisd.common.Constants;
 import ch.systemsx.cisd.common.exceptions.ConfigurationFailureException;
 import ch.systemsx.cisd.common.logging.LogCategory;
 import ch.systemsx.cisd.common.logging.LogInitializer;
@@ -180,18 +183,50 @@ public class DirectoryScanningTimerTaskTest
         assert faultyPaths.length() == 0;
     }
 
+    private static class RecordingRecoverable implements IRecoverable
+    {
+        File pathToDeleteOrNull = null;
+        
+        boolean recoveryCalled = false;
+        
+        public void recover()
+        {
+            if (pathToDeleteOrNull != null)
+            {
+                pathToDeleteOrNull.delete();
+            }
+            recoveryCalled = true;
+        }
+    }
+    
     @Test
-    public void testProcessOK() throws IOException
+    public void testRecovery() throws IOException
     {
         final File someFile = new File(workingDirectory, "some_file");
         someFile.createNewFile();
         someFile.deleteOnExit();
+        final RecordingRecoverable mockRecoverable = new RecordingRecoverable();
         final DirectoryScanningTimerTask scanner =
-                new DirectoryScanningTimerTask(workingDirectory, ACCEPT_ALL_FILTER, mockPathHandler);
+                new DirectoryScanningTimerTask(workingDirectory, ACCEPT_ALL_FILTER, mockPathHandler, mockRecoverable);
+        assertTrue(mockRecoverable.recoveryCalled);
+        mockRecoverable.recoveryCalled = false;
         assertEquals(0, mockPathHandler.handledPaths.size());
         scanner.run();
+        assertFalse(mockRecoverable.recoveryCalled);
         assertEquals(1, mockPathHandler.handledPaths.size());
         assertEquals(someFile, mockPathHandler.handledPaths.get(0));
+        final File recoveryFile = new File(workingDirectory, Constants.RECOVERY_MARKER_FIILENAME);
+        recoveryFile.createNewFile();
+        recoveryFile.deleteOnExit();
+        mockRecoverable.recoveryCalled = false;
+        mockRecoverable.pathToDeleteOrNull = someFile;
+        mockPathHandler.handledPaths.clear();
+        someFile.createNewFile();
+        scanner.run();
+        assertTrue(mockRecoverable.recoveryCalled);
+        assertFalse(recoveryFile.exists());
+        assertFalse(someFile.exists());
+        assertEquals(0, mockPathHandler.handledPaths.size());
     }
 
     @Test
@@ -358,5 +393,5 @@ public class DirectoryScanningTimerTaskTest
         LogMonitoringAppender.removeAppender(appender1);
         LogMonitoringAppender.removeAppender(appender2);
     }
-
+    
 }
diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/utilities/QueuingPathHandlerTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/utilities/QueuingPathHandlerTest.java
index dcc93f68219..27a44f8e1ba 100644
--- a/common/sourceTest/java/ch/systemsx/cisd/common/utilities/QueuingPathHandlerTest.java
+++ b/common/sourceTest/java/ch/systemsx/cisd/common/utilities/QueuingPathHandlerTest.java
@@ -45,7 +45,7 @@ public class QueuingPathHandlerTest
         private final int blockBeforeFile;
 
         private final long blockMillis;
-        
+
         private boolean interrupted;
 
         RecordingIPathHandler(int blockBeforeFile, long blockMillis)
@@ -79,7 +79,7 @@ public class QueuingPathHandlerTest
         {
             return handled;
         }
-        
+
         boolean isInterrupted()
         {
             return interrupted;
@@ -124,7 +124,7 @@ public class QueuingPathHandlerTest
         final int FILES_TO_PROCESS = 4;
         for (int i = 0; i < 10; ++i)
         {
-            final File f = new File("File " + i); 
+            final File f = new File("File " + i);
             if (i < FILES_TO_PROCESS)
             {
                 processedFileList.add(f);
@@ -146,4 +146,32 @@ public class QueuingPathHandlerTest
         assertEquals(processedFileList, blocker.getHandledFiles());
     }
 
+    private static class RecordingRecoverable implements IRecoverable
+    {
+        boolean recoverCalled;
+
+        public void recover()
+        {
+            recoverCalled = true;
+        }
+    }
+
+    @Test
+    public void testRecovery() throws InterruptedException
+    {
+        final File testFile = new File("test_file_to_handle");
+        final RecordingIPathHandler recordingHandler = new RecordingIPathHandler();
+        final RecordingRecoverable recordingRecoverable = new RecordingRecoverable();
+        final QueuingPathHandler qPathHandler =
+                QueuingPathHandler.create(recordingHandler, recordingRecoverable, "test-thread");
+        Thread.sleep(MILLIS_TO_WAIT_FOR_PROCESSING_TO_FINISH);
+        assertTrue(recordingRecoverable.recoverCalled);
+        recordingRecoverable.recoverCalled = false;
+        qPathHandler.handle(testFile);
+        qPathHandler.recover();
+        Thread.sleep(MILLIS_TO_WAIT_FOR_PROCESSING_TO_FINISH);
+        assertTrue(recordingRecoverable.recoverCalled);
+        assertEquals(Collections.singletonList(testFile), recordingHandler.getHandledFiles());
+    }
+
 }
-- 
GitLab