From 683136ad2607a9723300882ed571a87d034b82d3 Mon Sep 17 00:00:00 2001 From: gakin <gakin> Date: Mon, 13 Mar 2017 15:19:03 +0000 Subject: [PATCH] SSDM-4394 : Sync attachments, complete implementation for experiments SVN: 37888 --- ...AttachmentSynchronizationTaskExecutor.java | 189 ++++++++++++++++++ .../DataSetRegistrationIngestionService.java | 2 +- .../synchronizer/EntitySynchronizer.java | 45 +---- .../synchronizer/MasterDataSynchronizer.java | 13 +- .../{DSSFileUtils.java => V3Utils.java} | 13 +- 5 files changed, 202 insertions(+), 60 deletions(-) create mode 100644 datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/AttachmentSynchronizationTaskExecutor.java rename datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/{DSSFileUtils.java => V3Utils.java} (88%) diff --git a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/AttachmentSynchronizationTaskExecutor.java b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/AttachmentSynchronizationTaskExecutor.java new file mode 100644 index 00000000000..4adeaa805c7 --- /dev/null +++ b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/AttachmentSynchronizationTaskExecutor.java @@ -0,0 +1,189 @@ +/* + * Copyright 2017 ETH Zuerich, SIS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer; + +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Date; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import ch.ethz.sis.openbis.generic.asapi.v3.dto.attachment.Attachment; +import ch.ethz.sis.openbis.generic.asapi.v3.dto.experiment.id.ExperimentPermId; +import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.common.ServiceFinderUtils; +import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.config.SyncConfig; +import ch.systemsx.cisd.common.concurrent.ITaskExecutor; +import ch.systemsx.cisd.common.exceptions.Status; +import ch.systemsx.cisd.openbis.dss.generic.shared.IEncapsulatedOpenBISService; +import ch.systemsx.cisd.openbis.dss.generic.shared.ServiceProvider; +import ch.systemsx.cisd.openbis.generic.shared.ICommonServer; +import ch.systemsx.cisd.openbis.generic.shared.basic.TechId; +import ch.systemsx.cisd.openbis.generic.shared.basic.dto.AttachmentHolderKind; +import ch.systemsx.cisd.openbis.generic.shared.basic.dto.Experiment; +import ch.systemsx.cisd.openbis.generic.shared.basic.dto.Identifier; +import ch.systemsx.cisd.openbis.generic.shared.basic.dto.NewAttachment; +import ch.systemsx.cisd.openbis.generic.shared.basic.dto.NewExperiment; +import ch.systemsx.cisd.openbis.generic.shared.dto.identifier.ExperimentIdentifierFactory; + +/** + * + * + * @author Ganime Betul Akin + */ +final class AttachmentSynchronizationTaskExecutor implements ITaskExecutor<Identifier<?>> +{ + private final Date lastSyncTimestamp; + + private final SyncConfig config; + + private final IEncapsulatedOpenBISService service; + + public AttachmentSynchronizationTaskExecutor(IEncapsulatedOpenBISService service, Date lastSyncTimestamp, SyncConfig config) + { + this.service = service; + this.lastSyncTimestamp = lastSyncTimestamp; + this.config = config; + } + + @Override + public Status execute(Identifier<?> item) + { + V3Utils dssFileUtils = V3Utils.create(config.getDataSourceOpenbisURL(), config.getDataSourceDSSURL()); + String sessionToken = dssFileUtils.login(config.getUser(), config.getPassword()); + if (item instanceof NewExperiment) + { + List<Attachment> incomingAttachments = dssFileUtils.getExperimentAttachments(sessionToken, new ExperimentPermId(item.getPermID())); + + // place the incoming attachments in a map + Map<String, Attachment> incomingAttachmentMap = new HashMap<String, Attachment>(); + for (Attachment incoming : incomingAttachments) + { + incomingAttachmentMap.put(incoming.getFileName(), incoming); + } + + Experiment experiment = service.tryGetExperiment(ExperimentIdentifierFactory.parse(item.getIdentifier())); + List<ch.systemsx.cisd.openbis.generic.shared.basic.dto.Attachment> existingAttachments = + service.listAttachments(AttachmentHolderKind.EXPERIMENT, experiment.getId()); + Map<String, ch.systemsx.cisd.openbis.generic.shared.basic.dto.Attachment> existingAttachmentMap = + new HashMap<String, ch.systemsx.cisd.openbis.generic.shared.basic.dto.Attachment>(); + + // place the existing attachments in a map + for (ch.systemsx.cisd.openbis.generic.shared.basic.dto.Attachment attachment : existingAttachments) + { + existingAttachmentMap.put(attachment.getFileName(), attachment); + } + + ICommonServer commonServer = ServiceFinderUtils.getCommonServer(ServiceProvider.getConfigProvider().getOpenBisServerUrl()); + String localSessionToken = ServiceFinderUtils.login(commonServer, config.getHarvesterUser(), config.getHarvesterPass()); + TechId experimentId = new TechId(experiment.getId()); + for (Attachment incoming : incomingAttachments) + { + ch.systemsx.cisd.openbis.generic.shared.basic.dto.Attachment existingAttachment = + existingAttachmentMap.get(incoming.getFileName()); + if (existingAttachment == null) + { + addAttachments(incoming, 1, experimentId, commonServer, localSessionToken); + } + else + { + int version = existingAttachment.getVersion(); + if (incoming.getVersion() < version) + { + // Harvester has a later version of the attachment. Delete it from harvester + commonServer.deleteExperimentAttachments(localSessionToken, experimentId, + Arrays.asList(incoming.getFileName()), "Synchronization from data source " + config.getDataSourceAlias()); + addAttachments(incoming, 1, experimentId, commonServer, localSessionToken); + } + else if (incoming.getVersion() == version) + { + // check last sync date and meta data + if (incoming.getRegistrationDate().after(lastSyncTimestamp)) + { + // Data source has the same version number but with a later registration date than the last sync timestamp: + // This means, the attachment was probably deleted in the data source and re-registered. Delete it from harvester + // and re-register + commonServer.deleteExperimentAttachments(localSessionToken, experimentId, + Arrays.asList(incoming.getFileName()), "Synchronization from data source " + config.getDataSourceAlias()); + addAttachments(incoming, 1, experimentId, commonServer, localSessionToken); + } + else + { + // check if meta data changed + if (incoming.getTitle().equals(existingAttachment.getTitle()) == false + || incoming.getDescription().equals(existingAttachment.getDescription())) + { + ch.systemsx.cisd.openbis.generic.shared.basic.dto.Attachment updateDTO = + new ch.systemsx.cisd.openbis.generic.shared.basic.dto.Attachment(); + updateDTO.setFileName(existingAttachment.getFileName()); + updateDTO.setVersion(existingAttachment.getVersion()); + updateDTO.setTitle(incoming.getTitle()); + updateDTO.setDescription(incoming.getDescription()); + commonServer.updateExperimentAttachments(localSessionToken, experimentId, updateDTO); + } + } + } + else + { + // add all new versions from the incoming (do we need to check last sync date) + // Attachment attachmentVersion = getVersion(incoming, version); + addAttachments(incoming, version + 1, experimentId, commonServer, localSessionToken); + } + } + } + // loop through existing attachments and if they no longer exist in data source, delete them. + for (ch.systemsx.cisd.openbis.generic.shared.basic.dto.Attachment existing : existingAttachments) + { + if (incomingAttachmentMap.get(existing.getFileName()) == null) + { + commonServer.deleteExperimentAttachments(localSessionToken, experimentId, + Arrays.asList(existing.getFileName()), "Synchronization from data source " + config.getDataSourceAlias() + + " Attachment no longer exists on data source."); + } + } + } + return null; + } + + private void addAttachments(Attachment attachment, int fromVersion, TechId techId, ICommonServer commonServer, String sessionToken) + { + + Integer version = attachment.getVersion(); + Deque<NewAttachment> versions = new ArrayDeque<NewAttachment>(); + for (int i = version; i >= fromVersion; i--) + { + NewAttachment newAttachment = new NewAttachment(); + newAttachment.setTitle(attachment.getTitle()); + newAttachment.setDescription(attachment.getDescription()); + newAttachment.setFilePath(attachment.getFileName()); + newAttachment.setContent(attachment.getContent()); + versions.add(newAttachment); + if (i == 1) + { + break; + } + attachment = attachment.getPreviousVersion(); + } + NewAttachment earliestVersion = versions.pollLast(); + while (earliestVersion != null) + { + commonServer.addExperimentAttachment(sessionToken, techId, earliestVersion); + earliestVersion = versions.pollLast(); + } + } +} diff --git a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/DataSetRegistrationIngestionService.java b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/DataSetRegistrationIngestionService.java index a15e7bc74ca..e94b1b60470 100644 --- a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/DataSetRegistrationIngestionService.java +++ b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/DataSetRegistrationIngestionService.java @@ -200,7 +200,7 @@ class DataSetRegistrationIngestionService extends IngestionService<DataSetInform private void downloadDataSetFiles(File dir, String dataSetCode) throws Exception { - DSSFileUtils dssFileUtils = DSSFileUtils.create(asUrl, dssUrl); + V3Utils dssFileUtils = V3Utils.create(asUrl, dssUrl); String sessionToken = dssFileUtils.login(loginUser, loginPass); DataSetFileFetchOptions dsFileFetchOptions = new DataSetFileFetchOptions(); SearchResult<DataSetFile> result = dssFileUtils.searchWithDataSetCode(sessionToken, dataSetCode, dsFileFetchOptions); diff --git a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/EntitySynchronizer.java b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/EntitySynchronizer.java index 376e45e4a1f..911498dccc7 100644 --- a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/EntitySynchronizer.java +++ b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/EntitySynchronizer.java @@ -41,7 +41,6 @@ import org.apache.log4j.Logger; import org.w3c.dom.Document; import ch.ethz.sis.openbis.generic.asapi.v3.IApplicationServerApi; -import ch.ethz.sis.openbis.generic.asapi.v3.dto.attachment.Attachment; import ch.ethz.sis.openbis.generic.asapi.v3.dto.common.search.SearchResult; import ch.ethz.sis.openbis.generic.asapi.v3.dto.dataset.DataSetKind; import ch.ethz.sis.openbis.generic.asapi.v3.dto.dataset.delete.DataSetDeletionOptions; @@ -60,7 +59,6 @@ import ch.ethz.sis.openbis.generic.dssapi.v3.dto.datasetfile.DataSetFile; import ch.ethz.sis.openbis.generic.dssapi.v3.dto.datasetfile.fetchoptions.DataSetFileFetchOptions; import ch.ethz.sis.openbis.generic.dssapi.v3.dto.datasetfile.search.DataSetFileSearchCriteria; import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.common.EntityRetriever; -import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.common.ServiceFinderUtils; import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.config.SyncConfig; import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.ResourceListParserData.Connection; import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.ResourceListParserData.DataSetWithConnections; @@ -88,10 +86,8 @@ import ch.systemsx.cisd.openbis.dss.generic.shared.IEncapsulatedOpenBISService; import ch.systemsx.cisd.openbis.dss.generic.shared.IShareIdManager; import ch.systemsx.cisd.openbis.dss.generic.shared.ServiceProvider; import ch.systemsx.cisd.openbis.dss.generic.shared.utils.SegmentedStoreUtils; -import ch.systemsx.cisd.openbis.generic.shared.ICommonServer; import ch.systemsx.cisd.openbis.generic.shared.basic.TechId; import ch.systemsx.cisd.openbis.generic.shared.basic.dto.AbstractExternalData; -import ch.systemsx.cisd.openbis.generic.shared.basic.dto.AttachmentHolderKind; import ch.systemsx.cisd.openbis.generic.shared.basic.dto.Experiment; import ch.systemsx.cisd.openbis.generic.shared.basic.dto.GenericEntityProperty; import ch.systemsx.cisd.openbis.generic.shared.basic.dto.IEntityProperty; @@ -122,7 +118,6 @@ import ch.systemsx.cisd.openbis.generic.shared.dto.NewExternalData; import ch.systemsx.cisd.openbis.generic.shared.dto.NewProperty; import ch.systemsx.cisd.openbis.generic.shared.dto.ProjectUpdatesDTO; import ch.systemsx.cisd.openbis.generic.shared.dto.SampleUpdatesDTO; -import ch.systemsx.cisd.openbis.generic.shared.dto.SessionContextDTO; import ch.systemsx.cisd.openbis.generic.shared.dto.builders.AtomicEntityOperationDetailsBuilder; import ch.systemsx.cisd.openbis.generic.shared.dto.identifier.ExperimentIdentifier; import ch.systemsx.cisd.openbis.generic.shared.dto.identifier.ExperimentIdentifierFactory; @@ -224,7 +219,7 @@ public class EntitySynchronizer operationLog.info("entity operation result: " + operationResult); operationLog.info("processing attachments..."); - // processAttachments(entitiesWithAttachments); + // processAttachments(entitiesWithAttachments, lastSyncTimestamp); // register physical data sets without any hierarchy // Note that container/component and parent/child relationships are established post-reg. @@ -252,7 +247,7 @@ public class EntitySynchronizer private void processAttachments(List<Identifier<?>> entities) { - ParallelizedExecutor.process(entities, new AttachmentSynchronizationTaskExecutor(), + ParallelizedExecutor.process(entities, new AttachmentSynchronizationTaskExecutor(service, lastSyncTimestamp, config), 0.5, 10, "process attachments", 0, false); } @@ -1037,40 +1032,6 @@ public class EntitySynchronizer return null; } - private final class AttachmentSynchronizationTaskExecutor implements ITaskExecutor<Identifier<?>> - { - public AttachmentSynchronizationTaskExecutor() - { - } - - @Override - public Status execute(Identifier<?> item) - { - DSSFileUtils dssFileUtils = DSSFileUtils.create(config.getDataSourceOpenbisURL(), config.getDataSourceDSSURL()); - String sessionToken = dssFileUtils.login(config.getUser(), config.getPassword()); - if(item instanceof NewExperiment) { - - // TODO complete this by comparing versions, getting any previous versions using v3 api and syncing - List<Attachment> incomingAttachments = dssFileUtils.getExperimentAttachments(sessionToken, new ExperimentPermId(item.getPermID())); - - Experiment experiment = service.tryGetExperiment(ExperimentIdentifierFactory.parse(item.getIdentifier())); - List<ch.systemsx.cisd.openbis.generic.shared.basic.dto.Attachment> listAttachments = - service.listAttachments(AttachmentHolderKind.EXPERIMENT, experiment.getId()); - ICommonServer commonServer = ServiceFinderUtils.getCommonServer(ServiceProvider.getConfigProvider().getOpenBisServerUrl()); - SessionContextDTO sessionDTO = commonServer.tryAuthenticate(config.getHarvesterUser(), config.getHarvesterPass()); - for (Attachment attachment : incomingAttachments) - { - NewAttachment newAttachment = new NewAttachment(); - newAttachment.setTitle(attachment.getTitle()); - newAttachment.setDescription(attachment.getDescription()); - newAttachment.setFilePath(attachment.getFileName()); - newAttachment.setContent(attachment.getContent()); - commonServer.addExperimentAttachment(sessionDTO.getSessionToken(), new TechId(experiment.getId()), newAttachment); - } - } - return null; - } - } private final class DataSetRegistrationTaskExecutor implements ITaskExecutor<DataSetWithConnections> { @@ -1135,7 +1096,7 @@ public class EntitySynchronizer String asUrl = config.getDataSourceOpenbisURL(); String dssUrl = config.getDataSourceDSSURL(); - DSSFileUtils dssFileUtils = DSSFileUtils.create(asUrl, dssUrl); + V3Utils dssFileUtils = V3Utils.create(asUrl, dssUrl); String sessionToken = dssFileUtils.login(config.getUser(), config.getPassword()); DataSetFileSearchCriteria criteria = new DataSetFileSearchCriteria(); diff --git a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/MasterDataSynchronizer.java b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/MasterDataSynchronizer.java index e63385b50b4..a417361f20a 100644 --- a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/MasterDataSynchronizer.java +++ b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/MasterDataSynchronizer.java @@ -44,7 +44,6 @@ import ch.systemsx.cisd.openbis.generic.shared.basic.dto.SampleType; import ch.systemsx.cisd.openbis.generic.shared.basic.dto.Script; import ch.systemsx.cisd.openbis.generic.shared.basic.dto.Vocabulary; import ch.systemsx.cisd.openbis.generic.shared.basic.dto.VocabularyTerm; -import ch.systemsx.cisd.openbis.generic.shared.dto.SessionContextDTO; /** * @@ -65,20 +64,10 @@ public class MasterDataSynchronizer { String openBisServerUrl = ServiceProvider.getConfigProvider().getOpenBisServerUrl(); this.commonServer = ServiceFinderUtils.getCommonServer(openBisServerUrl); - this.sessionToken = login(harvesterUser, harvesterPassword); + this.sessionToken = ServiceFinderUtils.login(commonServer, harvesterUser, harvesterPassword); this.masterData = masterData; vocabularyTermsToBeDeleted = new HashMap<TechId, List<VocabularyTerm>>(); } - - private String login(String harvesterUser, String harvesterPassword) - { - SessionContextDTO session = commonServer.tryAuthenticate(harvesterUser, harvesterPassword); - if (session == null) - { - throw UserFailureException.fromTemplate("Invalid username/password combination for user:" + harvesterUser); - } - return session.getSessionToken(); - } public void synchronizeMasterData() { MultiKeyMap<String, List<NewETPTAssignment>> propertyAssignmentsToProcess = masterData.getPropertyAssignmentsToProcess(); diff --git a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/DSSFileUtils.java b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/V3Utils.java similarity index 88% rename from datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/DSSFileUtils.java rename to datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/V3Utils.java index b0404a15351..f3c5e4573bf 100644 --- a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/DSSFileUtils.java +++ b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/V3Utils.java @@ -23,6 +23,7 @@ import java.util.Map; import ch.ethz.sis.openbis.generic.asapi.v3.IApplicationServerApi; import ch.ethz.sis.openbis.generic.asapi.v3.dto.attachment.Attachment; +import ch.ethz.sis.openbis.generic.asapi.v3.dto.attachment.fetchoptions.AttachmentFetchOptions; import ch.ethz.sis.openbis.generic.asapi.v3.dto.common.search.SearchResult; import ch.ethz.sis.openbis.generic.asapi.v3.dto.experiment.Experiment; import ch.ethz.sis.openbis.generic.asapi.v3.dto.experiment.fetchoptions.ExperimentFetchOptions; @@ -41,19 +42,19 @@ import ch.systemsx.cisd.common.ssl.SslCertificateHelper; * * @author Ganime Betul Akin */ -public class DSSFileUtils +public class V3Utils { public static final int TIMEOUT = 100000; private final IDataStoreServerApi dss; private final IApplicationServerApi as; - public static DSSFileUtils create(String asUrl, String dssUrl) + public static V3Utils create(String asUrl, String dssUrl) { - return new DSSFileUtils(asUrl, dssUrl, TIMEOUT); + return new V3Utils(asUrl, dssUrl, TIMEOUT); } - private DSSFileUtils (String asUrl, String dssUrl, int timeout) + private V3Utils (String asUrl, String dssUrl, int timeout) { SslCertificateHelper.trustAnyCertificate(asUrl); SslCertificateHelper.trustAnyCertificate(dssUrl); @@ -77,7 +78,9 @@ public class DSSFileUtils public List<Attachment> getExperimentAttachments(String sessionToken, IExperimentId experimentId) { ExperimentFetchOptions fetchOptions = new ExperimentFetchOptions(); - fetchOptions.withAttachments().withContent(); + AttachmentFetchOptions attachmentFetchOptions = fetchOptions.withAttachments(); + attachmentFetchOptions.withContent(); + attachmentFetchOptions.withPreviousVersion().withContent(); Map<IExperimentId, Experiment> experiments = as.getExperiments(sessionToken, Arrays.asList(experimentId), fetchOptions); if(experiments.size() == 1) { Experiment experiment = experiments.get(experimentId); -- GitLab