diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/ETLDaemon.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/ETLDaemon.java index df2ec73ebe3ed4c3eeb4bd0be637db9639c5ba79..8b9cb24d7593e917e7aba84e4bfbad9ffe219ff7 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/ETLDaemon.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/ETLDaemon.java @@ -280,8 +280,9 @@ public final class ETLDaemon for (final ThreadParameters threadParameters : threads) { File incomingDataDirectory = threadParameters.getIncomingDataDirectory(); + Integer preferredShareIdOrNull = threadParameters.getPreferredShareId(); String shareId = - SegmentedStoreUtils.findIncomingShare(incomingDataDirectory, storeRootDir, + SegmentedStoreUtils.findIncomingShare(incomingDataDirectory, storeRootDir, preferredShareIdOrNull, new Log4jSimpleLogger(operationLog)); incomingShares.add(shareId); } @@ -487,7 +488,8 @@ public final class ETLDaemon private static String getShareId(final ThreadParameters threadParams, final File storeRoot) { File incomingDirectory = threadParams.getIncomingDataDirectory(); - return SegmentedStoreUtils.findIncomingShare(incomingDirectory, storeRoot, + Integer preferredShareIdOrNull = threadParams.getPreferredShareId(); + return SegmentedStoreUtils.findIncomingShare(incomingDirectory, storeRoot, preferredShareIdOrNull, new Log4jSimpleLogger(operationLog)); } diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/ThreadParameters.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/ThreadParameters.java index b9aa9c08b14bc02f3df0115443eaf2c3af7b6634..5b81a499cd80368d44269637dbb6437490b51d4e 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/ThreadParameters.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/ThreadParameters.java @@ -31,6 +31,7 @@ import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.properties.PropertyUtils; import ch.systemsx.cisd.etlserver.registrator.DataSetRegistrationPreStagingBehavior; +import ch.systemsx.cisd.openbis.dss.generic.shared.utils.SegmentedStoreUtils; /** * <i>ETL</i> thread specific parameters. @@ -96,6 +97,8 @@ public final class ThreadParameters public static final String DATASET_REGISTRATION_PRE_STAGING_BEHAVIOR = "dataset-registration-prestaging-behavior"; + @Private + public static final String PREFERRED_SHARE_ID = "preferred-share-id"; /* * The properties that control the process of retrying registration by jython dropboxes @@ -166,6 +169,8 @@ public final class ThreadParameters private final int minimumRecoveryPeriod; private final DataSetRegistrationPreStagingBehavior dataSetRegistrationPreStagingBehavior; + + private final Integer preferredShareId; /** * @param threadProperties parameters for one processing thread together with general @@ -207,7 +212,8 @@ public final class ThreadParameters "false")); this.dataSetRegistrationPreStagingBehavior = getOriginalnputDataSetBehaviour(threadProperties); - + this.preferredShareId = tryGetPreferredShareId(threadProperties); + boolean developmentMode = PropertyUtils.getBoolean(threadProperties, RECOVERY_DEVELOPMENT_MODE, false); if (developmentMode) @@ -269,6 +275,11 @@ public final class ThreadParameters return retVal; } + public Integer getPreferredShareId() + { + return preferredShareId; + } + // true if marker file should be used, false if autodetection should be used, exceprion when the // value is invalid. private static boolean parseCompletenessCondition(String completenessCondition) @@ -363,6 +374,21 @@ public final class ThreadParameters } return paths; } + + @Private + static final Integer tryGetPreferredShareId(final Properties properties) + { + String shareId = PropertyUtils.getProperty(properties, PREFERRED_SHARE_ID); + if(StringUtils.isBlank(shareId)) + { + return null; + } + if(SegmentedStoreUtils.SHARE_ID_PATTERN.matcher(shareId).matches() == false) + { + throw new ConfigurationFailureException("Invalid preferred share Id:" + shareId); + } + return Integer.parseInt(shareId); + } private static String nullIfEmpty(String value) { diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtils.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtils.java index bfe6e72fbbdd6616c12682e4b5d3e96625f0a9a8..de27d033fa4a835cad89b64ada30ad7fdf76a55a 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtils.java +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtils.java @@ -82,7 +82,7 @@ public class SegmentedStoreUtils private static final String RSYNC_EXEC = "rsync"; - private static final Pattern SHARE_ID_PATTERN = Pattern.compile("[0-9]+"); + public static final Pattern SHARE_ID_PATTERN = Pattern.compile("[0-9]+"); public static final Long MINIMUM_FREE_SCRATCH_SPACE = FileUtils.ONE_GB; @@ -153,10 +153,18 @@ public class SegmentedStoreUtils * Lists all folders in specified store root directory which match share pattern. */ public static File[] getShares(File storeRootDir) + { + return getShares(storeRootDir, FILTER_ON_SHARES); + } + + /** + * Lists all folders in specified store root directory which match pattern given by filter. + */ + public static File[] getShares(File storeRootDir, FileFilter filter) { File[] files = FileOperations.getMonitoredInstanceForCurrentThread().listFiles(storeRootDir, - FILTER_ON_SHARES); + filter); if (files == null) { throw new ConfigurationFailureException( @@ -170,7 +178,7 @@ public class SegmentedStoreUtils * Returns first the id of the first incoming share folder of specified store root which allows to move a file from specified incoming folder to * the incoming share. */ - public static String findIncomingShare(File incomingFolder, File storeRoot, ISimpleLogger logger) + public static String findIncomingShare(File incomingFolder, File storeRoot, Integer preferredShareIdOrNull, ISimpleLogger logger) { final IFileOperations fileOp = FileOperations.getMonitoredInstanceForCurrentThread(); if (fileOp.isDirectory(incomingFolder) == false) @@ -193,12 +201,37 @@ public class SegmentedStoreUtils "Couldn't create a test file in the following incoming folder: " + incomingFolder, ex); } - File matchingShare = findShare(testFile, storeRoot, logger); + File matchingShare = findShare(testFile, storeRoot, preferredShareIdOrNull, logger); return matchingShare.getName(); } - private static File findShare(File testFile, File storeRoot, ISimpleLogger logger) + private static File findShare(File testFile, File storeRoot, final Integer preferredShareIdOrNull, ISimpleLogger logger) { + if( preferredShareIdOrNull != null) + { + File[] shares = getShares(storeRoot, new FileFilter() + { + @Override + public boolean accept(File pathname) + { + if (FileOperations.getMonitoredInstanceForCurrentThread().isDirectory(pathname) == false) + { + return false; + } + String name = pathname.getName(); + Pattern p = Pattern.compile("\\b" + String.valueOf(preferredShareIdOrNull + "\\b")); + return p.matcher(name).matches(); + } + }); + + if(shares.length == 0) + { + throw new ConfigurationFailureException("Preferred share: " + + preferredShareIdOrNull + " could not be found for the following incoming folder: " + testFile.getParentFile().getAbsolutePath()); + } + return shares[0]; + } + for (File share : getShares(storeRoot)) { File destination = new File(share, testFile.getName()); diff --git a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtilsTest.java b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtilsTest.java index 78834f67c68ce9bcf22a5d50d607ad1aecdab0c5..a685663376def8294c74bb52d44fbcc9b731d3e8 100644 --- a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtilsTest.java +++ b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/shared/utils/SegmentedStoreUtilsTest.java @@ -692,7 +692,7 @@ public class SegmentedStoreUtilsTest extends AbstractFileSystemTestCase FileUtilities.writeToFile(new File(share1, "share.properties"), ShareFactory.WITHDRAW_SHARE_PROP + " = true"); - String share = SegmentedStoreUtils.findIncomingShare(incomingFolder, store, log); + String share = SegmentedStoreUtils.findIncomingShare(incomingFolder, store, null, log); assertEquals("1", share); assertEquals( @@ -712,7 +712,7 @@ public class SegmentedStoreUtilsTest extends AbstractFileSystemTestCase FileUtilities.writeToFile(new File(share1, "share.properties"), ShareFactory.IGNORED_FOR_SHUFFLING_PROP + " = true"); - String share = SegmentedStoreUtils.findIncomingShare(incomingFolder, store, log); + String share = SegmentedStoreUtils.findIncomingShare(incomingFolder, store, null, log); assertEquals("1", share); assertEquals("", log.toString());