diff --git a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/EntitySynchronizer.java b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/EntitySynchronizer.java index b5c391877edd6214c2f030532b28cd430fd2f285..e8eecb0f0a9f3945ba51b1df7c768631f2cfdc45 100644 --- a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/EntitySynchronizer.java +++ b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/EntitySynchronizer.java @@ -73,6 +73,7 @@ import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.config.Sync import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.datasourceconnector.DataSourceConnector; import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.parallelizedExecutor.AttachmentSynchronizationSummary; import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.parallelizedExecutor.AttachmentSynchronizationTaskExecutor; +import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.parallelizedExecutor.AttachmentsSynchronizer; import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.parallelizedExecutor.DataSetRegistrationTaskExecutor; import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.parallelizedExecutor.DataSetSynchronizationSummary; import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.translator.INameTranslator; @@ -84,6 +85,7 @@ import ch.systemsx.cisd.common.concurrent.ParallelizedExecutor; import ch.systemsx.cisd.common.filesystem.FileUtilities; import ch.systemsx.cisd.common.logging.Log4jSimpleLogger; import ch.systemsx.cisd.etlserver.registrator.api.v1.impl.ConversionUtils; +import ch.systemsx.cisd.openbis.dss.generic.server.EncapsulatedOpenBISService; import ch.systemsx.cisd.openbis.dss.generic.shared.DataSetDirectoryProvider; import ch.systemsx.cisd.openbis.dss.generic.shared.DataSetProcessingContext; import ch.systemsx.cisd.openbis.dss.generic.shared.IConfigProvider; @@ -183,7 +185,7 @@ public class EntitySynchronizer Document doc = getResourceList(); ResourceListParserData data = parseResourceList(doc); -// processDeletions(data); + processDeletions(data); registerMasterData(data.getMasterData()); MultiKeyMap<String, String> newEntities = registerEntities(data); List<String> notSyncedAttachmentsHolders = registerAttachments(data, newEntities); @@ -517,15 +519,39 @@ public class EntitySynchronizer ParallelizedExecutionPreferences preferences = config.getParallelizedExecutionPrefs(); V3Facade v3FacadeToDataSource = new V3Facade(config); monitor.log("Services for accessing data source established"); - ParallelizedExecutor.process(attachmentHoldersToProcess, new AttachmentSynchronizationTaskExecutor(synchronizationSummary, - service, v3FacadeToDataSource, - lastSyncTimestamp, config, monitor), + List<List<IncomingEntity<?>>> attachmentHoldersChunks = chunk(attachmentHoldersToProcess); + IApplicationServerApi v3apiDataSource = EncapsulatedOpenBISService.createOpenBisV3Service(config.getDataSourceOpenbisURL(), "60"); + String sessionTokenDataSource = v3apiDataSource.login(config.getUser(), config.getPassword()); + ParallelizedExecutor.process(attachmentHoldersChunks, + new AttachmentsSynchronizer(v3Api, service.getSessionToken(), v3apiDataSource, sessionTokenDataSource, + lastSyncTimestamp, synchronizationSummary), preferences.getMachineLoad(), preferences.getMaxThreads(), "process attachments", preferences.getRetriesOnFail(), preferences.isStopOnFailure()); +// ParallelizedExecutor.process(attachmentHoldersToProcess, new AttachmentSynchronizationTaskExecutor(synchronizationSummary, +// service, v3FacadeToDataSource, +// lastSyncTimestamp, config, monitor), +// preferences.getMachineLoad(), preferences.getMaxThreads(), "process attachments", preferences.getRetriesOnFail(), +// preferences.isStopOnFailure()); return synchronizationSummary; } + private List<List<IncomingEntity<?>>> chunk(List<IncomingEntity<?>> entities) + { + List<List<IncomingEntity<?>>> chunks = new ArrayList<>(); + List<IncomingEntity<?>> chunk = null; + for (IncomingEntity<?> incomingEntity : entities) + { + if (chunk == null || chunk.size() >= 1000) + { + chunk = new ArrayList<>(); + chunks.add(chunk); + } + chunk.add(incomingEntity); + } + return chunks; + } + // private void cleanup() // { // operationLog.info("Cleaning up unused master data"); diff --git a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/parallelizedExecutor/AttachmentsSynchronizer.java b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/parallelizedExecutor/AttachmentsSynchronizer.java new file mode 100644 index 0000000000000000000000000000000000000000..e124648ec564f89d6de60e8b377c9a5bea4f04da --- /dev/null +++ b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/sync/harvester/synchronizer/parallelizedExecutor/AttachmentsSynchronizer.java @@ -0,0 +1,403 @@ +/* + * Copyright 2019 ETH Zuerich, SIS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.parallelizedExecutor; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.log4j.Logger; + +import ch.ethz.sis.openbis.generic.asapi.v3.IApplicationServerApi; +import ch.ethz.sis.openbis.generic.asapi.v3.dto.attachment.Attachment; +import ch.ethz.sis.openbis.generic.asapi.v3.dto.attachment.create.AttachmentCreation; +import ch.ethz.sis.openbis.generic.asapi.v3.dto.attachment.fetchoptions.AttachmentFetchOptions; +import ch.ethz.sis.openbis.generic.asapi.v3.dto.attachment.update.AttachmentListUpdateValue; +import ch.ethz.sis.openbis.generic.asapi.v3.dto.common.interfaces.IAttachmentsHolder; +import ch.ethz.sis.openbis.generic.asapi.v3.dto.common.interfaces.IPermIdHolder; +import ch.ethz.sis.openbis.generic.asapi.v3.dto.experiment.fetchoptions.ExperimentFetchOptions; +import ch.ethz.sis.openbis.generic.asapi.v3.dto.experiment.id.ExperimentPermId; +import ch.ethz.sis.openbis.generic.asapi.v3.dto.experiment.update.ExperimentUpdate; +import ch.ethz.sis.openbis.generic.asapi.v3.dto.project.fetchoptions.ProjectFetchOptions; +import ch.ethz.sis.openbis.generic.asapi.v3.dto.project.id.ProjectPermId; +import ch.ethz.sis.openbis.generic.asapi.v3.dto.project.update.ProjectUpdate; +import ch.ethz.sis.openbis.generic.asapi.v3.dto.sample.fetchoptions.SampleFetchOptions; +import ch.ethz.sis.openbis.generic.asapi.v3.dto.sample.id.SamplePermId; +import ch.ethz.sis.openbis.generic.asapi.v3.dto.sample.update.SampleUpdate; +import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.common.SyncEntityKind; +import ch.ethz.sis.openbis.generic.server.dss.plugins.sync.harvester.synchronizer.IncomingEntity; +import ch.systemsx.cisd.common.concurrent.ITaskExecutor; +import ch.systemsx.cisd.common.exceptions.Status; +import ch.systemsx.cisd.common.logging.LogCategory; +import ch.systemsx.cisd.common.logging.LogFactory; + +/** + * @author Franz-Josef Elmer + */ +public class AttachmentsSynchronizer implements ITaskExecutor<List<IncomingEntity<?>>> +{ + private static final Logger operationLog = + LogFactory.getLogger(LogCategory.OPERATION, AttachmentsSynchronizer.class); + + private Map<SyncEntityKind, AbstractHandler> handlersByEntityKind; + + public AttachmentsSynchronizer(IApplicationServerApi v3api, String sessionToken, + IApplicationServerApi v3apiDataSource, String sessionTokenDataSource, Date lastSyncTimestamp, + AttachmentSynchronizationSummary synchronizationSummary) + { + handlersByEntityKind = new HashMap<>(); + handlersByEntityKind.put(SyncEntityKind.PROJECT, new ProjectsHandler()); + handlersByEntityKind.put(SyncEntityKind.EXPERIMENT, new ExperimentsHandler()); + handlersByEntityKind.put(SyncEntityKind.SAMPLE, new SamplesHandler()); + Collection<AbstractHandler> values = handlersByEntityKind.values(); + for (AbstractHandler handler : values) + { + handler.setV3api(v3api); + handler.setSessionToken(sessionToken); + handler.setV3apiDataSource(v3apiDataSource); + handler.setSessionTokenDataSource(sessionTokenDataSource); + handler.setLastSyncTimestamp(lastSyncTimestamp); + handler.setSynchronizationSummary(synchronizationSummary); + } + } + + @Override + public Status execute(List<IncomingEntity<?>> entities) + { + try + { + Map<SyncEntityKind, List<IncomingEntity<?>>> entitiesByEntityKind = segregateEntitiesByEntityKind(entities); + Set<Entry<SyncEntityKind, List<IncomingEntity<?>>>> entrySet = entitiesByEntityKind.entrySet(); + ; + for (Entry<SyncEntityKind, List<IncomingEntity<?>>> entry : entrySet) + { + handlersByEntityKind.get(entry.getKey()).handle(entry.getValue()); + } + } catch (Exception e) + { + operationLog.error("Attachment synchronization failed", e); + return Status.createError("Attachment synchronization failed: " + e.getMessage()); + } + return Status.OK; + } + + private Map<SyncEntityKind, List<IncomingEntity<?>>> segregateEntitiesByEntityKind(List<IncomingEntity<?>> entities) + { + Map<SyncEntityKind, List<IncomingEntity<?>>> map = new HashMap<>(); + for (IncomingEntity<?> entity : entities) + { + List<IncomingEntity<?>> list = map.get(entity.getEntityKind()); + if (list == null) + { + list = new ArrayList<>(); + map.put(entity.getEntityKind(), list); + } + list.add(entity); + } + return map; + } + + private static abstract class AbstractHandler + { + protected IApplicationServerApi v3api; + + protected String sessionToken; + + protected IApplicationServerApi v3apiDataSource; + + protected String sessionTokenDataSource; + + protected Date lastSyncTimestamp; + + protected AttachmentSynchronizationSummary synchronizationSummary; + + public void setV3api(IApplicationServerApi v3api) + { + this.v3api = v3api; + } + + public void setSessionToken(String sessionToken) + { + this.sessionToken = sessionToken; + } + + public void setV3apiDataSource(IApplicationServerApi v3apiDataSource) + { + this.v3apiDataSource = v3apiDataSource; + } + + public void setSessionTokenDataSource(String sessionTokenDataSource) + { + this.sessionTokenDataSource = sessionTokenDataSource; + } + + public void setLastSyncTimestamp(Date lastSyncTimestamp) + { + this.lastSyncTimestamp = lastSyncTimestamp; + } + + public void setSynchronizationSummary(AttachmentSynchronizationSummary synchronizationSummary) + { + this.synchronizationSummary = synchronizationSummary; + } + + public <AH extends IPermIdHolder & IAttachmentsHolder> void handle(List<IncomingEntity<?>> entities) + { + Map<String, Map<String, Attachment>> existingAttachments = retrievAttachments(v3api, sessionToken, entities); + List<IncomingEntity<?>> filteredEntities = entities.stream().filter(e -> e.hasAttachments()).collect(Collectors.toList()); + Map<String, Map<String, Attachment>> attachmentsFromDataSource = + retrievAttachments(v3apiDataSource, sessionTokenDataSource, filteredEntities); + List<NewAttachment> newAttachments = new ArrayList<>(); + List<AttachmentReplacement> attachmentReplacements = new ArrayList<>(); + for (Entry<String, Map<String, Attachment>> entry : attachmentsFromDataSource.entrySet()) + { + String permId = entry.getKey(); + Map<String, Attachment> existingAttachmentsByFile = existingAttachments.get(permId); + if (existingAttachmentsByFile == null) + { + throw new RuntimeException("Severe error: Entity with permId " + permId + + " should exist because it has been just registered on the harvester openBIS instance."); + } + Map<String, Attachment> attachmentsFromDataSourceByFile = entry.getValue(); + Collection<Attachment> values = attachmentsFromDataSourceByFile.values(); + for (Attachment attachmentFromDataSource : values) + { + String fileName = attachmentFromDataSource.getFileName(); + Attachment existingAttachment = existingAttachmentsByFile.get(fileName); + int version = existingAttachment == null ? 0 : existingAttachment.getVersion(); + if (attachmentFromDataSource.getVersion() < version) + { + attachmentReplacements.add(new AttachmentReplacement(attachmentFromDataSource, permId)); + } else if (attachmentFromDataSource.getVersion() == version) + { + if (attachmentFromDataSource.getRegistrationDate().after(lastSyncTimestamp)) + { + attachmentReplacements.add(new AttachmentReplacement(attachmentFromDataSource, permId)); + } else if (equals(attachmentFromDataSource.getTitle(), existingAttachment.getTitle()) == false + || equals(attachmentFromDataSource.getDescription(), existingAttachment.getDescription()) == false) + { + // TODO: update attachment + } + } else + { + newAttachments.add(new NewAttachment(attachmentFromDataSource, version, permId)); + } + } + for (Attachment existingAttachment : existingAttachmentsByFile.values()) + { + if (attachmentsFromDataSourceByFile.get(existingAttachment.getFileName()) == null) + { + // TODO: delete attachment + } + } + } + handleNewAttachments(newAttachments); + } + + private <AH extends IPermIdHolder & IAttachmentsHolder> Map<String, Map<String, Attachment>> retrievAttachments( + IApplicationServerApi v3api, String sessionToken, List<IncomingEntity<?>> entities) + { + List<String> entitiesWithAttachments = entities.stream().map(IncomingEntity::getPermID).collect(Collectors.toList()); + Map<String, Map<String, Attachment>> attachmentsByPermId = new HashMap<>(); + Collection<AH> attachmentHolders = getAttachments(v3api, sessionToken, entitiesWithAttachments); + for (AH attachmentHolder : attachmentHolders) + { + attachmentsByPermId.put(attachmentHolder.getPermId().toString(), + attachmentHolder.getAttachments().stream() + .collect(Collectors.toMap(Attachment::getFileName, Function.identity()))); + } + return attachmentsByPermId; + } + + protected abstract <AH extends IPermIdHolder & IAttachmentsHolder> Collection<AH> getAttachments( + IApplicationServerApi v3api, String sessionToken, List<String> permIds); + + private void handleNewAttachments(List<NewAttachment> newAttachments) + { + Map<String, AttachmentListUpdateValue> attachmentCreationsByPermId = new HashMap<>(); + List<AttachmentCreation> attachmentCreations = new ArrayList<>(); + for (NewAttachment newAttachment : newAttachments) + { + String permId = newAttachment.getPermId(); + AttachmentListUpdateValue attachmentListUpdate = new AttachmentListUpdateValue(); + int fromVersion = newAttachment.getVersion(); + Attachment attachment = newAttachment.getAttachment(); + for (int i = attachment.getVersion(); i > fromVersion; i--) + { + AttachmentCreation attachmentCreation = new AttachmentCreation(); + attachmentCreation.setFileName(attachment.getFileName()); + attachmentCreation.setTitle(attachment.getTitle()); + attachmentCreation.setDescription(attachment.getDescription()); + attachmentCreation.setContent(attachment.getContent()); + attachmentCreations.add(attachmentCreation); + if (attachment.getFetchOptions().hasPreviousVersion()) + { + attachment = attachment.getPreviousVersion(); + } + } + attachmentListUpdate.add(attachmentCreations.toArray(new AttachmentCreation[0])); + attachmentCreationsByPermId.put(permId, attachmentListUpdate); + } + handleAttachments(attachmentCreationsByPermId); + synchronizationSummary.addedCount.addAndGet(attachmentCreationsByPermId.size()); + } + + protected abstract void handleAttachments(Map<String, AttachmentListUpdateValue> attachmentsByPermId); + + private void handleAttachmentReplacements(List<AttachmentReplacement> replacements) + { + + } + + protected void specifiyAttachmentFetchOptions(AttachmentFetchOptions attachmentFetchOptions) + { + attachmentFetchOptions.withContent(); + attachmentFetchOptions.withPreviousVersion().withPreviousVersionUsing(attachmentFetchOptions); + attachmentFetchOptions.withPreviousVersion().withContentUsing(attachmentFetchOptions.withContent()); + } + + private boolean equals(Object o1, Object o2) + { + return o1 == null ? o1 == o2 : o1.equals(o2); + } + } + + private static final class ProjectsHandler extends AbstractHandler + { + @Override + protected <AH extends IPermIdHolder & IAttachmentsHolder> Collection<AH> getAttachments( + IApplicationServerApi v3api, String sessionToken, List<String> permIds) + { + List<ProjectPermId> ids = permIds.stream().map(ProjectPermId::new).collect(Collectors.toList()); + ProjectFetchOptions fetchOptions = new ProjectFetchOptions(); + specifiyAttachmentFetchOptions(fetchOptions.withAttachments()); + return (Collection<AH>) v3api.getProjects(sessionToken, ids, fetchOptions).values(); + } + + @Override + protected void handleAttachments(Map<String, AttachmentListUpdateValue> attachmentsByPermId) + { + List<ProjectUpdate> updates = new ArrayList<>(); + for (Entry<String, AttachmentListUpdateValue> entry : attachmentsByPermId.entrySet()) + { + ProjectUpdate projectUpdate = new ProjectUpdate(); + projectUpdate.setProjectId(new ProjectPermId(entry.getKey())); + projectUpdate.getAttachments().setActions(entry.getValue().getActions()); + updates.add(projectUpdate); + } + v3api.updateProjects(sessionToken, updates); + } + } + + private static final class ExperimentsHandler extends AbstractHandler + { + @Override + protected <AH extends IPermIdHolder & IAttachmentsHolder> Collection<AH> getAttachments( + IApplicationServerApi v3api, String sessionToken, List<String> permIds) + { + List<ExperimentPermId> ids = permIds.stream().map(ExperimentPermId::new).collect(Collectors.toList()); + ExperimentFetchOptions fetchOptions = new ExperimentFetchOptions(); + specifiyAttachmentFetchOptions(fetchOptions.withAttachments()); + return (Collection<AH>) v3api.getExperiments(sessionToken, ids, fetchOptions).values(); + } + + @Override + protected void handleAttachments(Map<String, AttachmentListUpdateValue> attachmentsByPermId) + { + List<ExperimentUpdate> updates = new ArrayList<>(); + for (Entry<String, AttachmentListUpdateValue> entry : attachmentsByPermId.entrySet()) + { + ExperimentUpdate experimentUpdate = new ExperimentUpdate(); + experimentUpdate.setExperimentId(new ExperimentPermId(entry.getKey())); + experimentUpdate.getAttachments().setActions(entry.getValue().getActions()); + updates.add(experimentUpdate); + } + v3api.updateExperiments(sessionToken, updates); + } + } + + private static final class SamplesHandler extends AbstractHandler + { + @Override + protected <AH extends IPermIdHolder & IAttachmentsHolder> Collection<AH> getAttachments( + IApplicationServerApi v3api, String sessionToken, List<String> permIds) + { + List<SamplePermId> ids = permIds.stream().map(SamplePermId::new).collect(Collectors.toList()); + SampleFetchOptions fetchOptions = new SampleFetchOptions(); + specifiyAttachmentFetchOptions(fetchOptions.withAttachments()); + return (Collection<AH>) v3api.getSamples(sessionToken, ids, fetchOptions).values(); + } + + @Override + protected void handleAttachments(Map<String, AttachmentListUpdateValue> attachmentsByPermId) + { + List<SampleUpdate> updates = new ArrayList<>(); + for (Entry<String, AttachmentListUpdateValue> entry : attachmentsByPermId.entrySet()) + { + SampleUpdate sampleUpdate = new SampleUpdate(); + sampleUpdate.setSampleId(new SamplePermId(entry.getKey())); + sampleUpdate.getAttachments().setActions(entry.getValue().getActions()); + updates.add(sampleUpdate); + } + v3api.updateSamples(sessionToken, updates); + } + } + + private static final class NewAttachment + { + private Attachment attachment; + private int version; + private String permId; + + NewAttachment(Attachment attachment, int version, String permId) + { + this.attachment = attachment; + this.version = version; + this.permId = permId; + } + + public Attachment getAttachment() + { + return attachment; + } + + public int getVersion() + { + return version; + } + + public String getPermId() + { + return permId; + } + } + + private static final class AttachmentReplacement + { + AttachmentReplacement(Attachment attachment, String permId) + { + + } + } +}