diff --git a/datastore_server/source/java/ch/systemsx/cisd/etlserver/ETLDaemon.java b/datastore_server/source/java/ch/systemsx/cisd/etlserver/ETLDaemon.java index 19c65eaf4296e90fea5e725c5226c1e14a0e33f3..297c4683c72bc4a549f45ad775129301fde21f5b 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/etlserver/ETLDaemon.java +++ b/datastore_server/source/java/ch/systemsx/cisd/etlserver/ETLDaemon.java @@ -61,6 +61,8 @@ import ch.systemsx.cisd.common.utilities.SystemExit; import ch.systemsx.cisd.etlserver.validation.DataSetValidator; import ch.systemsx.cisd.etlserver.validation.IDataSetValidator; import ch.systemsx.cisd.openbis.dss.BuildAndEnvironmentInfo; +import ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.DataSetCodeWithStatus; +import ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.QueueingDataSetStatusUpdaterService; import ch.systemsx.cisd.openbis.dss.generic.shared.IEncapsulatedOpenBISService; import ch.systemsx.cisd.openbis.dss.generic.shared.ServiceProvider; import ch.systemsx.cisd.openbis.dss.generic.shared.utils.PropertyParametersUtil; @@ -76,6 +78,8 @@ public final class ETLDaemon { public static final File shredderQueueFile = new File(".shredder"); + public static final File updaterQueueFile = new File(".updater"); + static final String STOREROOT_DIR_KEY = "storeroot-dir"; static final String NOTIFY_SUCCESSFUL_REGISTRATION = "notify-successful-registration"; @@ -116,6 +120,23 @@ public final class ETLDaemon } } + public static void listUpdaterQueue() + { + final List<DataSetCodeWithStatus> items = + QueueingDataSetStatusUpdaterService.listItems(updaterQueueFile); + if (items.isEmpty()) + { + System.out.println("Updater queue is empty."); + } else + { + System.out.println("Found " + items.size() + " items in updater:"); + for (final DataSetCodeWithStatus item : items) + { + System.out.println(item); + } + } + } + private static void selfTest(final File incomingDirectory, final IEncapsulatedOpenBISService service, final ISelfTestable... selfTestables) { @@ -253,8 +274,8 @@ public final class ETLDaemon private final static void createProcessingThread(final Parameters parameters, final ThreadParameters threadParameters, final IEncapsulatedOpenBISService authorizedLimsService, - final HighwaterMarkWatcher highwaterMarkWatcher, - IDataSetValidator dataSetValidator, final boolean notifySuccessfulRegistration) + final HighwaterMarkWatcher highwaterMarkWatcher, IDataSetValidator dataSetValidator, + final boolean notifySuccessfulRegistration) { final File incomingDataDirectory = threadParameters.getIncomingDataDirectory(); final TransferredDataSetHandler pathHandler = @@ -280,8 +301,8 @@ public final class ETLDaemon public static TransferredDataSetHandler createDataSetHandler(final Properties properties, final ThreadParameters threadParameters, - final IEncapsulatedOpenBISService openBISService, - IDataSetValidator dataSetValidator, final boolean notifySuccessfulRegistration) + final IEncapsulatedOpenBISService openBISService, IDataSetValidator dataSetValidator, + final boolean notifySuccessfulRegistration) { final IETLServerPlugin plugin = threadParameters.getPlugin(); final File storeRootDir = getStoreRootDir(properties); @@ -290,9 +311,9 @@ public final class ETLDaemon final Properties mailProperties = Parameters.createMailProperties(properties); String dssCode = PropertyParametersUtil.getDataStoreCode(properties); boolean deleteUnidentified = threadParameters.deleteUnidentified(); - return new TransferredDataSetHandler(dssCode, plugin, openBISService, - mailProperties, dataSetValidator, notifySuccessfulRegistration, - threadParameters.useIsFinishedMarkerFile(), deleteUnidentified); + return new TransferredDataSetHandler(dssCode, plugin, openBISService, mailProperties, + dataSetValidator, notifySuccessfulRegistration, threadParameters + .useIsFinishedMarkerFile(), deleteUnidentified); } private static FileFilter createFileFilter(File incomingDataDirectory, @@ -447,6 +468,10 @@ public final class ETLDaemon { QueueingPathRemoverService.start(shredderQueueFile); } + if (QueueingDataSetStatusUpdaterService.isRunning() == false) + { + QueueingDataSetStatusUpdaterService.start(updaterQueueFile); + } printInitialLogMessage(parameters); startupServer(parameters); startupMaintenancePlugins(parameters.getMaintenancePlugins()); diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/DataStoreServer.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/DataStoreServer.java index 25fd1403acd61f5a082986c79f579f2df98642db..2b9e794e379d1f8d9f646cdc35435e2722fd4916 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/DataStoreServer.java +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/DataStoreServer.java @@ -26,6 +26,7 @@ import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.common.logging.LogInitializer; import ch.systemsx.cisd.etlserver.ETLDaemon; import ch.systemsx.cisd.openbis.dss.generic.server.CommandQueueLister; +import ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard.QueueingDataSetStatusUpdaterService; /** * Main class starting {@link ch.systemsx.cisd.openbis.dss.generic.server.DataStoreServer}, @@ -68,6 +69,13 @@ public class DataStoreServer ETLDaemon.listShredder(); System.exit(0); } + final boolean showUpdaterQueue = + (args.length > 0 && args[0].equals("--show-updater-queue")); + if (showUpdaterQueue) + { + ETLDaemon.listUpdaterQueue(); + System.exit(0); + } final boolean showCommandQueue = (args.length > 0 && args[0].equals("--show-command-queue")); if (showCommandQueue) @@ -75,8 +83,9 @@ public class DataStoreServer CommandQueueLister.listQueuedCommand(); System.exit(0); } - // Initialize the shredder _before_ the DataSetCommandExecutor which uses it. + // Initialize the shredder and updater _before_ the DataSetCommandExecutor which uses them. QueueingPathRemoverService.start(ETLDaemon.shredderQueueFile); + QueueingDataSetStatusUpdaterService.start(ETLDaemon.updaterQueueFile); ch.systemsx.cisd.openbis.dss.generic.server.DataStoreServer.main(args); ETLDaemon.main(args); } diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/AbstractArchiverProcessingPlugin.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/AbstractArchiverProcessingPlugin.java index 26b9b1b41e350ef74c371a908a348cd4d6b87117..999ed84f97ef12f665d3c157b178f34fcb60baef 100644 --- a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/AbstractArchiverProcessingPlugin.java +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/AbstractArchiverProcessingPlugin.java @@ -29,7 +29,6 @@ import ch.systemsx.cisd.common.logging.LogFactory; import ch.systemsx.cisd.openbis.dss.generic.server.ProcessDatasetsCommand; import ch.systemsx.cisd.openbis.dss.generic.server.plugins.tasks.IArchiverTask; import ch.systemsx.cisd.openbis.dss.generic.server.plugins.tasks.ProcessingStatus; -import ch.systemsx.cisd.openbis.dss.generic.shared.ServiceProvider; import ch.systemsx.cisd.openbis.generic.shared.basic.dto.DataSetArchivizationStatus; import ch.systemsx.cisd.openbis.generic.shared.dto.DatasetDescription; @@ -130,21 +129,12 @@ public abstract class AbstractArchiverProcessingPlugin extends AbstractDatastore IDatasetDescriptionHandler handler) { final ProcessingStatus result = new ProcessingStatus(); - // FIXME 2010-03-22, Piotr Buczek: remove this workaround (solves StaleObjectStateException) - try - { - Thread.sleep(1000); - } catch (InterruptedException ex) - { - ex.printStackTrace(); - } for (DatasetDescription dataset : datasets) { Status status = handler.handle(dataset); DataSetArchivizationStatus newStatus = status.isError() ? failure : success; - operationLog.info(dataset + " changing status: " + newStatus); - ServiceProvider.getOpenBISService().updateDataSetStatus(dataset.getDatasetCode(), - newStatus); + QueueingDataSetStatusUpdaterService.update(new DataSetCodeWithStatus(dataset + .getDatasetCode(), newStatus)); result.addDatasetStatus(dataset, status); } return result; diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/DataSetCodeWithStatus.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/DataSetCodeWithStatus.java new file mode 100644 index 0000000000000000000000000000000000000000..d16e8138e4ee31e4920b892a0172cddf41d157b5 --- /dev/null +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/DataSetCodeWithStatus.java @@ -0,0 +1,56 @@ +/* + * Copyright 2010 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard; + +import java.io.Serializable; + +import ch.systemsx.cisd.openbis.generic.shared.basic.dto.DataSetArchivizationStatus; + +/** + * @author Piotr Buczek + */ +public class DataSetCodeWithStatus implements Serializable +{ + private static final long serialVersionUID = 1L; + + private String dataSetCode; + + private DataSetArchivizationStatus status; + + public DataSetCodeWithStatus(String dataSetCode, DataSetArchivizationStatus status) + { + this.dataSetCode = dataSetCode; + this.status = status; + } + + public String getDataSetCode() + { + return dataSetCode; + } + + public DataSetArchivizationStatus getStatus() + { + return status; + } + + @Override + public String toString() + { + return dataSetCode + " - " + getStatus(); + } + +} diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/IDataSetStatusUpdater.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/IDataSetStatusUpdater.java new file mode 100644 index 0000000000000000000000000000000000000000..0ef749a23fad2b70ea0867cd1de130190bd5826a --- /dev/null +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/IDataSetStatusUpdater.java @@ -0,0 +1,35 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard; + +import ch.systemsx.cisd.openbis.generic.shared.basic.dto.DataSetArchivizationStatus; + +/** + * A role that can update data set status. + * + * @author Piotr Buczek + */ +public interface IDataSetStatusUpdater +{ + /** + * Updates status of data set with given code. + * + * @param dataSetCode code of data set to be updated + * @param newStatus status to be set + */ + public void updateDataSetStatus(String dataSetCode, DataSetArchivizationStatus newStatus); +} \ No newline at end of file diff --git a/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/QueueingDataSetStatusUpdaterService.java b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/QueueingDataSetStatusUpdaterService.java new file mode 100644 index 0000000000000000000000000000000000000000..64fcd966ece7d33bdf883a60606cb6483ca11b0d --- /dev/null +++ b/datastore_server/source/java/ch/systemsx/cisd/openbis/dss/generic/server/plugins/standard/QueueingDataSetStatusUpdaterService.java @@ -0,0 +1,214 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.openbis.dss.generic.server.plugins.standard; + +import java.io.File; +import java.util.List; + +import org.apache.log4j.Logger; + +import ch.rinn.restrictions.Private; +import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked; +import ch.systemsx.cisd.common.TimingParameters; +import ch.systemsx.cisd.common.collections.ExtendedBlockingQueueFactory; +import ch.systemsx.cisd.common.collections.IExtendedBlockingQueue; +import ch.systemsx.cisd.common.collections.PersistentExtendedBlockingQueueDecorator; +import ch.systemsx.cisd.common.collections.RecordBasedQueuePersister; +import ch.systemsx.cisd.common.filesystem.ICloseable; +import ch.systemsx.cisd.common.logging.LogCategory; +import ch.systemsx.cisd.common.logging.LogFactory; +import ch.systemsx.cisd.openbis.dss.generic.shared.ServiceProvider; +import ch.systemsx.cisd.openbis.generic.shared.basic.dto.DataSetArchivizationStatus; + +/** + * A service for updating data set status in openBIS. It provides a method + * {@link #update(DataSetCodeWithStatus)} that queues updates using a separate thread to actually + * perform update. + * <p> + * Note that the service needs to be started via {@link #start(File, TimingParameters)}. + * <p> + * A file that keeps track of all the data sets that are to be updated needs to be specified in + * order to persist program restart. + * + * @author Piotr Buczek + */ +public class QueueingDataSetStatusUpdaterService +{ + + private final static Logger operationLog = + LogFactory.getLogger(LogCategory.OPERATION, QueueingDataSetStatusUpdaterService.class); + + private static final int INITIAL_RECORD_SIZE = 128; + + @Private + final static String UPDATER_PREFIX = ".UPDATER_"; + + private static IExtendedBlockingQueue<DataSetCodeWithStatus> queue = null; + + private static ICloseable queueCloseableOrNull = null; + + private static Thread thread = null; + + private static IDataSetStatusUpdater updater = null; + + /** + * Initializes the updater thread. <i>Needs to be called before this class is constructed for + * the first time.</i> + * + * @param queueFile the file that will be used to persist the items to be deleted over program + * restart. + */ + public static final void start(File queueFile) + { + start(queueFile, TimingParameters.getDefaultParameters()); + } + + /** + * Initializes the updater thread. <i>Needs to be called before this class is constructed for + * the first time.</i> + * + * @param queueFile the file that will be used to persist the items to be deleted over program + * restart. + */ + public static synchronized final void start(final File queueFile, TimingParameters parameters) + { + updater = new IDataSetStatusUpdater() + { + public void updateDataSetStatus(String dataSetCode, + DataSetArchivizationStatus newStatus) + { + ServiceProvider.getOpenBISService().updateDataSetStatus(dataSetCode, newStatus); + operationLog + .info("Data Set " + dataSetCode + " changed status to " + newStatus); + } + + }; + final PersistentExtendedBlockingQueueDecorator<DataSetCodeWithStatus> persistentQueue = + ExtendedBlockingQueueFactory.createPersistRecordBased(queueFile, + INITIAL_RECORD_SIZE); + queue = persistentQueue; + queueCloseableOrNull = persistentQueue; + thread = new Thread(new Runnable() + { + public void run() + { + try + { + while (true) + { + final DataSetCodeWithStatus dataSet = queue.peekWait(); + updater.updateDataSetStatus(dataSet.getDataSetCode(), dataSet + .getStatus()); + // Note: this is the only consumer of this queue. + queue.take(); + } + } catch (InterruptedException ex) + { + // Exit thread. + } catch (InterruptedExceptionUnchecked ex) + { + // Exit thread. + } + } + }, "Updater Queue"); + thread.setDaemon(true); + thread.start(); + } + + /** + * Schedules update of given data set. If operation fails the updating thread will exit. + */ + public static void update(DataSetCodeWithStatus dataSet) + { + queue.add(dataSet); + } + + private static final void close() + { + if (queueCloseableOrNull != null) + { + queueCloseableOrNull.close(); + } + } + + /** + * Stop the service. + */ + public static synchronized final void stop() + { + if (thread == null) + { + return; + } + thread.interrupt(); + close(); + thread = null; + queue = null; + queueCloseableOrNull = null; + updater = null; + } + + /** + * Stop the service and wait for it to finish, but at most <var>timeoutMillis</var> + * milli-seconds. + * + * @return <code>true</code>, if stopping was successful, <code>false</code> otherwise. + */ + public static synchronized final boolean stopAndWait(long timeoutMillis) + { + if (thread == null) + { + return true; + } + thread.interrupt(); + try + { + thread.join(timeoutMillis); + } catch (InterruptedException ex) + { + } + close(); + final boolean ok = (thread.isAlive() == false); + thread = null; + queue = null; + queueCloseableOrNull = null; + updater = null; + return ok; + } + + /** + * Returns <code>true</code>, if the service is currently running, <code>false</code> otherwise. + */ + public static synchronized final boolean isRunning() + { + return updater != null; + } + + /** + * Returns the list of currently queued up items. + */ + public static final List<DataSetCodeWithStatus> listItems(File queueFile) + { + return RecordBasedQueuePersister.list(DataSetCodeWithStatus.class, queueFile); + } + + private QueueingDataSetStatusUpdaterService() + { + // Cannot be instantiated. + } + +}