diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/ETLDaemon.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/ETLDaemon.java index a1d7263520b11e5160bd3f0d54609c388d35c543..a88e60a20e88c6fa7afa0ab8751bb5e7a1f10516 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/ETLDaemon.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/ETLDaemon.java @@ -20,8 +20,11 @@ import java.io.File; import java.io.FileFilter; import java.io.FilenameFilter; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.Timer; @@ -43,9 +46,6 @@ import ch.systemsx.cisd.common.filesystem.DirectoryScanningTimerTask; import ch.systemsx.cisd.common.filesystem.DirectoryScanningTimerTask.IScannedStore; import ch.systemsx.cisd.common.filesystem.FaultyPathDirectoryScanningHandler; import ch.systemsx.cisd.common.filesystem.FaultyPathDirectoryScanningHandler.IFaultyPathDirectoryScanningHandlerDelegate; -import ch.systemsx.cisd.common.filesystem.highwatermark.HighwaterMarkDirectoryScanningHandler; -import ch.systemsx.cisd.common.filesystem.highwatermark.HighwaterMarkWatcher; -import ch.systemsx.cisd.common.filesystem.highwatermark.HostAwareFileWithHighwaterMark; import ch.systemsx.cisd.common.filesystem.FileConstants; import ch.systemsx.cisd.common.filesystem.FileUtilities; import ch.systemsx.cisd.common.filesystem.HostAwareFile; @@ -56,6 +56,9 @@ import ch.systemsx.cisd.common.filesystem.LastModificationChecker; import ch.systemsx.cisd.common.filesystem.QueueingPathRemoverService; import ch.systemsx.cisd.common.filesystem.QuietPeriodFileFilter; import ch.systemsx.cisd.common.filesystem.StoreItem; +import ch.systemsx.cisd.common.filesystem.highwatermark.HighwaterMarkDirectoryScanningHandler; +import ch.systemsx.cisd.common.filesystem.highwatermark.HighwaterMarkWatcher; +import ch.systemsx.cisd.common.filesystem.highwatermark.HostAwareFileWithHighwaterMark; import ch.systemsx.cisd.common.logging.Log4jSimpleLogger; import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; @@ -346,7 +349,8 @@ public final class ETLDaemon final HighwaterMarkDirectoryScanningHandler directoryScanningHandler = createDirectoryScanningHandler(pathHandler, highwaterMarkWatcher, hostAwareIncomingDataDirectory, hostAwareRecoveryStateDirectory, - threadParameters.reprocessFaultyDatasets(), pathHandler); + threadParameters.reprocessFaultyDatasets(), + parameters.getCheckIntervalMillis(), pathHandler); FileFilter fileFilter = createFileFilter(incomingDataDirectory, threadParameters.useIsFinishedMarkerFile(), parameters); @@ -576,12 +580,12 @@ public final class ETLDaemon private final static HighwaterMarkDirectoryScanningHandler createDirectoryScanningHandler( final IStopSignaler stopSignaler, final HighwaterMarkWatcher highwaterMarkWatcher, final HostAwareFile incomingDataDirectory, final HostAwareFile recoveryStateDirectory, - boolean reprocessFaultyDatasets, + boolean reprocessFaultyDatasets, final long checkIntervalMillis, IFaultyPathDirectoryScanningHandlerDelegate faultyPathHandlerDelegate) { final IDirectoryScanningHandler faultyPathHandler = createFaultyPathHandler(stopSignaler, incomingDataDirectory.getLocalFile(), - reprocessFaultyDatasets, faultyPathHandlerDelegate); + reprocessFaultyDatasets, checkIntervalMillis, faultyPathHandlerDelegate); return new HighwaterMarkDirectoryScanningHandler(faultyPathHandler, highwaterMarkWatcher, new HostAwareFile[] { incomingDataDirectory, recoveryStateDirectory }); @@ -589,12 +593,12 @@ public final class ETLDaemon private static IDirectoryScanningHandler createFaultyPathHandler( final IStopSignaler stopSignaler, final File incomingDataDirectory, - boolean reprocessFaultyDatasets, + boolean reprocessFaultyDatasets, final long checkIntervalMillis, IFaultyPathDirectoryScanningHandlerDelegate faultyPathHandlerDelegate) { if (reprocessFaultyDatasets) { - return createDummyFaultyPathHandler(); + return createDummyFaultyPathHandler(checkIntervalMillis); } else { return new FaultyPathDirectoryScanningHandler(incomingDataDirectory, stopSignaler, @@ -603,15 +607,27 @@ public final class ETLDaemon } // returns the handler which does not check if the path was faulty - private static IDirectoryScanningHandler createDummyFaultyPathHandler() + private static IDirectoryScanningHandler createDummyFaultyPathHandler( + final long checkIntervalMillis) { return new IDirectoryScanningHandler() { + private Map<String, Long> faultyItems = new HashMap<String, Long>(); @Override public void init(IScannedStore scannedStore) { - // do nothing + final long now = System.currentTimeMillis(); + // Clean up item map to avoid accumulating trash. + final Iterator<Map.Entry<String, Long>> it = faultyItems.entrySet().iterator(); + while (it.hasNext()) + { + final Map.Entry<String, Long> e = it.next(); + if (now - e.getValue() > 3 * checkIntervalMillis) + { + it.remove(); + } + } } @Override @@ -625,14 +641,18 @@ public final class ETLDaemon { if (scannedStore.existsOrError(storeItem)) { - StringBuffer sb = new StringBuffer(); - sb.append("The thread configuration setting " - + ch.systemsx.cisd.etlserver.ThreadParameters.REPROCESS_FAULTY_DATASETS_NAME - + " = true."); - sb.append(" File " - + storeItem - + " not written to faulty paths. It will be reprocessed during the next iteration."); - operationLog.info(sb.toString()); + if (faultyItems.containsValue(storeItem.getName()) == false) + { + StringBuffer sb = new StringBuffer(); + sb.append("The thread configuration setting " + + ch.systemsx.cisd.etlserver.ThreadParameters.REPROCESS_FAULTY_DATASETS_NAME + + " = true."); + sb.append(" File " + + storeItem + + " not written to faulty paths. It will be reprocessed until successfull or removed."); + operationLog.info(sb.toString()); + } + faultyItems.put(storeItem.getName(), System.currentTimeMillis()); } return Status.OK; }