Skip to content
Snippets Groups Projects
Commit c2891b26 authored by gakin's avatar gakin
Browse files

SSDM-4402 : Go back to establishing parent-child relationships post-reg, load...

SSDM-4402 : Go back to establishing parent-child relationships post-reg, load the not-synced-data-sets, process them along with the ones with last mod. dates after last sync time plus better summary log after ds registration

SVN: 37440
parent cf810609
No related branches found
No related tags found
No related merge requests found
......@@ -123,7 +123,6 @@ class DataSetRegistrationIngestionService extends IngestionService<DataSetInform
IDataSet ds = transaction.createNewDataSet(dataSet.getDataSetType().getCode(), dataSet.getCode());
ds.setSample(sample);
ds.setExperiment(experiment);
ds.setParentDatasets(dataSet.getParentDataSetCodes());
for (NewProperty newProperty : dataSetProperties)
{
ds.setPropertyValue(newProperty.getPropertyCode(), newProperty.getValue());
......
......@@ -127,9 +127,8 @@ import ch.systemsx.cisd.openbis.generic.shared.dto.identifier.ProjectIdentifierF
import ch.systemsx.cisd.openbis.generic.shared.dto.identifier.SampleIdentifier;
import ch.systemsx.cisd.openbis.generic.shared.dto.identifier.SampleIdentifierFactory;
import ch.systemsx.cisd.openbis.generic.shared.dto.identifier.SpaceIdentifier;
/**
*
*
* @author Ganime Betul Akin
*/
public class EntitySynchronizer
......@@ -207,56 +206,57 @@ public class EntitySynchronizer
AtomicEntityOperationResult operationResult = service.performEntityOperations(builder.getDetails());
operationLog.info("entity operation result: " + operationResult);
// register physical data sets
// first set the parent/child relationship
// this way if the parent has failed to register, child will fail too.
// Not that container component relationships are established post-reg.
setParentDataSetsOnTheChildren(data);
Map<String, DataSetWithConnections> physicalDSMap = data.filterPhysicalDataSetsByLastModificationDate(lastSyncTimestamp);
// register physical data sets without any hierarchy
// Note that container/component and parent/child relationships are established post-reg.
// setParentDataSetsOnTheChildren(data);
Map<String, DataSetWithConnections> physicalDSMap =
data.filterPhysicalDataSetsByLastModificationDate(lastSyncTimestamp, dataSetsCodesToRetry);
operationLog.info("Registering data sets...");
registerPhysicalDataSets(physicalDSMap);
List<String> notRegisteredDataSets = registerPhysicalDataSets(physicalDSMap);
operationLog.info((physicalDSMap.keySet().size() - notRegisteredDataSets.size()) + " data set(s) have been successfully registered. "
+ notRegisteredDataSets.size()
+ " data set(s) FAILED to register ");
// link physical data sets registered above to container data sets
// and set parent/child relationships
operationLog.info("start linking/un-linking container and component data sets");
establishContainerComponentRelationships(data.getDataSetsToProcess(), physicalDSMap);
}
private void setParentDataSetsOnTheChildren(ResourceListParserData data)
{
for (DataSetWithConnections dsWithConn : data.getDataSetsToProcess().values())
{
for (Connection conn : dsWithConn.getConnections())
{
NewExternalData dataSet = dsWithConn.getDataSet();
if (data.getDataSetsToProcess().containsKey(conn.getToPermId()) && conn.getType().equals("Child"))
{
NewExternalData childDataSet = data.getDataSetsToProcess().get(conn.getToPermId()).getDataSet();
List<String> parentDataSetCodes = childDataSet.getParentDataSetCodes();
parentDataSetCodes.add(dataSet.getCode());
}
}
}
establishDataSetRelationships(data.getDataSetsToProcess(), notRegisteredDataSets, physicalDSMap);
}
private void establishContainerComponentRelationships(Map<String, DataSetWithConnections> dataSetsToProcess, Map<String, DataSetWithConnections> physicalDSMap)
private void establishDataSetRelationships(Map<String, DataSetWithConnections> dataSetsToProcess,
List<String> notRegisteredDataSets, Map<String, DataSetWithConnections> physicalDSMap)
{
// set parent and container data set codes before everything else
// container and physical data sets can both be parents/children of each other
AtomicEntityOperationDetailsBuilder builder = new AtomicEntityOperationDetailsBuilder();
Map<String, NewExternalData> datasetsToUpdate = new HashMap<String, NewExternalData>();
Map<String, Set<String>> dsToParents = new HashMap<String, Set<String>>();
Map<String, Set<String>> dsToContained = new HashMap<String, Set<String>>();
for (DataSetWithConnections dsWithConn : dataSetsToProcess.values())
{
for (Connection conn : dsWithConn.getConnections())
{
NewExternalData dataSet = dsWithConn.getDataSet();
if (dataSetsToProcess.containsKey(conn.getToPermId()) && conn.getType().equals("Component"))
if (dataSetsToProcess.containsKey(conn.getToPermId()) && conn.getType().equals("Child"))
{
if (notRegisteredDataSets.contains(dataSet.getCode()) == false)
{
NewExternalData childDataSet = dataSetsToProcess.get(conn.getToPermId()).getDataSet();
List<String> parentDataSetCodes = childDataSet.getParentDataSetCodes();
parentDataSetCodes.add(dataSet.getCode());
dsToParents.put(childDataSet.getCode(), new HashSet<String>(parentDataSetCodes));
}
}
else if (dataSetsToProcess.containsKey(conn.getToPermId()) && conn.getType().equals("Component"))
{
NewExternalData componentDataSet = dataSetsToProcess.get(conn.getToPermId()).getDataSet();
NewContainerDataSet containerDataSet = (NewContainerDataSet) dataSet;
List<String> containedDataSetCodes = containerDataSet.getContainedDataSetCodes();
containedDataSetCodes.add(componentDataSet.getCode());
dsToContained.put(dataSet.getCode(), new HashSet<String>(containedDataSetCodes));
if (notRegisteredDataSets.contains(componentDataSet.getCode()) == false)
{
NewContainerDataSet containerDataSet = (NewContainerDataSet) dataSet;
List<String> containedDataSetCodes = containerDataSet.getContainedDataSetCodes();
containedDataSetCodes.add(componentDataSet.getCode());
dsToContained.put(dataSet.getCode(), new HashSet<String>(containedDataSetCodes));
}
}
}
}
......@@ -264,7 +264,10 @@ public class EntitySynchronizer
for (DataSetWithConnections dsWithConn : dataSetsToProcess.values())
{
NewExternalData dataSet = (NewExternalData) dsWithConn.getDataSet();
if (dsWithConn.getLastModificationDate().after(lastSyncTimestamp))
if (dsWithConn.getLastModificationDate().after(lastSyncTimestamp)
|| dataSetsCodesToRetry.contains(dataSet.getCode()) == true
|| isParentModified(dsToParents, dataSet))
{
if (physicalDSMap.containsKey(dataSet.getCode()) == false && service.tryGetDataSet(dataSet.getCode()) == null)
{
......@@ -302,16 +305,16 @@ public class EntitySynchronizer
}
dsBatchUpdatesDTO.getDetails().setContainerUpdateRequested(true);
}
// if (dsToParents.containsKey(dataSet.getCode()))
// {
// dsBatchUpdatesDTO.setModifiedParentDatasetCodesOrNull(dsToParents.get(dataSet.getCode()).toArray(
// new String[dataSet.getParentDataSetCodes().size()]));
// // TODO should this always be true or should we flag the ones that require parent update. Same for container
// }
// else
// {
if (dsToParents.containsKey(dataSet.getCode()))
{
dsBatchUpdatesDTO.setModifiedParentDatasetCodesOrNull(dsToParents.get(dataSet.getCode()).toArray(
new String[dataSet.getParentDataSetCodes().size()]));
// TODO should this always be true or should we flag the ones that require parent update. Same for container
}
else
{
dsBatchUpdatesDTO.setModifiedParentDatasetCodesOrNull(new String[0]);
// }
}
dsBatchUpdatesDTO.getDetails().setParentsUpdateRequested(true);
SampleIdentifier sampleIdentifier = dataSet.getSampleIdentifierOrNull();
if (sampleIdentifier != null)
......@@ -344,27 +347,51 @@ public class EntitySynchronizer
operationLog.info("entity operation result: " + operationResult);
}
private void registerPhysicalDataSets(Map<String, DataSetWithConnections> physicalDSMap) throws IOException
private boolean isParentModified(Map<String, Set<String>> dsToParents, NewExternalData dataSet)
{
Set<String> parents = dsToParents.get(dataSet.getCode());
if (parents == null)
{
return false;
}
for (String parentDSCode : parents)
{
if (dataSetsCodesToRetry.contains(parentDSCode))
{
return true;
}
}
return false;
}
private List<String> registerPhysicalDataSets(Map<String, DataSetWithConnections> physicalDSMap) throws IOException
{
List<DataSetWithConnections> dsList = new ArrayList<DataSetWithConnections>(physicalDSMap.values());
List<String> notSyncedDataSetCodes = Collections.synchronizedList(new ArrayList<String>());
List<String> notRegisteredDataSetCodes = Collections.synchronizedList(new ArrayList<String>());
// TODO this parallelization needs to be revisited: In case of a data set appearing before DSs it is dependent on,
// the parallelization will result in an error, i.e. The graph needs to be topologically sorted before it can be
// parallelized
ParallelizedExecutor.process(dsList, new DataSetRegistrationTaskExecutor(notSyncedDataSetCodes), 0.5, 10, "register data sets", 0, false);
// This parallelization is possible because each DS is registered without dependencies
// and the dependencies are established later on in the sync process.
ParallelizedExecutor.process(dsList, new DataSetRegistrationTaskExecutor(notRegisteredDataSetCodes), 0.5, 10, "register data sets", 0, false);
// backup the current not synced data set codes file, delete the original file
File notSyncedDataSetsFile = new File(config.getNotSyncedDataSetsFileName());
File backupLastSyncTimeStampFile = new File(config.getNotSyncedDataSetsFileName() + ".bk");
FileUtils.copyFile(notSyncedDataSetsFile, backupLastSyncTimeStampFile);
FileUtilities.delete(notSyncedDataSetsFile);
if (notSyncedDataSetsFile.exists())
{
backupAndResetNotSyncedDataSetsFile(notSyncedDataSetsFile);
}
for (String dsCode : notSyncedDataSetCodes)
for (String dsCode : notRegisteredDataSetCodes)
{
FileUtilities.appendToFile(notSyncedDataSetsFile, dsCode, true);
}
return notRegisteredDataSetCodes;
}
private void backupAndResetNotSyncedDataSetsFile(File notSyncedDataSetsFile) throws IOException
{
File backupLastSyncTimeStampFile = new File(config.getNotSyncedDataSetsFileName() + ".bk");
FileUtils.copyFile(notSyncedDataSetsFile, backupLastSyncTimeStampFile);
FileUtils.writeStringToFile(notSyncedDataSetsFile, "");
}
private void processMetaData(ResourceListParserData data, AtomicEntityOperationDetailsBuilder builder)
......@@ -858,11 +885,11 @@ public class EntitySynchronizer
private final class DataSetRegistrationTaskExecutor implements ITaskExecutor<DataSetWithConnections>
{
private List<String> notSyncedDataSetCodes;
private List<String> notRegisteredDataSetCodes;
public DataSetRegistrationTaskExecutor(List<String> notSyncedDataSetCodes)
public DataSetRegistrationTaskExecutor(List<String> notRegisteredDataSetCodes)
{
this.notSyncedDataSetCodes = notSyncedDataSetCodes;
this.notRegisteredDataSetCodes = notRegisteredDataSetCodes;
}
@Override
......@@ -873,7 +900,7 @@ public class EntitySynchronizer
Properties props = setProperties();
DataSetRegistrationIngestionService ingestionService =
new DataSetRegistrationIngestionService(props, storeRoot, notSyncedDataSetCodes, dataSet.getDataSet(), operationLog);
new DataSetRegistrationIngestionService(props, storeRoot, notRegisteredDataSetCodes, dataSet.getDataSet(), operationLog);
TableModel resultTable = ingestionService.createAggregationReport(new HashMap<String, Object>(), context);
if (resultTable != null)
{
......@@ -884,7 +911,7 @@ public class EntitySynchronizer
if (headers.get(i).getTitle().startsWith("Error"))
{
String message = resultTable.getRows().get(0).getValues().toString();
notSyncedDataSetCodes.add(dataSet.getDataSet().getCode());
notRegisteredDataSetCodes.add(dataSet.getDataSet().getCode());
operationLog.error(message);
return Status.createError(message);
}
......
......@@ -80,13 +80,14 @@ public class ResourceListParserData
return materialsToProcess;
}
public Map<String, DataSetWithConnections> filterPhysicalDataSetsByLastModificationDate(Date lastSyncDate)
public Map<String, DataSetWithConnections> filterPhysicalDataSetsByLastModificationDate(Date lastSyncDate, Set<String> dataSetsCodesToRetry)
{
Map<String, DataSetWithConnections> dsMap = new HashMap<String, ResourceListParserData.DataSetWithConnections>();
for (String permId : dataSetsToProcess.keySet())
{
DataSetWithConnections ds = dataSetsToProcess.get(permId);
if (ds.getKind() == DataSetKind.PHYSICAL && ds.lastModificationDate.after(lastSyncDate))
if (ds.getKind() == DataSetKind.PHYSICAL
&& (ds.lastModificationDate.after(lastSyncDate) || dataSetsCodesToRetry.contains(ds.getDataSet().getCode())))
{
dsMap.put(permId, ds);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment