diff --git a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/DataSetRegistrationTask.java b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/DataSetRegistrationTask.java index 1ad30eda3d650865a0e0c72a8e044f629dd50a12..fb5784acd2c0b78c9f60d5965eee390a92b8ae34 100644 --- a/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/DataSetRegistrationTask.java +++ b/datastore_server/source/java/ch/ethz/sis/openbis/generic/server/dss/plugins/DataSetRegistrationTask.java @@ -29,17 +29,21 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.UnsupportedEncodingException; import java.net.URI; import java.net.URISyntaxException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -61,6 +65,7 @@ import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; import javax.xml.xpath.XPathExpressionException; +import org.apache.commons.codec.binary.Hex; import org.apache.log4j.DailyRollingFileAppender; import org.apache.log4j.Logger; import org.apache.log4j.PatternLayout; @@ -75,6 +80,7 @@ import org.xml.sax.SAXException; import ch.ethz.sis.openbis.generic.asapi.v3.IApplicationServerApi; import ch.ethz.sis.openbis.generic.asapi.v3.dto.common.search.SearchResult; +import ch.ethz.sis.openbis.generic.asapi.v3.dto.dataset.DataSetKind; import ch.ethz.sis.openbis.generic.asapi.v3.dto.dataset.delete.DataSetDeletionOptions; import ch.ethz.sis.openbis.generic.asapi.v3.dto.dataset.id.DataSetPermId; import ch.ethz.sis.openbis.generic.asapi.v3.dto.deletion.id.IDeletionId; @@ -192,7 +198,7 @@ public class DataSetRegistrationTask<T extends DataSetInformation> implements IM private static final String DATA_SOURCE_SPACES_PROPERTY_NAME = "data-source-spaces"; - private static final String DATA_SOURCE_PREFIX_PROPERTY_NAME = "data-souce-prefix"; + private static final String DATA_SOURCE_PREFIX_PROPERTY_NAME = "data-source-prefix"; private static final String DATA_SOURCE_AUTH_REALM_PROPERTY_NAME = "data-source-auth-realm"; @@ -475,12 +481,12 @@ public class DataSetRegistrationTask<T extends DataSetInformation> implements IM File storeRoot = transaction.getGlobalState().getStoreRootDir(); File temp = new File(storeRoot, this.harvesterTempDir); temp.mkdirs(); - File file = new File(temp, ds.getDataSetCode()); - file.mkdirs(); + File dir = new File(temp, ds.getDataSetCode()); + dir.mkdirs(); - downloadDataSetFiles(file, ds.getDataSetCode()); + downloadDataSetFiles(dir, ds.getDataSetCode()); - File dsPath = new File(file, "original"); + File dsPath = new File(dir, "original"); for (File f : dsPath.listFiles()) { transaction.moveFile(f.getAbsolutePath(), ds); @@ -500,7 +506,7 @@ public class DataSetRegistrationTask<T extends DataSetInformation> implements IM return null; } - private void downloadDataSetFiles(File file, String dataSetCode) + private void downloadDataSetFiles(File dir, String dataSetCode) { SslCertificateHelper.trustAnyCertificate(asUrl); SslCertificateHelper.trustAnyCertificate(dssUrl); @@ -537,7 +543,7 @@ public class DataSetRegistrationTask<T extends DataSetInformation> implements IM // if (dsFile.getPath().equals("original")) // continue; String filePath = dsFile.getPath();// .substring("original/".length()); - File output = new File(file.getAbsolutePath(), filePath); + File output = new File(dir.getAbsolutePath(), filePath); if (dsFile.isDirectory()) { output.mkdirs(); @@ -546,7 +552,7 @@ public class DataSetRegistrationTask<T extends DataSetInformation> implements IM { System.err.println("Downloaded " + dsFile.getPath() + " " + MemorySizeFormatter.format(dsFile.getFileLength())); - Path path = Paths.get(file.getAbsolutePath(), filePath); + Path path = Paths.get(dir.getAbsolutePath(), filePath); try { Files.copy(fileDownload.getInputStream(), path, java.nio.file.StandardCopyOption.REPLACE_EXISTING); @@ -650,6 +656,9 @@ public class DataSetRegistrationTask<T extends DataSetInformation> implements IM Map<String, DataSetWithConnections> dataSetsToProcess = data.datasetsToProcess; Map<String, MaterialWithLastModificationDate> materialsToProcess = data.materialsToProcess; + operationLog.info("Processing deletions"); + processDeletions(data); + AtomicEntityOperationDetailsBuilder builder = new AtomicEntityOperationDetailsBuilder(); processProjects(projectsToProcess, experimentsToProcess, builder); @@ -790,9 +799,6 @@ public class DataSetRegistrationTask<T extends DataSetInformation> implements IM System.err.println("entity operation result: " + operationResult); operationLog.info("entity operation result: " + operationResult); - operationLog.info("Processing deletions"); - processDeletions(data); - operationLog.info("Saving the timestamp of sync start to file"); saveSyncTimestamp(); @@ -802,12 +808,11 @@ public class DataSetRegistrationTask<T extends DataSetInformation> implements IM } catch (Exception e) { operationLog.error("Sync failed: " + e.getMessage()); - - sendInfoEmail(); + sendErrorEmail("Synchronization failed"); } } - private void sendInfoEmail() + private void sendErrorEmail(String subject) { if (config.getLogFilePath() != null) { @@ -815,8 +820,8 @@ public class DataSetRegistrationTask<T extends DataSetInformation> implements IM DataSource dataSource = createDataSource(config.getLogFilePath()); // /Users/gakin/Documents/sync.log for (EMailAddress recipient : config.getEmailAddresses()) { - mailClient.sendEmailMessageWithAttachment("Synchronization failed", - "Hi, the syncronization failed. See the attached file for details.", + mailClient.sendEmailMessageWithAttachment(subject, + "See the attached file for details.", "", new DataHandler( dataSource), null, null, recipient); } @@ -825,8 +830,8 @@ public class DataSetRegistrationTask<T extends DataSetInformation> implements IM { for (EMailAddress recipient : config.getEmailAddresses()) { - mailClient.sendEmailMessage("Synchronization failed", - "Hi, the syncronization failed. See the data store server log for details.", null, null, recipient); + mailClient.sendEmailMessage(subject, + "See the data store server log for details.", null, null, recipient); } } } @@ -856,7 +861,7 @@ public class DataSetRegistrationTask<T extends DataSetInformation> implements IM config.setLogFilePath(reader.getString(DEFAULT_DATA_SOURCE_SECTION, LOG_FILE_PROPERTY_NAME, DEFAULT_LOG_FILE_NAME, false)); if (config.getLogFilePath() != null) { - configureFileAppender(); + // configureFileAppender(); } config.setDataSourceURI(reader.getString(DEFAULT_DATA_SOURCE_SECTION, DATA_SOURCE_URL_PROPERTY_NAME, null, true)); @@ -879,7 +884,7 @@ public class DataSetRegistrationTask<T extends DataSetInformation> implements IM config.printConfig(); } - private void processDeletions(ResourceListParserData data) + private void processDeletions(ResourceListParserData data) throws NoSuchAlgorithmException, UnsupportedEncodingException { String sessionToken = ServiceProvider.getOpenBISService().getSessionToken(); EntityRetriever entityRetriever = @@ -918,6 +923,14 @@ public class DataSetRegistrationTask<T extends DataSetInformation> implements IM { experimentPermIds.add(new ExperimentPermId(entity.getPermId())); } + else { + String typeCodeOrNull = entity.getTypeCodeOrNull(); + NewExperiment exp = data.experimentsToProcess.get(entity.getPermId()).getExperiment(); + if (typeCodeOrNull.equals(exp.getExperimentTypeCode()) == false) + { + experimentPermIds.add(new ExperimentPermId(entity.getPermId())); + } + } } else if (entity.getEntityKind().equals("SAMPLE")) { @@ -925,6 +938,15 @@ public class DataSetRegistrationTask<T extends DataSetInformation> implements IM { samplePermIds.add(new SamplePermId(entity.getPermId())); } + else + { + String typeCodeOrNull = entity.getTypeCodeOrNull(); + NewSample smp = data.samplesToProcess.get(entity.getPermId()).getSample(); + if (typeCodeOrNull.equals(smp.getSampleType().getCode()) == false) + { + samplePermIds.add(new SamplePermId(entity.getPermId())); + } + } } else if (entity.getEntityKind().equals("DATA_SET")) { @@ -932,6 +954,29 @@ public class DataSetRegistrationTask<T extends DataSetInformation> implements IM { dsPermIds.add(new DataSetPermId(entity.getPermId())); } + else + { + boolean sameDS = true; + // if (ds.getKind() == DataSetKind.PHYSICAL && ds.lastModificationDate.after(lastSyncDate)) + String typeCodeOrNull = entity.getTypeCodeOrNull(); + DataSetWithConnections dsWithConns = data.datasetsToProcess.get(entity.getPermId()); + NewExternalData ds = dsWithConns.getDataSet(); + if (typeCodeOrNull.equals(ds.getDataSetType().getCode()) == false) + { + sameDS = false; + } + else + { + if (dsWithConns.getKind() == DataSetKind.PHYSICAL && dsWithConns.getLastModificationDate().after(lastSyncTimestamp)) + { + sameDS = deepCompareDataSets(entity.getPermId()); + } + } + if (sameDS == false) + { + dsPermIds.add(new DataSetPermId(entity.getPermId())); + } + } } } } @@ -980,6 +1025,69 @@ public class DataSetRegistrationTask<T extends DataSetInformation> implements IM v3Api.confirmDeletions(sessionToken, Arrays.asList(expDeletionId, dsDeletionId, smpDeletionId)); } + private boolean deepCompareDataSets(String dataSetCode) + throws NoSuchAlgorithmException, UnsupportedEncodingException + { + // get the file nodes in the incoming DS by querying the data source openbis + String asUrl = config.getDataSourceOpenbisURL(); + String dssUrl = config.getDataSourceDSSURL(); + + SslCertificateHelper.trustAnyCertificate(dssUrl); + SslCertificateHelper.trustAnyCertificate(asUrl); + + IDataStoreServerApi dss = + HttpInvokerUtils.createStreamSupportingServiceStub(IDataStoreServerApi.class, + dssUrl + IDataStoreServerApi.SERVICE_URL, 10000); + IApplicationServerApi as = HttpInvokerUtils + .createServiceStub(IApplicationServerApi.class, asUrl + + IApplicationServerApi.SERVICE_URL, 10000); + String sessionToken = as.login(config.getUser(), config.getPass()); + + DataSetFileSearchCriteria criteria = new DataSetFileSearchCriteria(); + criteria.withDataSet().withCode().thatEquals(dataSetCode); + SearchResult<DataSetFile> result = dss.searchFiles(sessionToken, criteria, new DataSetFileFetchOptions()); + + IDataStoreServerApi dssharvester = (IDataStoreServerApi) ServiceProvider.getDssServiceV3().getService(); + SearchResult<DataSetFile> resultHarvester = + dssharvester.searchFiles(ServiceProvider.getOpenBISService().getSessionToken(), criteria, new DataSetFileFetchOptions()); + if (result.getTotalCount() != resultHarvester.getTotalCount()) + { + return false; + } + + List<DataSetFile> dsNodes = result.getObjects(); + List<DataSetFile> harvesterNodes = resultHarvester.getObjects(); + sortFileNodes(dsNodes); + sortFileNodes(harvesterNodes); + return calculateHash(dsNodes).equals(calculateHash(harvesterNodes)); + } + + private void sortFileNodes(List<DataSetFile> nodes) + { + Collections.sort(nodes, new Comparator<DataSetFile>() + { + + @Override + public int compare(DataSetFile dsFile1, DataSetFile dsFile2) + { + return dsFile1.getPath().compareTo(dsFile2.getPath()); + } + }); + } + + private String calculateHash(List<DataSetFile> nodes) throws NoSuchAlgorithmException, UnsupportedEncodingException + { + StringBuffer sb = new StringBuffer(); + for (DataSetFile dataSetFile : nodes) + { + sb.append(dataSetFile.getPath()); + sb.append(dataSetFile.getChecksumCRC32()); + sb.append(dataSetFile.getFileLength()); + } + byte[] digest = MessageDigest.getInstance("MD5").digest(new String(sb).getBytes("UTF-8")); + return new String(Hex.encodeHex(digest)); + } + private void saveSyncTimestamp() { newLastSyncTimeStampFile.renameTo(lastSyncTimestampFile);