From 079ddd60f7dd28fe3f719f1edb2d9e5c4c1b9674 Mon Sep 17 00:00:00 2001
From: brinn <brinn>
Date: Wed, 24 Oct 2012 11:30:12 +0000
Subject: [PATCH] Avoid continued logging of a faulty entry which is set to be
 reprocessed.

SVN: 27358
---
 .../ch/systemsx/cisd/etlserver/ETLDaemon.java | 56 +++++++++++++------
 1 file changed, 38 insertions(+), 18 deletions(-)

diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/ETLDaemon.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/ETLDaemon.java
index a1d7263520b..a88e60a20e8 100644
--- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/ETLDaemon.java
+++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/ETLDaemon.java
@@ -20,8 +20,11 @@ import java.io.File;
 import java.io.FileFilter;
 import java.io.FilenameFilter;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.Timer;
@@ -43,9 +46,6 @@ import ch.systemsx.cisd.common.filesystem.DirectoryScanningTimerTask;
 import ch.systemsx.cisd.common.filesystem.DirectoryScanningTimerTask.IScannedStore;
 import ch.systemsx.cisd.common.filesystem.FaultyPathDirectoryScanningHandler;
 import ch.systemsx.cisd.common.filesystem.FaultyPathDirectoryScanningHandler.IFaultyPathDirectoryScanningHandlerDelegate;
-import ch.systemsx.cisd.common.filesystem.highwatermark.HighwaterMarkDirectoryScanningHandler;
-import ch.systemsx.cisd.common.filesystem.highwatermark.HighwaterMarkWatcher;
-import ch.systemsx.cisd.common.filesystem.highwatermark.HostAwareFileWithHighwaterMark;
 import ch.systemsx.cisd.common.filesystem.FileConstants;
 import ch.systemsx.cisd.common.filesystem.FileUtilities;
 import ch.systemsx.cisd.common.filesystem.HostAwareFile;
@@ -56,6 +56,9 @@ import ch.systemsx.cisd.common.filesystem.LastModificationChecker;
 import ch.systemsx.cisd.common.filesystem.QueueingPathRemoverService;
 import ch.systemsx.cisd.common.filesystem.QuietPeriodFileFilter;
 import ch.systemsx.cisd.common.filesystem.StoreItem;
+import ch.systemsx.cisd.common.filesystem.highwatermark.HighwaterMarkDirectoryScanningHandler;
+import ch.systemsx.cisd.common.filesystem.highwatermark.HighwaterMarkWatcher;
+import ch.systemsx.cisd.common.filesystem.highwatermark.HostAwareFileWithHighwaterMark;
 import ch.systemsx.cisd.common.logging.Log4jSimpleLogger;
 import ch.systemsx.cisd.common.logging.LogCategory;
 import ch.systemsx.cisd.common.logging.LogFactory;
@@ -346,7 +349,8 @@ public final class ETLDaemon
         final HighwaterMarkDirectoryScanningHandler directoryScanningHandler =
                 createDirectoryScanningHandler(pathHandler, highwaterMarkWatcher,
                         hostAwareIncomingDataDirectory, hostAwareRecoveryStateDirectory,
-                        threadParameters.reprocessFaultyDatasets(), pathHandler);
+                        threadParameters.reprocessFaultyDatasets(),
+                        parameters.getCheckIntervalMillis(), pathHandler);
         FileFilter fileFilter =
                 createFileFilter(incomingDataDirectory, threadParameters.useIsFinishedMarkerFile(),
                         parameters);
@@ -576,12 +580,12 @@ public final class ETLDaemon
     private final static HighwaterMarkDirectoryScanningHandler createDirectoryScanningHandler(
             final IStopSignaler stopSignaler, final HighwaterMarkWatcher highwaterMarkWatcher,
             final HostAwareFile incomingDataDirectory, final HostAwareFile recoveryStateDirectory,
-            boolean reprocessFaultyDatasets,
+            boolean reprocessFaultyDatasets, final long checkIntervalMillis,
             IFaultyPathDirectoryScanningHandlerDelegate faultyPathHandlerDelegate)
     {
         final IDirectoryScanningHandler faultyPathHandler =
                 createFaultyPathHandler(stopSignaler, incomingDataDirectory.getLocalFile(),
-                        reprocessFaultyDatasets, faultyPathHandlerDelegate);
+                        reprocessFaultyDatasets, checkIntervalMillis, faultyPathHandlerDelegate);
         return new HighwaterMarkDirectoryScanningHandler(faultyPathHandler, highwaterMarkWatcher,
                 new HostAwareFile[]
                     { incomingDataDirectory, recoveryStateDirectory });
@@ -589,12 +593,12 @@ public final class ETLDaemon
 
     private static IDirectoryScanningHandler createFaultyPathHandler(
             final IStopSignaler stopSignaler, final File incomingDataDirectory,
-            boolean reprocessFaultyDatasets,
+            boolean reprocessFaultyDatasets, final long checkIntervalMillis,
             IFaultyPathDirectoryScanningHandlerDelegate faultyPathHandlerDelegate)
     {
         if (reprocessFaultyDatasets)
         {
-            return createDummyFaultyPathHandler();
+            return createDummyFaultyPathHandler(checkIntervalMillis);
         } else
         {
             return new FaultyPathDirectoryScanningHandler(incomingDataDirectory, stopSignaler,
@@ -603,15 +607,27 @@ public final class ETLDaemon
     }
 
     // returns the handler which does not check if the path was faulty
-    private static IDirectoryScanningHandler createDummyFaultyPathHandler()
+    private static IDirectoryScanningHandler createDummyFaultyPathHandler(
+            final long checkIntervalMillis)
     {
         return new IDirectoryScanningHandler()
             {
+                private Map<String, Long> faultyItems = new HashMap<String, Long>();
 
                 @Override
                 public void init(IScannedStore scannedStore)
                 {
-                    // do nothing
+                    final long now = System.currentTimeMillis();
+                    // Clean up item map to avoid accumulating trash.
+                    final Iterator<Map.Entry<String, Long>> it = faultyItems.entrySet().iterator();
+                    while (it.hasNext())
+                    {
+                        final Map.Entry<String, Long> e = it.next();
+                        if (now - e.getValue() > 3 * checkIntervalMillis)
+                        {
+                            it.remove();
+                        }
+                    }
                 }
 
                 @Override
@@ -625,14 +641,18 @@ public final class ETLDaemon
                 {
                     if (scannedStore.existsOrError(storeItem))
                     {
-                        StringBuffer sb = new StringBuffer();
-                        sb.append("The thread configuration setting "
-                                + ch.systemsx.cisd.etlserver.ThreadParameters.REPROCESS_FAULTY_DATASETS_NAME
-                                + " = true.");
-                        sb.append(" File "
-                                + storeItem
-                                + " not written to faulty paths. It will be reprocessed during the next iteration.");
-                        operationLog.info(sb.toString());
+                        if (faultyItems.containsValue(storeItem.getName()) == false)
+                        {
+                            StringBuffer sb = new StringBuffer();
+                            sb.append("The thread configuration setting "
+                                    + ch.systemsx.cisd.etlserver.ThreadParameters.REPROCESS_FAULTY_DATASETS_NAME
+                                    + " = true.");
+                            sb.append(" File "
+                                    + storeItem
+                                    + " not written to faulty paths. It will be reprocessed until successfull or removed.");
+                            operationLog.info(sb.toString());
+                        }
+                        faultyItems.put(storeItem.getName(), System.currentTimeMillis());
                     }
                     return Status.OK;
                 }
-- 
GitLab