From c2891b26a6b6ecb9d2b26b9d97e3e19304c7edb5 Mon Sep 17 00:00:00 2001
From: gakin <gakin>
Date: Mon, 5 Dec 2016 15:22:41 +0000
Subject: [PATCH] SSDM-4402 : Go back to establishing parent-child
 relationships post-reg, load the not-synced-data-sets, process them along
 with the ones with last mod. dates after last sync time plus better summary
 log after ds registration

SVN: 37440
---
 .../DataSetRegistrationIngestionService.java  |   1 -
 .../synchronizer/EntitySynchronizer.java      | 143 +++++++++++-------
 .../synchronizer/ResourceListParserData.java  |   5 +-
 3 files changed, 88 insertions(+), 61 deletions(-)

diff --git a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/harvester/synchronizer/DataSetRegistrationIngestionService.java b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/harvester/synchronizer/DataSetRegistrationIngestionService.java
index c3daabb8cb9..42848742a35 100644
--- a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/harvester/synchronizer/DataSetRegistrationIngestionService.java
+++ b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/harvester/synchronizer/DataSetRegistrationIngestionService.java
@@ -123,7 +123,6 @@ class DataSetRegistrationIngestionService extends IngestionService<DataSetInform
             IDataSet ds = transaction.createNewDataSet(dataSet.getDataSetType().getCode(), dataSet.getCode());
             ds.setSample(sample);
             ds.setExperiment(experiment);
-            ds.setParentDatasets(dataSet.getParentDataSetCodes());
             for (NewProperty newProperty : dataSetProperties)
             {
                 ds.setPropertyValue(newProperty.getPropertyCode(), newProperty.getValue());
diff --git a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/harvester/synchronizer/EntitySynchronizer.java b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/harvester/synchronizer/EntitySynchronizer.java
index b113cbb0af5..dc77eca7ef9 100644
--- a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/harvester/synchronizer/EntitySynchronizer.java
+++ b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/harvester/synchronizer/EntitySynchronizer.java
@@ -127,9 +127,8 @@ import ch.systemsx.cisd.openbis.generic.shared.dto.identifier.ProjectIdentifierF
 import ch.systemsx.cisd.openbis.generic.shared.dto.identifier.SampleIdentifier;
 import ch.systemsx.cisd.openbis.generic.shared.dto.identifier.SampleIdentifierFactory;
 import ch.systemsx.cisd.openbis.generic.shared.dto.identifier.SpaceIdentifier;
+
 /**
- * 
- *
  * @author Ganime Betul Akin
  */
 public class EntitySynchronizer
@@ -207,56 +206,57 @@ public class EntitySynchronizer
         AtomicEntityOperationResult operationResult = service.performEntityOperations(builder.getDetails());
         operationLog.info("entity operation result: " + operationResult);
 
-        // register physical data sets
-        // first set the parent/child relationship
-        // this way if the parent has failed to register, child will fail too.
-        // Not that container component relationships are established post-reg.
-        setParentDataSetsOnTheChildren(data);
-        Map<String, DataSetWithConnections> physicalDSMap = data.filterPhysicalDataSetsByLastModificationDate(lastSyncTimestamp);
+        // register physical data sets without any hierarchy
+        // Note that container/component and parent/child relationships are established post-reg.
+        // setParentDataSetsOnTheChildren(data);
+        Map<String, DataSetWithConnections> physicalDSMap =
+                data.filterPhysicalDataSetsByLastModificationDate(lastSyncTimestamp, dataSetsCodesToRetry);
         operationLog.info("Registering data sets...");
-        registerPhysicalDataSets(physicalDSMap);
+        List<String> notRegisteredDataSets = registerPhysicalDataSets(physicalDSMap);
+        operationLog.info((physicalDSMap.keySet().size() - notRegisteredDataSets.size()) + " data set(s) have been successfully registered. "
+                + notRegisteredDataSets.size()
+                + " data set(s) FAILED to register ");
 
         // link physical data sets registered above to container data sets
+        // and set parent/child relationships
         operationLog.info("start linking/un-linking container and component data sets");
-        establishContainerComponentRelationships(data.getDataSetsToProcess(), physicalDSMap);
-    }
-
-    private void setParentDataSetsOnTheChildren(ResourceListParserData data)
-    {
-        for (DataSetWithConnections dsWithConn : data.getDataSetsToProcess().values())
-        {
-            for (Connection conn : dsWithConn.getConnections())
-            {
-                NewExternalData dataSet = dsWithConn.getDataSet();
-                if (data.getDataSetsToProcess().containsKey(conn.getToPermId()) && conn.getType().equals("Child"))
-                {
-                    NewExternalData childDataSet = data.getDataSetsToProcess().get(conn.getToPermId()).getDataSet();
-                    List<String> parentDataSetCodes = childDataSet.getParentDataSetCodes();
-                    parentDataSetCodes.add(dataSet.getCode());
-                }
-            }
-        }
+        establishDataSetRelationships(data.getDataSetsToProcess(), notRegisteredDataSets, physicalDSMap);
     }
 
-    private void establishContainerComponentRelationships(Map<String, DataSetWithConnections> dataSetsToProcess, Map<String, DataSetWithConnections> physicalDSMap)
+    private void establishDataSetRelationships(Map<String, DataSetWithConnections> dataSetsToProcess,
+            List<String> notRegisteredDataSets, Map<String, DataSetWithConnections> physicalDSMap)
     {
         // set parent and container data set codes before everything else
         // container and physical data sets can both be parents/children of each other
         AtomicEntityOperationDetailsBuilder builder = new AtomicEntityOperationDetailsBuilder();
         Map<String, NewExternalData> datasetsToUpdate = new HashMap<String, NewExternalData>();
+        Map<String, Set<String>> dsToParents = new HashMap<String, Set<String>>();
         Map<String, Set<String>> dsToContained = new HashMap<String, Set<String>>();
         for (DataSetWithConnections dsWithConn : dataSetsToProcess.values())
         {
             for (Connection conn : dsWithConn.getConnections())
             {
                 NewExternalData dataSet = dsWithConn.getDataSet();
-                if (dataSetsToProcess.containsKey(conn.getToPermId()) && conn.getType().equals("Component"))
+                if (dataSetsToProcess.containsKey(conn.getToPermId()) && conn.getType().equals("Child"))
+                {
+                    if (notRegisteredDataSets.contains(dataSet.getCode()) == false)
+                    {
+                        NewExternalData childDataSet = dataSetsToProcess.get(conn.getToPermId()).getDataSet();
+                        List<String> parentDataSetCodes = childDataSet.getParentDataSetCodes();
+                        parentDataSetCodes.add(dataSet.getCode());
+                        dsToParents.put(childDataSet.getCode(), new HashSet<String>(parentDataSetCodes));
+                    }
+                }
+                else if (dataSetsToProcess.containsKey(conn.getToPermId()) && conn.getType().equals("Component"))
                 {
                     NewExternalData componentDataSet = dataSetsToProcess.get(conn.getToPermId()).getDataSet();
-                    NewContainerDataSet containerDataSet = (NewContainerDataSet) dataSet;
-                    List<String> containedDataSetCodes = containerDataSet.getContainedDataSetCodes();
-                    containedDataSetCodes.add(componentDataSet.getCode());
-                    dsToContained.put(dataSet.getCode(), new HashSet<String>(containedDataSetCodes));
+                    if (notRegisteredDataSets.contains(componentDataSet.getCode()) == false)
+                    {
+                        NewContainerDataSet containerDataSet = (NewContainerDataSet) dataSet;
+                        List<String> containedDataSetCodes = containerDataSet.getContainedDataSetCodes();
+                        containedDataSetCodes.add(componentDataSet.getCode());
+                        dsToContained.put(dataSet.getCode(), new HashSet<String>(containedDataSetCodes));
+                    }
                 }
             }
         }
@@ -264,7 +264,10 @@ public class EntitySynchronizer
         for (DataSetWithConnections dsWithConn : dataSetsToProcess.values())
         {
             NewExternalData dataSet = (NewExternalData) dsWithConn.getDataSet();
-            if (dsWithConn.getLastModificationDate().after(lastSyncTimestamp))
+
+            if (dsWithConn.getLastModificationDate().after(lastSyncTimestamp)
+                    || dataSetsCodesToRetry.contains(dataSet.getCode()) == true
+                    || isParentModified(dsToParents, dataSet))
             {
                 if (physicalDSMap.containsKey(dataSet.getCode()) == false && service.tryGetDataSet(dataSet.getCode()) == null)
                 {
@@ -302,16 +305,16 @@ public class EntitySynchronizer
                 }
                 dsBatchUpdatesDTO.getDetails().setContainerUpdateRequested(true);
             }
-            // if (dsToParents.containsKey(dataSet.getCode()))
-            // {
-            // dsBatchUpdatesDTO.setModifiedParentDatasetCodesOrNull(dsToParents.get(dataSet.getCode()).toArray(
-            // new String[dataSet.getParentDataSetCodes().size()]));
-            // // TODO should this always be true or should we flag the ones that require parent update. Same for container
-            // }
-            // else
-            // {
+            if (dsToParents.containsKey(dataSet.getCode()))
+            {
+                dsBatchUpdatesDTO.setModifiedParentDatasetCodesOrNull(dsToParents.get(dataSet.getCode()).toArray(
+                        new String[dataSet.getParentDataSetCodes().size()]));
+                // TODO should this always be true or should we flag the ones that require parent update. Same for container
+            }
+            else
+            {
                 dsBatchUpdatesDTO.setModifiedParentDatasetCodesOrNull(new String[0]);
-            // }
+            }
             dsBatchUpdatesDTO.getDetails().setParentsUpdateRequested(true);
             SampleIdentifier sampleIdentifier = dataSet.getSampleIdentifierOrNull();
             if (sampleIdentifier != null)
@@ -344,27 +347,51 @@ public class EntitySynchronizer
         operationLog.info("entity operation result: " + operationResult);
     }
 
-    private void registerPhysicalDataSets(Map<String, DataSetWithConnections> physicalDSMap) throws IOException
+    private boolean isParentModified(Map<String, Set<String>> dsToParents, NewExternalData dataSet)
+    {
+        Set<String> parents = dsToParents.get(dataSet.getCode());
+        if (parents == null)
+        {
+            return false;
+        }
+        for (String parentDSCode : parents)
+        {
+            if (dataSetsCodesToRetry.contains(parentDSCode))
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private List<String> registerPhysicalDataSets(Map<String, DataSetWithConnections> physicalDSMap) throws IOException
     {
         List<DataSetWithConnections> dsList = new ArrayList<DataSetWithConnections>(physicalDSMap.values());
-        List<String> notSyncedDataSetCodes = Collections.synchronizedList(new ArrayList<String>());
+        List<String> notRegisteredDataSetCodes = Collections.synchronizedList(new ArrayList<String>());
 
-        // TODO this parallelization needs to be revisited: In case of a data set appearing before DSs it is dependent on,
-        // the parallelization will result in an error, i.e. The graph needs to be topologically sorted before it can be
-        // parallelized
-        ParallelizedExecutor.process(dsList, new DataSetRegistrationTaskExecutor(notSyncedDataSetCodes), 0.5, 10, "register data sets", 0, false);
+        // This parallelization is possible because each DS is registered without dependencies
+        // and the dependencies are established later on in the sync process.
+        ParallelizedExecutor.process(dsList, new DataSetRegistrationTaskExecutor(notRegisteredDataSetCodes), 0.5, 10, "register data sets", 0, false);
 
         // backup the current not synced data set codes file, delete the original file
         File notSyncedDataSetsFile = new File(config.getNotSyncedDataSetsFileName());
-        File backupLastSyncTimeStampFile = new File(config.getNotSyncedDataSetsFileName() + ".bk");
-        FileUtils.copyFile(notSyncedDataSetsFile, backupLastSyncTimeStampFile);
-        FileUtilities.delete(notSyncedDataSetsFile);
+        if (notSyncedDataSetsFile.exists())
+        {
+            backupAndResetNotSyncedDataSetsFile(notSyncedDataSetsFile);
+        }
 
-        for (String dsCode : notSyncedDataSetCodes)
+        for (String dsCode : notRegisteredDataSetCodes)
         {
             FileUtilities.appendToFile(notSyncedDataSetsFile, dsCode, true);
         }
+        return notRegisteredDataSetCodes;
+    }
 
+    private void backupAndResetNotSyncedDataSetsFile(File notSyncedDataSetsFile) throws IOException
+    {
+        File backupLastSyncTimeStampFile = new File(config.getNotSyncedDataSetsFileName() + ".bk");
+        FileUtils.copyFile(notSyncedDataSetsFile, backupLastSyncTimeStampFile);
+        FileUtils.writeStringToFile(notSyncedDataSetsFile, "");
     }
 
     private void processMetaData(ResourceListParserData data, AtomicEntityOperationDetailsBuilder builder)
@@ -858,11 +885,11 @@ public class EntitySynchronizer
     private final class DataSetRegistrationTaskExecutor implements ITaskExecutor<DataSetWithConnections>
     {
 
-        private List<String> notSyncedDataSetCodes;
+        private List<String> notRegisteredDataSetCodes;
 
-        public DataSetRegistrationTaskExecutor(List<String> notSyncedDataSetCodes)
+        public DataSetRegistrationTaskExecutor(List<String> notRegisteredDataSetCodes)
         {
-            this.notSyncedDataSetCodes = notSyncedDataSetCodes;
+            this.notRegisteredDataSetCodes = notRegisteredDataSetCodes;
         }
 
         @Override
@@ -873,7 +900,7 @@ public class EntitySynchronizer
             Properties props = setProperties();
 
             DataSetRegistrationIngestionService ingestionService =
-                    new DataSetRegistrationIngestionService(props, storeRoot, notSyncedDataSetCodes, dataSet.getDataSet(), operationLog);
+                    new DataSetRegistrationIngestionService(props, storeRoot, notRegisteredDataSetCodes, dataSet.getDataSet(), operationLog);
             TableModel resultTable = ingestionService.createAggregationReport(new HashMap<String, Object>(), context);
             if (resultTable != null)
             {
@@ -884,7 +911,7 @@ public class EntitySynchronizer
                     if (headers.get(i).getTitle().startsWith("Error"))
                     {
                         String message = resultTable.getRows().get(0).getValues().toString();
-                        notSyncedDataSetCodes.add(dataSet.getDataSet().getCode());
+                        notRegisteredDataSetCodes.add(dataSet.getDataSet().getCode());
                         operationLog.error(message);
                         return Status.createError(message);
                     }
diff --git a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/harvester/synchronizer/ResourceListParserData.java b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/harvester/synchronizer/ResourceListParserData.java
index ff3c764d5ab..75df9cc294c 100644
--- a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/harvester/synchronizer/ResourceListParserData.java
+++ b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/harvester/synchronizer/ResourceListParserData.java
@@ -80,13 +80,14 @@ public class ResourceListParserData
         return materialsToProcess;
     }
 
-    public Map<String, DataSetWithConnections> filterPhysicalDataSetsByLastModificationDate(Date lastSyncDate)
+    public Map<String, DataSetWithConnections> filterPhysicalDataSetsByLastModificationDate(Date lastSyncDate, Set<String> dataSetsCodesToRetry)
     {
         Map<String, DataSetWithConnections> dsMap = new HashMap<String, ResourceListParserData.DataSetWithConnections>();
         for (String permId : dataSetsToProcess.keySet())
         {
             DataSetWithConnections ds = dataSetsToProcess.get(permId);
-            if (ds.getKind() == DataSetKind.PHYSICAL && ds.lastModificationDate.after(lastSyncDate))
+            if (ds.getKind() == DataSetKind.PHYSICAL
+                    && (ds.lastModificationDate.after(lastSyncDate) || dataSetsCodesToRetry.contains(ds.getDataSet().getCode())))
             {
                 dsMap.put(permId, ds);
             }
-- 
GitLab