Skip to content
Snippets Groups Projects
Commit a72d2bd2 authored by tpylak's avatar tpylak
Browse files

LMS-1548 configure high-watermark checker, improve performance

SVN: 16151
parent 097c41d7
No related branches found
No related tags found
No related merge requests found
Showing
with 378 additions and 154 deletions
......@@ -17,6 +17,7 @@
package ch.systemsx.cisd.openbis.dss.generic.server;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
......@@ -51,7 +52,7 @@ public class ProcessDatasetsCommand implements IDataSetCommand
private final List<DatasetDescription> datasets;
private final Map<String, String> parameterBindings;
private final String userEmailOrNull;
private final DatastoreServiceDescription serviceDescription;
......@@ -149,7 +150,18 @@ public class ProcessDatasetsCommand implements IDataSetCommand
private String getDescription(String prefix)
{
return String.format("%s on data set(s): \n%s", prefix, getDataSetCodes(datasets));
return String.format("%s on %d data set(s): \n%s", prefix, datasets.size(),
getDataSetCodes(asCodes(datasets)));
}
private static List<String> asCodes(List<DatasetDescription> datasets)
{
List<String> codes = new ArrayList<String>();
for (DatasetDescription dataset : datasets)
{
codes.add(dataset.getDatasetCode());
}
return codes;
}
public String getDescription()
......@@ -157,7 +169,7 @@ public class ProcessDatasetsCommand implements IDataSetCommand
return getDescription(getShortDescription(""));
}
private static String getDataSetCodes(List<DatasetDescription> datasets)
private static String getDataSetCodes(List<String> datasets)
{
if (datasets.isEmpty())
{
......@@ -165,9 +177,9 @@ public class ProcessDatasetsCommand implements IDataSetCommand
} else
{
final StringBuilder sb = new StringBuilder();
for (DatasetDescription dataset : datasets)
for (String dataset : datasets)
{
sb.append(dataset.getDatasetCode());
sb.append(dataset);
sb.append(',');
}
sb.setLength(sb.length() - 1);
......@@ -178,20 +190,27 @@ public class ProcessDatasetsCommand implements IDataSetCommand
private static String generateDescription(ProcessingStatus processingStatus)
{
StringBuilder sb = new StringBuilder();
sb.append("Data sets:\n");
List<DatasetDescription> successfullyProcessed =
processingStatus.getDatasetsByStatus(Status.OK);
sb
.append("This is an automatically generated report from the completed processing of data sets in openBIS.\n");
List<String> successfullyProcessed = processingStatus.getDatasetsByStatus(Status.OK);
if (successfullyProcessed != null && successfullyProcessed.isEmpty() == false)
{
sb.append("- successfully processed: ");
sb.append("- number of successfully processed data sets: ");
sb.append(successfullyProcessed.size());
sb.append(". Datasets: ");
sb.append(getDataSetCodes(successfullyProcessed));
sb.append("\n");
}
List<Status> errorStatuses = processingStatus.getErrorStatuses();
for (Status errorStatus : errorStatuses)
{
sb.append("- " + errorStatus.tryGetErrorMessage() + ": ");
sb.append(getDataSetCodes(processingStatus.getDatasetsByStatus(errorStatus)));
List<String> unsuccessfullyProcessed =
processingStatus.getDatasetsByStatus(errorStatus);
sb.append("- processing of ");
sb.append(unsuccessfullyProcessed.size());
sb.append(" data set(s) failed because: ");
sb.append(" " + errorStatus.tryGetErrorMessage() + ". Datasets: ");
sb.append(getDataSetCodes(unsuccessfullyProcessed));
sb.append("\n");
}
return sb.toString();
......
......@@ -17,8 +17,10 @@
package ch.systemsx.cisd.openbis.dss.generic.server.plugins.demo;
import java.io.File;
import java.util.List;
import java.util.Properties;
import ch.systemsx.cisd.common.exceptions.Status;
import ch.systemsx.cisd.common.exceptions.UserFailureException;
import ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.AbstractArchiverProcessingPlugin;
import ch.systemsx.cisd.openbis.generic.shared.dto.DatasetDescription;
......@@ -33,20 +35,22 @@ public class DemoArchiver extends AbstractArchiverProcessingPlugin
public DemoArchiver(Properties properties, File storeRoot)
{
super(properties, storeRoot, null, null);
// NOTE using HighWaterMarkChecker before archiving every dataset degrades performance
// super(properties, storeRoot, new HighWaterMarkChecker(storeRoot), new HighWaterMarkChecker(storeRoot);
}
@Override
protected void archive(DatasetDescription dataset) throws UserFailureException
protected DatasetProcessingStatuses doArchive(List<DatasetDescription> datasets)
throws UserFailureException
{
System.out.println("DemoArchiver - Archived: " + dataset);
System.out.println("DemoArchiver - Archived: " + datasets);
return createStatusesFrom(Status.OK, datasets, true);
}
@Override
protected void unarchive(DatasetDescription dataset) throws UserFailureException
protected DatasetProcessingStatuses doUnarchive(List<DatasetDescription> datasets)
throws UserFailureException
{
System.out.println("DemoArchiver - Unarchived: " + dataset);
System.out.println("DemoArchiver - Unarchived: " + datasets);
return createStatusesFrom(Status.OK, datasets, false);
}
}
......@@ -28,7 +28,6 @@ import ch.systemsx.cisd.common.exceptions.Status;
import ch.systemsx.cisd.common.exceptions.UserFailureException;
import ch.systemsx.cisd.common.logging.LogCategory;
import ch.systemsx.cisd.common.logging.LogFactory;
import ch.systemsx.cisd.openbis.dss.generic.server.ProcessDatasetsCommand;
import ch.systemsx.cisd.openbis.dss.generic.server.plugins.tasks.IArchiverTask;
import ch.systemsx.cisd.openbis.dss.generic.server.plugins.tasks.ProcessingStatus;
import ch.systemsx.cisd.openbis.dss.generic.shared.QueueingDataSetStatusUpdaterService;
......@@ -45,8 +44,8 @@ public abstract class AbstractArchiverProcessingPlugin extends AbstractDatastore
IArchiverTask
{
private static final Logger operationLog =
LogFactory.getLogger(LogCategory.OPERATION, ProcessDatasetsCommand.class);
protected static final Logger operationLog =
LogFactory.getLogger(LogCategory.OPERATION, AbstractArchiverProcessingPlugin.class);
private static final long serialVersionUID = 1L;
......@@ -62,9 +61,11 @@ public abstract class AbstractArchiverProcessingPlugin extends AbstractDatastore
this.unarchivePrerequisiteOrNull = unarchivePrerequisiteOrNull;
}
abstract protected void archive(DatasetDescription dataset) throws UserFailureException;
abstract protected DatasetProcessingStatuses doArchive(List<DatasetDescription> datasets)
throws UserFailureException;
abstract protected void unarchive(DatasetDescription dataset) throws UserFailureException;
abstract protected DatasetProcessingStatuses doUnarchive(List<DatasetDescription> datasets)
throws UserFailureException;
public ProcessingStatus archive(List<DatasetDescription> datasets)
{
......@@ -73,27 +74,17 @@ public abstract class AbstractArchiverProcessingPlugin extends AbstractDatastore
return handleDatasets(datasets, DataSetArchivingStatus.ARCHIVED,
DataSetArchivingStatus.AVAILABLE, new IDatasetDescriptionHandler()
{
public Status handle(DatasetDescription dataset)
public DatasetProcessingStatuses handle(List<DatasetDescription> allDatasets)
{
if (archivePrerequisiteOrNull != null)
{
Status status = archivePrerequisiteOrNull.check();
if (status.isError())
{
return status;
}
}
try
Status prerequisiteStatus = checkArchivePrerequisite(allDatasets);
if (prerequisiteStatus.isError())
{
archive(dataset);
} catch (UserFailureException ex)
return createStatusesFrom(prerequisiteStatus, allDatasets, true);
} else
{
return Status.createError(ex.getMessage());
return doArchive(allDatasets);
}
return Status.OK;
}
});
}
......@@ -104,49 +95,122 @@ public abstract class AbstractArchiverProcessingPlugin extends AbstractDatastore
return handleDatasets(datasets, DataSetArchivingStatus.AVAILABLE,
DataSetArchivingStatus.ARCHIVED, new IDatasetDescriptionHandler()
{
public Status handle(DatasetDescription dataset)
public DatasetProcessingStatuses handle(List<DatasetDescription> allDatasets)
{
if (unarchivePrerequisiteOrNull != null)
{
Status status = unarchivePrerequisiteOrNull.check();
if (status.isError())
{
return status;
}
}
try
Status prerequisiteStatus = checkUnarchivePrerequisite(allDatasets);
if (prerequisiteStatus.isError())
{
unarchive(dataset);
} catch (UserFailureException ex)
return createStatusesFrom(prerequisiteStatus, allDatasets, false);
} else
{
return Status
.createError(ex.getMessage() == null ? "unknown reason"
: ex.getMessage());
return doUnarchive(allDatasets);
}
return Status.OK;
}
});
}
protected final Status checkUnarchivePrerequisite(List<DatasetDescription> datasets)
{
if (unarchivePrerequisiteOrNull != null)
{
return unarchivePrerequisiteOrNull.check(datasets.size());
} else
{
return Status.OK;
}
}
protected final Status checkArchivePrerequisite(List<DatasetDescription> datasets)
{
if (archivePrerequisiteOrNull != null)
{
return archivePrerequisiteOrNull.check(datasets.size());
} else
{
return Status.OK;
}
}
protected static class DatasetProcessingStatuses
{
private final List<String> successfulDatasetCodes;
private final List<String> failedDatasetCodes;
private final ProcessingStatus processingStatus;
public DatasetProcessingStatuses()
{
this.successfulDatasetCodes = new ArrayList<String>();
this.failedDatasetCodes = new ArrayList<String>();
this.processingStatus = new ProcessingStatus();
}
public void addResult(String datasetCode, Status status, boolean isArchiving)
{
String logMessage = createLogMessage(datasetCode, status, isArchiving);
if (status.isError())
{
operationLog.error(logMessage);
failedDatasetCodes.add(datasetCode);
} else
{
operationLog.debug(logMessage);
successfulDatasetCodes.add(datasetCode);
}
processingStatus.addDatasetStatus(datasetCode, status);
}
private String createLogMessage(String datasetCode, Status status, boolean isArchiving)
{
String operationDescription = isArchiving ? "Archiving" : "Unarchiving";
return String.format("%s for dataset %s finished with the status: %s.",
operationDescription, datasetCode, status);
}
public List<String> getSuccessfulDatasetCodes()
{
return successfulDatasetCodes;
}
public List<String> getFailedDatasetCodes()
{
return failedDatasetCodes;
}
public ProcessingStatus getProcessingStatus()
{
return processingStatus;
}
}
private ProcessingStatus handleDatasets(List<DatasetDescription> datasets,
DataSetArchivingStatus success, DataSetArchivingStatus failure,
IDatasetDescriptionHandler handler)
{
final ProcessingStatus result = new ProcessingStatus();
List<String> successful = new ArrayList<String>();
List<String> failed = new ArrayList<String>();
DatasetProcessingStatuses statuses = handler.handle(datasets);
asyncUpdateStatuses(statuses, success, failure);
return statuses.getProcessingStatus();
}
// creates the same status for all datasets
protected final static DatasetProcessingStatuses createStatusesFrom(Status status,
List<DatasetDescription> datasets, boolean isArchiving)
{
DatasetProcessingStatuses statuses = new DatasetProcessingStatuses();
for (DatasetDescription dataset : datasets)
{
Status status = handler.handle(dataset);
List<String> codes = status.isError() ? failed : successful;
codes.add(dataset.getDatasetCode());
result.addDatasetStatus(dataset, status);
statuses.addResult(dataset.getDatasetCode(), status, isArchiving);
}
asyncUpdateStatuses(successful, success);
asyncUpdateStatuses(failed, failure);
return result;
return statuses;
}
private void asyncUpdateStatuses(DatasetProcessingStatuses statuses,
DataSetArchivingStatus success, DataSetArchivingStatus failure)
{
asyncUpdateStatuses(statuses.getSuccessfulDatasetCodes(), success);
asyncUpdateStatuses(statuses.getFailedDatasetCodes(), failure);
}
private static void asyncUpdateStatuses(List<String> dataSetCodes,
......@@ -158,7 +222,7 @@ public abstract class AbstractArchiverProcessingPlugin extends AbstractDatastore
private interface IDatasetDescriptionHandler
{
public Status handle(DatasetDescription dataset);
public DatasetProcessingStatuses handle(List<DatasetDescription> datasets);
}
}
......@@ -6,10 +6,7 @@ import java.io.Serializable;
import ch.systemsx.cisd.common.exceptions.Status;
import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher;
import ch.systemsx.cisd.common.highwatermark.HostAwareFile;
import ch.systemsx.cisd.common.highwatermark.HostAwareFileWithHighwaterMark;
import ch.systemsx.cisd.common.highwatermark.HighwaterMarkWatcher.HighwaterMarkState;
import ch.systemsx.cisd.common.utilities.PropertyUtils;
import ch.systemsx.cisd.openbis.dss.generic.shared.utils.DssPropertyParametersUtil;
/**
* Checks if the space available is larger than specified value.
......@@ -20,29 +17,19 @@ public class HighWaterMarkChecker implements IStatusChecker, Serializable
{
private static final long serialVersionUID = 1L;
private final long highWaterMark;
private final long highWaterMark; // amount of free space per item in KB
private final File highWaterMarkPath;
/**
* Loads the high water mark value from global properties -
* {@link HostAwareFileWithHighwaterMark#HIGHWATER_MARK_PROPERTY_KEY}.
*/
public HighWaterMarkChecker(File path)
{
this(PropertyUtils.getLong(DssPropertyParametersUtil.loadServiceProperties(),
HostAwareFileWithHighwaterMark.HIGHWATER_MARK_PROPERTY_KEY, -1L), path);
}
public HighWaterMarkChecker(long archiveHighWaterMark, File archiveHighWaterMarkPath)
{
this.highWaterMark = archiveHighWaterMark;
this.highWaterMarkPath = archiveHighWaterMarkPath;
}
public Status check()
public Status check(int numberOfItems)
{
HighwaterMarkWatcher w = new HighwaterMarkWatcher(highWaterMark);
HighwaterMarkWatcher w = new HighwaterMarkWatcher(highWaterMark * numberOfItems);
HighwaterMarkState state = w.getHighwaterMarkState(new HostAwareFile(highWaterMarkPath));
if (HighwaterMarkWatcher.isBelow(state))
{
......
/*
* Copyright 2010 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard;
import ch.systemsx.cisd.common.exceptions.Status;
/**
* Checks the status.
*
* @author Izabela Adamczyk
*/
public interface IStatusChecker
{
Status check();
}
\ No newline at end of file
Status check(int size);
}
......@@ -32,18 +32,18 @@ import ch.systemsx.cisd.openbis.generic.shared.dto.DatasetDescription;
public class ProcessingStatus
{
private Map<Status, List<DatasetDescription>> datasetByStatus =
new LinkedHashMap<Status, List<DatasetDescription>>();
private Map<Status, List<String/* dataset code */>> datasetByStatus =
new LinkedHashMap<Status, List<String>>();
public void addDatasetStatus(DatasetDescription dataset, Status status)
public void addDatasetStatus(String datasetCode, Status status)
{
List<DatasetDescription> datasets = datasetByStatus.get(status);
List<String> datasets = datasetByStatus.get(status);
if (datasets == null)
{
datasets = new ArrayList<DatasetDescription>();
datasets = new ArrayList<String>();
datasetByStatus.put(status, datasets);
}
datasets.add(dataset);
datasets.add(datasetCode);
}
public List<Status> getErrorStatuses()
......@@ -53,9 +53,14 @@ public class ProcessingStatus
return result;
}
public List<DatasetDescription> getDatasetsByStatus(Status status)
public List<String/* dataset code */> getDatasetsByStatus(Status status)
{
return datasetByStatus.get(status);
}
public void addDatasetStatus(DatasetDescription dataset, Status status)
{
addDatasetStatus(dataset.getDatasetCode(), status);
}
}
......@@ -20,6 +20,7 @@ import static ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.DataS
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
......@@ -37,17 +38,11 @@ import ch.systemsx.cisd.base.tests.AbstractFileSystemTestCase;
import ch.systemsx.cisd.common.exceptions.Status;
import ch.systemsx.cisd.common.filesystem.IPathCopier;
import ch.systemsx.cisd.common.filesystem.ssh.ISshCommandExecutor;
import ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.DataSetCopier;
import ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.DataSetCopierForUsers;
import ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.IPathCopierFactory;
import ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.ISshCommandExecutorFactory;
import ch.systemsx.cisd.openbis.dss.generic.server.plugins.tasks.ProcessingStatus;
import ch.systemsx.cisd.openbis.generic.shared.Constants;
import ch.systemsx.cisd.openbis.generic.shared.dto.DatasetDescription;
/**
*
*
* @author Franz-Josef Elmer
*/
@Friend(toClasses = DataSetCopier.class)
......@@ -138,7 +133,8 @@ public class DataSetCopierForUsersTest extends AbstractFileSystemTestCase
DataSetCopier dataSetCopier =
new DataSetCopierForUsers(properties, storeRoot, pathFactory, sshFactory);
ProcessingStatus processingStatus = dataSetCopier.process(Arrays.asList(ds), parameterBindings);
ProcessingStatus processingStatus =
dataSetCopier.process(Arrays.asList(ds), parameterBindings);
assertNoErrors(processingStatus);
assertSuccessful(processingStatus, ds);
......@@ -151,23 +147,24 @@ public class DataSetCopierForUsersTest extends AbstractFileSystemTestCase
properties.setProperty(DESTINATION_KEY, "tmp/test");
prepareCreateAndCheckCopier();
context.checking(new Expectations()
{
{
File canonicalFile = getCanonicalFile("tmp/test");
one(copier).copyToRemote(dsData, canonicalFile, null, null, null);
will(returnValue(Status.OK));
}
});
{
File canonicalFile = getCanonicalFile("tmp/test");
one(copier).copyToRemote(dsData, canonicalFile, null, null, null);
will(returnValue(Status.OK));
}
});
DataSetCopier dataSetCopier =
new DataSetCopierForUsers(properties, storeRoot, pathFactory, sshFactory);
ProcessingStatus processingStatus = dataSetCopier.process(Arrays.asList(ds), parameterBindings);
new DataSetCopierForUsers(properties, storeRoot, pathFactory, sshFactory);
ProcessingStatus processingStatus =
dataSetCopier.process(Arrays.asList(ds), parameterBindings);
assertNoErrors(processingStatus);
assertSuccessful(processingStatus, ds);
context.assertIsSatisfied();
}
private void prepareCreateAndCheckCopier()
{
context.checking(new Expectations()
......@@ -199,10 +196,19 @@ public class DataSetCopierForUsersTest extends AbstractFileSystemTestCase
private void checkStatus(ProcessingStatus processingStatus, Status status,
DatasetDescription... expectedDatasets)
{
final List<DatasetDescription> actualDatasets =
processingStatus.getDatasetsByStatus(status);
final List<String> actualDatasets = processingStatus.getDatasetsByStatus(status);
assertEquals(expectedDatasets.length, actualDatasets.size());
assertTrue(actualDatasets.containsAll(Arrays.asList(expectedDatasets)));
assertTrue(actualDatasets.containsAll(asCodes(Arrays.asList(expectedDatasets))));
}
private static List<String> asCodes(List<DatasetDescription> datasets)
{
List<String> codes = new ArrayList<String>();
for (DatasetDescription dataset : datasets)
{
codes.add(dataset.getDatasetCode());
}
return codes;
}
private File getCanonicalFile(String fileName)
......
......@@ -21,6 +21,7 @@ import static ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.DataS
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
......@@ -128,7 +129,7 @@ public class DataSetCopierTest extends AbstractFileSystemTestCase
ds4Data = new File(ds4Folder, "existing");
ds4Data.mkdirs();
}
private DatasetDescription createDataSetDescription(String dataSetCode, String location)
{
DatasetDescription description = new DatasetDescription();
......@@ -205,8 +206,9 @@ public class DataSetCopierTest extends AbstractFileSystemTestCase
{
properties.setProperty(DESTINATION_KEY, "host:tmp/test");
prepareCreateAndCheckCopier("host", null, 1, false);
DataSetCopier dataSetCopier = new DataSetCopier(properties, storeRoot, pathFactory, sshFactory);
DataSetCopier dataSetCopier =
new DataSetCopier(properties, storeRoot, pathFactory, sshFactory);
try
{
dataSetCopier.process(Arrays.asList(ds1), null);
......@@ -225,8 +227,9 @@ public class DataSetCopierTest extends AbstractFileSystemTestCase
properties.setProperty(DESTINATION_KEY, "host:abc:tmp/test");
properties.setProperty(RSYNC_PASSWORD_FILE_KEY, "abc-password");
prepareCreateAndCheckCopier("host", "abc", 1, false);
DataSetCopier dataSetCopier = new DataSetCopier(properties, storeRoot, pathFactory, sshFactory);
DataSetCopier dataSetCopier =
new DataSetCopier(properties, storeRoot, pathFactory, sshFactory);
try
{
dataSetCopier.process(Arrays.asList(ds1), null);
......@@ -441,15 +444,18 @@ public class DataSetCopierTest extends AbstractFileSystemTestCase
}
private void prepareCreateAndCheckCopier(final String hostOrNull,
final String rsyncModuleOrNull, final int numberOfExpectedCreations, final boolean checkingResult)
final String rsyncModuleOrNull, final int numberOfExpectedCreations,
final boolean checkingResult)
{
context.checking(new Expectations()
{
{
exactly(numberOfExpectedCreations).of(pathFactory).create(rsyncExecutableDummy, sshExecutableDummy);
exactly(numberOfExpectedCreations).of(pathFactory).create(rsyncExecutableDummy,
sshExecutableDummy);
will(returnValue(copier));
exactly(numberOfExpectedCreations).of(sshFactory).create(sshExecutableDummy, hostOrNull);
exactly(numberOfExpectedCreations).of(sshFactory).create(sshExecutableDummy,
hostOrNull);
will(returnValue(sshCommandExecutor));
exactly(numberOfExpectedCreations).of(copier).check();
......@@ -457,11 +463,13 @@ public class DataSetCopierTest extends AbstractFileSystemTestCase
{
if (rsyncModuleOrNull != null)
{
exactly(numberOfExpectedCreations).of(copier).checkRsyncConnectionViaRsyncServer(hostOrNull,
rsyncModuleOrNull, rsyncModuleOrNull + "-password");
exactly(numberOfExpectedCreations).of(copier)
.checkRsyncConnectionViaRsyncServer(hostOrNull,
rsyncModuleOrNull, rsyncModuleOrNull + "-password");
} else
{
exactly(numberOfExpectedCreations).of(copier).checkRsyncConnectionViaSsh(hostOrNull, null);
exactly(numberOfExpectedCreations).of(copier)
.checkRsyncConnectionViaSsh(hostOrNull, null);
}
will(returnValue(checkingResult));
}
......@@ -491,10 +499,19 @@ public class DataSetCopierTest extends AbstractFileSystemTestCase
private void checkStatus(ProcessingStatus processingStatus, Status status,
DatasetDescription... expectedDatasets)
{
final List<DatasetDescription> actualDatasets =
processingStatus.getDatasetsByStatus(status);
final List<String> actualDatasets = processingStatus.getDatasetsByStatus(status);
assertEquals(expectedDatasets.length, actualDatasets.size());
assertTrue(actualDatasets.containsAll(Arrays.asList(expectedDatasets)));
assertTrue(actualDatasets.containsAll(asCodes(Arrays.asList(expectedDatasets))));
}
private static List<String> asCodes(List<DatasetDescription> datasets)
{
List<String> codes = new ArrayList<String>();
for (DatasetDescription dataset : datasets)
{
codes.add(dataset.getDatasetCode());
}
return codes;
}
private File getCanonicalFile(String fileName)
......
......@@ -65,14 +65,8 @@ metabol-db.databaseKind = dev
metabol-db.readOnlyGroup = metabol_readonly
metabol-db.readWriteGroup = metabol_readwrite
metabol-db.scriptFolder = sql
# ---------------------------------------------------------------------------
#maintenance-plugins=ds-remover
ds-remover.class = ch.systemsx.cisd.yeastx.etl.MetabolDatabaseUpdater
ds-remover.interval = 10
ds-remover.data-source = metabol-db
metabol-db.maxActive = 15
metabol-db.maxIdle = 15
# ---------------------------------------------------------------------------
......@@ -277,7 +271,47 @@ quantml-uploader.storage-processor.unique-experiment-name-property-code = ${expe
quantml-uploader.storage-processor.data-source = metabol-db
quantml-uploader.storage-processor.processor = ch.systemsx.cisd.etlserver.DefaultStorageProcessor
# ---------------------------------------------------------------------------
# maintenance plugins configuration
# ---------------------------------------------------------------------------
dataset-unarchiving-highwater-mark = 2000
# Comma separated names of maintenance plugins.
# Each plugin should have configuration properties prefixed with its name.
# Mandatory properties for each <plugin> include:
# <plugin>.class - Fully qualified plugin class name
# <plugin>.interval - The time between plugin executions (in seconds)
# Optional properties for each <plugin> include:
# <plugin>.start - Time of the first execution (HH:mm)
maintenance-plugins=ds-remover, auto-archiver
ds-remover.class = ch.systemsx.cisd.yeastx.etl.MetabolDatabaseUpdater
ds-remover.interval = 1000
ds-remover.data-source = metabol-db
# Performs automatic archivization of 'ACTIVE' data sets based on their properties
auto-archiver.class = ch.systemsx.cisd.etlserver.plugins.AutoArchiverTask
# The time between subsequent archivizations (in seconds)
auto-archiver.interval = 300
# size of the disc free space in KB which must be available to unarchive one dataset
auto-archiver.dataset-unarchiving-highwater-mark = ${dataset-unarchiving-highwater-mark}
# Time of the first execution (HH:mm)
#auto-archiver.start = 10:49
# following properties are optional
# only data sets of specified type will be archived
auto-archiver.data-set-type = EICML
# only data sets that are older than specified number of days will be archived (default = 0)
auto-archiver.older-than = 777
# fully qualified class name of a policy that additionally filters data sets to be filtered
#auto-archiver.policy.class = ch.systemsx.cisd.etlserver.plugins.DummyAutoArchiverPolicy
# --- ARCHIVER ------------------------------------------------------------------------
archiver.class = ch.systemsx.cisd.yeastx.etl.MLArchiverTask
archiver.unique-sample-name-property-code = ${sample-name-property-code}
archiver.unique-experiment-name-property-code = ${experiment-name-property-code}
archiver.data-source = metabol-db
\ No newline at end of file
archiver.data-source = metabol-db
# size of the disc free space in KB which must be available to unarchive one dataset
archiver.dataset-unarchiving-highwater-mark = ${dataset-unarchiving-highwater-mark}
\ No newline at end of file
......@@ -17,12 +17,17 @@
package ch.systemsx.cisd.yeastx.etl;
import java.io.File;
import java.util.List;
import java.util.Properties;
import javax.sql.DataSource;
import ch.systemsx.cisd.common.exceptions.Status;
import ch.systemsx.cisd.common.exceptions.UserFailureException;
import ch.systemsx.cisd.common.utilities.PropertyUtils;
import ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.AbstractArchiverProcessingPlugin;
import ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.HighWaterMarkChecker;
import ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.IStatusChecker;
import ch.systemsx.cisd.openbis.dss.generic.shared.DataSourceProvider;
import ch.systemsx.cisd.openbis.dss.generic.shared.ServiceProvider;
import ch.systemsx.cisd.openbis.generic.shared.basic.dto.Experiment;
......@@ -44,25 +49,41 @@ public class MLArchiverTask extends AbstractArchiverProcessingPlugin
private static final long serialVersionUID = 1L;
// specification of the size of the disc free space in KB which must be available to unarchive
// one dataset
private static final String HIGHWATER_MARK_PROPERTY_KEY = "dataset-unarchiving-highwater-mark";
private final String dataSourceName;
public MLArchiverTask(Properties properties, File storeRoot)
{
super(properties, storeRoot, null, null);
super(properties, storeRoot, null, tryCreateUnarchivingStatusChecker(properties, storeRoot));
dataSourceName = DataSourceProvider.extractDataSourceName(properties);
// Check if given data source exists
getDataSource(dataSourceName);
}
private static IStatusChecker tryCreateUnarchivingStatusChecker(Properties properties,
File storeRoot)
{
long archiveHighWaterMark =
PropertyUtils.getLong(properties, HIGHWATER_MARK_PROPERTY_KEY, -1);
if (archiveHighWaterMark == -1)
{
return null;
}
// NOTE: we assume that the database (which grows when unarchiving is done) is stored on the
// same file system as the DSS store
return new HighWaterMarkChecker(archiveHighWaterMark, storeRoot);
}
/**
* Deletes data related to given data set from metabol database.
*/
@Override
protected void archive(DatasetDescription dataset) throws UserFailureException
private Status doArchive(DatasetDescription dataset, IDMGenericDAO dao)
throws UserFailureException
{
DataSource dataSource = getDataSource(dataSourceName);
final IDMGenericDAO dao = DBUtils.getQuery(dataSource, IDMGenericDAO.class);
try
{
dao.deleteDataSet(dataset.getDatasetCode());
......@@ -70,11 +91,9 @@ public class MLArchiverTask extends AbstractArchiverProcessingPlugin
} catch (Exception ex)
{
dao.rollback();
throw new UserFailureException(ex.getMessage());
} finally
{
dao.close();
return createErrorStatus(ex);
}
return Status.OK;
}
private static DataSource getDataSource(String dataSourceName)
......@@ -85,10 +104,9 @@ public class MLArchiverTask extends AbstractArchiverProcessingPlugin
/**
* Adds data related to given data set to metabol database.
*/
@Override
protected void unarchive(DatasetDescription dataset) throws UserFailureException
private Status doUnarchive(DatasetDescription dataset, ML2DatabaseUploader databaseUploader)
throws UserFailureException
{
ML2DatabaseUploader databaseUploader = null;
try
{
Sample sample = null;
......@@ -97,7 +115,6 @@ public class MLArchiverTask extends AbstractArchiverProcessingPlugin
sample = fetchSample(dataset);
}
Experiment experiment = getOrFetchExperiment(dataset, sample);
databaseUploader = new ML2DatabaseUploader(properties);
databaseUploader.upload(getDataFile(dataset), sample, experiment, dataset
.getDatasetCode());
databaseUploader.commit();
......@@ -107,8 +124,15 @@ public class MLArchiverTask extends AbstractArchiverProcessingPlugin
{
databaseUploader.rollback();
}
throw new UserFailureException(ex.getMessage());
return createErrorStatus(ex);
}
return Status.OK;
}
private Status createErrorStatus(Exception ex)
{
return Status.createError(ex.getMessage() == null ? "unknown reason "
+ ex.getClass().getName() : ex.getMessage());
}
private Sample fetchSample(DatasetDescription dataset)
......@@ -140,7 +164,7 @@ public class MLArchiverTask extends AbstractArchiverProcessingPlugin
{
File datasetDir = getDataSubDir(dataset);
File[] files = datasetDir.listFiles();
if (files.length < 1)
if (files == null || files.length < 1)
{
throw new UserFailureException(
"Data set directory contains no files (exactly one expected)");
......@@ -153,4 +177,53 @@ public class MLArchiverTask extends AbstractArchiverProcessingPlugin
return files[0];
}
}
@Override
protected DatasetProcessingStatuses doArchive(List<DatasetDescription> datasets)
throws UserFailureException
{
DataSource dataSource = getDataSource(dataSourceName);
final IDMGenericDAO dao = DBUtils.getQuery(dataSource, IDMGenericDAO.class);
try
{
int counter = 0;
DatasetProcessingStatuses statuses = new DatasetProcessingStatuses();
for (DatasetDescription dataset : datasets)
{
Status status = doArchive(dataset, dao);
statuses.addResult(dataset.getDatasetCode(), status, true);
counter++;
if (counter % 100 == 0)
{
operationLog.info("Archiving status: " + counter + "/" + datasets.size());
}
}
return statuses;
} finally
{
dao.close();
}
}
@Override
protected DatasetProcessingStatuses doUnarchive(List<DatasetDescription> datasets)
throws UserFailureException
{
ML2DatabaseUploader databaseUploader = new ML2DatabaseUploader(properties);
DatasetProcessingStatuses statuses = new DatasetProcessingStatuses();
int counter = 0;
for (DatasetDescription dataset : datasets)
{
Status status = doUnarchive(dataset, databaseUploader);
statuses.addResult(dataset.getDatasetCode(), status, false);
counter++;
if (counter % 100 == 0)
{
operationLog.info("Unarchiving status: " + counter + "/" + datasets.size());
}
}
return statuses;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment