diff --git a/common/source/java/ch/systemsx/cisd/common/utilities/IDelegatedActionWithResult.java b/common/source/java/ch/systemsx/cisd/common/utilities/IDelegatedActionWithResult.java new file mode 100644 index 0000000000000000000000000000000000000000..079d404c414353f608eb311168cb2db81e935355 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/utilities/IDelegatedActionWithResult.java @@ -0,0 +1,28 @@ +/* + * Copyright 2009 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.utilities; + +/** + * Use this interface to delegate any kind of action to a different part of code without adding an + * explicit dependency. The action return a result. + * + * @author Tomasz Pylak + */ +public interface IDelegatedActionWithResult<T> +{ + T execute(); +} diff --git a/datastore_server/dist/etc/service.properties b/datastore_server/dist/etc/service.properties index 074cf1ee48c7aebb8d4ea28246c8a2d0466133ed..3255029161d4d88e20f7515f025cc7af8bda42d9 100644 --- a/datastore_server/dist/etc/service.properties +++ b/datastore_server/dist/etc/service.properties @@ -60,6 +60,11 @@ max-retries = 11 # Time (in seconds) to wait after an operation has been timed out before re-trying. failure-interval = 10 +# The period of no write access that needs to pass before an incoming data item is considered +# complete and ready to be processed (in seconds) [default: 300]. +# Valid only when auto-detection method is used to determine if an incoming data are ready to be processed. +# quiet-period = <value in seconds> + # Globally used separator character which separates entities in a data set file name data-set-file-name-entity-separator = _ @@ -74,6 +79,14 @@ inputs=main-thread main-thread.group-code = TEST # The directory to watch for incoming data. main-thread.incoming-dir = data/incoming + +# Determines when the incoming data should be considered complete and ready to be processed. +# Allowed values: +# - auto-detection - when no write access will be detected for a specified 'quite-period' +# - marker-file - when an appropriate marker file for the data exists. +# The default value is 'marker-file'. +main-thread.incoming-data-completeness-condition = marker-file + # The group the samples extracted by this thread belong to. If commented out or empty, then samples # are considered associated to a database instance (not group private). # main-thread.group-code = <change this> diff --git a/datastore_server/etc/service.properties b/datastore_server/etc/service.properties index ef18d20926c67f2f6df786c8a3d497216362bd31..3fe083d39a861513577cedaba8d86f4be107f964 100644 --- a/datastore_server/etc/service.properties +++ b/datastore_server/etc/service.properties @@ -58,6 +58,11 @@ password = doesnotmatter # Globally used separator character which separates entities in a data set file name data-set-file-name-entity-separator = _ +# The period of no write access that needs to pass before an incoming data item is considered +# complete and ready to be processed (in seconds) [default: 300]. +# Valid only when auto-detection method is used to determine if an incoming data are ready to be processed. +quiet-period = 3 + # Comma separated names of processing threads. Each thread should have configuration properties prefixed with its name. # E.g. 'code-extractor' property for the thread 'my-etl' should be specified as 'my-etl.code-extractor' inputs=main-thread @@ -68,6 +73,14 @@ inputs=main-thread # The directory to watch for incoming data. main-thread.incoming-dir = targets/incoming + +# Determines when the incoming data should be considered complete and ready to be processed. +# Allowed values: +# - auto-detection - when no write access will be detected for a specified 'quite-period' +# - marker-file - when an appropriate marker file for the data exists. +# The default value is 'marker-file'. +main-thread.incoming-data-completeness-condition = marker-file + # The group the samples extracted by this thread belong to. If commented out or empty, then samples # are considered associated to a database instance (not group private). # main-thread.group-code = CISD diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/ETLDaemon.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/ETLDaemon.java index 70a74ccce2f34b6241688eae6d6fe14c81db9c62..23ebd9bfcb51bad3d0912a6bf37d4ae8fd50d856 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/ETLDaemon.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/ETLDaemon.java @@ -17,6 +17,7 @@ package ch.systemsx.cisd.etlserver; import java.io.File; +import java.io.FileFilter; import java.io.FilenameFilter; import java.lang.Thread.UncaughtExceptionHandler; import java.util.ArrayList; @@ -28,7 +29,6 @@ import java.util.Timer; import java.util.concurrent.TimeUnit; import org.apache.commons.io.filefilter.FileFilterUtils; -import org.apache.commons.io.filefilter.IOFileFilter; import org.apache.commons.io.filefilter.NameFileFilter; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -45,8 +45,12 @@ import ch.systemsx.cisd.common.filesystem.DirectoryScanningTimerTask; import ch.systemsx.cisd.common.filesystem.FaultyPathDirectoryScanningHandler; import ch.systemsx.cisd.common.filesystem.FileUtilities; import ch.systemsx.cisd.common.filesystem.IDirectoryScanningHandler; +import ch.systemsx.cisd.common.filesystem.IStoreItemFilter; +import ch.systemsx.cisd.common.filesystem.LastModificationChecker; import ch.systemsx.cisd.common.filesystem.PathPrefixPrepender; import ch.systemsx.cisd.common.filesystem.QueueingPathRemoverService; +import ch.systemsx.cisd.common.filesystem.QuietPeriodFileFilter; +import ch.systemsx.cisd.common.filesystem.StoreItem; import ch.systemsx.cisd.common.highwatermark.HighwaterMarkDirectoryScanningHandler; import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.highwatermark.HostAwareFileWithHighwaterMark; @@ -342,13 +346,16 @@ public final class ETLDaemon final TransferredDataSetHandler pathHandler = new TransferredDataSetHandler(threadParameters.tryGetGroupCode(), plugin, authorizedLimsService, mailProperties, highwaterMarkWatcher, - notifySuccessfulRegistration); + notifySuccessfulRegistration, threadParameters.useIsFinishedMarkerFile()); pathHandler.setProcessorFactories(processorFactories); final HighwaterMarkDirectoryScanningHandler directoryScanningHandler = createDirectoryScanningHandler(pathHandler, highwaterMarkWatcher, incomingDataDirectory, storeRootDir, processorFactories.values()); + FileFilter fileFilter = + createFileFilter(incomingDataDirectory, threadParameters.useIsFinishedMarkerFile(), + parameters); final DirectoryScanningTimerTask dataMonitorTask = - createDirectoryScanningTimerTask(incomingDataDirectory, pathHandler, + new DirectoryScanningTimerTask(incomingDataDirectory, fileFilter, pathHandler, directoryScanningHandler); selfTest(incomingDataDirectory, authorizedLimsService, pathHandler); final String timerThreadName = @@ -359,13 +366,38 @@ public final class ETLDaemon threadParameters.getThreadName()); } - private static DirectoryScanningTimerTask createDirectoryScanningTimerTask( - final File incomingDataDirectory, final TransferredDataSetHandler pathHandler, - final HighwaterMarkDirectoryScanningHandler directoryScanningHandler) + private static FileFilter createFileFilter(File incomingDataDirectory, + boolean useIsFinishedMarkerFile, Parameters parameters) { - IOFileFilter filter = FileFilterUtils.prefixFileFilter(Constants.IS_FINISHED_PREFIX); - return new DirectoryScanningTimerTask(incomingDataDirectory, filter, pathHandler, - directoryScanningHandler); + if (useIsFinishedMarkerFile) + { + return FileFilterUtils.prefixFileFilter(Constants.IS_FINISHED_PREFIX); + } else + { + return createQuietPeriodFilter(incomingDataDirectory, parameters); + } + } + + private static FileFilter createQuietPeriodFilter(final File incomingDataDirectory, + Parameters parameters) + { + int ignoredErrorCountBeforeNotification = 3; + LastModificationChecker lastModificationChecker = + new LastModificationChecker(incomingDataDirectory); + final IStoreItemFilter quietPeriodFilter = + new QuietPeriodFileFilter(lastModificationChecker, parameters + .getQuietPeriodMillis(), ignoredErrorCountBeforeNotification); + return new FileFilter() + { + public boolean accept(File pathname) + { + assert pathname.getParentFile().getAbsolutePath().equals( + incomingDataDirectory.getAbsolutePath()) : "The file should come to the filter only from the incoming directory"; + + StoreItem storeItem = new StoreItem(pathname.getName()); + return quietPeriodFilter.accept(storeItem); + } + }; } private static void addShutdownHookForCleanup(final Timer workerTimer, diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/Parameters.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/Parameters.java index edcc089548cafaea4e066df692257eafd99b158c..e5ef7c142db2471cbeb986e9fe85a26ea25da1a1 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/Parameters.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/Parameters.java @@ -48,6 +48,10 @@ import ch.systemsx.cisd.common.utilities.SystemExit; */ public class Parameters { + private static final String CHECK_INTERVAL_NAME = "check-interval"; + + private static final String QUIET_PERIOD_NAME = "quiet-period"; + private static final String SERVICE_PROPERTIES_FILE = "etc/service.properties"; private static final Logger operationLog = @@ -67,10 +71,20 @@ public class Parameters /** * The interval to wait between to checks for activity (in milliseconds). */ - @Option(name = "c", longName = "check-interval", usage = "The interval to wait between two checks (in seconds) " + @Option(name = "c", longName = CHECK_INTERVAL_NAME, usage = "The interval to wait between two checks (in seconds) " + "[default: 120]") private long checkIntervalSeconds; + /** + * Valid only when {@link ThreadParameters#INCOMING_DATA_COMPLETENESS_CONDITION} is false.<br> + * The period to wait before a file or directory is considered "quiet" (in milliseconds). This + * setting is used when deciding whether a file or directory is ready to be processed. + */ + @Option(name = "q", longName = QUIET_PERIOD_NAME, usage = "The period of no write access that needs to pass before an incoming data item is " + + "considered complete and ready to be processed (in seconds) [default: 300]. " + + "Valid only when auto-detection method is used to determine if an incoming data are ready to be processed.") + private long quietPeriodMillis; + /** * The time-out for clean up work in the shutdown sequence (in seconds). * <p> @@ -181,7 +195,8 @@ public class Parameters username = serviceProperties.getProperty("username"); password = serviceProperties.getProperty("password"); checkIntervalSeconds = - Long.parseLong(serviceProperties.getProperty("check-interval", "120")); + Long.parseLong(serviceProperties.getProperty(CHECK_INTERVAL_NAME, "120")); + quietPeriodMillis = Long.parseLong(serviceProperties.getProperty(QUIET_PERIOD_NAME, "300")); shutdownTimeOutSeconds = Long.parseLong(serviceProperties.getProperty("shutdown-timeout", "30")); mailProperties = createMailProperties(serviceProperties); @@ -314,6 +329,11 @@ public class Parameters return checkIntervalSeconds * DateUtils.MILLIS_PER_SECOND; } + public long getQuietPeriodMillis() + { + return quietPeriodMillis * DateUtils.MILLIS_PER_SECOND; + } + /** * Returns the time-out for clean up work in the shutdown sequence (in seconds). * <p> @@ -384,6 +404,11 @@ public class Parameters } operationLog.info(String.format("Check intervall: %d s.", getCheckIntervalMillis() / 1000)); + operationLog + .info(String + .format( + "Quiet period (valid if auto-detection method is used to determine incoming data completeness): %d s.", + getQuietPeriodMillis() / 1000)); } } diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/ThreadParameters.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/ThreadParameters.java index 6caa7f5af41202125fbc2e13dea7b8aca9cf3736..8475ddd53644e2ee04738b2bfe83c162965099f6 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/ThreadParameters.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/ThreadParameters.java @@ -17,7 +17,9 @@ package ch.systemsx.cisd.etlserver; import java.io.File; +import java.util.Arrays; import java.util.Properties; +import java.util.Vector; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; @@ -39,6 +41,16 @@ public final class ThreadParameters @Private static final String GROUP_CODE_KEY = "group-code"; + @Private + static final String INCOMING_DATA_COMPLETENESS_CONDITION = + "incoming-data-completeness-condition"; + + @Private + static final String INCOMING_DATA_COMPLETENESS_CONDITION_MARKER_FILE = "marker-file"; + + @Private + static final String INCOMING_DATA_COMPLETENESS_CONDITION_AUTODETECTION = "auto-detection"; + private static final Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, ThreadParameters.class); @@ -56,6 +68,8 @@ public final class ThreadParameters private final String groupCode; + private final boolean useIsFinishedMarkerFile; + /** * @param threadProperties parameters for one processing thread together with general * parameters. @@ -64,10 +78,36 @@ public final class ThreadParameters { this.incomingDataDirectory = extractIncomingDataDir(threadProperties); this.plugin = new PropertiesBasedETLServerPlugin(threadProperties); - groupCode = tryGetGroupCode(threadProperties); + this.groupCode = tryGetGroupCode(threadProperties); + String completenessCondition = + PropertyUtils.getProperty(threadProperties, INCOMING_DATA_COMPLETENESS_CONDITION, + INCOMING_DATA_COMPLETENESS_CONDITION_MARKER_FILE); + this.useIsFinishedMarkerFile = parseCompletenessCondition(completenessCondition); this.threadName = threadName; } + // true if marker file should be used, false if autodetection should be used, exceprion when the + // value is invalid. + private static boolean parseCompletenessCondition(String completenessCondition) + { + if (completenessCondition + .equalsIgnoreCase(INCOMING_DATA_COMPLETENESS_CONDITION_MARKER_FILE)) + { + return true; + } else if (completenessCondition + .equalsIgnoreCase(INCOMING_DATA_COMPLETENESS_CONDITION_AUTODETECTION)) + { + return false; + } else + { + throw new ConfigurationFailureException(String.format( + "Invalid value '%s' for the option '%s'. Allowed values are: '%s', '%s'.", + completenessCondition, INCOMING_DATA_COMPLETENESS_CONDITION, + INCOMING_DATA_COMPLETENESS_CONDITION_MARKER_FILE, + INCOMING_DATA_COMPLETENESS_CONDITION_AUTODETECTION)); + } + } + final void check() { if (incomingDataDirectory.isDirectory() == false) @@ -93,8 +133,12 @@ public final class ThreadParameters @Private static final String tryGetGroupCode(final Properties properties) { - return StringUtils.defaultIfEmpty(PropertyUtils.getProperty(properties, GROUP_CODE_KEY), - null); + return nullIfEmpty(PropertyUtils.getProperty(properties, GROUP_CODE_KEY)); + } + + private static String nullIfEmpty(String value) + { + return StringUtils.defaultIfEmpty(value, null); } /** @@ -105,6 +149,11 @@ public final class ThreadParameters return groupCode; } + public boolean useIsFinishedMarkerFile() + { + return useIsFinishedMarkerFile; + } + /** * Returns The directory to monitor for incoming data. */ @@ -125,22 +174,30 @@ public final class ThreadParameters { if (operationLog.isInfoEnabled()) { - operationLog.info(String.format("[%s] Code extractor: '%s'", threadName, plugin - .getDataSetInfoExtractor().getClass().getName())); - operationLog.info(String.format("[%s] Type extractor: '%s'", threadName, plugin - .getTypeExtractor().getClass().getName())); - operationLog.info(String.format("[%s] Incoming data directory: '%s'.", threadName, - getIncomingDataDirectory().getAbsolutePath())); + logLine("Code extractor: '%s'", plugin.getDataSetInfoExtractor().getClass().getName()); + logLine("Type extractor: '%s'", plugin.getTypeExtractor().getClass().getName()); + logLine("Incoming data directory: '%s'.", getIncomingDataDirectory().getAbsolutePath()); if (groupCode != null) { - operationLog.info(String.format("[%s] Group code: '%s'.", threadName, groupCode)); + logLine("Group code: '%s'.", groupCode); } + String completenessCond = + useIsFinishedMarkerFile ? "marker file exists" + : "no write access for some period"; + logLine("Condition of incoming data completeness: %s.", completenessCond); } } + private void logLine(String format, Object... params) + { + Vector<Object> allParams = new Vector<Object>(); + allParams.add(threadName); + allParams.addAll(Arrays.asList(params)); + operationLog.info(String.format("[%s] " + format, allParams.toArray(new Object[0]))); + } + public String getThreadName() { return threadName; } - } diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/TransferredDataSetHandler.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/TransferredDataSetHandler.java index 5112dec921751cbc7f80d531d71d10eba9928bd0..f0cb6579050df96171e857b934daf16d06720181 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/TransferredDataSetHandler.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/TransferredDataSetHandler.java @@ -52,6 +52,7 @@ import ch.systemsx.cisd.common.mail.IMailClient; import ch.systemsx.cisd.common.mail.MailClient; import ch.systemsx.cisd.common.types.BooleanOrUnknown; import ch.systemsx.cisd.common.utilities.BeanUtils; +import ch.systemsx.cisd.common.utilities.IDelegatedActionWithResult; import ch.systemsx.cisd.common.utilities.ISelfTestable; import ch.systemsx.cisd.openbis.dss.generic.shared.IEncapsulatedOpenBISService; import ch.systemsx.cisd.openbis.dss.generic.shared.dto.DataSetInformation; @@ -120,6 +121,8 @@ public final class TransferredDataSetHandler implements IPathHandler, ISelfTesta private final boolean notifySuccessfulRegistration; + private final boolean useIsFinishedMarkerFile; + private boolean stopped = false; private Map<String, IProcessorFactory> processorFactories = @@ -127,20 +130,26 @@ public final class TransferredDataSetHandler implements IPathHandler, ISelfTesta private DatabaseInstancePE homeDatabaseInstance; + /** + * @param useIsFinishedMarkerFile if true, file/directory is processed when a marker file for it + * appears. Otherwise processing starts if the file/directory is not modified for a + * certain amount of time (so called "quiet period"). + */ public TransferredDataSetHandler(final String groupCode, final IETLServerPlugin plugin, final IEncapsulatedOpenBISService limsService, final Properties mailProperties, final HighwaterMarkWatcher highwaterMarkWatcher, - final boolean notifySuccessfulRegistration) + final boolean notifySuccessfulRegistration, boolean useIsFinishedMarkerFile) { this(groupCode, plugin.getStorageProcessor(), plugin, limsService, new MailClient( - mailProperties), notifySuccessfulRegistration); + mailProperties), notifySuccessfulRegistration, useIsFinishedMarkerFile); } TransferredDataSetHandler(final String groupCode, final IStoreRootDirectoryHolder storeRootDirectoryHolder, final IETLServerPlugin plugin, final IEncapsulatedOpenBISService limsService, - final IMailClient mailClient, final boolean notifySuccessfulRegistration) + final IMailClient mailClient, final boolean notifySuccessfulRegistration, + boolean useIsFinishedMarkerFile) { assert storeRootDirectoryHolder != null : "Given store root directory holder can not be null."; @@ -159,6 +168,7 @@ public final class TransferredDataSetHandler implements IPathHandler, ISelfTesta this.notifySuccessfulRegistration = notifySuccessfulRegistration; this.registrationLock = new ReentrantLock(); this.fileOperations = FileOperations.getMonitoredInstanceForCurrentThread(); + this.useIsFinishedMarkerFile = useIsFinishedMarkerFile; } public final void setProcessorFactories(final Map<String, IProcessorFactory> processorFactories) @@ -179,13 +189,13 @@ public final class TransferredDataSetHandler implements IPathHandler, ISelfTesta // IPathHandler // - public final void handle(final File isFinishedFile) + public final void handle(final File file) { if (stopped) { return; } - final RegistrationHelper registrationHelper = new RegistrationHelper(isFinishedFile); + final RegistrationHelper registrationHelper = createRegistrationHelper(file); registrationHelper.prepare(); if (registrationHelper.hasDataSetBeenIdentified()) { @@ -248,9 +258,103 @@ public final class TransferredDataSetHandler implements IPathHandler, ISelfTesta // Helper class // + private RegistrationHelper createRegistrationHelper(File file) + { + if (useIsFinishedMarkerFile) + { + return createRegistrationHelperWithIsFinishedFile(file); + } else + { + return createRegistrationHelperWithQuietPeriodFilter(file); + } + } + + private RegistrationHelper createRegistrationHelperWithIsFinishedFile(final File isFinishedFile) + { + assert isFinishedFile != null : "Unspecified is-finished file."; + final String name = isFinishedFile.getName(); + assert name.startsWith(IS_FINISHED_PREFIX) : "A finished file must starts with '" + + IS_FINISHED_PREFIX + "'."; + + File incomingDataSetFile = getIncomingDataSetPathFromMarker(isFinishedFile); + IDelegatedActionWithResult<Boolean> cleanAftrewardsAction = + new IDelegatedActionWithResult<Boolean>() + { + public Boolean execute() + { + return deleteAndLogIsFinishedMarkerFile(isFinishedFile); + } + }; + return new RegistrationHelper(incomingDataSetFile, cleanAftrewardsAction); + } + + private RegistrationHelper createRegistrationHelperWithQuietPeriodFilter( + File incomingDataSetFile) + { + IDelegatedActionWithResult<Boolean> cleanAftrewardsAction = + new IDelegatedActionWithResult<Boolean>() + { + public Boolean execute() + { + return true; // do nothing + } + }; + return new RegistrationHelper(incomingDataSetFile, cleanAftrewardsAction); + } + + /** + * From given <var>isFinishedPath</var> gets the incoming data set path and checks it. + * + * @return <code>null</code> if a problem has happened. Otherwise a useful and usable incoming + * data set path is returned. + */ + private final File getIncomingDataSetPathFromMarker(final File isFinishedPath) + { + final File incomingDataSetPath = + FileUtilities.removePrefixFromFileName(isFinishedPath, IS_FINISHED_PREFIX); + if (operationLog.isDebugEnabled()) + { + operationLog.debug(String.format( + "Getting incoming data set path '%s' from is-finished path '%s'", + incomingDataSetPath, isFinishedPath)); + } + final String errorMsg = + fileOperations.checkPathFullyAccessible(incomingDataSetPath, "incoming data set"); + if (errorMsg != null) + { + fileOperations.delete(isFinishedPath); + throw EnvironmentFailureException.fromTemplate(String.format( + "Error moving path '%s' from '%s' to '%s': %s", incomingDataSetPath.getName(), + incomingDataSetPath.getParent(), storeRootDirectoryHolder + .getStoreRootDirectory(), errorMsg)); + } + return incomingDataSetPath; + } + + private boolean deleteAndLogIsFinishedMarkerFile(File isFinishedFile) + { + if (fileOperations.exists(isFinishedFile) == false) + { + return false; + } + final boolean ok = fileOperations.delete(isFinishedFile); + final String absolutePath = isFinishedFile.getAbsolutePath(); + if (ok == false) + { + notificationLog.error(String.format("Removing file '%s' failed.", absolutePath)); + } else + { + if (operationLog.isDebugEnabled()) + { + operationLog.debug(String.format("File '%s' has been removed.", absolutePath)); + } + } + return ok; + } + private final class RegistrationHelper { - private final File isFinishedFile; + private final IDelegatedActionWithResult<Boolean> cleanAftrewardsAction; private final File incomingDataSetFile; @@ -266,26 +370,24 @@ public final class TransferredDataSetHandler implements IPathHandler, ISelfTesta private String errorMessageTemplate; - RegistrationHelper(final File isFinishedFile) + private RegistrationHelper(File incomingDataSetFile, + IDelegatedActionWithResult<Boolean> cleanAftrewardsAction) { - assert isFinishedFile != null : "Unspecified is-finished file."; - final String name = isFinishedFile.getName(); - assert name.startsWith(IS_FINISHED_PREFIX) : "A finished file must starts with '" - + IS_FINISHED_PREFIX + "'."; - errorMessageTemplate = DATA_SET_STORAGE_FAILURE_TEMPLATE; - this.isFinishedFile = isFinishedFile; - incomingDataSetFile = getIncomingDataSetPath(isFinishedFile); - dataSetInformation = extractDataSetInformation(incomingDataSetFile); + + this.errorMessageTemplate = DATA_SET_STORAGE_FAILURE_TEMPLATE; + this.incomingDataSetFile = incomingDataSetFile; + this.cleanAftrewardsAction = cleanAftrewardsAction; + this.dataSetInformation = extractDataSetInformation(incomingDataSetFile); if (dataSetInformation.getDataSetCode() == null) { // Extractor didn't extract an externally generated data set code, so request one // from the openBIS server. dataSetInformation.setDataSetCode(limsService.createDataSetCode()); } - dataStoreStrategy = + this.dataStoreStrategy = dataStrategyStore.getDataStoreStrategy(dataSetInformation, incomingDataSetFile); - dataSetType = typeExtractor.getDataSetType(incomingDataSetFile); - storeRoot = storageProcessor.getStoreRootDirectory(); + this.dataSetType = typeExtractor.getDataSetType(incomingDataSetFile); + this.storeRoot = storageProcessor.getStoreRootDirectory(); } final void prepare() @@ -319,7 +421,7 @@ public final class TransferredDataSetHandler implements IPathHandler, ISelfTesta operationLog.error("Cannot delete '" + incomingDataSetFile.getAbsolutePath() + "'."); } - deleteAndLogIsFinishedFile(); + clean(); } catch (final Throwable throwable) { rollback(throwable); @@ -361,7 +463,7 @@ public final class TransferredDataSetHandler implements IPathHandler, ISelfTesta writeThrowable(throwable); if (moveInCaseOfErrorOk) { - deleteAndLogIsFinishedFile(); + clean(); } } } @@ -404,8 +506,8 @@ public final class TransferredDataSetHandler implements IPathHandler, ISelfTesta { errorMessageTemplate = DATA_SET_REGISTRATION_FAILURE_TEMPLATE; plainRegisterDataSet(relativePath, availableFormat, isCompleteFlag); - deleteAndLogIsFinishedFile(); - deleteAndLogIsFinishedFile(); + clean(); + clean(); if (processorOrNull == null) { return; @@ -501,7 +603,7 @@ public final class TransferredDataSetHandler implements IPathHandler, ISelfTesta .getTargetFile()); if (ok) { - deleteAndLogIsFinishedFile(); + clean(); } } @@ -568,36 +670,6 @@ public final class TransferredDataSetHandler implements IPathHandler, ISelfTesta } } - /** - * From given <var>isFinishedPath</var> gets the incoming data set path and checks it. - * - * @return <code>null</code> if a problem has happened. Otherwise a useful and usable - * incoming data set path is returned. - */ - private final File getIncomingDataSetPath(final File isFinishedPath) - { - final File incomingDataSetPath = - FileUtilities.removePrefixFromFileName(isFinishedPath, IS_FINISHED_PREFIX); - if (operationLog.isDebugEnabled()) - { - operationLog.debug(String.format( - "Getting incoming data set path '%s' from is-finished path '%s'", - incomingDataSetPath, isFinishedPath)); - } - final String errorMsg = - fileOperations.checkPathFullyAccessible(incomingDataSetPath, - "incoming data set"); - if (errorMsg != null) - { - fileOperations.delete(isFinishedPath); - throw EnvironmentFailureException.fromTemplate(String.format( - "Error moving path '%s' from '%s' to '%s': %s", incomingDataSetPath - .getName(), incomingDataSetPath.getParent(), - storeRootDirectoryHolder.getStoreRootDirectory(), errorMsg)); - } - return incomingDataSetPath; - } - /** * From given <var>incomingDataSetPath</var> extracts a <code>DataSetInformation</code>. * @@ -717,25 +789,9 @@ public final class TransferredDataSetHandler implements IPathHandler, ISelfTesta } } - private boolean deleteAndLogIsFinishedFile() + private boolean clean() { - if (fileOperations.exists(isFinishedFile) == false) - { - return false; - } - final boolean ok = fileOperations.delete(isFinishedFile); - final String absolutePath = isFinishedFile.getAbsolutePath(); - if (ok == false) - { - notificationLog.error(String.format("Removing file '%s' failed.", absolutePath)); - } else - { - if (operationLog.isDebugEnabled()) - { - operationLog.debug(String.format("File '%s' has been removed.", absolutePath)); - } - } - return ok; + return cleanAftrewardsAction.execute(); } } diff --git a/datastore_server/sourceTest/java/ch/systemsx/cisd/etlserver/TransferredDataSetHandlerTest.java b/datastore_server/sourceTest/java/ch/systemsx/cisd/etlserver/TransferredDataSetHandlerTest.java index 4891b8b2a3ae2a20b8df1024361ed0552179ff65..f5ca427307cf5bddd05725a0b2cb7821f5fa57e5 100644 --- a/datastore_server/sourceTest/java/ch/systemsx/cisd/etlserver/TransferredDataSetHandlerTest.java +++ b/datastore_server/sourceTest/java/ch/systemsx/cisd/etlserver/TransferredDataSetHandlerTest.java @@ -271,7 +271,7 @@ public final class TransferredDataSetHandlerTest extends AbstractFileSystemTestC new EncapsulatedOpenBISService(new SessionTokenManager(), limsService, 0, "u", "p"); handler = new TransferredDataSetHandler(null, storageProcessor, plugin, - authorizedLimsService, mailClient, true); + authorizedLimsService, mailClient, true, true); handler.setProcessorFactories(map); dataSetInformation = new DataSetInformation();