diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/AbstractArchiverProcessingPlugin.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/AbstractArchiverProcessingPlugin.java index 14ffe4d38e66533259dad0b6461b8c07e900a8f1..ff3510284eedebb0ffd9e91eb859ef7303c26e03 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/AbstractArchiverProcessingPlugin.java +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/AbstractArchiverProcessingPlugin.java @@ -22,8 +22,8 @@ import static ch.systemsx.cisd.openbis.generic.shared.basic.dto.DataSetArchiving import java.io.File; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Properties; import java.util.Set; @@ -83,6 +83,8 @@ public abstract class AbstractArchiverProcessingPlugin extends AbstractDatastore public static final String SYNCHRONIZE_ARCHIVE = "synchronize-archive"; + public static final String BATCH_SIZE_IN_BYTES = "batch-size-in-bytes"; + private final IStatusChecker archivePrerequisiteOrNull; private final IStatusChecker unarchivePrerequisiteOrNull; @@ -95,6 +97,11 @@ public abstract class AbstractArchiverProcessingPlugin extends AbstractDatastore transient IDataSetStatusUpdater statusUpdater; + /** + * Total size in bytes of data sets processed in a single batch of archiver. + */ + private final int maximumBatchSizeInBytes; + public AbstractArchiverProcessingPlugin(Properties properties, File storeRoot, IStatusChecker archivePrerequisiteOrNull, IStatusChecker unarchivePrerequisiteOrNull) { @@ -102,6 +109,7 @@ public abstract class AbstractArchiverProcessingPlugin extends AbstractDatastore this.archivePrerequisiteOrNull = archivePrerequisiteOrNull; this.unarchivePrerequisiteOrNull = unarchivePrerequisiteOrNull; this.synchronizeArchive = PropertyUtils.getBoolean(properties, SYNCHRONIZE_ARCHIVE, true); + this.maximumBatchSizeInBytes = PropertyUtils.getInt(properties, BATCH_SIZE_IN_BYTES, 1024 * 1024 * 1024); } /** @@ -143,17 +151,53 @@ public abstract class AbstractArchiverProcessingPlugin extends AbstractDatastore DatasetProcessingStatuses finalstatuses = new DatasetProcessingStatuses(); - for (DatasetDescription singleDatset : datasets) + try + { + initializeDatasetSizesIfNeeded(datasets); + } catch (Throwable t) { - List<DatasetDescription> singleBatch = Collections.singletonList(singleDatset); + String errorMessage = "Archiving failed to calculate dataset sizes:" + t.getMessage(); + operationLog.error(errorMessage, t); + Status errorStatus = Status.createError(errorMessage); + return createStatuses(errorStatus, datasets, Operation.ARCHIVE).getProcessingStatus(); + } - archiveSingleBatch(context, removeFromDataStore, finalstatuses, singleBatch); + for (List<DatasetDescription> datasetGroup : splitIntoGroups(datasets, maximumBatchSizeInBytes)) + { + DatasetProcessingStatuses statuses = archiveSingleBatch(context, removeFromDataStore, finalstatuses, datasetGroup); + finalstatuses.addResults(statuses); } return finalstatuses.getProcessingStatus(); } - private void archiveSingleBatch(final ArchiverTaskContext context, boolean removeFromDataStore, DatasetProcessingStatuses finalstatuses, + private List<List<DatasetDescription>> splitIntoGroups(List<DatasetDescription> datasets, long minGroupSize) + { + List<List<DatasetDescription>> results = new LinkedList<List<DatasetDescription>>(); + + List<DatasetDescription> currentResult = new LinkedList<DatasetDescription>(); + + long runningSum = 0; + for (DatasetDescription dataset : datasets) + { + currentResult.add(dataset); + runningSum += minGroupSize; + if (runningSum > minGroupSize) + { + results.add(currentResult); + runningSum = 0; + currentResult = new LinkedList<DatasetDescription>(); + } + } + if (false == currentResult.isEmpty()) + { + results.add(currentResult); + } + return results; + } + + private DatasetProcessingStatuses archiveSingleBatch(final ArchiverTaskContext context, boolean removeFromDataStore, + DatasetProcessingStatuses finalstatuses, List<DatasetDescription> singleBatch) { DatasetProcessingStatuses statuses = safeArchive(singleBatch, context, removeFromDataStore); @@ -163,7 +207,7 @@ public abstract class AbstractArchiverProcessingPlugin extends AbstractDatastore asyncUpdateStatuses(statuses.getSuccessfulDatasetCodes(), successStatus, true); asyncUpdateStatuses(statuses.getFailedDatasetCodes(), AVAILABLE, false); - finalstatuses.addResults(statuses); + return statuses; } private void initializeDatasetSizesIfNeeded(List<DatasetDescription> datasets) @@ -198,7 +242,6 @@ public abstract class AbstractArchiverProcessingPlugin extends AbstractDatastore { try { - initializeDatasetSizesIfNeeded(datasets); statuses = unsafeArchive(datasets, context, removeFromDataStore); } catch (Throwable t) { diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/DataSetFileOperationsManager.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/DataSetFileOperationsManager.java index 1bb5c1d17092eb76b5cbb92fc9f98f913bd694db..79ef2e643f4bf85a43b8c70d0e588ab3b465abf8 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/DataSetFileOperationsManager.java +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/DataSetFileOperationsManager.java @@ -44,8 +44,7 @@ import ch.systemsx.cisd.openbis.generic.shared.basic.dto.IDatasetLocation; import ch.systemsx.cisd.openbis.generic.shared.dto.DatasetDescription; /** - * Code based on LocalAndRemoteCopier, able to copy dataset files both ways: to and from - * destination. + * Code based on LocalAndRemoteCopier, able to copy dataset files both ways: to and from destination. * * @author Piotr Buczek */ @@ -54,10 +53,10 @@ public class DataSetFileOperationsManager implements IDataSetFileOperationsManag private static interface IDeleteAction { String getName(); + void delete(File dataSetFolder, String dataSetCode); } - private final static Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, DataSetFileOperationsManager.class); @@ -150,8 +149,8 @@ public class DataSetFileOperationsManager implements IDataSetFileOperationsManag } /** - * Copies specified dataset's data to destination specified in constructor. The path at the - * destination is defined by the original location of the data set. + * Copies specified dataset's data to destination specified in constructor. The path at the destination is defined by the original location of the + * data set. */ @Override public Status copyToDestination(File originalData, DatasetDescription dataset) @@ -179,8 +178,8 @@ public class DataSetFileOperationsManager implements IDataSetFileOperationsManag } /** - * Retrieves specified datases's data from the destination specified in constructor. The path at - * the destination is defined by original location of the data set. + * Retrieves specified datases's data from the destination specified in constructor. The path at the destination is defined by original location + * of the data set. */ @Override public Status retrieveFromDestination(File originalData, DatasetDescription dataset) @@ -202,8 +201,8 @@ public class DataSetFileOperationsManager implements IDataSetFileOperationsManag } /** - * Deletes specified datases's data from the destination specified in constructor. The path at - * the destination is defined by original location of the data set. + * Deletes specified datases's data from the destination specified in constructor. The path at the destination is defined by original location of + * the data set. */ @Override public Status deleteFromDestination(IDatasetLocation dataset) @@ -268,10 +267,10 @@ public class DataSetFileOperationsManager implements IDataSetFileOperationsManag return ex.getStatus(); } } - + /** - * Checks if specified dataset's data are present and synchronized in the destination specified - * in constructor. The path at the destination is defined by original location of the data set. + * Checks if specified dataset's data are present and synchronized in the destination specified in constructor. The path at the destination is + * defined by original location of the data set. */ @Override public BooleanStatus isSynchronizedWithDestination(File originalData, DatasetDescription dataset) @@ -293,8 +292,8 @@ public class DataSetFileOperationsManager implements IDataSetFileOperationsManag } /** - * Checks if specified dataset's data are present in the destination specified in constructor. - * The path at the destination is defined by original location of the data set. + * Checks if specified dataset's data are present in the destination specified in constructor. The path at the destination is defined by original + * location of the data set. */ @Override public BooleanStatus isPresentInDestination(DatasetDescription dataset) diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/RsyncArchiver.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/RsyncArchiver.java index 59833a8c79c5ff8594c049118a580aafc4113baa..a4ffcd215cac2a2a25325879a089f60fc1a9c621 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/RsyncArchiver.java +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/RsyncArchiver.java @@ -37,8 +37,7 @@ import ch.systemsx.cisd.openbis.generic.shared.basic.dto.IDatasetLocation; import ch.systemsx.cisd.openbis.generic.shared.dto.DatasetDescription; /** - * Archiver plugin which copies data sets to a destination folder using rsync (if it is remote). The - * destination can be + * Archiver plugin which copies data sets to a destination folder using rsync (if it is remote). The destination can be * <ul> * <li>on the local file system, * <li>a mounted remote folder, diff --git a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/AbstractArchiverProcessingPluginTest.java b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/AbstractArchiverProcessingPluginTest.java index 6e957aca81d9aa91d68199b9da167bc55f56874c..3809eb478e03fdf7c62bfe77c5c2370928b34a1d 100644 --- a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/AbstractArchiverProcessingPluginTest.java +++ b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/AbstractArchiverProcessingPluginTest.java @@ -501,7 +501,7 @@ public class AbstractArchiverProcessingPluginTest extends AbstractFileSystemTest assertEquals("[ds1]", simpleArchiver.getSortedDataSetsToBeArchived().toString()); assertEquals("[]", simpleArchiver.getSortedDataSetsToBeDeletedFromArchive().toString()); assertEquals("[]", status.getErrorStatuses().toString()); - assertEquals(null, status.getDatasetsByStatus(Status.createError())); + assertEquals("[]", status.getDatasetsByStatus(Status.createError()).toString()); assertEquals("INFO OPERATION.AbstractDatastorePlugin - " + "Archiving of the following datasets has been requested: [Dataset 'ds1']", logRecorder.getLogContent());