Skip to content
Snippets Groups Projects
Commit 079ddd60 authored by brinn's avatar brinn
Browse files

Avoid continued logging of a faulty entry which is set to be reprocessed.

SVN: 27358
parent 12290dc7
No related branches found
No related tags found
No related merge requests found
......@@ -20,8 +20,11 @@ import java.io.File;
import java.io.FileFilter;
import java.io.FilenameFilter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Timer;
......@@ -43,9 +46,6 @@ import ch.systemsx.cisd.common.filesystem.DirectoryScanningTimerTask;
import ch.systemsx.cisd.common.filesystem.DirectoryScanningTimerTask.IScannedStore;
import ch.systemsx.cisd.common.filesystem.FaultyPathDirectoryScanningHandler;
import ch.systemsx.cisd.common.filesystem.FaultyPathDirectoryScanningHandler.IFaultyPathDirectoryScanningHandlerDelegate;
import ch.systemsx.cisd.common.filesystem.highwatermark.HighwaterMarkDirectoryScanningHandler;
import ch.systemsx.cisd.common.filesystem.highwatermark.HighwaterMarkWatcher;
import ch.systemsx.cisd.common.filesystem.highwatermark.HostAwareFileWithHighwaterMark;
import ch.systemsx.cisd.common.filesystem.FileConstants;
import ch.systemsx.cisd.common.filesystem.FileUtilities;
import ch.systemsx.cisd.common.filesystem.HostAwareFile;
......@@ -56,6 +56,9 @@ import ch.systemsx.cisd.common.filesystem.LastModificationChecker;
import ch.systemsx.cisd.common.filesystem.QueueingPathRemoverService;
import ch.systemsx.cisd.common.filesystem.QuietPeriodFileFilter;
import ch.systemsx.cisd.common.filesystem.StoreItem;
import ch.systemsx.cisd.common.filesystem.highwatermark.HighwaterMarkDirectoryScanningHandler;
import ch.systemsx.cisd.common.filesystem.highwatermark.HighwaterMarkWatcher;
import ch.systemsx.cisd.common.filesystem.highwatermark.HostAwareFileWithHighwaterMark;
import ch.systemsx.cisd.common.logging.Log4jSimpleLogger;
import ch.systemsx.cisd.common.logging.LogCategory;
import ch.systemsx.cisd.common.logging.LogFactory;
......@@ -346,7 +349,8 @@ public final class ETLDaemon
final HighwaterMarkDirectoryScanningHandler directoryScanningHandler =
createDirectoryScanningHandler(pathHandler, highwaterMarkWatcher,
hostAwareIncomingDataDirectory, hostAwareRecoveryStateDirectory,
threadParameters.reprocessFaultyDatasets(), pathHandler);
threadParameters.reprocessFaultyDatasets(),
parameters.getCheckIntervalMillis(), pathHandler);
FileFilter fileFilter =
createFileFilter(incomingDataDirectory, threadParameters.useIsFinishedMarkerFile(),
parameters);
......@@ -576,12 +580,12 @@ public final class ETLDaemon
private final static HighwaterMarkDirectoryScanningHandler createDirectoryScanningHandler(
final IStopSignaler stopSignaler, final HighwaterMarkWatcher highwaterMarkWatcher,
final HostAwareFile incomingDataDirectory, final HostAwareFile recoveryStateDirectory,
boolean reprocessFaultyDatasets,
boolean reprocessFaultyDatasets, final long checkIntervalMillis,
IFaultyPathDirectoryScanningHandlerDelegate faultyPathHandlerDelegate)
{
final IDirectoryScanningHandler faultyPathHandler =
createFaultyPathHandler(stopSignaler, incomingDataDirectory.getLocalFile(),
reprocessFaultyDatasets, faultyPathHandlerDelegate);
reprocessFaultyDatasets, checkIntervalMillis, faultyPathHandlerDelegate);
return new HighwaterMarkDirectoryScanningHandler(faultyPathHandler, highwaterMarkWatcher,
new HostAwareFile[]
{ incomingDataDirectory, recoveryStateDirectory });
......@@ -589,12 +593,12 @@ public final class ETLDaemon
private static IDirectoryScanningHandler createFaultyPathHandler(
final IStopSignaler stopSignaler, final File incomingDataDirectory,
boolean reprocessFaultyDatasets,
boolean reprocessFaultyDatasets, final long checkIntervalMillis,
IFaultyPathDirectoryScanningHandlerDelegate faultyPathHandlerDelegate)
{
if (reprocessFaultyDatasets)
{
return createDummyFaultyPathHandler();
return createDummyFaultyPathHandler(checkIntervalMillis);
} else
{
return new FaultyPathDirectoryScanningHandler(incomingDataDirectory, stopSignaler,
......@@ -603,15 +607,27 @@ public final class ETLDaemon
}
// returns the handler which does not check if the path was faulty
private static IDirectoryScanningHandler createDummyFaultyPathHandler()
private static IDirectoryScanningHandler createDummyFaultyPathHandler(
final long checkIntervalMillis)
{
return new IDirectoryScanningHandler()
{
private Map<String, Long> faultyItems = new HashMap<String, Long>();
@Override
public void init(IScannedStore scannedStore)
{
// do nothing
final long now = System.currentTimeMillis();
// Clean up item map to avoid accumulating trash.
final Iterator<Map.Entry<String, Long>> it = faultyItems.entrySet().iterator();
while (it.hasNext())
{
final Map.Entry<String, Long> e = it.next();
if (now - e.getValue() > 3 * checkIntervalMillis)
{
it.remove();
}
}
}
@Override
......@@ -625,14 +641,18 @@ public final class ETLDaemon
{
if (scannedStore.existsOrError(storeItem))
{
StringBuffer sb = new StringBuffer();
sb.append("The thread configuration setting "
+ ch.systemsx.cisd.etlserver.ThreadParameters.REPROCESS_FAULTY_DATASETS_NAME
+ " = true.");
sb.append(" File "
+ storeItem
+ " not written to faulty paths. It will be reprocessed during the next iteration.");
operationLog.info(sb.toString());
if (faultyItems.containsValue(storeItem.getName()) == false)
{
StringBuffer sb = new StringBuffer();
sb.append("The thread configuration setting "
+ ch.systemsx.cisd.etlserver.ThreadParameters.REPROCESS_FAULTY_DATASETS_NAME
+ " = true.");
sb.append(" File "
+ storeItem
+ " not written to faulty paths. It will be reprocessed until successfull or removed.");
operationLog.info(sb.toString());
}
faultyItems.put(storeItem.getName(), System.currentTimeMillis());
}
return Status.OK;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment