diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/Parameters.java b/datamover/source/java/ch/systemsx/cisd/datamover/Parameters.java index d10a833cb1c8bdfe817d52024bf07064ef4c52f0..b25bc6dd3f82a98b8e2a123cb654d2488c9bcd32 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/Parameters.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/Parameters.java @@ -63,6 +63,14 @@ public class Parameters implements ITimingParameters, IFileSysParameters + "copy operations.") private String rsyncExecutable = null; + /** + * If set to <code>true</code>, rsync is called in such a way to files that already exist are overwritten rather + * than appended to. + */ + @Option(longName = "rsync-overwrite", usage = "If true, files that already exist on the remote side are always " + + "overwritten rather than appended.") + private boolean rsyncOverwrite = false; + /** * The name of the <code>ssh</code> executable to use for creating tunnels. */ @@ -369,6 +377,7 @@ public class Parameters implements ITimingParameters, IFileSysParameters { final Properties serviceProperties = loadServiceProperties(); rsyncExecutable = serviceProperties.getProperty("rsync-executable"); + rsyncOverwrite = Boolean.parseBoolean(serviceProperties.getProperty("rsync-overwrite", "false")); sshExecutable = serviceProperties.getProperty("ssh-executable"); hardLinkExecutable = serviceProperties.getProperty("hard-link-executable"); checkIntervalMillis = @@ -459,6 +468,15 @@ public class Parameters implements ITimingParameters, IFileSysParameters return rsyncExecutable; } + /** + * @return <code>true</code>, if rsync is called in such a way to files that already exist are overwritten rather + * than appended to. + */ + public boolean isRsyncOverwrite() + { + return rsyncOverwrite; + } + /** * @return The name of the <code>ssh</code> executable to use for creating tunnels. */ diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/FileSysOperationsFactory.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/FileSysOperationsFactory.java index 4186820c0d35e6292b5db72a049c4e5eb616338a..cd51c56a8501d7f6cfc8f4361296d17f8e98b8f8 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/FileSysOperationsFactory.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/FileSysOperationsFactory.java @@ -61,7 +61,7 @@ public class FileSysOperationsFactory implements IFileSysOperationsFactory public FileSysOperationsFactory(IFileSysParameters parameters) { assert parameters != null; - + this.parameters = parameters; } @@ -73,28 +73,28 @@ public class FileSysOperationsFactory implements IFileSysOperationsFactory public IReadPathOperations getReadPathOperations() { return new IReadPathOperations() - { - - public boolean exists(File file) { - return file.exists(); - } - public long lastChanged(File path) - { - return FileUtilities.lastChanged(path); - } + public boolean exists(File file) + { + return file.exists(); + } - public File[] tryListFiles(File directory, FileFilter filter, ISimpleLogger loggerOrNull) - { - return FileUtilities.tryListFiles(directory, filter, loggerOrNull); - } + public long lastChanged(File path) + { + return FileUtilities.lastChanged(path); + } - public File[] tryListFiles(File directory, ISimpleLogger loggerOrNull) - { - return FileUtilities.tryListFiles(directory, FileUtilities.ACCEPT_ALL_FILTER, loggerOrNull); - } - }; + public File[] tryListFiles(File directory, FileFilter filter, ISimpleLogger loggerOrNull) + { + return FileUtilities.tryListFiles(directory, filter, loggerOrNull); + } + + public File[] tryListFiles(File directory, ISimpleLogger loggerOrNull) + { + return FileUtilities.tryListFiles(directory, FileUtilities.ACCEPT_ALL_FILTER, loggerOrNull); + } + }; } public IPathImmutableCopier getImmutableCopier() @@ -144,7 +144,8 @@ public class FileSysOperationsFactory implements IFileSysOperationsFactory final File sshExecutable = findSshExecutable(parameters.getSshExecutable()); if (rsyncExecutable != null) { - return new RsyncCopier(rsyncExecutable, sshExecutable, requiresDeletionBeforeCreation); + return new RsyncCopier(rsyncExecutable, sshExecutable, requiresDeletionBeforeCreation, parameters + .isRsyncOverwrite()); } else { throw new ConfigurationFailureException("Unable to find a copy engine."); diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/rsync/RsyncCopier.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/rsync/RsyncCopier.java index 826e9cda6ded5254653b1d2741056b32857a1ad8..d736155cbdd79e8e1232eefd85c341048e20a9ac 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/rsync/RsyncCopier.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/rsync/RsyncCopier.java @@ -16,8 +16,6 @@ package ch.systemsx.cisd.datamover.filesystem.remote.rsync; -import static ch.systemsx.cisd.datamover.filesystem.remote.rsync.RsyncVersionChecker.getVersion; - import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -64,9 +62,13 @@ public class RsyncCopier implements IPathCopier private final String rsyncExecutable; + private final RsyncVersion rsyncVersion; + private final String sshExecutable; private final List<String> additionalCmdLineFlags; + + private final boolean overwrite; /** * If <code>true</code>, the file system of the destination directory requires that already existing files and @@ -91,15 +93,17 @@ public class RsyncCopier implements IPathCopier * paths). */ public RsyncCopier(File rsyncExecutable, File sshExecutable, - boolean destinationDirectoryRequiresDeletionBeforeCreation, String... cmdLineFlags) + boolean destinationDirectoryRequiresDeletionBeforeCreation, boolean overwrite, String... cmdLineFlags) { assert rsyncExecutable != null && rsyncExecutable.exists(); assert sshExecutable == null || rsyncExecutable.exists(); this.rsyncExecutable = rsyncExecutable.getAbsolutePath(); + this.rsyncVersion = RsyncVersionChecker.getVersion(rsyncExecutable.getAbsolutePath()); this.sshExecutable = (sshExecutable != null) ? sshExecutable.getPath() : null; this.destinationDirectoryRequiresDeletionBeforeCreation = destinationDirectoryRequiresDeletionBeforeCreation; this.copyProcessReference = new AtomicReference<Process>(null); + this.overwrite = overwrite; if (cmdLineFlags.length > 0) { this.additionalCmdLineFlags = Arrays.asList(cmdLineFlags); @@ -108,6 +112,18 @@ public class RsyncCopier implements IPathCopier this.additionalCmdLineFlags = null; } } + + private boolean rsyncSupportsAppend() + { + assert rsyncVersion != null; + + return rsyncVersion.isNewerOrEqual(2, 6, 7); + } + + private boolean isOverwriteMode() + { + return overwrite || destinationDirectoryRequiresDeletionBeforeCreation || (rsyncSupportsAppend() == false); + } public Status copy(File sourcePath, File destinationDirectory) { @@ -182,10 +198,17 @@ public class RsyncCopier implements IPathCopier assert (destinationHost != null && sshExecutable != null) || (destinationHost == null); assert (sourceHost != null && sshExecutable != null) || (sourceHost == null); - final List<String> standardParameters = Arrays.asList("--archive", "--delete", "--inplace", "--whole-file"); + final List<String> standardParameters = Arrays.asList("--archive", "--delete", "--inplace"); final List<String> commandLineList = new ArrayList<String>(); commandLineList.add(rsyncExecutable); commandLineList.addAll(standardParameters); + if (isOverwriteMode()) + { + commandLineList.add("--whole-file"); + } else + { + commandLineList.add("--append"); + } if (sshExecutable != null && destinationHost != null) { commandLineList.add("--rsh"); @@ -264,8 +287,7 @@ public class RsyncCopier implements IPathCopier private static void logRsyncExitValue(final Process copyProcess) { final int exitValue = copyProcess.exitValue(); - final List<String> processOutput = - ProcessExecutionHelper.readProcessOutputLines(copyProcess, machineLog); + final List<String> processOutput = ProcessExecutionHelper.readProcessOutputLines(copyProcess, machineLog); ProcessExecutionHelper.logProcessExecution("rsync", exitValue, processOutput, operationLog, machineLog); } @@ -297,8 +319,7 @@ public class RsyncCopier implements IPathCopier { machineLog.debug(String.format("Testing rsync executable '%s'", rsyncExecutable)); } - final RsyncVersion version = getVersion(rsyncExecutable); - if (version == null) + if (rsyncVersion == null) { if (OSUtilities.executableExists(rsyncExecutable)) { @@ -310,16 +331,16 @@ public class RsyncCopier implements IPathCopier rsyncExecutable)); } } - if (version.getMajorVersion() < 2 || (version.getMajorVersion() == 2 && version.getMinorVersion() < 6)) + if (rsyncVersion.isNewerOrEqual(2, 6, 0) == false) { throw new ConfigurationFailureException(String.format( - "Rsync executable '%s' is too old (required: 2.6.0, found: %s)", rsyncExecutable, version + "Rsync executable '%s' is too old (required: 2.6.0, found: %s)", rsyncExecutable, rsyncVersion .getVersionString())); } if (machineLog.isInfoEnabled()) { - machineLog.info(String.format("Using rsync executable '%s', version %s", rsyncExecutable, version - .getVersionString())); + machineLog.info(String.format("Using rsync executable '%s', version %s, mode: %s", rsyncExecutable, rsyncVersion + .getVersionString(), (isOverwriteMode() ? "overwrite" : "append"))); } } diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/rsync/RsyncVersionChecker.java b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/rsync/RsyncVersionChecker.java index dcc56a153cd6cde317f3f3c2d9c90d18637fdce5..666e17c4bf434eeb9bb2e952ab5abd8bc1ed60e2 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/rsync/RsyncVersionChecker.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/filesystem/remote/rsync/RsyncVersionChecker.java @@ -17,6 +17,7 @@ package ch.systemsx.cisd.datamover.filesystem.remote.rsync; import java.util.Arrays; +import java.util.List; import org.apache.log4j.Logger; @@ -101,6 +102,32 @@ final class RsyncVersionChecker { return rsyncPatchVersion; } + + /** + * @return <code>true</code>, if this version is newer or as new the minimal version specified. + */ + public boolean isNewerOrEqual(int minMajorVerson, int minMinorVersion, int minPatchLevel) + { + if (rsyncMajorVersion < minMajorVerson) + { + return false; + } else if (rsyncMajorVersion > minMajorVerson) + { + return true; + } else + { + if (rsyncMinorVersion < minMinorVersion) + { + return false; + } else if (rsyncMinorVersion > minMinorVersion) + { + return true; + } else + { + return (rsyncPatchVersion >= minPatchLevel); + } + } + } } @@ -115,7 +142,7 @@ final class RsyncVersionChecker { assert rsyncExecutable != null; - String rsyncVersion = getRsyncVersion(rsyncExecutable); + String rsyncVersion = tryGetRsyncVersion(rsyncExecutable); if (rsyncVersion == null) { return null; @@ -132,18 +159,23 @@ final class RsyncVersionChecker return new RsyncVersion(rsyncVersion, rsyncMajorVersion, rsyncMinorVersion, rsyncPatchVersion); } - private static String getRsyncVersion(String rsyncExecutableToCheck) + private static String tryGetRsyncVersion(String rsyncExecutableToCheck) { final long TIME_TO_WAIT_FOR_COMPLETION = 2 * 1000; final ProcessExecutionHelper.ProcessResult result = ProcessExecutionHelper.run(Arrays.asList(rsyncExecutableToCheck, "--version"), TIME_TO_WAIT_FOR_COMPLETION, operationLog, machineLog); result.log(); - final String versionString = extractRsyncVersion(result.getProcessOutput().get(0)); + final List<String> processOutput = result.getProcessOutput(); + if (processOutput.size() == 0) + { + return null; + } + final String versionString = tryExtractRsyncVersion(processOutput.get(0)); return versionString; } - private static String extractRsyncVersion(String rsyncVersionLine) + private static String tryExtractRsyncVersion(String rsyncVersionLine) { if (rsyncVersionLine.startsWith("rsync version") == false) { diff --git a/datamover/source/java/ch/systemsx/cisd/datamover/intf/IFileSysParameters.java b/datamover/source/java/ch/systemsx/cisd/datamover/intf/IFileSysParameters.java index d439e05b3f413ec8b157cd17b11d5d3d39a88260..d0d5d9442e375a27756d6d2f8681d2f5a0c165ec 100644 --- a/datamover/source/java/ch/systemsx/cisd/datamover/intf/IFileSysParameters.java +++ b/datamover/source/java/ch/systemsx/cisd/datamover/intf/IFileSysParameters.java @@ -31,6 +31,13 @@ public interface IFileSysParameters */ String getRsyncExecutable(); + /** + * @return <code>true</code>, if rsync is called in such a way to files that already exist are overwritten rather + * than appended to. + */ + + boolean isRsyncOverwrite(); + /** * The name of the <code>ssh</code> executable to use for creating tunnels. */ diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/ParametersTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/ParametersTest.java index 12923a4b596f5baa72ff58bcdaab67ec9b9b5497..04e5deda98f8f6b9d93072dda31f84174cced3a1 100644 --- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/ParametersTest.java +++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/ParametersTest.java @@ -17,6 +17,8 @@ package ch.systemsx.cisd.datamover; import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertFalse; +import static org.testng.AssertJUnit.assertTrue; import java.io.File; @@ -219,6 +221,20 @@ public class ParametersTest assertEquals(1000 * QUIET_PERIOD, parameters.getQuietPeriodMillis()); } + @Test + public void testDefaultRsyncOverwrite() + { + final Parameters parameters = parse(); + assertFalse(parameters.isRsyncOverwrite()); + } + + @Test + public void testSetRsyncOverwrite() + { + final Parameters parameters = parse("--rsync-overwrite"); + assertTrue(parameters.isRsyncOverwrite()); + } + @Test public void testSetMandatoryOptions() throws Exception { @@ -248,7 +264,7 @@ public class ParametersTest parse("--incoming-dir", LOCAL_DATADIR, "--buffer-dir", LOCAL_TEMPDIR, "--outgoing-dir", REMOTE_DATADIR, "--outgoing-host", REMOTE_HOST, "--check-interval", Integer.toString(CHECK_INTERVAL), "--quiet-period", Integer.toString(QUIET_PERIOD), "--treat-incoming-as-remote", - "--incoming-host", REMOTE_INCOMING_HOST, "--extra-copy-dir", EXTRA_COPY_DIR); + "--incoming-host", REMOTE_INCOMING_HOST, "--extra-copy-dir", EXTRA_COPY_DIR, "--rsync-overwrite"); assertEquals(LOCAL_DATADIR, parameters.getIncomingStore().getPath().getPath()); assertEquals(REMOTE_INCOMING_HOST, parameters.getIncomingStore().getHost()); assertEquals(LOCAL_TEMPDIR, parameters.getBufferStore().getPath().getPath()); @@ -258,6 +274,7 @@ public class ParametersTest assertEquals(1000 * CHECK_INTERVAL, parameters.getCheckIntervalMillis()); assertEquals(1000 * QUIET_PERIOD, parameters.getQuietPeriodMillis()); assertEquals(true, parameters.getTreatIncomingAsRemote()); + assertTrue(parameters.isRsyncOverwrite()); } } diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java index adb166e9d02b0e706ed0973593ac77e306efe913..bf49a0061cdb2a089edb51b45b5a0d7cec7fefe6 100644 --- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java +++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/CopyActivityMonitorTest.java @@ -106,6 +106,11 @@ public class CopyActivityMonitorTest { return null; } + + public boolean isRsyncOverwrite() + { + return false; + } }; this.impl = new FileSysOperationsFactory(dummyFileSysParameters).getReadPathOperations(); diff --git a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/rsync/RsyncCopierTest.java b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/rsync/RsyncCopierTest.java index d36522330ad3345e146dae24fc37eeab975337fd..0f341df87852c677961dd662a392e1ee5695d7b8 100644 --- a/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/rsync/RsyncCopierTest.java +++ b/datamover/sourceTest/java/ch/systemsx/cisd/datamover/filesystem/remote/rsync/RsyncCopierTest.java @@ -17,10 +17,14 @@ package ch.systemsx.cisd.datamover.filesystem.remote.rsync; import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertFalse; +import static org.testng.AssertJUnit.assertTrue; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; @@ -30,6 +34,7 @@ import ch.systemsx.cisd.common.exceptions.Status; import ch.systemsx.cisd.common.exceptions.StatusFlag; import ch.systemsx.cisd.common.logging.LogInitializer; import ch.systemsx.cisd.common.utilities.CollectionIO; +import ch.systemsx.cisd.common.utilities.FileUtilities; import ch.systemsx.cisd.common.utilities.StoringUncaughtExceptionHandler; import ch.systemsx.cisd.datamover.filesystem.remote.rsync.RsyncCopier; @@ -75,11 +80,15 @@ public class RsyncCopierTest destinationDirectory.deleteOnExit(); } - private File createRsync(String... lines) throws IOException, InterruptedException + private File createRsync(String rsyncVersion, String... additionalLines) throws IOException, InterruptedException { final File rsyncBinary = new File(workingDirectory, "rsync"); rsyncBinary.delete(); - CollectionIO.writeIterable(rsyncBinary, Arrays.asList(lines)); + final List<String> lines = new ArrayList<String>(); + lines.addAll(Arrays.asList("#! /bin/sh", "if [ \"$1\" = \"--version\" ]; then ", String.format( + " echo \"rsync version %s\"", rsyncVersion), "exit 0", "fi")); + lines.addAll(Arrays.asList(additionalLines)); + CollectionIO.writeIterable(rsyncBinary, lines); Runtime.getRuntime().exec(String.format("/bin/chmod +x %s", rsyncBinary.getPath())).waitFor(); rsyncBinary.deleteOnExit(); return rsyncBinary; @@ -87,7 +96,7 @@ public class RsyncCopierTest private File createRsync(int exitValue) throws IOException, InterruptedException { - return createRsync("#! /bin/sh", "exit " + exitValue); + return createRsync("2.6.9", "exit " + exitValue); } @Test(groups = @@ -95,7 +104,7 @@ public class RsyncCopierTest public void testRsyncOK() throws IOException, InterruptedException { final File dummyRsyncBinary = createRsync(0); - final RsyncCopier copier = new RsyncCopier(dummyRsyncBinary, null, false); + final RsyncCopier copier = new RsyncCopier(dummyRsyncBinary, null, false, false); Status status = copier.copy(sourceFile, destinationDirectory); assert Status.OK == status; } @@ -106,7 +115,7 @@ public class RsyncCopierTest { final int exitValue = 11; final File dummyRsyncBinary = createRsync(exitValue); - final RsyncCopier copier = new RsyncCopier(dummyRsyncBinary, null, false); + final RsyncCopier copier = new RsyncCopier(dummyRsyncBinary, null, false, false); Status status = copier.copy(sourceFile, destinationDirectory); assertEquals(StatusFlag.RETRIABLE_ERROR, status.getFlag()); assertEquals(RsyncExitValueTranslator.getMessage(exitValue), status.getMessage()); @@ -118,18 +127,60 @@ public class RsyncCopierTest { final int exitValue = 1; final File dummyRsyncBinary = createRsync(exitValue); - final RsyncCopier copier = new RsyncCopier(dummyRsyncBinary, null, false); + final RsyncCopier copier = new RsyncCopier(dummyRsyncBinary, null, false, false); Status status = copier.copy(sourceFile, destinationDirectory); assertEquals(StatusFlag.FATAL_ERROR, status.getFlag()); assertEquals(RsyncExitValueTranslator.getMessage(exitValue), status.getMessage()); } + @Test(groups = + { "requires_unix" }) + public void testRsyncAppendMode() throws IOException, InterruptedException + { + final File parametersLogFile = new File(workingDirectory, "parameters.log"); + final File loggingRsyncBinary = + createRsync("2.6.7", String.format("echo \"$@\" > %s", parametersLogFile.getAbsolutePath())); + final RsyncCopier copier = new RsyncCopier(loggingRsyncBinary, null, false, false); + copier.copy(sourceFile, destinationDirectory); + String rsyncParameters = FileUtilities.loadToString(parametersLogFile); + assertFalse(rsyncParameters.indexOf("--whole-file") >= 0); + assertTrue(rsyncParameters.indexOf("--append") >= 0); + } + + @Test(groups = + { "requires_unix" }) + public void testRsyncAppendModeWhenNotSupported() throws IOException, InterruptedException + { + final File parametersLogFile = new File(workingDirectory, "parameters.log"); + final File loggingRsyncBinary = + createRsync("2.6.6", String.format("echo \"$@\" > %s", parametersLogFile.getAbsolutePath())); + final RsyncCopier copier = new RsyncCopier(loggingRsyncBinary, null, false, false); + copier.copy(sourceFile, destinationDirectory); + String rsyncParameters = FileUtilities.loadToString(parametersLogFile); + assertTrue(rsyncParameters.indexOf("--whole-file") >= 0); + assertFalse(rsyncParameters.indexOf("--append") >= 0); + } + + @Test(groups = + { "requires_unix" }) + public void testRsyncOverwriteMode() throws IOException, InterruptedException + { + final File parametersLogFile = new File(workingDirectory, "parameters.log"); + final File loggingRsyncBinary = + createRsync("2.6.7", String.format("echo \"$@\" > %s", parametersLogFile.getAbsolutePath())); + final RsyncCopier copier = new RsyncCopier(loggingRsyncBinary, null, false, true); + copier.copy(sourceFile, destinationDirectory); + String rsyncParameters = FileUtilities.loadToString(parametersLogFile); + assertTrue(rsyncParameters.indexOf("--whole-file") >= 0); + assertFalse(rsyncParameters.indexOf("--append") >= 0); + } + @Test(groups = { "requires_unix" }) public void testRsyncTermination() throws IOException, InterruptedException { - final File sleepyRsyncBinary = createRsync("#! /bin/sh", "/bin/sleep 100"); - final RsyncCopier copier = new RsyncCopier(sleepyRsyncBinary, null, false); + final File sleepyRsyncBinary = createRsync("2.6.9", "/bin/sleep 100"); + final RsyncCopier copier = new RsyncCopier(sleepyRsyncBinary, null, false, false); (new Thread(new Runnable() { public void run()