From d0ff3433a1ce731f6343da1b995cdd37610b4811 Mon Sep 17 00:00:00 2001 From: pkupczyk <pkupczyk> Date: Thu, 30 May 2013 11:55:06 +0000 Subject: [PATCH] SP-652 / BIS-402: KNIME UI problems with 2 data store servers: - universal multiplexer - create report method that uses multiple DSSes SVN: 29249 --- .../cisd/common/multiplexer/Batch.java | 49 ++++++ .../common/multiplexer/BatchesResults.java | 35 ++-- .../cisd/common/multiplexer/IBatch.java | 30 ++++ .../common/multiplexer/IBatchHandler.java | 27 +++ .../common/multiplexer/IBatchIdProvider.java | 27 +++ .../common/multiplexer/IBatchesResults.java | 4 +- .../cisd/common/multiplexer/IMultiplexer.java | 30 ++++ .../multiplexer/ThreadPoolMultiplexer.java | 154 +++++++++++++++++ .../server/CommonBusinessObjectFactory.java | 8 +- .../openbis/generic/server/CommonServer.java | 11 ++ .../generic/server/CommonServerLogger.java | 9 + .../bo/AbstractBusinessObjectFactory.java | 12 +- .../server/business/bo/DataSetTable.java | 60 ++++++- .../server/business/bo/IDataSetTable.java | 8 + .../openbis/generic/shared/ICommonServer.java | 4 + .../openbis/generic/shared/ResourceNames.java | 2 + .../query/client/api/v1/IQueryApiFacade.java | 9 + .../query/client/api/v1/QueryApiFacade.java | 13 +- .../query/server/api/v1/QueryApiLogger.java | 12 +- .../query/server/api/v1/QueryApiServer.java | 10 +- .../query/shared/api/v1/IQueryApiServer.java | 13 ++ .../source/java/genericApplicationContext.xml | 9 + .../server/business/bo/DataSetTableTest.java | 12 +- .../DssServiceRpcScreeningHolder.java | 2 +- .../DssServiceRpcScreeningMultiplexer.java | 159 +++++------------- .../IDssServiceRpcScreeningMultiplexer.java | 8 +- .../api/v1/ScreeningOpenbisServiceFacade.java | 5 +- .../screening/server/ScreeningServer.java | 61 ++++--- ...ssServiceRpcScreeningBatchResultsTest.java | 23 ++- ...DssServiceRpcScreeningMultiplexerTest.java | 16 +- 30 files changed, 625 insertions(+), 197 deletions(-) create mode 100644 common/source/java/ch/systemsx/cisd/common/multiplexer/Batch.java rename screening/source/java/ch/systemsx/cisd/openbis/dss/screening/shared/api/internal/DssServiceRpcScreeningBatchResults.java => common/source/java/ch/systemsx/cisd/common/multiplexer/BatchesResults.java (59%) create mode 100644 common/source/java/ch/systemsx/cisd/common/multiplexer/IBatch.java create mode 100644 common/source/java/ch/systemsx/cisd/common/multiplexer/IBatchHandler.java create mode 100644 common/source/java/ch/systemsx/cisd/common/multiplexer/IBatchIdProvider.java rename screening/source/java/ch/systemsx/cisd/openbis/dss/screening/shared/api/internal/IDssServiceRpcScreeningBatchResults.java => common/source/java/ch/systemsx/cisd/common/multiplexer/IBatchesResults.java (86%) create mode 100644 common/source/java/ch/systemsx/cisd/common/multiplexer/IMultiplexer.java create mode 100644 common/source/java/ch/systemsx/cisd/common/multiplexer/ThreadPoolMultiplexer.java diff --git a/common/source/java/ch/systemsx/cisd/common/multiplexer/Batch.java b/common/source/java/ch/systemsx/cisd/common/multiplexer/Batch.java new file mode 100644 index 00000000000..278dce7a913 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/multiplexer/Batch.java @@ -0,0 +1,49 @@ +/* + * Copyright 2013 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.multiplexer; + +import java.util.List; + +/** + * @author pkupczyk + */ +public class Batch<O, I> implements IBatch<O, I> +{ + + private I id; + + private List<O> objects; + + public Batch(I id, List<O> objects) + { + this.id = id; + this.objects = objects; + } + + @Override + public I getId() + { + return id; + } + + @Override + public List<O> getObjects() + { + return objects; + } + +} diff --git a/screening/source/java/ch/systemsx/cisd/openbis/dss/screening/shared/api/internal/DssServiceRpcScreeningBatchResults.java b/common/source/java/ch/systemsx/cisd/common/multiplexer/BatchesResults.java similarity index 59% rename from screening/source/java/ch/systemsx/cisd/openbis/dss/screening/shared/api/internal/DssServiceRpcScreeningBatchResults.java rename to common/source/java/ch/systemsx/cisd/common/multiplexer/BatchesResults.java index 7b420bde1b7..647d4a87731 100644 --- a/screening/source/java/ch/systemsx/cisd/openbis/dss/screening/shared/api/internal/DssServiceRpcScreeningBatchResults.java +++ b/common/source/java/ch/systemsx/cisd/common/multiplexer/BatchesResults.java @@ -14,27 +14,24 @@ * limitations under the License. */ -package ch.systemsx.cisd.openbis.dss.screening.shared.api.internal; +package ch.systemsx.cisd.common.multiplexer; import java.util.ArrayList; -import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; -import java.util.Map; import java.util.Set; /** * @author pkupczyk */ -public class DssServiceRpcScreeningBatchResults<T> implements - IDssServiceRpcScreeningBatchResults<T> +public class BatchesResults<T> implements IBatchesResults<T> { - private Map<String, List<T>> dssResultsMap = new LinkedHashMap<String, List<T>>(); + private List<List<T>> batchesResults = new ArrayList<List<T>>(); - public void addDataStoreResults(String dssUrl, List<T> dssResults) + public void addBatchResults(List<T> results) { - dssResultsMap.put(dssUrl, dssResults); + batchesResults.add(results); } @Override @@ -42,11 +39,11 @@ public class DssServiceRpcScreeningBatchResults<T> implements { List<T> results = new ArrayList<T>(); - for (List<T> dssResults : dssResultsMap.values()) + for (List<T> batchResults : batchesResults) { - if (dssResults != null) + if (batchResults != null) { - results.addAll(dssResults); + results.addAll(batchResults); } } @@ -58,11 +55,11 @@ public class DssServiceRpcScreeningBatchResults<T> implements { Set<T> results = new LinkedHashSet<T>(); - for (List<T> dssResults : dssResultsMap.values()) + for (List<T> batchResults : batchesResults) { - if (dssResults != null) + if (batchResults != null) { - results.addAll(dssResults); + results.addAll(batchResults); } } @@ -74,15 +71,15 @@ public class DssServiceRpcScreeningBatchResults<T> implements { List<T> results = new ArrayList<T>(); - for (List<T> dssResults : dssResultsMap.values()) + for (List<T> batchResults : batchesResults) { - if (dssResults != null) + if (batchResults != null) { - for (T result : dssResults) + for (T batchResult : batchResults) { - if (results.contains(result) == false) + if (results.contains(batchResult) == false) { - results.add(result); + results.add(batchResult); } } } diff --git a/common/source/java/ch/systemsx/cisd/common/multiplexer/IBatch.java b/common/source/java/ch/systemsx/cisd/common/multiplexer/IBatch.java new file mode 100644 index 00000000000..4a80b752d9a --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/multiplexer/IBatch.java @@ -0,0 +1,30 @@ +/* + * Copyright 2013 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.multiplexer; + +import java.util.List; + +/** + * @author pkupczyk + */ +public interface IBatch<O, I> +{ + public I getId(); + + public List<O> getObjects(); + +} diff --git a/common/source/java/ch/systemsx/cisd/common/multiplexer/IBatchHandler.java b/common/source/java/ch/systemsx/cisd/common/multiplexer/IBatchHandler.java new file mode 100644 index 00000000000..feaa39754ae --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/multiplexer/IBatchHandler.java @@ -0,0 +1,27 @@ +/* + * Copyright 2013 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.multiplexer; + +import java.util.List; + +/** + * @author pkupczyk + */ +public interface IBatchHandler<O, I, R> +{ + public List<R> handleBatch(IBatch<O, I> batch); +} diff --git a/common/source/java/ch/systemsx/cisd/common/multiplexer/IBatchIdProvider.java b/common/source/java/ch/systemsx/cisd/common/multiplexer/IBatchIdProvider.java new file mode 100644 index 00000000000..5f319068099 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/multiplexer/IBatchIdProvider.java @@ -0,0 +1,27 @@ +/* + * Copyright 2013 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.multiplexer; + +/** + * @author pkupczyk + */ +public interface IBatchIdProvider<O, I> +{ + + public I getBatchId(O object); + +} diff --git a/screening/source/java/ch/systemsx/cisd/openbis/dss/screening/shared/api/internal/IDssServiceRpcScreeningBatchResults.java b/common/source/java/ch/systemsx/cisd/common/multiplexer/IBatchesResults.java similarity index 86% rename from screening/source/java/ch/systemsx/cisd/openbis/dss/screening/shared/api/internal/IDssServiceRpcScreeningBatchResults.java rename to common/source/java/ch/systemsx/cisd/common/multiplexer/IBatchesResults.java index 0fdbc1ff7b2..dcae3132974 100644 --- a/screening/source/java/ch/systemsx/cisd/openbis/dss/screening/shared/api/internal/IDssServiceRpcScreeningBatchResults.java +++ b/common/source/java/ch/systemsx/cisd/common/multiplexer/IBatchesResults.java @@ -14,14 +14,14 @@ * limitations under the License. */ -package ch.systemsx.cisd.openbis.dss.screening.shared.api.internal; +package ch.systemsx.cisd.common.multiplexer; import java.util.List; /** * @author pkupczyk */ -public interface IDssServiceRpcScreeningBatchResults<T> +public interface IBatchesResults<T> { public List<T> withDuplicates(); diff --git a/common/source/java/ch/systemsx/cisd/common/multiplexer/IMultiplexer.java b/common/source/java/ch/systemsx/cisd/common/multiplexer/IMultiplexer.java new file mode 100644 index 00000000000..f0c95f5e89f --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/multiplexer/IMultiplexer.java @@ -0,0 +1,30 @@ +/* + * Copyright 2013 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.multiplexer; + +import java.util.List; + +/** + * @author pkupczyk + */ +public interface IMultiplexer +{ + + public <O, I, R> BatchesResults<R> process(final List<? extends O> objects, + final IBatchIdProvider<O, I> batchIdProvider, final IBatchHandler<O, I, R> batchHandler); + +} diff --git a/common/source/java/ch/systemsx/cisd/common/multiplexer/ThreadPoolMultiplexer.java b/common/source/java/ch/systemsx/cisd/common/multiplexer/ThreadPoolMultiplexer.java new file mode 100644 index 00000000000..c4d9fe13fd0 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/multiplexer/ThreadPoolMultiplexer.java @@ -0,0 +1,154 @@ +/* + * Copyright 2013 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.multiplexer; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import ch.systemsx.cisd.base.namedthread.NamingThreadPoolExecutor; +import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities; +import ch.systemsx.cisd.common.concurrent.ITerminableFuture; +import ch.systemsx.cisd.common.concurrent.TerminableCallable.INamedCallable; +import ch.systemsx.cisd.common.concurrent.TerminableCallable.IStoppableExecutor; + +/** + * @author pkupczyk + */ +public class ThreadPoolMultiplexer implements IMultiplexer +{ + private final NamingThreadPoolExecutor executor; + + public ThreadPoolMultiplexer(String threadPoolName) + { + this.executor = new NamingThreadPoolExecutor(threadPoolName).daemonize(); + } + + @Override + public <O, I, R> BatchesResults<R> process(final List<? extends O> objects, + final IBatchIdProvider<O, I> batchIdProvider, final IBatchHandler<O, I, R> batchHandler) + { + List<IBatch<O, I>> batches = createBatches(objects, batchIdProvider); + List<ITerminableFuture<List<R>>> futures = handleBatches(batches, batchHandler); + return gatherResults(futures); + } + + public static <O, I, R> List<IBatch<O, I>> createBatches(final List<? extends O> objects, + final IBatchIdProvider<O, I> batchIdProvider) + { + Map<I, List<O>> batchIdToObjectsMap = createBatchIdToObjectsMap(objects, batchIdProvider); + List<IBatch<O, I>> batches = new ArrayList<IBatch<O, I>>(); + + for (Map.Entry<I, List<O>> batchIdToObjectsMapEntry : batchIdToObjectsMap.entrySet()) + { + IBatch<O, I> batch = + new Batch<O, I>(batchIdToObjectsMapEntry.getKey(), + batchIdToObjectsMapEntry.getValue()); + batches.add(batch); + } + + return batches; + } + + public static <O, I, R> Map<I, List<O>> createBatchIdToObjectsMap( + final List<? extends O> objects, final IBatchIdProvider<O, I> batchIdProvider) + { + Map<I, List<O>> batchIdToObjectsMap = new HashMap<I, List<O>>(); + + if (objects != null) + { + for (O object : objects) + { + if (object != null) + { + I batchId = batchIdProvider.getBatchId(object); + if (batchId != null) + { + List<O> objectsForBatchId = batchIdToObjectsMap.get(batchId); + if (objectsForBatchId == null) + { + objectsForBatchId = new ArrayList<O>(); + batchIdToObjectsMap.put(batchId, objectsForBatchId); + } + objectsForBatchId.add(object); + } + } + } + } + + return batchIdToObjectsMap; + } + + private <O, I, R> List<ITerminableFuture<List<R>>> handleBatches( + final List<IBatch<O, I>> batches, final IBatchHandler<O, I, R> batchHandler) + { + List<ITerminableFuture<List<R>>> futures = new ArrayList<ITerminableFuture<List<R>>>(); + + final long startTime = System.currentTimeMillis(); + + for (final IBatch<O, I> batch : batches) + { + ITerminableFuture<List<R>> future = + ConcurrencyUtilities.submit(executor, new INamedCallable<List<R>>() + { + @Override + public List<R> call(IStoppableExecutor<List<R>> stoppableExecutor) + throws Exception + { + return batchHandler.handleBatch(batch); + } + + @Override + public String getCallableName() + { + return batch.getId() + "(" + startTime + ")"; + } + }); + futures.add(future); + } + + return futures; + } + + private <R> BatchesResults<R> gatherResults(final List<ITerminableFuture<List<R>>> futures) + { + BatchesResults<R> batchesResults = new BatchesResults<R>(); + + try + { + for (ITerminableFuture<List<R>> future : futures) + { + List<R> results = ConcurrencyUtilities.tryGetResult(future, -1); + if (results != null) + { + batchesResults.addBatchResults(results); + } + } + } catch (RuntimeException e) + { + for (ITerminableFuture<List<R>> future : futures) + { + future.cancel(true); + } + throw e; + } + + return batchesResults; + } + +} diff --git a/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/CommonBusinessObjectFactory.java b/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/CommonBusinessObjectFactory.java index 56f73e4fc13..c53b6bd44f8 100644 --- a/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/CommonBusinessObjectFactory.java +++ b/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/CommonBusinessObjectFactory.java @@ -16,6 +16,7 @@ package ch.systemsx.cisd.openbis.generic.server; +import ch.systemsx.cisd.common.multiplexer.IMultiplexer; import ch.systemsx.cisd.openbis.generic.server.business.IDataStoreServiceFactory; import ch.systemsx.cisd.openbis.generic.server.business.IEntityOperationChecker; import ch.systemsx.cisd.openbis.generic.server.business.IRelationshipService; @@ -102,10 +103,11 @@ public final class CommonBusinessObjectFactory extends AbstractBusinessObjectFac IRelationshipService relationshipService, IEntityOperationChecker entityOperationChecker, IServiceConversationClientManagerLocal conversationClient, - IManagedPropertyEvaluatorFactory managedPropertyEvaluatorFactory) + IManagedPropertyEvaluatorFactory managedPropertyEvaluatorFactory, + IMultiplexer multiplexer) { super(daoFactory, dssFactory, relationshipService, entityOperationChecker, - conversationClient, managedPropertyEvaluatorFactory); + conversationClient, managedPropertyEvaluatorFactory, multiplexer); } @Override @@ -192,7 +194,7 @@ public final class CommonBusinessObjectFactory extends AbstractBusinessObjectFac { return new DataSetTable(getDaoFactory(), getDSSFactory(), session, getRelationshipService(), getConversationClient(), - getManagedPropertyEvaluatorFactory()); + getManagedPropertyEvaluatorFactory(), getMultiplexer()); } @Override diff --git a/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/CommonServer.java b/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/CommonServer.java index 6b372ba898c..206aa4e3de9 100644 --- a/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/CommonServer.java +++ b/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/CommonServer.java @@ -3157,6 +3157,17 @@ public final class CommonServer extends AbstractCommonServer<ICommonServerForInt serviceDescription.getDatastoreCode(), datasetCodes); } + @Override + @RolesAllowed(RoleWithHierarchy.SPACE_OBSERVER) + public TableModel createReportFromDatasets(String sessionToken, String serviceKey, + @AuthorizationGuard(guardClass = DataSetCodeCollectionPredicate.class) + List<String> datasetCodes) + { + Session session = getSession(sessionToken); + IDataSetTable dataSetTable = businessObjectFactory.createDataSetTable(session); + return dataSetTable.createReportFromDatasets(serviceKey, datasetCodes); + } + @Override @RolesAllowed(RoleWithHierarchy.SPACE_OBSERVER) public TableModel createReportFromAggregationService(String sessionToken, diff --git a/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/CommonServerLogger.java b/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/CommonServerLogger.java index 1f9579ad80c..97874c2e504 100644 --- a/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/CommonServerLogger.java +++ b/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/CommonServerLogger.java @@ -1221,6 +1221,15 @@ final class CommonServerLogger extends AbstractServerLogger implements ICommonSe return null; } + @Override + public TableModel createReportFromDatasets(String sessionToken, String serviceKey, + List<String> datasetCodes) + { + logAccess(sessionToken, "createReportFromDatasets", "SERVICE(%s), DATASETS(%s)", + serviceKey, abbreviate(datasetCodes)); + return null; + } + @Override public void processDatasets(String sessionToken, DatastoreServiceDescription serviceDescription, List<String> datasetCodes) diff --git a/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/business/bo/AbstractBusinessObjectFactory.java b/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/business/bo/AbstractBusinessObjectFactory.java index 29456d040b9..5efe0c2d639 100644 --- a/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/business/bo/AbstractBusinessObjectFactory.java +++ b/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/business/bo/AbstractBusinessObjectFactory.java @@ -18,6 +18,7 @@ package ch.systemsx.cisd.openbis.generic.server.business.bo; import javax.annotation.Resource; +import ch.systemsx.cisd.common.multiplexer.IMultiplexer; import ch.systemsx.cisd.openbis.generic.server.ComponentNames; import ch.systemsx.cisd.openbis.generic.server.business.IDataStoreServiceFactory; import ch.systemsx.cisd.openbis.generic.server.business.IEntityOperationChecker; @@ -52,6 +53,8 @@ public abstract class AbstractBusinessObjectFactory private final IEntityResolverQuery entityResolver; + private IMultiplexer multiplexer; + protected AbstractBusinessObjectFactory() { this.entityResolver = EntityResolverQueryFactory.create(); @@ -61,7 +64,8 @@ public abstract class AbstractBusinessObjectFactory IDataStoreServiceFactory dssFactory, IRelationshipService relationshipService, IEntityOperationChecker entityOperationChecker, IServiceConversationClientManagerLocal conversationClient, - IManagedPropertyEvaluatorFactory managedPropertyEvaluatorFactory) + IManagedPropertyEvaluatorFactory managedPropertyEvaluatorFactory, + IMultiplexer multiplexer) { this(); this.daoFactory = daoFactory; @@ -70,6 +74,7 @@ public abstract class AbstractBusinessObjectFactory this.entityOperationChecker = entityOperationChecker; this.conversationClient = conversationClient; this.managedPropertyEvaluatorFactory = managedPropertyEvaluatorFactory; + this.multiplexer = multiplexer; } protected final IDAOFactory getDaoFactory() @@ -110,4 +115,9 @@ public abstract class AbstractBusinessObjectFactory return entityResolver; } + protected IMultiplexer getMultiplexer() + { + return multiplexer; + } + } diff --git a/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/business/bo/DataSetTable.java b/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/business/bo/DataSetTable.java index deaba0aa3ac..70a5379c2c3 100644 --- a/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/business/bo/DataSetTable.java +++ b/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/business/bo/DataSetTable.java @@ -41,6 +41,11 @@ import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException; import ch.systemsx.cisd.common.exceptions.UserFailureException; import ch.systemsx.cisd.common.logging.LogCategory; import ch.systemsx.cisd.common.logging.LogFactory; +import ch.systemsx.cisd.common.multiplexer.BatchesResults; +import ch.systemsx.cisd.common.multiplexer.IBatch; +import ch.systemsx.cisd.common.multiplexer.IBatchHandler; +import ch.systemsx.cisd.common.multiplexer.IBatchIdProvider; +import ch.systemsx.cisd.common.multiplexer.IMultiplexer; import ch.systemsx.cisd.openbis.generic.server.business.IDataStoreServiceFactory; import ch.systemsx.cisd.openbis.generic.server.business.IRelationshipService; import ch.systemsx.cisd.openbis.generic.server.business.IServiceConversationClientManagerLocal; @@ -52,10 +57,10 @@ import ch.systemsx.cisd.openbis.generic.shared.Constants; import ch.systemsx.cisd.openbis.generic.shared.IDataStoreService; import ch.systemsx.cisd.openbis.generic.shared.basic.BasicConstant; import ch.systemsx.cisd.openbis.generic.shared.basic.TechId; +import ch.systemsx.cisd.openbis.generic.shared.basic.dto.AbstractExternalData; import ch.systemsx.cisd.openbis.generic.shared.basic.dto.Code; import ch.systemsx.cisd.openbis.generic.shared.basic.dto.DataSetArchivingStatus; import ch.systemsx.cisd.openbis.generic.shared.basic.dto.DataSetBatchUpdateDetails; -import ch.systemsx.cisd.openbis.generic.shared.basic.dto.AbstractExternalData; import ch.systemsx.cisd.openbis.generic.shared.basic.dto.LinkModel; import ch.systemsx.cisd.openbis.generic.shared.basic.dto.Metaproject; import ch.systemsx.cisd.openbis.generic.shared.basic.dto.TableModel; @@ -195,16 +200,20 @@ public final class DataSetTable extends AbstractDataSetBusinessObject implements private final IDataStoreServiceFactory dssFactory; + private final IMultiplexer multiplexer; + private List<DataPE> dataSets; public DataSetTable(IDAOFactory daoFactory, IDataStoreServiceFactory dssFactory, Session session, IRelationshipService relationshipService, IServiceConversationClientManagerLocal conversationClient, - IManagedPropertyEvaluatorFactory managedPropertyEvaluatorFactory) + IManagedPropertyEvaluatorFactory managedPropertyEvaluatorFactory, + IMultiplexer multiplexer) { super(daoFactory, session, relationshipService, conversationClient, managedPropertyEvaluatorFactory); this.dssFactory = dssFactory; + this.multiplexer = multiplexer; } // @@ -533,6 +542,53 @@ public final class DataSetTable extends AbstractDataSetBusinessObject implements datastoreServiceKey, locations, tryGetLoggedUserId(), tryGetLoggedUserEmail()); } + @Override + public TableModel createReportFromDatasets(final String datastoreServiceKey, + final List<String> datasetCodes) + { + List<DatasetDescription> locations = loadAvailableDatasetDescriptions(datasetCodes); + + IBatchIdProvider<DatasetDescription, String> batchIdProvider = + new IBatchIdProvider<DatasetDescription, String>() + { + @Override + public String getBatchId(DatasetDescription object) + { + return object.getDataStoreCode(); + } + }; + + IBatchHandler<DatasetDescription, String, TableModel> batchHandler = + new IBatchHandler<DatasetDescription, String, TableModel>() + { + @Override + public List<TableModel> handleBatch(IBatch<DatasetDescription, String> batch) + { + DataStorePE dataStore = findDataStore(batch.getId()); + String sessionToken = session.getSessionToken(); + String storeSessionToken = dataStore.getSessionToken(); + + IDataStoreService service = + getConversationClient().getDataStoreService( + dataStore.getRemoteUrl(), sessionToken); + + TableModel tableModel = + service.createReportFromDatasets(storeSessionToken, + sessionToken, datastoreServiceKey, batch.getObjects(), + tryGetLoggedUserId(), tryGetLoggedUserEmail()); + + return Collections.singletonList(tableModel); + } + }; + + BatchesResults<TableModel> batchesResults = + multiplexer.process(locations, batchIdProvider, batchHandler); + List<TableModel> tableModels = batchesResults.withDuplicates(); + + // TODO combine results + return tableModels.get(0); + } + private List<DatasetDescription> loadAvailableDatasetDescriptions(List<String> dataSetCodes) { IDataDAO dataDAO = getDataDAO(); diff --git a/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/business/bo/IDataSetTable.java b/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/business/bo/IDataSetTable.java index f61470a941a..3cd202b751e 100644 --- a/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/business/bo/IDataSetTable.java +++ b/openbis/source/java/ch/systemsx/cisd/openbis/generic/server/business/bo/IDataSetTable.java @@ -131,6 +131,14 @@ public interface IDataSetTable TableModel createReportFromDatasets(String datastoreServiceKey, String datastoreCode, List<String> datasetCodes); + /** + * Creates a report from specified datasets using the specified datastore service. It groups the + * data sets by a data store and creates a report for each group of objects on appropriate data + * store server. Results from the data stores are combined and returned as a result of this + * method. + */ + TableModel createReportFromDatasets(String datastoreServiceKey, List<String> datasetCodes); + /** * Schedules processing of specified datasets with specified parameter bindings using the * specified datastore service. diff --git a/openbis/source/java/ch/systemsx/cisd/openbis/generic/shared/ICommonServer.java b/openbis/source/java/ch/systemsx/cisd/openbis/generic/shared/ICommonServer.java index c8a1e4e1042..90b1c4224db 100644 --- a/openbis/source/java/ch/systemsx/cisd/openbis/generic/shared/ICommonServer.java +++ b/openbis/source/java/ch/systemsx/cisd/openbis/generic/shared/ICommonServer.java @@ -1127,6 +1127,10 @@ public interface ICommonServer extends IServer public TableModel createReportFromDatasets(String sessionToken, DatastoreServiceDescription serviceDescription, List<String> datasetCodes); + @Transactional(readOnly = true) + public TableModel createReportFromDatasets(String sessionToken, String serviceKey, + List<String> datasetCodes); + @Transactional(readOnly = true) public TableModel createReportFromAggregationService(String sessionToken, DatastoreServiceDescription serviceDescription, Map<String, Object> parameters); diff --git a/openbis/source/java/ch/systemsx/cisd/openbis/generic/shared/ResourceNames.java b/openbis/source/java/ch/systemsx/cisd/openbis/generic/shared/ResourceNames.java index d56611f8127..0b7d5e1967e 100644 --- a/openbis/source/java/ch/systemsx/cisd/openbis/generic/shared/ResourceNames.java +++ b/openbis/source/java/ch/systemsx/cisd/openbis/generic/shared/ResourceNames.java @@ -81,4 +81,6 @@ public final class ResourceNames public final static String SERVICE_CONVERSATION_SERVER_MANAGER = "service-conversation-server-manager"; + public final static String MULTIPLEXER = "multiplexer"; + } diff --git a/openbis/source/java/ch/systemsx/cisd/openbis/plugin/query/client/api/v1/IQueryApiFacade.java b/openbis/source/java/ch/systemsx/cisd/openbis/plugin/query/client/api/v1/IQueryApiFacade.java index 51f2896cbe2..079dd9726c4 100644 --- a/openbis/source/java/ch/systemsx/cisd/openbis/plugin/query/client/api/v1/IQueryApiFacade.java +++ b/openbis/source/java/ch/systemsx/cisd/openbis/plugin/query/client/api/v1/IQueryApiFacade.java @@ -65,6 +65,15 @@ public interface IQueryApiFacade public QueryTableModel createReportFromDataSets(ReportDescription reportDescription, List<String> dataSetCodes); + /** + * Creates for the specified data sets and specified report key a report. It groups the data + * sets by a data store and creates a report for each group of objects on appropriate data store + * server. Results from the data stores are combined and returned as a result of this method. + * Available report keys can be obtained by {@link #listTableReportDescriptions()}. + */ + @Retry + public QueryTableModel createReportFromDataSets(String reportKey, List<String> dataSetCodes); + /** * Returns a remote access to the {@link IGeneralInformationService}. */ diff --git a/openbis/source/java/ch/systemsx/cisd/openbis/plugin/query/client/api/v1/QueryApiFacade.java b/openbis/source/java/ch/systemsx/cisd/openbis/plugin/query/client/api/v1/QueryApiFacade.java index 74040be3a3e..97ccf6bcdcd 100644 --- a/openbis/source/java/ch/systemsx/cisd/openbis/plugin/query/client/api/v1/QueryApiFacade.java +++ b/openbis/source/java/ch/systemsx/cisd/openbis/plugin/query/client/api/v1/QueryApiFacade.java @@ -97,6 +97,12 @@ class QueryApiFacade implements IQueryApiFacade reportDescription.getKey(), dataSetCodes); } + @Override + public QueryTableModel createReportFromDataSets(String reportKey, List<String> dataSetCodes) + { + return service.createReportFromDataSets(sessionToken, reportKey, dataSetCodes); + } + @Override public List<AggregationServiceDescription> listAggregationServices() { @@ -105,10 +111,13 @@ class QueryApiFacade implements IQueryApiFacade } @Override - public QueryTableModel createReportFromAggregationService(AggregationServiceDescription serviceDescription, Map<String, Object> parameters) + public QueryTableModel createReportFromAggregationService( + AggregationServiceDescription serviceDescription, Map<String, Object> parameters) { checkMinimalServerVersion(1, 3); - return service.createReportFromAggregationService(sessionToken, serviceDescription.getDataStoreCode(), serviceDescription.getServiceKey(), parameters); + return service.createReportFromAggregationService(sessionToken, + serviceDescription.getDataStoreCode(), serviceDescription.getServiceKey(), + parameters); } /** diff --git a/openbis/source/java/ch/systemsx/cisd/openbis/plugin/query/server/api/v1/QueryApiLogger.java b/openbis/source/java/ch/systemsx/cisd/openbis/plugin/query/server/api/v1/QueryApiLogger.java index ce21f992ac3..a03daa6f49a 100644 --- a/openbis/source/java/ch/systemsx/cisd/openbis/plugin/query/server/api/v1/QueryApiLogger.java +++ b/openbis/source/java/ch/systemsx/cisd/openbis/plugin/query/server/api/v1/QueryApiLogger.java @@ -78,6 +78,15 @@ class QueryApiLogger extends AbstractServerLogger implements IQueryApiServer return null; } + @Override + public QueryTableModel createReportFromDataSets(String sessionToken, String serviceKey, + List<String> dataSetCodes) + { + logAccess(sessionToken, "create_report_from_data_sets", "SERVICE(%s) DATA_SETS(%s)", + serviceKey, dataSetCodes); + return null; + } + @Override public int getMajorVersion() { @@ -98,7 +107,8 @@ class QueryApiLogger extends AbstractServerLogger implements IQueryApiServer } @Override - public QueryTableModel createReportFromAggregationService(String sessionToken, String dataStoreCode, String serviceKey, Map<String, Object> parameters) + public QueryTableModel createReportFromAggregationService(String sessionToken, + String dataStoreCode, String serviceKey, Map<String, Object> parameters) { logAccess(sessionToken, "create_report_from_aggregation_service", "DATA_STORE(%s) SERVICE(%s) PARAMETERS(%s)", dataStoreCode, serviceKey, parameters); diff --git a/openbis/source/java/ch/systemsx/cisd/openbis/plugin/query/server/api/v1/QueryApiServer.java b/openbis/source/java/ch/systemsx/cisd/openbis/plugin/query/server/api/v1/QueryApiServer.java index 6be69f5b17a..02143a10a0e 100644 --- a/openbis/source/java/ch/systemsx/cisd/openbis/plugin/query/server/api/v1/QueryApiServer.java +++ b/openbis/source/java/ch/systemsx/cisd/openbis/plugin/query/server/api/v1/QueryApiServer.java @@ -182,6 +182,14 @@ public class QueryApiServer extends AbstractServer<IQueryApiServer> implements I dataSetCodes)); } + @Override + public QueryTableModel createReportFromDataSets(String sessionToken, String serviceKey, + List<String> dataSetCodes) + { + return translate(commonServer.createReportFromDatasets(sessionToken, serviceKey, + dataSetCodes)); + } + @Override public List<AggregationServiceDescription> listAggregationServices(String sessionToken) { @@ -235,7 +243,7 @@ public class QueryApiServer extends AbstractServer<IQueryApiServer> implements I @Override public int getMinorVersion() { - return 5; + return 6; } private QueryTableModel translate(TableModel result) diff --git a/openbis/source/java/ch/systemsx/cisd/openbis/plugin/query/shared/api/v1/IQueryApiServer.java b/openbis/source/java/ch/systemsx/cisd/openbis/plugin/query/shared/api/v1/IQueryApiServer.java index 4fa60fa73a4..d4b3a367532 100644 --- a/openbis/source/java/ch/systemsx/cisd/openbis/plugin/query/shared/api/v1/IQueryApiServer.java +++ b/openbis/source/java/ch/systemsx/cisd/openbis/plugin/query/shared/api/v1/IQueryApiServer.java @@ -94,6 +94,19 @@ public interface IQueryApiServer extends IRpcService public QueryTableModel createReportFromDataSets(String sessionToken, String dataStoreCode, String serviceKey, List<String> dataSetCodes); + /** + * Creates for the specified data sets a report. It groups the data sets by a data store and + * creates a report for each group of objects on appropriate data store server. Results from the + * data stores are combined and returned as a result of this method. Available report keys can + * be obtained by {@link #listTableReportDescriptions(String)}. + * + * @param serviceKey Key of the data store service. + * @since 1.6 + */ + @Transactional(readOnly = true) + public QueryTableModel createReportFromDataSets(String sessionToken, String serviceKey, + List<String> dataSetCodes); + /** * Returns metadata for all aggregation and ingestion services. * diff --git a/openbis/source/java/genericApplicationContext.xml b/openbis/source/java/genericApplicationContext.xml index 6e258a94261..027ce75123f 100644 --- a/openbis/source/java/genericApplicationContext.xml +++ b/openbis/source/java/genericApplicationContext.xml @@ -139,6 +139,7 @@ <constructor-arg ref="entity-operation-checker" /> <constructor-arg ref="service-conversation-client-manager" /> <constructor-arg ref="managed-property-evaluator-factory" /> + <constructor-arg ref="multiplexer" /> </bean> <bean id="last-modification-state" @@ -375,4 +376,12 @@ <plugins:component-scan base-package="ch.systemsx.cisd.openbis" annotation-config="false"> <plugins:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller" /> </plugins:component-scan> + + + <!-- Mutiplexer --> + + <bean id="multiplexer" class="ch.systemsx.cisd.common.multiplexer.ThreadPoolMultiplexer"> + <constructor-arg value="mutiplexer-thread-pool" /> + </bean> + </beans> diff --git a/openbis/sourceTest/java/ch/systemsx/cisd/openbis/generic/server/business/bo/DataSetTableTest.java b/openbis/sourceTest/java/ch/systemsx/cisd/openbis/generic/server/business/bo/DataSetTableTest.java index 3d35ef45ccd..3bd570446a1 100644 --- a/openbis/sourceTest/java/ch/systemsx/cisd/openbis/generic/server/business/bo/DataSetTableTest.java +++ b/openbis/sourceTest/java/ch/systemsx/cisd/openbis/generic/server/business/bo/DataSetTableTest.java @@ -39,6 +39,7 @@ import org.testng.annotations.Test; import ch.rinn.restrictions.Friend; import ch.systemsx.cisd.common.exceptions.UserFailureException; +import ch.systemsx.cisd.common.multiplexer.IMultiplexer; import ch.systemsx.cisd.openbis.generic.server.business.IDataStoreServiceFactory; import ch.systemsx.cisd.openbis.generic.server.business.ManagerTestTool; import ch.systemsx.cisd.openbis.generic.server.dataaccess.event.DeleteDataSetEventBuilder; @@ -46,10 +47,10 @@ import ch.systemsx.cisd.openbis.generic.shared.CommonTestUtils; import ch.systemsx.cisd.openbis.generic.shared.IDataStoreService; import ch.systemsx.cisd.openbis.generic.shared.basic.BasicConstant; import ch.systemsx.cisd.openbis.generic.shared.basic.TechId; +import ch.systemsx.cisd.openbis.generic.shared.basic.dto.AbstractExternalData; import ch.systemsx.cisd.openbis.generic.shared.basic.dto.Code; import ch.systemsx.cisd.openbis.generic.shared.basic.dto.DataSetArchivingStatus; import ch.systemsx.cisd.openbis.generic.shared.basic.dto.DataSetKind; -import ch.systemsx.cisd.openbis.generic.shared.basic.dto.AbstractExternalData; import ch.systemsx.cisd.openbis.generic.shared.dto.DataSetTypePE; import ch.systemsx.cisd.openbis.generic.shared.dto.DataSetUploadContext; import ch.systemsx.cisd.openbis.generic.shared.dto.DataStorePE; @@ -96,10 +97,13 @@ public final class DataSetTableTest extends AbstractBOTest private IDataStoreService dataStoreServiceConversational3; + private IMultiplexer multiplexer; + private final DataSetTable createDataSetTable() { return new DataSetTable(daoFactory, dssFactory, ManagerTestTool.EXAMPLE_SESSION, - relationshipService, conversationClient, managedPropertyEvaluatorFactory); + relationshipService, conversationClient, managedPropertyEvaluatorFactory, + multiplexer); } @BeforeMethod @@ -114,6 +118,7 @@ public final class DataSetTableTest extends AbstractBOTest dataStoreService1 = context.mock(IDataStoreService.class, "dataStoreService1"); dataStoreService2 = context.mock(IDataStoreService.class, "dataStoreService2"); dataStoreService3 = context.mock(IDataStoreService.class, "dataStoreService3"); + multiplexer = context.mock(IMultiplexer.class); dataStoreServiceConversational1 = context.mock(IDataStoreService.class, "dataStoreServiceConversational1"); @@ -380,7 +385,8 @@ public final class DataSetTableTest extends AbstractBOTest @Override public boolean matches(Object item) { - List<AbstractExternalData> list = (List<AbstractExternalData>) item; + List<AbstractExternalData> list = + (List<AbstractExternalData>) item; if (list.size() != 1) { return false; diff --git a/screening/source/java/ch/systemsx/cisd/openbis/dss/screening/shared/api/internal/DssServiceRpcScreeningHolder.java b/screening/source/java/ch/systemsx/cisd/openbis/dss/screening/shared/api/internal/DssServiceRpcScreeningHolder.java index 11d228afb55..c9bb635c9f6 100644 --- a/screening/source/java/ch/systemsx/cisd/openbis/dss/screening/shared/api/internal/DssServiceRpcScreeningHolder.java +++ b/screening/source/java/ch/systemsx/cisd/openbis/dss/screening/shared/api/internal/DssServiceRpcScreeningHolder.java @@ -16,7 +16,7 @@ package ch.systemsx.cisd.openbis.dss.screening.shared.api.internal; -import ch.rinn.restrictions.Private; +import ch.rinn.restrictions.Private; import ch.systemsx.cisd.common.spring.HttpInvokerUtils; import ch.systemsx.cisd.openbis.dss.screening.shared.api.v1.IDssServiceRpcScreening; diff --git a/screening/source/java/ch/systemsx/cisd/openbis/dss/screening/shared/api/internal/DssServiceRpcScreeningMultiplexer.java b/screening/source/java/ch/systemsx/cisd/openbis/dss/screening/shared/api/internal/DssServiceRpcScreeningMultiplexer.java index fca6b5a2c0f..9b38a9232ba 100644 --- a/screening/source/java/ch/systemsx/cisd/openbis/dss/screening/shared/api/internal/DssServiceRpcScreeningMultiplexer.java +++ b/screening/source/java/ch/systemsx/cisd/openbis/dss/screening/shared/api/internal/DssServiceRpcScreeningMultiplexer.java @@ -16,18 +16,15 @@ package ch.systemsx.cisd.openbis.dss.screening.shared.api.internal; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import ch.systemsx.cisd.base.namedthread.NamingThreadPoolExecutor; -import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities; -import ch.systemsx.cisd.common.concurrent.ITerminableFuture; -import ch.systemsx.cisd.common.concurrent.TerminableCallable.INamedCallable; -import ch.systemsx.cisd.common.concurrent.TerminableCallable.IStoppableExecutor; +import ch.systemsx.cisd.common.multiplexer.BatchesResults; +import ch.systemsx.cisd.common.multiplexer.IBatch; +import ch.systemsx.cisd.common.multiplexer.IBatchHandler; +import ch.systemsx.cisd.common.multiplexer.IBatchIdProvider; +import ch.systemsx.cisd.common.multiplexer.IMultiplexer; +import ch.systemsx.cisd.common.multiplexer.ThreadPoolMultiplexer; import ch.systemsx.cisd.openbis.plugin.screening.shared.api.v1.dto.IDatasetIdentifier; /** @@ -35,135 +32,67 @@ import ch.systemsx.cisd.openbis.plugin.screening.shared.api.v1.dto.IDatasetIdent */ public class DssServiceRpcScreeningMultiplexer implements IDssServiceRpcScreeningMultiplexer { - private final IDssServiceRpcScreeningFactory dssServiceFactory; + private final IMultiplexer multiplexer; - private final NamingThreadPoolExecutor executor; + private final IDssServiceRpcScreeningFactory dssServiceFactory; - public DssServiceRpcScreeningMultiplexer(IDssServiceRpcScreeningFactory dssServiceFactory) + public DssServiceRpcScreeningMultiplexer(IMultiplexer multiplexer, + IDssServiceRpcScreeningFactory dssServiceFactory) { + if (multiplexer == null) + { + throw new IllegalArgumentException("Multiplexer cannot be null"); + } if (dssServiceFactory == null) { throw new IllegalArgumentException("Dss service factory cannot be null"); } + this.multiplexer = multiplexer; this.dssServiceFactory = dssServiceFactory; - this.executor = new NamingThreadPoolExecutor("Dss service screening multiplexer").daemonize(); } @Override - public <R extends IDatasetIdentifier, V> DssServiceRpcScreeningBatchResults<V> process( - final List<? extends R> references, - final IDssServiceRpcScreeningBatchHandler<R, V> batchHandler) - { - Map<String, List<R>> referencesPerDataStore = getReferencesPerDataStore(cast(references)); - - Map<String, ITerminableFuture<List<V>>> futuresPerDataStore = - submitReferencesToDataStores(referencesPerDataStore, batchHandler); - - return gatherResultsFromDataStores(futuresPerDataStore); - } - - public static <R extends IDatasetIdentifier> Map<String, List<R>> getReferencesPerDataStore( - final List<R> references) + public <O extends IDatasetIdentifier, R> BatchesResults<R> process( + final List<? extends O> objects, + final IDssServiceRpcScreeningBatchHandler<O, R> screeningBatchHandler) { - HashMap<String, List<R>> referencesPerDataStore = new HashMap<String, List<R>>(); - - if (references != null) - { - for (R reference : references) + IBatchIdProvider<O, String> batchIdProvider = new IBatchIdProvider<O, String>() { - if (reference != null) + @Override + public String getBatchId(O object) { - String dataStoreUrl = reference.getDatastoreServerUrl(); - if (dataStoreUrl != null) - { - List<R> dataStoreReferences = referencesPerDataStore.get(dataStoreUrl); - if (dataStoreReferences == null) - { - dataStoreReferences = new ArrayList<R>(); - referencesPerDataStore.put(dataStoreUrl, dataStoreReferences); - } - dataStoreReferences.add(reference); - } + return object.getDatastoreServerUrl(); } - } - } - - return referencesPerDataStore; - } - - private <R extends IDatasetIdentifier, V> Map<String, ITerminableFuture<List<V>>> submitReferencesToDataStores( - final Map<String, List<R>> referencesPerDataStore, - final IDssServiceRpcScreeningBatchHandler<R, V> batchHandler) - { - Map<String, ITerminableFuture<List<V>>> futuresPerDataStore = - new LinkedHashMap<String, ITerminableFuture<List<V>>>(); - final long submitTime = System.currentTimeMillis(); + }; - for (Entry<String, List<R>> referencePerDataStore : referencesPerDataStore.entrySet()) - { - final String dataStoreUrl = referencePerDataStore.getKey(); - final List<R> dataStoreReferences = referencePerDataStore.getValue(); - - ITerminableFuture<List<V>> dataStoreFuture = - ConcurrencyUtilities.submit(executor, new INamedCallable<List<V>>() - { - @Override - public List<V> call(IStoppableExecutor<List<V>> stoppableExecutor) - throws Exception - { - final DssServiceRpcScreeningHolder dataStoreServiceHolder = - dssServiceFactory.createDssService(dataStoreUrl); - return batchHandler.handle(dataStoreServiceHolder, - dataStoreReferences); - } - - @Override - public String getCallableName() - { - return dataStoreUrl + "(" + submitTime + ")"; - } - }); - futuresPerDataStore.put(dataStoreUrl, dataStoreFuture); - } - - return futuresPerDataStore; - } - - private <V> DssServiceRpcScreeningBatchResults<V> gatherResultsFromDataStores( - final Map<String, ITerminableFuture<List<V>>> futuresPerDataStore) - { - DssServiceRpcScreeningBatchResults<V> results = new DssServiceRpcScreeningBatchResults<V>(); - - try - { - for (Map.Entry<String, ITerminableFuture<List<V>>> futurePerDataStore : futuresPerDataStore - .entrySet()) + IBatchHandler<O, String, R> batchHandler = new IBatchHandler<O, String, R>() { - String dataStoreUrl = futurePerDataStore.getKey(); - ITerminableFuture<List<V>> dataStoreFuture = futurePerDataStore.getValue(); - - List<V> dataStoreResults = ConcurrencyUtilities.tryGetResult(dataStoreFuture, -1); - if (dataStoreResults != null) + @Override + public List<R> handleBatch(IBatch<O, String> batch) { - results.addDataStoreResults(dataStoreUrl, dataStoreResults); + DssServiceRpcScreeningHolder dssService = + dssServiceFactory.createDssService(batch.getId()); + return screeningBatchHandler.handle(dssService, batch.getObjects()); } - } - } catch (RuntimeException e) - { - for (ITerminableFuture<List<V>> dataStoreFuture : futuresPerDataStore.values()) - { - dataStoreFuture.cancel(true); - } - throw e; - } + }; - return results; + return multiplexer.process(objects, batchIdProvider, batchHandler); } - @SuppressWarnings("unchecked") - private <R extends IDatasetIdentifier> List<R> cast(List<? extends R> references) + public static Map<String, List<IDatasetIdentifier>> getReferencesPerDataStore( + List<IDatasetIdentifier> dataSetIdentifiers) { - return (List<R>) references; + IBatchIdProvider<IDatasetIdentifier, String> batchIdProvider = + new IBatchIdProvider<IDatasetIdentifier, String>() + { + @Override + public String getBatchId(IDatasetIdentifier object) + { + return object.getDatastoreServerUrl(); + } + }; + + return ThreadPoolMultiplexer.createBatchIdToObjectsMap(dataSetIdentifiers, batchIdProvider); } } diff --git a/screening/source/java/ch/systemsx/cisd/openbis/dss/screening/shared/api/internal/IDssServiceRpcScreeningMultiplexer.java b/screening/source/java/ch/systemsx/cisd/openbis/dss/screening/shared/api/internal/IDssServiceRpcScreeningMultiplexer.java index dbc5962b64c..08ba109614a 100644 --- a/screening/source/java/ch/systemsx/cisd/openbis/dss/screening/shared/api/internal/IDssServiceRpcScreeningMultiplexer.java +++ b/screening/source/java/ch/systemsx/cisd/openbis/dss/screening/shared/api/internal/IDssServiceRpcScreeningMultiplexer.java @@ -18,6 +18,7 @@ package ch.systemsx.cisd.openbis.dss.screening.shared.api.internal; import java.util.List; +import ch.systemsx.cisd.common.multiplexer.BatchesResults; import ch.systemsx.cisd.openbis.plugin.screening.shared.api.v1.dto.IDatasetIdentifier; /** @@ -26,8 +27,7 @@ import ch.systemsx.cisd.openbis.plugin.screening.shared.api.v1.dto.IDatasetIdent public interface IDssServiceRpcScreeningMultiplexer { - public <R extends IDatasetIdentifier, V> DssServiceRpcScreeningBatchResults<V> process( - final List<? extends R> references, - final IDssServiceRpcScreeningBatchHandler<R, V> batchHandler); - + public <O extends IDatasetIdentifier, R> BatchesResults<R> process( + final List<? extends O> objects, + final IDssServiceRpcScreeningBatchHandler<O, R> batchHandler); } diff --git a/screening/source/java/ch/systemsx/cisd/openbis/plugin/screening/client/api/v1/ScreeningOpenbisServiceFacade.java b/screening/source/java/ch/systemsx/cisd/openbis/plugin/screening/client/api/v1/ScreeningOpenbisServiceFacade.java index 320fcf700e2..bdb71036e39 100644 --- a/screening/source/java/ch/systemsx/cisd/openbis/plugin/screening/client/api/v1/ScreeningOpenbisServiceFacade.java +++ b/screening/source/java/ch/systemsx/cisd/openbis/plugin/screening/client/api/v1/ScreeningOpenbisServiceFacade.java @@ -28,6 +28,8 @@ import ch.systemsx.cisd.common.api.retry.RetryProxyFactory; import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException; import ch.systemsx.cisd.common.exceptions.UserFailureException; import ch.systemsx.cisd.common.io.ConcatenatedFileOutputStreamWriter; +import ch.systemsx.cisd.common.multiplexer.IMultiplexer; +import ch.systemsx.cisd.common.multiplexer.ThreadPoolMultiplexer; import ch.systemsx.cisd.openbis.common.api.client.ServiceFinder; import ch.systemsx.cisd.openbis.dss.client.api.v1.DssComponentFactory; import ch.systemsx.cisd.openbis.dss.client.api.v1.IDataSetDss; @@ -280,7 +282,8 @@ public class ScreeningOpenbisServiceFacade implements IScreeningOpenbisServiceFa } }; - dssMultiplexer = new DssServiceRpcScreeningMultiplexer(dssServiceCache); + IMultiplexer multiplexer = new ThreadPoolMultiplexer("screening-facade-multiplexer"); + dssMultiplexer = new DssServiceRpcScreeningMultiplexer(multiplexer, dssServiceCache); } /** diff --git a/screening/source/java/ch/systemsx/cisd/openbis/plugin/screening/server/ScreeningServer.java b/screening/source/java/ch/systemsx/cisd/openbis/plugin/screening/server/ScreeningServer.java index db92ba88f97..9a54e3afa4e 100644 --- a/screening/source/java/ch/systemsx/cisd/openbis/plugin/screening/server/ScreeningServer.java +++ b/screening/source/java/ch/systemsx/cisd/openbis/plugin/screening/server/ScreeningServer.java @@ -32,6 +32,7 @@ import org.springframework.stereotype.Component; import ch.rinn.restrictions.Private; import ch.systemsx.cisd.authentication.ISessionManager; import ch.systemsx.cisd.common.exceptions.UserFailureException; +import ch.systemsx.cisd.common.multiplexer.IMultiplexer; import ch.systemsx.cisd.openbis.common.spring.IInvocationLoggerContext; import ch.systemsx.cisd.openbis.dss.screening.shared.api.internal.DssServiceRpcScreeningHolder; import ch.systemsx.cisd.openbis.dss.screening.shared.api.internal.DssServiceRpcScreeningMultiplexer; @@ -191,19 +192,13 @@ public final class ScreeningServer extends AbstractServer<IScreeningServer> impl private AnalysisSettings analysisSettings; + @Resource(name = ch.systemsx.cisd.openbis.generic.shared.ResourceNames.MULTIPLEXER) + private IMultiplexer multiplexer; + private IDssServiceRpcScreeningMultiplexer dssMultiplexer; public ScreeningServer() { - dssMultiplexer = new DssServiceRpcScreeningMultiplexer(new IDssServiceRpcScreeningFactory() - { - @Override - public DssServiceRpcScreeningHolder createDssService(String serverUrl) - { - return new DssServiceRpcScreeningHolder(serverUrl, getMajorVersion(), - 5 * DateUtils.MILLIS_PER_MINUTE); - } - }); } @Private @@ -769,7 +764,7 @@ public final class ScreeningServer extends AbstractServer<IScreeningServer> impl } }; - return dssMultiplexer.process(featureDatasets, handler).withoutDuplicates(); + return getDssMultiplexer().process(featureDatasets, handler).withoutDuplicates(); } @Override @@ -792,7 +787,7 @@ public final class ScreeningServer extends AbstractServer<IScreeningServer> impl } }; - return dssMultiplexer.process(featureDatasets, handler).withoutDuplicates(); + return getDssMultiplexer().process(featureDatasets, handler).withoutDuplicates(); } @Override @@ -816,7 +811,7 @@ public final class ScreeningServer extends AbstractServer<IScreeningServer> impl } }; - return dssMultiplexer.process(featureDatasets, handler).withDuplicates(); + return getDssMultiplexer().process(featureDatasets, handler).withDuplicates(); } @Override @@ -841,7 +836,7 @@ public final class ScreeningServer extends AbstractServer<IScreeningServer> impl } }; - return dssMultiplexer.process(datasetWellReferences, handler).withDuplicates(); + return getDssMultiplexer().process(datasetWellReferences, handler).withDuplicates(); } @Override @@ -863,7 +858,7 @@ public final class ScreeningServer extends AbstractServer<IScreeningServer> impl } }; - return dssMultiplexer.process(imageReferences, handler).withDuplicates(); + return getDssMultiplexer().process(imageReferences, handler).withDuplicates(); } @Override @@ -885,7 +880,7 @@ public final class ScreeningServer extends AbstractServer<IScreeningServer> impl } }; - return dssMultiplexer.process(imageReferences, handler).withDuplicates(); + return getDssMultiplexer().process(imageReferences, handler).withDuplicates(); } @Override @@ -907,7 +902,7 @@ public final class ScreeningServer extends AbstractServer<IScreeningServer> impl } }; - return dssMultiplexer.process(imageReferences, handler).withDuplicates(); + return getDssMultiplexer().process(imageReferences, handler).withDuplicates(); } @Override @@ -929,7 +924,7 @@ public final class ScreeningServer extends AbstractServer<IScreeningServer> impl } }; - return dssMultiplexer.process(imageReferences, handler).withDuplicates(); + return getDssMultiplexer().process(imageReferences, handler).withDuplicates(); } @Override @@ -949,7 +944,7 @@ public final class ScreeningServer extends AbstractServer<IScreeningServer> impl } }; - return dssMultiplexer.process(imageReferences, handler).withDuplicates(); + return getDssMultiplexer().process(imageReferences, handler).withDuplicates(); } @Override @@ -969,7 +964,7 @@ public final class ScreeningServer extends AbstractServer<IScreeningServer> impl } }; - return dssMultiplexer.process(imageReferences, handler).withDuplicates(); + return getDssMultiplexer().process(imageReferences, handler).withDuplicates(); } @Override @@ -990,7 +985,7 @@ public final class ScreeningServer extends AbstractServer<IScreeningServer> impl } }; - return dssMultiplexer.process(imageReferences, handler).withDuplicates(); + return getDssMultiplexer().process(imageReferences, handler).withDuplicates(); } @Override @@ -1011,7 +1006,7 @@ public final class ScreeningServer extends AbstractServer<IScreeningServer> impl } }; - return dssMultiplexer.process(imageDatasets, handler).withDuplicates(); + return getDssMultiplexer().process(imageDatasets, handler).withDuplicates(); } @Override @@ -1032,7 +1027,7 @@ public final class ScreeningServer extends AbstractServer<IScreeningServer> impl } }; - return dssMultiplexer.process(imageDatasets, handler).withDuplicates(); + return getDssMultiplexer().process(imageDatasets, handler).withDuplicates(); } @Override @@ -1052,7 +1047,27 @@ public final class ScreeningServer extends AbstractServer<IScreeningServer> impl } }; - return dssMultiplexer.process(imageReferences, handler).withDuplicates(); + return getDssMultiplexer().process(imageReferences, handler).withDuplicates(); + } + + private IDssServiceRpcScreeningMultiplexer getDssMultiplexer() + { + if (dssMultiplexer == null) + { + dssMultiplexer = + new DssServiceRpcScreeningMultiplexer(multiplexer, + new IDssServiceRpcScreeningFactory() + { + @Override + public DssServiceRpcScreeningHolder createDssService( + String serverUrl) + { + return new DssServiceRpcScreeningHolder(serverUrl, + getMajorVersion(), 5 * DateUtils.MILLIS_PER_MINUTE); + } + }); + } + return dssMultiplexer; } } diff --git a/screening/sourceTest/java/ch/systemsx/cisd/openbis/plugin/screening/shared/api/v1/DssServiceRpcScreeningBatchResultsTest.java b/screening/sourceTest/java/ch/systemsx/cisd/openbis/plugin/screening/shared/api/v1/DssServiceRpcScreeningBatchResultsTest.java index e0db9ee097e..f7cb7bb2872 100644 --- a/screening/sourceTest/java/ch/systemsx/cisd/openbis/plugin/screening/shared/api/v1/DssServiceRpcScreeningBatchResultsTest.java +++ b/screening/sourceTest/java/ch/systemsx/cisd/openbis/plugin/screening/shared/api/v1/DssServiceRpcScreeningBatchResultsTest.java @@ -22,7 +22,7 @@ import java.util.Arrays; import org.testng.AssertJUnit; import org.testng.annotations.Test; -import ch.systemsx.cisd.openbis.dss.screening.shared.api.internal.DssServiceRpcScreeningBatchResults; +import ch.systemsx.cisd.common.multiplexer.BatchesResults; /** * @author pkupczyk @@ -33,10 +33,9 @@ public class DssServiceRpcScreeningBatchResultsTest extends AssertJUnit @Test public void testNullResults() { - DssServiceRpcScreeningBatchResults<String> results = - new DssServiceRpcScreeningBatchResults<String>(); - results.addDataStoreResults("dss1", null); - results.addDataStoreResults("dss2", null); + BatchesResults<String> results = new BatchesResults<String>(); + results.addBatchResults(null); + results.addBatchResults(null); assertTrue(results.withDuplicates().isEmpty()); assertTrue(results.withoutDuplicates().isEmpty()); @@ -45,10 +44,9 @@ public class DssServiceRpcScreeningBatchResultsTest extends AssertJUnit @Test public void testEmptyResults() { - DssServiceRpcScreeningBatchResults<String> results = - new DssServiceRpcScreeningBatchResults<String>(); - results.addDataStoreResults("dss1", new ArrayList<String>()); - results.addDataStoreResults("dss2", new ArrayList<String>()); + BatchesResults<String> results = new BatchesResults<String>(); + results.addBatchResults(new ArrayList<String>()); + results.addBatchResults(new ArrayList<String>()); assertTrue(results.withDuplicates().isEmpty()); assertTrue(results.withoutDuplicates().isEmpty()); @@ -57,10 +55,9 @@ public class DssServiceRpcScreeningBatchResultsTest extends AssertJUnit @Test public void testNotEmptyResults() { - DssServiceRpcScreeningBatchResults<String> results = - new DssServiceRpcScreeningBatchResults<String>(); - results.addDataStoreResults("dss1", Arrays.asList("a", "c", "e")); - results.addDataStoreResults("dss2", Arrays.asList("c", "d")); + BatchesResults<String> results = new BatchesResults<String>(); + results.addBatchResults(Arrays.asList("a", "c", "e")); + results.addBatchResults(Arrays.asList("c", "d")); assertEquals(Arrays.asList("a", "c", "e", "c", "d"), results.withDuplicates()); assertEquals(Arrays.asList("a", "c", "e", "d"), results.withoutDuplicates()); diff --git a/screening/sourceTest/java/ch/systemsx/cisd/openbis/plugin/screening/shared/api/v1/DssServiceRpcScreeningMultiplexerTest.java b/screening/sourceTest/java/ch/systemsx/cisd/openbis/plugin/screening/shared/api/v1/DssServiceRpcScreeningMultiplexerTest.java index 6256fdcb17e..58ee7b1ede2 100644 --- a/screening/sourceTest/java/ch/systemsx/cisd/openbis/plugin/screening/shared/api/v1/DssServiceRpcScreeningMultiplexerTest.java +++ b/screening/sourceTest/java/ch/systemsx/cisd/openbis/plugin/screening/shared/api/v1/DssServiceRpcScreeningMultiplexerTest.java @@ -33,11 +33,12 @@ import org.testng.annotations.Test; import ch.systemsx.cisd.common.concurrent.MessageChannel; import ch.systemsx.cisd.common.concurrent.MessageChannelBuilder; +import ch.systemsx.cisd.common.multiplexer.IMultiplexer; +import ch.systemsx.cisd.common.multiplexer.ThreadPoolMultiplexer; import ch.systemsx.cisd.openbis.dss.screening.shared.api.internal.DssServiceRpcScreeningHolder; import ch.systemsx.cisd.openbis.dss.screening.shared.api.internal.DssServiceRpcScreeningMultiplexer; import ch.systemsx.cisd.openbis.dss.screening.shared.api.internal.IDssServiceRpcScreeningBatchHandler; import ch.systemsx.cisd.openbis.dss.screening.shared.api.internal.IDssServiceRpcScreeningFactory; -import ch.systemsx.cisd.openbis.dss.screening.shared.api.internal.IDssServiceRpcScreeningMultiplexer; import ch.systemsx.cisd.openbis.dss.screening.shared.api.v1.IDssServiceRpcScreening; import ch.systemsx.cisd.openbis.plugin.screening.shared.api.v1.dto.DatasetIdentifier; import ch.systemsx.cisd.openbis.plugin.screening.shared.api.v1.dto.IDatasetIdentifier; @@ -76,7 +77,9 @@ public class DssServiceRpcScreeningMultiplexerTest extends AssertJUnit private MessageChannel channel2; - private IDssServiceRpcScreeningMultiplexer multiplexer; + private IMultiplexer multiplexer; + + private DssServiceRpcScreeningMultiplexer dssMultiplexer; @SuppressWarnings("unchecked") @BeforeMethod @@ -103,7 +106,8 @@ public class DssServiceRpcScreeningMultiplexerTest extends AssertJUnit channel1 = new MessageChannelBuilder(1000).getChannel(); channel2 = new MessageChannelBuilder(1000).getChannel(); - multiplexer = new DssServiceRpcScreeningMultiplexer(serviceFactory); + multiplexer = new ThreadPoolMultiplexer("dss-screening-multiplexer-test"); + dssMultiplexer = new DssServiceRpcScreeningMultiplexer(multiplexer, serviceFactory); } @AfterMethod @@ -216,7 +220,7 @@ public class DssServiceRpcScreeningMultiplexerTest extends AssertJUnit @Test public void testWithNullReferenceLists() { - List<String> results = multiplexer.process(null, batchHandler).withDuplicates(); + List<String> results = dssMultiplexer.process(null, batchHandler).withDuplicates(); assertTrue(results.isEmpty()); } @@ -224,7 +228,7 @@ public class DssServiceRpcScreeningMultiplexerTest extends AssertJUnit public void testWithEmptyReferenceLists() { List<String> results = - multiplexer.process(new ArrayList<IDatasetIdentifier>(), batchHandler) + dssMultiplexer.process(new ArrayList<IDatasetIdentifier>(), batchHandler) .withDuplicates(); assertTrue(results.isEmpty()); } @@ -331,7 +335,7 @@ public class DssServiceRpcScreeningMultiplexerTest extends AssertJUnit } }); - return multiplexer.process(allDatasets, batchHandler).withDuplicates(); + return dssMultiplexer.process(allDatasets, batchHandler).withDuplicates(); } } -- GitLab