From ea85e6e95fbab2d89a9be2258318b76eb09b8fb5 Mon Sep 17 00:00:00 2001 From: felmer <franz-josef.elmer@id.ethz.ch> Date: Wed, 16 Jan 2019 11:42:06 +0100 Subject: [PATCH] SSDM-7430: monitoring attachment synchronization. Small refactoring. --- .../synchronizer/EntitySynchronizer.java | 13 +--- .../AttachmentsSynchronizer.java | 70 +++++++++++++++---- .../synchronizer/util/ServiceUtils.java | 47 +++++++++++++ .../harvester/synchronizer/util/V3Facade.java | 13 +--- 4 files changed, 110 insertions(+), 33 deletions(-) create mode 100644 datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/util/ServiceUtils.java diff --git a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/EntitySynchronizer.java b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/EntitySynchronizer.java index e8eecb0f0a9..c736537bf49 100644 --- a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/EntitySynchronizer.java +++ b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/EntitySynchronizer.java @@ -72,7 +72,6 @@ import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.config.Para import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.config.SyncConfig; import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.datasourceconnector.DataSourceConnector; import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.parallelizedExecutor.AttachmentSynchronizationSummary; -import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.parallelizedExecutor.AttachmentSynchronizationTaskExecutor; import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.parallelizedExecutor.AttachmentsSynchronizer; import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.parallelizedExecutor.DataSetRegistrationTaskExecutor; import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.parallelizedExecutor.DataSetSynchronizationSummary; @@ -80,12 +79,12 @@ import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronize import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.translator.PrefixBasedNameTranslator; import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.util.DSPropertyUtils; import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.util.Monitor; +import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.util.ServiceUtils; import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.util.V3Facade; import ch.systemsx.cisd.common.concurrent.ParallelizedExecutor; import ch.systemsx.cisd.common.filesystem.FileUtilities; import ch.systemsx.cisd.common.logging.Log4jSimpleLogger; import ch.systemsx.cisd.etlserver.registrator.api.v1.impl.ConversionUtils; -import ch.systemsx.cisd.openbis.dss.generic.server.EncapsulatedOpenBISService; import ch.systemsx.cisd.openbis.dss.generic.shared.DataSetDirectoryProvider; import ch.systemsx.cisd.openbis.dss.generic.shared.DataSetProcessingContext; import ch.systemsx.cisd.openbis.dss.generic.shared.IConfigProvider; @@ -517,21 +516,15 @@ public class EntitySynchronizer AttachmentSynchronizationSummary synchronizationSummary = new AttachmentSynchronizationSummary(); ParallelizedExecutionPreferences preferences = config.getParallelizedExecutionPrefs(); - V3Facade v3FacadeToDataSource = new V3Facade(config); monitor.log("Services for accessing data source established"); List<List<IncomingEntity<?>>> attachmentHoldersChunks = chunk(attachmentHoldersToProcess); - IApplicationServerApi v3apiDataSource = EncapsulatedOpenBISService.createOpenBisV3Service(config.getDataSourceOpenbisURL(), "60"); + IApplicationServerApi v3apiDataSource = ServiceUtils.createAsV3Api(config.getDataSourceOpenbisURL()); String sessionTokenDataSource = v3apiDataSource.login(config.getUser(), config.getPassword()); ParallelizedExecutor.process(attachmentHoldersChunks, new AttachmentsSynchronizer(v3Api, service.getSessionToken(), v3apiDataSource, sessionTokenDataSource, - lastSyncTimestamp, synchronizationSummary), + lastSyncTimestamp, synchronizationSummary, monitor), preferences.getMachineLoad(), preferences.getMaxThreads(), "process attachments", preferences.getRetriesOnFail(), preferences.isStopOnFailure()); -// ParallelizedExecutor.process(attachmentHoldersToProcess, new AttachmentSynchronizationTaskExecutor(synchronizationSummary, -// service, v3FacadeToDataSource, -// lastSyncTimestamp, config, monitor), -// preferences.getMachineLoad(), preferences.getMaxThreads(), "process attachments", preferences.getRetriesOnFail(), -// preferences.isStopOnFailure()); return synchronizationSummary; } diff --git a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/parallelizedExecutor/AttachmentsSynchronizer.java b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/parallelizedExecutor/AttachmentsSynchronizer.java index 84bf2264b4b..3042c04af46 100644 --- a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/parallelizedExecutor/AttachmentsSynchronizer.java +++ b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/parallelizedExecutor/AttachmentsSynchronizer.java @@ -49,6 +49,8 @@ import ch.ethz.sis.openbis.generic.asapi.v3.dto.sample.id.SamplePermId; import ch.ethz.sis.openbis.generic.asapi.v3.dto.sample.update.SampleUpdate; import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.common.SyncEntityKind; import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.IncomingEntity; +import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.util.Monitor; +import ch.systemsx.cisd.common.collection.CollectionUtils; import ch.systemsx.cisd.common.concurrent.ITaskExecutor; import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.logging.LogCategory; @@ -66,7 +68,7 @@ public class AttachmentsSynchronizer implements ITaskExecutor<List<IncomingEntit public AttachmentsSynchronizer(IApplicationServerApi v3api, String sessionToken, IApplicationServerApi v3apiDataSource, String sessionTokenDataSource, Date lastSyncTimestamp, - AttachmentSynchronizationSummary synchronizationSummary) + AttachmentSynchronizationSummary synchronizationSummary, Monitor monitor) { handlersByEntityKind = new HashMap<>(); handlersByEntityKind.put(SyncEntityKind.PROJECT, new ProjectsHandler()); @@ -81,6 +83,7 @@ public class AttachmentsSynchronizer implements ITaskExecutor<List<IncomingEntit handler.setSessionTokenDataSource(sessionTokenDataSource); handler.setLastSyncTimestamp(lastSyncTimestamp); handler.setSynchronizationSummary(synchronizationSummary); + handler.setMonitor(monitor); } } @@ -134,6 +137,16 @@ public class AttachmentsSynchronizer implements ITaskExecutor<List<IncomingEntit protected AttachmentSynchronizationSummary synchronizationSummary; + protected Monitor monitor; + + protected SyncEntityKind entityKind; + + AbstractHandler(SyncEntityKind entityKind) + { + + this.entityKind = entityKind; + } + public void setV3api(IApplicationServerApi v3api) { this.v3api = v3api; @@ -164,10 +177,29 @@ public class AttachmentsSynchronizer implements ITaskExecutor<List<IncomingEntit this.synchronizationSummary = synchronizationSummary; } + public void setMonitor(Monitor monitor) + { + this.monitor = monitor; + } + + private void log(List<IncomingEntity<?>> entities, int numberOfEntitiesWithAttachments, String description) + { + List<String> ids = entities.stream().map(IncomingEntity::getIdentifer).collect(Collectors.toList()); + String idsAsString = CollectionUtils.abbreviate(ids, 100); + monitor.log(String.format("%4d (of %4d) %ss %s. %s", + numberOfEntitiesWithAttachments, entities.size(), entityKind, description, idsAsString)); + } + public <AH extends IPermIdHolder & IAttachmentsHolder> void handle(List<IncomingEntity<?>> entities) { - Map<String, Map<String, Attachment>> existingAttachments = retrievAttachments(v3api, sessionToken, entities); List<IncomingEntity<?>> filteredEntities = entities.stream().filter(e -> e.hasAttachments()).collect(Collectors.toList()); + log(entities, filteredEntities.size(), "with attachments on data source"); + if (filteredEntities.isEmpty()) + { + return; + } + Map<String, Map<String, Attachment>> existingAttachments = retrievAttachments(v3api, sessionToken, entities); + log(entities, existingAttachments.size(), "on harvester"); Map<String, Map<String, Attachment>> attachmentsFromDataSource = retrievAttachments(v3apiDataSource, sessionTokenDataSource, filteredEntities); List<AttachmentChange> attachmentChanges = new ArrayList<>(); @@ -221,9 +253,9 @@ public class AttachmentsSynchronizer implements ITaskExecutor<List<IncomingEntit private <AH extends IPermIdHolder & IAttachmentsHolder> Map<String, Map<String, Attachment>> retrievAttachments( IApplicationServerApi v3api, String sessionToken, List<IncomingEntity<?>> entities) { - List<String> entitiesWithAttachments = entities.stream().map(IncomingEntity::getPermID).collect(Collectors.toList()); + List<String> ids = entities.stream().map(IncomingEntity::getPermID).collect(Collectors.toList()); Map<String, Map<String, Attachment>> attachmentsByPermId = new HashMap<>(); - Collection<AH> attachmentHolders = getAttachments(v3api, sessionToken, entitiesWithAttachments); + Collection<AH> attachmentHolders = getAttachments(v3api, sessionToken, ids); for (AH attachmentHolder : attachmentHolders) { attachmentsByPermId.put(attachmentHolder.getPermId().toString(), @@ -239,30 +271,29 @@ public class AttachmentsSynchronizer implements ITaskExecutor<List<IncomingEntit private void handleAttachmentChanges(List<AttachmentChange> attachmentChanges) { Map<String, AttachmentListUpdateValue> attachmentUpdatesByPermId = new HashMap<>(); - for (AttachmentChange attachmentChange: attachmentChanges) + for (AttachmentChange attachmentChange : attachmentChanges) { String permId = attachmentChange.getPermId(); - AttachmentListUpdateValue attachmentListUpdateValue = attachmentUpdatesByPermId.get(permId); - if (attachmentListUpdateValue == null) + AttachmentListUpdateValue attachmentListUpdate = attachmentUpdatesByPermId.get(permId); + if (attachmentListUpdate == null) { - attachmentListUpdateValue = new AttachmentListUpdateValue(); - attachmentUpdatesByPermId.put(permId, attachmentListUpdateValue); + attachmentListUpdate = new AttachmentListUpdateValue(); + attachmentUpdatesByPermId.put(permId, attachmentListUpdate); } Attachment attachment = attachmentChange.getAttachment(); if (attachmentChange.isRemove()) { - attachmentListUpdateValue.remove(new AttachmentFileName(attachment.getFileName())); + attachmentListUpdate.remove(new AttachmentFileName(attachment.getFileName())); } Integer version = attachmentChange.getVersion(); if (version != null) { - addAttachments(attachmentListUpdateValue, attachment, version); + addAttachments(attachmentListUpdate, attachment, version); } } handleAttachments(attachmentUpdatesByPermId); } - private void addAttachments(AttachmentListUpdateValue attachmentListUpdate, Attachment attachment, int fromVersion) { List<AttachmentCreation> attachmentCreations = new ArrayList<>(); @@ -300,6 +331,11 @@ public class AttachmentsSynchronizer implements ITaskExecutor<List<IncomingEntit private static final class ProjectsHandler extends AbstractHandler { + public ProjectsHandler() + { + super(SyncEntityKind.PROJECT); + } + @SuppressWarnings("unchecked") @Override protected <AH extends IPermIdHolder & IAttachmentsHolder> Collection<AH> getAttachments( @@ -328,6 +364,11 @@ public class AttachmentsSynchronizer implements ITaskExecutor<List<IncomingEntit private static final class ExperimentsHandler extends AbstractHandler { + public ExperimentsHandler() + { + super(SyncEntityKind.EXPERIMENT); + } + @SuppressWarnings("unchecked") @Override protected <AH extends IPermIdHolder & IAttachmentsHolder> Collection<AH> getAttachments( @@ -356,6 +397,11 @@ public class AttachmentsSynchronizer implements ITaskExecutor<List<IncomingEntit private static final class SamplesHandler extends AbstractHandler { + public SamplesHandler() + { + super(SyncEntityKind.SAMPLE); + } + @SuppressWarnings("unchecked") @Override protected <AH extends IPermIdHolder & IAttachmentsHolder> Collection<AH> getAttachments( diff --git a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/util/ServiceUtils.java b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/util/ServiceUtils.java new file mode 100644 index 00000000000..123d78a3318 --- /dev/null +++ b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/util/ServiceUtils.java @@ -0,0 +1,47 @@ +/* + * Copyright 2019 ETH Zuerich, SIS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.util; + +import org.apache.commons.lang.time.DateUtils; + +import ch.ethz.sis.openbis.generic.asapi.v3.IApplicationServerApi; +import ch.ethz.sis.openbis.generic.dssapi.v3.IDataStoreServerApi; +import ch.systemsx.cisd.openbis.dss.generic.server.EncapsulatedOpenBISService; + +/** + * @author Franz-Josef Elmer + */ +public class ServiceUtils +{ + public static final long TIMEOUT = 6 * DateUtils.MILLIS_PER_HOUR; + + public static IApplicationServerApi createAsV3Api(String url) + { + return EncapsulatedOpenBISService.createOpenBisV3Service(url, getTimeOutInMinutes()); + } + + public static IDataStoreServerApi createDssV3Api(String url) + { + return EncapsulatedOpenBISService.createDataStoreV3Service(url, getTimeOutInMinutes()); + } + + private static String getTimeOutInMinutes() + { + return Long.toString(TIMEOUT / DateUtils.MILLIS_PER_MINUTE); + + } +} diff --git a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/util/V3Facade.java b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/util/V3Facade.java index 6c5738b3b10..e2b16b0640c 100644 --- a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/util/V3Facade.java +++ b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/util/V3Facade.java @@ -21,8 +21,6 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import org.apache.commons.lang.time.DateUtils; - import ch.ethz.sis.openbis.generic.asapi.v3.IApplicationServerApi; import ch.ethz.sis.openbis.generic.asapi.v3.dto.attachment.Attachment; import ch.ethz.sis.openbis.generic.asapi.v3.dto.common.search.SearchResult; @@ -42,7 +40,6 @@ import ch.ethz.sis.openbis.generic.dssapi.v3.dto.datasetfile.fetchoptions.DataSe import ch.ethz.sis.openbis.generic.dssapi.v3.dto.datasetfile.id.IDataSetFileId; import ch.ethz.sis.openbis.generic.dssapi.v3.dto.datasetfile.search.DataSetFileSearchCriteria; import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.config.SyncConfig; -import ch.systemsx.cisd.openbis.dss.generic.server.EncapsulatedOpenBISService; /** * @@ -51,8 +48,6 @@ import ch.systemsx.cisd.openbis.dss.generic.server.EncapsulatedOpenBISService; */ public class V3Facade { - public static final long TIMEOUT = 6 * DateUtils.MILLIS_PER_HOUR; - private final IDataStoreServerApi dss; private final IApplicationServerApi as; @@ -61,12 +56,8 @@ public class V3Facade public V3Facade(SyncConfig config) { - String asUrl = config.getDataSourceOpenbisURL(); - String dssUrl = config.getDataSourceDSSURL(); - long timeout = TIMEOUT; - String timeoutInMinutes = Long.toString(timeout / DateUtils.MILLIS_PER_MINUTE); - as = EncapsulatedOpenBISService.createOpenBisV3Service(asUrl, timeoutInMinutes); - dss = EncapsulatedOpenBISService.createDataStoreV3Service(dssUrl, timeoutInMinutes); + as = ServiceUtils.createAsV3Api(config.getDataSourceOpenbisURL()); + dss = ServiceUtils.createDssV3Api(config.getDataSourceDSSURL()); sessionToken = as.login(config.getUser(), config.getPassword()); } -- GitLab