From a72d2bd27b5ef58ae4182f0b3ccedfb80e63aafd Mon Sep 17 00:00:00 2001 From: tpylak <tpylak> Date: Thu, 27 May 2010 10:47:03 +0000 Subject: [PATCH] LMS-1548 configure high-watermark checker, improve performance SVN: 16151 --- .../server/ProcessDatasetsCommand.java | 41 +++-- .../server/plugins/demo/DemoArchiver.java | 16 +- .../AbstractArchiverProcessingPlugin.java | 164 ++++++++++++------ .../standard/HighWaterMarkChecker.java | 19 +- .../plugins/standard/IStatusChecker.java | 23 ++- .../plugins/tasks/ProcessingStatus.java | 19 +- .../standard/DataSetCopierForUsersTest.java | 48 ++--- .../plugins/standard/DataSetCopierTest.java | 45 +++-- rtd_yeastx/etc/service.properties | 52 +++++- .../cisd/yeastx/etl/MLArchiverTask.java | 105 +++++++++-- 10 files changed, 378 insertions(+), 154 deletions(-) diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/ProcessDatasetsCommand.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/ProcessDatasetsCommand.java index b4a5770d615..ce52ab10dce 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/ProcessDatasetsCommand.java +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/ProcessDatasetsCommand.java @@ -17,6 +17,7 @@ package ch.systemsx.cisd.openbis.dss.generic.server; import java.io.File; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -51,7 +52,7 @@ public class ProcessDatasetsCommand implements IDataSetCommand private final List<DatasetDescription> datasets; private final Map<String, String> parameterBindings; - + private final String userEmailOrNull; private final DatastoreServiceDescription serviceDescription; @@ -149,7 +150,18 @@ public class ProcessDatasetsCommand implements IDataSetCommand private String getDescription(String prefix) { - return String.format("%s on data set(s): \n%s", prefix, getDataSetCodes(datasets)); + return String.format("%s on %d data set(s): \n%s", prefix, datasets.size(), + getDataSetCodes(asCodes(datasets))); + } + + private static List<String> asCodes(List<DatasetDescription> datasets) + { + List<String> codes = new ArrayList<String>(); + for (DatasetDescription dataset : datasets) + { + codes.add(dataset.getDatasetCode()); + } + return codes; } public String getDescription() @@ -157,7 +169,7 @@ public class ProcessDatasetsCommand implements IDataSetCommand return getDescription(getShortDescription("")); } - private static String getDataSetCodes(List<DatasetDescription> datasets) + private static String getDataSetCodes(List<String> datasets) { if (datasets.isEmpty()) { @@ -165,9 +177,9 @@ public class ProcessDatasetsCommand implements IDataSetCommand } else { final StringBuilder sb = new StringBuilder(); - for (DatasetDescription dataset : datasets) + for (String dataset : datasets) { - sb.append(dataset.getDatasetCode()); + sb.append(dataset); sb.append(','); } sb.setLength(sb.length() - 1); @@ -178,20 +190,27 @@ public class ProcessDatasetsCommand implements IDataSetCommand private static String generateDescription(ProcessingStatus processingStatus) { StringBuilder sb = new StringBuilder(); - sb.append("Data sets:\n"); - List<DatasetDescription> successfullyProcessed = - processingStatus.getDatasetsByStatus(Status.OK); + sb + .append("This is an automatically generated report from the completed processing of data sets in openBIS.\n"); + List<String> successfullyProcessed = processingStatus.getDatasetsByStatus(Status.OK); if (successfullyProcessed != null && successfullyProcessed.isEmpty() == false) { - sb.append("- successfully processed: "); + sb.append("- number of successfully processed data sets: "); + sb.append(successfullyProcessed.size()); + sb.append(". Datasets: "); sb.append(getDataSetCodes(successfullyProcessed)); sb.append("\n"); } List<Status> errorStatuses = processingStatus.getErrorStatuses(); for (Status errorStatus : errorStatuses) { - sb.append("- " + errorStatus.tryGetErrorMessage() + ": "); - sb.append(getDataSetCodes(processingStatus.getDatasetsByStatus(errorStatus))); + List<String> unsuccessfullyProcessed = + processingStatus.getDatasetsByStatus(errorStatus); + sb.append("- processing of "); + sb.append(unsuccessfullyProcessed.size()); + sb.append(" data set(s) failed because: "); + sb.append(" " + errorStatus.tryGetErrorMessage() + ". Datasets: "); + sb.append(getDataSetCodes(unsuccessfullyProcessed)); sb.append("\n"); } return sb.toString(); diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/demo/DemoArchiver.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/demo/DemoArchiver.java index 4e6b846d4ce..ff94d320768 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/demo/DemoArchiver.java +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/demo/DemoArchiver.java @@ -17,8 +17,10 @@ package ch.systemsx.cisd.openbis.dss.generic.server.plugins.demo; import java.io.File; +import java.util.List; import java.util.Properties; +import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.exceptions.UserFailureException; import ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.AbstractArchiverProcessingPlugin; import ch.systemsx.cisd.openbis.generic.shared.dto.DatasetDescription; @@ -33,20 +35,22 @@ public class DemoArchiver extends AbstractArchiverProcessingPlugin public DemoArchiver(Properties properties, File storeRoot) { super(properties, storeRoot, null, null); - // NOTE using HighWaterMarkChecker before archiving every dataset degrades performance - // super(properties, storeRoot, new HighWaterMarkChecker(storeRoot), new HighWaterMarkChecker(storeRoot); } @Override - protected void archive(DatasetDescription dataset) throws UserFailureException + protected DatasetProcessingStatuses doArchive(List<DatasetDescription> datasets) + throws UserFailureException { - System.out.println("DemoArchiver - Archived: " + dataset); + System.out.println("DemoArchiver - Archived: " + datasets); + return createStatusesFrom(Status.OK, datasets, true); } @Override - protected void unarchive(DatasetDescription dataset) throws UserFailureException + protected DatasetProcessingStatuses doUnarchive(List<DatasetDescription> datasets) + throws UserFailureException { - System.out.println("DemoArchiver - Unarchived: " + dataset); + System.out.println("DemoArchiver - Unarchived: " + datasets); + return createStatusesFrom(Status.OK, datasets, false); } } diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/AbstractArchiverProcessingPlugin.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/AbstractArchiverProcessingPlugin.java index 7b33fdcbc96..22b6e959174 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/AbstractArchiverProcessingPlugin.java +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/AbstractArchiverProcessingPlugin.java @@ -28,7 +28,6 @@ import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.exceptions.UserFailureException; import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; -import ch.systemsx.cisd.openbis.dss.generic.server.ProcessDatasetsCommand; import ch.systemsx.cisd.openbis.dss.generic.server.plugins.tasks.IArchiverTask; import ch.systemsx.cisd.openbis.dss.generic.server.plugins.tasks.ProcessingStatus; import ch.systemsx.cisd.openbis.dss.generic.shared.QueueingDataSetStatusUpdaterService; @@ -45,8 +44,8 @@ public abstract class AbstractArchiverProcessingPlugin extends AbstractDatastore IArchiverTask { - private static final Logger operationLog = - LogFactory.getLogger(LogCategory.OPERATION, ProcessDatasetsCommand.class); + protected static final Logger operationLog = + LogFactory.getLogger(LogCategory.OPERATION, AbstractArchiverProcessingPlugin.class); private static final long serialVersionUID = 1L; @@ -62,9 +61,11 @@ public abstract class AbstractArchiverProcessingPlugin extends AbstractDatastore this.unarchivePrerequisiteOrNull = unarchivePrerequisiteOrNull; } - abstract protected void archive(DatasetDescription dataset) throws UserFailureException; + abstract protected DatasetProcessingStatuses doArchive(List<DatasetDescription> datasets) + throws UserFailureException; - abstract protected void unarchive(DatasetDescription dataset) throws UserFailureException; + abstract protected DatasetProcessingStatuses doUnarchive(List<DatasetDescription> datasets) + throws UserFailureException; public ProcessingStatus archive(List<DatasetDescription> datasets) { @@ -73,27 +74,17 @@ public abstract class AbstractArchiverProcessingPlugin extends AbstractDatastore return handleDatasets(datasets, DataSetArchivingStatus.ARCHIVED, DataSetArchivingStatus.AVAILABLE, new IDatasetDescriptionHandler() { - - public Status handle(DatasetDescription dataset) + public DatasetProcessingStatuses handle(List<DatasetDescription> allDatasets) { - if (archivePrerequisiteOrNull != null) - { - Status status = archivePrerequisiteOrNull.check(); - if (status.isError()) - { - return status; - } - } - try + Status prerequisiteStatus = checkArchivePrerequisite(allDatasets); + if (prerequisiteStatus.isError()) { - archive(dataset); - } catch (UserFailureException ex) + return createStatusesFrom(prerequisiteStatus, allDatasets, true); + } else { - return Status.createError(ex.getMessage()); + return doArchive(allDatasets); } - return Status.OK; } - }); } @@ -104,49 +95,122 @@ public abstract class AbstractArchiverProcessingPlugin extends AbstractDatastore return handleDatasets(datasets, DataSetArchivingStatus.AVAILABLE, DataSetArchivingStatus.ARCHIVED, new IDatasetDescriptionHandler() { - - public Status handle(DatasetDescription dataset) + public DatasetProcessingStatuses handle(List<DatasetDescription> allDatasets) { - if (unarchivePrerequisiteOrNull != null) - { - Status status = unarchivePrerequisiteOrNull.check(); - if (status.isError()) - { - return status; - } - } - try + Status prerequisiteStatus = checkUnarchivePrerequisite(allDatasets); + if (prerequisiteStatus.isError()) { - unarchive(dataset); - } catch (UserFailureException ex) + return createStatusesFrom(prerequisiteStatus, allDatasets, false); + } else { - return Status - .createError(ex.getMessage() == null ? "unknown reason" - : ex.getMessage()); + return doUnarchive(allDatasets); } - return Status.OK; } - }); } + protected final Status checkUnarchivePrerequisite(List<DatasetDescription> datasets) + { + if (unarchivePrerequisiteOrNull != null) + { + return unarchivePrerequisiteOrNull.check(datasets.size()); + } else + { + return Status.OK; + } + } + + protected final Status checkArchivePrerequisite(List<DatasetDescription> datasets) + { + if (archivePrerequisiteOrNull != null) + { + return archivePrerequisiteOrNull.check(datasets.size()); + } else + { + return Status.OK; + } + } + + protected static class DatasetProcessingStatuses + { + private final List<String> successfulDatasetCodes; + + private final List<String> failedDatasetCodes; + + private final ProcessingStatus processingStatus; + + public DatasetProcessingStatuses() + { + this.successfulDatasetCodes = new ArrayList<String>(); + this.failedDatasetCodes = new ArrayList<String>(); + this.processingStatus = new ProcessingStatus(); + } + + public void addResult(String datasetCode, Status status, boolean isArchiving) + { + String logMessage = createLogMessage(datasetCode, status, isArchiving); + if (status.isError()) + { + operationLog.error(logMessage); + failedDatasetCodes.add(datasetCode); + } else + { + operationLog.debug(logMessage); + successfulDatasetCodes.add(datasetCode); + } + processingStatus.addDatasetStatus(datasetCode, status); + } + + private String createLogMessage(String datasetCode, Status status, boolean isArchiving) + { + String operationDescription = isArchiving ? "Archiving" : "Unarchiving"; + return String.format("%s for dataset %s finished with the status: %s.", + operationDescription, datasetCode, status); + } + + public List<String> getSuccessfulDatasetCodes() + { + return successfulDatasetCodes; + } + + public List<String> getFailedDatasetCodes() + { + return failedDatasetCodes; + } + + public ProcessingStatus getProcessingStatus() + { + return processingStatus; + } + } + private ProcessingStatus handleDatasets(List<DatasetDescription> datasets, DataSetArchivingStatus success, DataSetArchivingStatus failure, IDatasetDescriptionHandler handler) { - final ProcessingStatus result = new ProcessingStatus(); - List<String> successful = new ArrayList<String>(); - List<String> failed = new ArrayList<String>(); + DatasetProcessingStatuses statuses = handler.handle(datasets); + asyncUpdateStatuses(statuses, success, failure); + return statuses.getProcessingStatus(); + } + + // creates the same status for all datasets + protected final static DatasetProcessingStatuses createStatusesFrom(Status status, + List<DatasetDescription> datasets, boolean isArchiving) + { + DatasetProcessingStatuses statuses = new DatasetProcessingStatuses(); for (DatasetDescription dataset : datasets) { - Status status = handler.handle(dataset); - List<String> codes = status.isError() ? failed : successful; - codes.add(dataset.getDatasetCode()); - result.addDatasetStatus(dataset, status); + statuses.addResult(dataset.getDatasetCode(), status, isArchiving); } - asyncUpdateStatuses(successful, success); - asyncUpdateStatuses(failed, failure); - return result; + return statuses; + } + + private void asyncUpdateStatuses(DatasetProcessingStatuses statuses, + DataSetArchivingStatus success, DataSetArchivingStatus failure) + { + asyncUpdateStatuses(statuses.getSuccessfulDatasetCodes(), success); + asyncUpdateStatuses(statuses.getFailedDatasetCodes(), failure); + } private static void asyncUpdateStatuses(List<String> dataSetCodes, @@ -158,7 +222,7 @@ public abstract class AbstractArchiverProcessingPlugin extends AbstractDatastore private interface IDatasetDescriptionHandler { - public Status handle(DatasetDescription dataset); + public DatasetProcessingStatuses handle(List<DatasetDescription> datasets); } } diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/HighWaterMarkChecker.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/HighWaterMarkChecker.java index 383177430a1..848cc1a41fc 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/HighWaterMarkChecker.java +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/HighWaterMarkChecker.java @@ -6,10 +6,7 @@ import java.io.Serializable; import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher; import ch.systemsx.cisd.common.highwatermark.HostAwareFile; -import ch.systemsx.cisd.common.highwatermark.HostAwareFileWithHighwaterMark; import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher.HighwaterMarkState; -import ch.systemsx.cisd.common.utilities.PropertyUtils; -import ch.systemsx.cisd.openbis.dss.generic.shared.utils.DssPropertyParametersUtil; /** * Checks if the space available is larger than specified value. @@ -20,29 +17,19 @@ public class HighWaterMarkChecker implements IStatusChecker, Serializable { private static final long serialVersionUID = 1L; - private final long highWaterMark; + private final long highWaterMark; // amount of free space per item in KB private final File highWaterMarkPath; - /** - * Loads the high water mark value from global properties - - * {@link HostAwareFileWithHighwaterMark#HIGHWATER_MARK_PROPERTY_KEY}. - */ - public HighWaterMarkChecker(File path) - { - this(PropertyUtils.getLong(DssPropertyParametersUtil.loadServiceProperties(), - HostAwareFileWithHighwaterMark.HIGHWATER_MARK_PROPERTY_KEY, -1L), path); - } - public HighWaterMarkChecker(long archiveHighWaterMark, File archiveHighWaterMarkPath) { this.highWaterMark = archiveHighWaterMark; this.highWaterMarkPath = archiveHighWaterMarkPath; } - public Status check() + public Status check(int numberOfItems) { - HighwaterMarkWatcher w = new HighwaterMarkWatcher(highWaterMark); + HighwaterMarkWatcher w = new HighwaterMarkWatcher(highWaterMark * numberOfItems); HighwaterMarkState state = w.getHighwaterMarkState(new HostAwareFile(highWaterMarkPath)); if (HighwaterMarkWatcher.isBelow(state)) { diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/IStatusChecker.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/IStatusChecker.java index ad708037c9f..8096fe2a0e4 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/IStatusChecker.java +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/IStatusChecker.java @@ -1,13 +1,28 @@ +/* + * Copyright 2010 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard; import ch.systemsx.cisd.common.exceptions.Status; /** - * Checks the status. - * * @author Izabela Adamczyk */ public interface IStatusChecker { - Status check(); -} \ No newline at end of file + Status check(int size); + +} diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/tasks/ProcessingStatus.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/tasks/ProcessingStatus.java index 805dc09bd9c..0119297f29a 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/tasks/ProcessingStatus.java +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/tasks/ProcessingStatus.java @@ -32,18 +32,18 @@ import ch.systemsx.cisd.openbis.generic.shared.dto.DatasetDescription; public class ProcessingStatus { - private Map<Status, List<DatasetDescription>> datasetByStatus = - new LinkedHashMap<Status, List<DatasetDescription>>(); + private Map<Status, List<String/* dataset code */>> datasetByStatus = + new LinkedHashMap<Status, List<String>>(); - public void addDatasetStatus(DatasetDescription dataset, Status status) + public void addDatasetStatus(String datasetCode, Status status) { - List<DatasetDescription> datasets = datasetByStatus.get(status); + List<String> datasets = datasetByStatus.get(status); if (datasets == null) { - datasets = new ArrayList<DatasetDescription>(); + datasets = new ArrayList<String>(); datasetByStatus.put(status, datasets); } - datasets.add(dataset); + datasets.add(datasetCode); } public List<Status> getErrorStatuses() @@ -53,9 +53,14 @@ public class ProcessingStatus return result; } - public List<DatasetDescription> getDatasetsByStatus(Status status) + public List<String/* dataset code */> getDatasetsByStatus(Status status) { return datasetByStatus.get(status); } + public void addDatasetStatus(DatasetDescription dataset, Status status) + { + addDatasetStatus(dataset.getDatasetCode(), status); + } + } diff --git a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/DataSetCopierForUsersTest.java b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/DataSetCopierForUsersTest.java index b399f9365a7..0a3b8158076 100644 --- a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/DataSetCopierForUsersTest.java +++ b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/DataSetCopierForUsersTest.java @@ -20,6 +20,7 @@ import static ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.DataS import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -37,17 +38,11 @@ import ch.systemsx.cisd.base.tests.AbstractFileSystemTestCase; import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.filesystem.IPathCopier; import ch.systemsx.cisd.common.filesystem.ssh.ISshCommandExecutor; -import ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.DataSetCopier; -import ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.DataSetCopierForUsers; -import ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.IPathCopierFactory; -import ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.ISshCommandExecutorFactory; import ch.systemsx.cisd.openbis.dss.generic.server.plugins.tasks.ProcessingStatus; import ch.systemsx.cisd.openbis.generic.shared.Constants; import ch.systemsx.cisd.openbis.generic.shared.dto.DatasetDescription; /** - * - * * @author Franz-Josef Elmer */ @Friend(toClasses = DataSetCopier.class) @@ -138,7 +133,8 @@ public class DataSetCopierForUsersTest extends AbstractFileSystemTestCase DataSetCopier dataSetCopier = new DataSetCopierForUsers(properties, storeRoot, pathFactory, sshFactory); - ProcessingStatus processingStatus = dataSetCopier.process(Arrays.asList(ds), parameterBindings); + ProcessingStatus processingStatus = + dataSetCopier.process(Arrays.asList(ds), parameterBindings); assertNoErrors(processingStatus); assertSuccessful(processingStatus, ds); @@ -151,23 +147,24 @@ public class DataSetCopierForUsersTest extends AbstractFileSystemTestCase properties.setProperty(DESTINATION_KEY, "tmp/test"); prepareCreateAndCheckCopier(); context.checking(new Expectations() - { { - File canonicalFile = getCanonicalFile("tmp/test"); - one(copier).copyToRemote(dsData, canonicalFile, null, null, null); - will(returnValue(Status.OK)); - } - }); + { + File canonicalFile = getCanonicalFile("tmp/test"); + one(copier).copyToRemote(dsData, canonicalFile, null, null, null); + will(returnValue(Status.OK)); + } + }); DataSetCopier dataSetCopier = - new DataSetCopierForUsers(properties, storeRoot, pathFactory, sshFactory); - - ProcessingStatus processingStatus = dataSetCopier.process(Arrays.asList(ds), parameterBindings); + new DataSetCopierForUsers(properties, storeRoot, pathFactory, sshFactory); + + ProcessingStatus processingStatus = + dataSetCopier.process(Arrays.asList(ds), parameterBindings); assertNoErrors(processingStatus); assertSuccessful(processingStatus, ds); - + context.assertIsSatisfied(); } - + private void prepareCreateAndCheckCopier() { context.checking(new Expectations() @@ -199,10 +196,19 @@ public class DataSetCopierForUsersTest extends AbstractFileSystemTestCase private void checkStatus(ProcessingStatus processingStatus, Status status, DatasetDescription... expectedDatasets) { - final List<DatasetDescription> actualDatasets = - processingStatus.getDatasetsByStatus(status); + final List<String> actualDatasets = processingStatus.getDatasetsByStatus(status); assertEquals(expectedDatasets.length, actualDatasets.size()); - assertTrue(actualDatasets.containsAll(Arrays.asList(expectedDatasets))); + assertTrue(actualDatasets.containsAll(asCodes(Arrays.asList(expectedDatasets)))); + } + + private static List<String> asCodes(List<DatasetDescription> datasets) + { + List<String> codes = new ArrayList<String>(); + for (DatasetDescription dataset : datasets) + { + codes.add(dataset.getDatasetCode()); + } + return codes; } private File getCanonicalFile(String fileName) diff --git a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/DataSetCopierTest.java b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/DataSetCopierTest.java index dfdf264cf3b..da7d1f5155b 100644 --- a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/DataSetCopierTest.java +++ b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/DataSetCopierTest.java @@ -21,6 +21,7 @@ import static ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.DataS import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; @@ -128,7 +129,7 @@ public class DataSetCopierTest extends AbstractFileSystemTestCase ds4Data = new File(ds4Folder, "existing"); ds4Data.mkdirs(); } - + private DatasetDescription createDataSetDescription(String dataSetCode, String location) { DatasetDescription description = new DatasetDescription(); @@ -205,8 +206,9 @@ public class DataSetCopierTest extends AbstractFileSystemTestCase { properties.setProperty(DESTINATION_KEY, "host:tmp/test"); prepareCreateAndCheckCopier("host", null, 1, false); - DataSetCopier dataSetCopier = new DataSetCopier(properties, storeRoot, pathFactory, sshFactory); - + DataSetCopier dataSetCopier = + new DataSetCopier(properties, storeRoot, pathFactory, sshFactory); + try { dataSetCopier.process(Arrays.asList(ds1), null); @@ -225,8 +227,9 @@ public class DataSetCopierTest extends AbstractFileSystemTestCase properties.setProperty(DESTINATION_KEY, "host:abc:tmp/test"); properties.setProperty(RSYNC_PASSWORD_FILE_KEY, "abc-password"); prepareCreateAndCheckCopier("host", "abc", 1, false); - DataSetCopier dataSetCopier = new DataSetCopier(properties, storeRoot, pathFactory, sshFactory); - + DataSetCopier dataSetCopier = + new DataSetCopier(properties, storeRoot, pathFactory, sshFactory); + try { dataSetCopier.process(Arrays.asList(ds1), null); @@ -441,15 +444,18 @@ public class DataSetCopierTest extends AbstractFileSystemTestCase } private void prepareCreateAndCheckCopier(final String hostOrNull, - final String rsyncModuleOrNull, final int numberOfExpectedCreations, final boolean checkingResult) + final String rsyncModuleOrNull, final int numberOfExpectedCreations, + final boolean checkingResult) { context.checking(new Expectations() { { - exactly(numberOfExpectedCreations).of(pathFactory).create(rsyncExecutableDummy, sshExecutableDummy); + exactly(numberOfExpectedCreations).of(pathFactory).create(rsyncExecutableDummy, + sshExecutableDummy); will(returnValue(copier)); - exactly(numberOfExpectedCreations).of(sshFactory).create(sshExecutableDummy, hostOrNull); + exactly(numberOfExpectedCreations).of(sshFactory).create(sshExecutableDummy, + hostOrNull); will(returnValue(sshCommandExecutor)); exactly(numberOfExpectedCreations).of(copier).check(); @@ -457,11 +463,13 @@ public class DataSetCopierTest extends AbstractFileSystemTestCase { if (rsyncModuleOrNull != null) { - exactly(numberOfExpectedCreations).of(copier).checkRsyncConnectionViaRsyncServer(hostOrNull, - rsyncModuleOrNull, rsyncModuleOrNull + "-password"); + exactly(numberOfExpectedCreations).of(copier) + .checkRsyncConnectionViaRsyncServer(hostOrNull, + rsyncModuleOrNull, rsyncModuleOrNull + "-password"); } else { - exactly(numberOfExpectedCreations).of(copier).checkRsyncConnectionViaSsh(hostOrNull, null); + exactly(numberOfExpectedCreations).of(copier) + .checkRsyncConnectionViaSsh(hostOrNull, null); } will(returnValue(checkingResult)); } @@ -491,10 +499,19 @@ public class DataSetCopierTest extends AbstractFileSystemTestCase private void checkStatus(ProcessingStatus processingStatus, Status status, DatasetDescription... expectedDatasets) { - final List<DatasetDescription> actualDatasets = - processingStatus.getDatasetsByStatus(status); + final List<String> actualDatasets = processingStatus.getDatasetsByStatus(status); assertEquals(expectedDatasets.length, actualDatasets.size()); - assertTrue(actualDatasets.containsAll(Arrays.asList(expectedDatasets))); + assertTrue(actualDatasets.containsAll(asCodes(Arrays.asList(expectedDatasets)))); + } + + private static List<String> asCodes(List<DatasetDescription> datasets) + { + List<String> codes = new ArrayList<String>(); + for (DatasetDescription dataset : datasets) + { + codes.add(dataset.getDatasetCode()); + } + return codes; } private File getCanonicalFile(String fileName) diff --git a/rtd_yeastx/etc/service.properties b/rtd_yeastx/etc/service.properties index 7caa67c9549..f1c916744b3 100644 --- a/rtd_yeastx/etc/service.properties +++ b/rtd_yeastx/etc/service.properties @@ -65,14 +65,8 @@ metabol-db.databaseKind = dev metabol-db.readOnlyGroup = metabol_readonly metabol-db.readWriteGroup = metabol_readwrite metabol-db.scriptFolder = sql - -# --------------------------------------------------------------------------- - -#maintenance-plugins=ds-remover - -ds-remover.class = ch.systemsx.cisd.yeastx.etl.MetabolDatabaseUpdater -ds-remover.interval = 10 -ds-remover.data-source = metabol-db +metabol-db.maxActive = 15 +metabol-db.maxIdle = 15 # --------------------------------------------------------------------------- @@ -277,7 +271,47 @@ quantml-uploader.storage-processor.unique-experiment-name-property-code = ${expe quantml-uploader.storage-processor.data-source = metabol-db quantml-uploader.storage-processor.processor = ch.systemsx.cisd.etlserver.DefaultStorageProcessor + +# --------------------------------------------------------------------------- +# maintenance plugins configuration +# --------------------------------------------------------------------------- + +dataset-unarchiving-highwater-mark = 2000 + +# Comma separated names of maintenance plugins. +# Each plugin should have configuration properties prefixed with its name. +# Mandatory properties for each <plugin> include: +# <plugin>.class - Fully qualified plugin class name +# <plugin>.interval - The time between plugin executions (in seconds) +# Optional properties for each <plugin> include: +# <plugin>.start - Time of the first execution (HH:mm) +maintenance-plugins=ds-remover, auto-archiver + +ds-remover.class = ch.systemsx.cisd.yeastx.etl.MetabolDatabaseUpdater +ds-remover.interval = 1000 +ds-remover.data-source = metabol-db + +# Performs automatic archivization of 'ACTIVE' data sets based on their properties +auto-archiver.class = ch.systemsx.cisd.etlserver.plugins.AutoArchiverTask +# The time between subsequent archivizations (in seconds) +auto-archiver.interval = 300 +# size of the disc free space in KB which must be available to unarchive one dataset +auto-archiver.dataset-unarchiving-highwater-mark = ${dataset-unarchiving-highwater-mark} +# Time of the first execution (HH:mm) +#auto-archiver.start = 10:49 +# following properties are optional +# only data sets of specified type will be archived +auto-archiver.data-set-type = EICML +# only data sets that are older than specified number of days will be archived (default = 0) +auto-archiver.older-than = 777 +# fully qualified class name of a policy that additionally filters data sets to be filtered +#auto-archiver.policy.class = ch.systemsx.cisd.etlserver.plugins.DummyAutoArchiverPolicy + +# --- ARCHIVER ------------------------------------------------------------------------ + archiver.class = ch.systemsx.cisd.yeastx.etl.MLArchiverTask archiver.unique-sample-name-property-code = ${sample-name-property-code} archiver.unique-experiment-name-property-code = ${experiment-name-property-code} -archiver.data-source = metabol-db \ No newline at end of file +archiver.data-source = metabol-db +# size of the disc free space in KB which must be available to unarchive one dataset +archiver.dataset-unarchiving-highwater-mark = ${dataset-unarchiving-highwater-mark} \ No newline at end of file diff --git a/rtd_yeastx/source/java/ch/systemsx/cisd/yeastx/etl/MLArchiverTask.java b/rtd_yeastx/source/java/ch/systemsx/cisd/yeastx/etl/MLArchiverTask.java index 7606271da6c..b1f6eaf86e0 100644 --- a/rtd_yeastx/source/java/ch/systemsx/cisd/yeastx/etl/MLArchiverTask.java +++ b/rtd_yeastx/source/java/ch/systemsx/cisd/yeastx/etl/MLArchiverTask.java @@ -17,12 +17,17 @@ package ch.systemsx.cisd.yeastx.etl; import java.io.File; +import java.util.List; import java.util.Properties; import javax.sql.DataSource; +import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.exceptions.UserFailureException; +import ch.systemsx.cisd.common.utilities.PropertyUtils; import ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.AbstractArchiverProcessingPlugin; +import ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.HighWaterMarkChecker; +import ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.IStatusChecker; import ch.systemsx.cisd.openbis.dss.generic.shared.DataSourceProvider; import ch.systemsx.cisd.openbis.dss.generic.shared.ServiceProvider; import ch.systemsx.cisd.openbis.generic.shared.basic.dto.Experiment; @@ -44,25 +49,41 @@ public class MLArchiverTask extends AbstractArchiverProcessingPlugin private static final long serialVersionUID = 1L; + // specification of the size of the disc free space in KB which must be available to unarchive + // one dataset + private static final String HIGHWATER_MARK_PROPERTY_KEY = "dataset-unarchiving-highwater-mark"; + private final String dataSourceName; public MLArchiverTask(Properties properties, File storeRoot) { - super(properties, storeRoot, null, null); + super(properties, storeRoot, null, tryCreateUnarchivingStatusChecker(properties, storeRoot)); dataSourceName = DataSourceProvider.extractDataSourceName(properties); // Check if given data source exists getDataSource(dataSourceName); } + private static IStatusChecker tryCreateUnarchivingStatusChecker(Properties properties, + File storeRoot) + { + + long archiveHighWaterMark = + PropertyUtils.getLong(properties, HIGHWATER_MARK_PROPERTY_KEY, -1); + if (archiveHighWaterMark == -1) + { + return null; + } + // NOTE: we assume that the database (which grows when unarchiving is done) is stored on the + // same file system as the DSS store + return new HighWaterMarkChecker(archiveHighWaterMark, storeRoot); + } + /** * Deletes data related to given data set from metabol database. */ - @Override - protected void archive(DatasetDescription dataset) throws UserFailureException + private Status doArchive(DatasetDescription dataset, IDMGenericDAO dao) + throws UserFailureException { - - DataSource dataSource = getDataSource(dataSourceName); - final IDMGenericDAO dao = DBUtils.getQuery(dataSource, IDMGenericDAO.class); try { dao.deleteDataSet(dataset.getDatasetCode()); @@ -70,11 +91,9 @@ public class MLArchiverTask extends AbstractArchiverProcessingPlugin } catch (Exception ex) { dao.rollback(); - throw new UserFailureException(ex.getMessage()); - } finally - { - dao.close(); + return createErrorStatus(ex); } + return Status.OK; } private static DataSource getDataSource(String dataSourceName) @@ -85,10 +104,9 @@ public class MLArchiverTask extends AbstractArchiverProcessingPlugin /** * Adds data related to given data set to metabol database. */ - @Override - protected void unarchive(DatasetDescription dataset) throws UserFailureException + private Status doUnarchive(DatasetDescription dataset, ML2DatabaseUploader databaseUploader) + throws UserFailureException { - ML2DatabaseUploader databaseUploader = null; try { Sample sample = null; @@ -97,7 +115,6 @@ public class MLArchiverTask extends AbstractArchiverProcessingPlugin sample = fetchSample(dataset); } Experiment experiment = getOrFetchExperiment(dataset, sample); - databaseUploader = new ML2DatabaseUploader(properties); databaseUploader.upload(getDataFile(dataset), sample, experiment, dataset .getDatasetCode()); databaseUploader.commit(); @@ -107,8 +124,15 @@ public class MLArchiverTask extends AbstractArchiverProcessingPlugin { databaseUploader.rollback(); } - throw new UserFailureException(ex.getMessage()); + return createErrorStatus(ex); } + return Status.OK; + } + + private Status createErrorStatus(Exception ex) + { + return Status.createError(ex.getMessage() == null ? "unknown reason " + + ex.getClass().getName() : ex.getMessage()); } private Sample fetchSample(DatasetDescription dataset) @@ -140,7 +164,7 @@ public class MLArchiverTask extends AbstractArchiverProcessingPlugin { File datasetDir = getDataSubDir(dataset); File[] files = datasetDir.listFiles(); - if (files.length < 1) + if (files == null || files.length < 1) { throw new UserFailureException( "Data set directory contains no files (exactly one expected)"); @@ -153,4 +177,53 @@ public class MLArchiverTask extends AbstractArchiverProcessingPlugin return files[0]; } } + + @Override + protected DatasetProcessingStatuses doArchive(List<DatasetDescription> datasets) + throws UserFailureException + { + DataSource dataSource = getDataSource(dataSourceName); + final IDMGenericDAO dao = DBUtils.getQuery(dataSource, IDMGenericDAO.class); + + try + { + int counter = 0; + DatasetProcessingStatuses statuses = new DatasetProcessingStatuses(); + for (DatasetDescription dataset : datasets) + { + Status status = doArchive(dataset, dao); + statuses.addResult(dataset.getDatasetCode(), status, true); + counter++; + if (counter % 100 == 0) + { + operationLog.info("Archiving status: " + counter + "/" + datasets.size()); + } + } + return statuses; + } finally + { + dao.close(); + } + } + + @Override + protected DatasetProcessingStatuses doUnarchive(List<DatasetDescription> datasets) + throws UserFailureException + { + ML2DatabaseUploader databaseUploader = new ML2DatabaseUploader(properties); + + DatasetProcessingStatuses statuses = new DatasetProcessingStatuses(); + int counter = 0; + for (DatasetDescription dataset : datasets) + { + Status status = doUnarchive(dataset, databaseUploader); + statuses.addResult(dataset.getDatasetCode(), status, false); + counter++; + if (counter % 100 == 0) + { + operationLog.info("Unarchiving status: " + counter + "/" + datasets.size()); + } + } + return statuses; + } } -- GitLab