diff --git a/common/source/java/ch/systemsx/cisd/common/io/MonitoredIOStreamCopier.java b/common/source/java/ch/systemsx/cisd/common/io/MonitoredIOStreamCopier.java index a840a283df8fcd29838dc6bcaf74d1327b00cb00..646b5dd814df1d0f9fa2672eeca507d997d58d3d 100644 --- a/common/source/java/ch/systemsx/cisd/common/io/MonitoredIOStreamCopier.java +++ b/common/source/java/ch/systemsx/cisd/common/io/MonitoredIOStreamCopier.java @@ -22,7 +22,10 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel; +import ch.systemsx.cisd.common.exceptions.ConfigurationFailureException; import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException; import ch.systemsx.cisd.common.filesystem.FileUtilities; import ch.systemsx.cisd.common.logging.ISimpleLogger; @@ -33,6 +36,8 @@ import ch.systemsx.cisd.common.utilities.SystemTimeProvider; /** * Helper class which copies bytes from an {@link InputStream} to an {@link OutputStream}. + * Copying is either done in a series of reading/writing a specified amount of bytes (i.e. bufferSize) + * or in parallel with an ad-hoc writing thread. * If {@link ISimpleLogger} is injected (via {@link #setLogger(ISimpleLogger)} statistical information * especially reading/writing speed will be logged after copying has been finished. * <p> @@ -42,7 +47,7 @@ import ch.systemsx.cisd.common.utilities.SystemTimeProvider; */ public class MonitoredIOStreamCopier { - private final byte[] buffer; + private int bufferSize; private ISimpleLogger logger; @@ -51,10 +56,37 @@ public class MonitoredIOStreamCopier private Statistics readStatistics; private Statistics writeStatistics; + + private LinkedBlockingQueue<WritingItem> queue; + private Throwable writingException; + + private Thread writingThread; + + /** + * Creates an instance for copying in a series of reading/writing chunks of specified size. + */ public MonitoredIOStreamCopier(int bufferSize) { - buffer = new byte[bufferSize]; + this(bufferSize, null); + } + + /** + * Creates an instance for copying in chunks of specified size in parallel using a queue of specified size. + */ + public MonitoredIOStreamCopier(int bufferSize, Long maxQueueSizeInBytes) + { + this.bufferSize = bufferSize; + if (maxQueueSizeInBytes != null) + { + long maxQueueSize = maxQueueSizeInBytes / bufferSize; + if (maxQueueSize < 1) + { + throw new ConfigurationFailureException("Maximum queue size " + + maxQueueSize + " should be larger than buffer size " + bufferSize + "."); + } + queue = new LinkedBlockingQueue<WritingItem>((int) maxQueueSize); + } } public void setLogger(ISimpleLogger logger) @@ -62,8 +94,8 @@ public class MonitoredIOStreamCopier this.logger = logger; if (logger != null) { - readStatistics = new Statistics(buffer.length); - writeStatistics = new Statistics(buffer.length); + readStatistics = new Statistics(bufferSize); + writeStatistics = new Statistics(bufferSize); } } @@ -77,14 +109,18 @@ public class MonitoredIOStreamCopier long totalNumberOfBytes = 0; try { + startWritingThread(); int numberOfBytes = 0; - while (-1 != (numberOfBytes = readBytes(input))) + byte[] buffer = new byte[bufferSize]; + while (-1 != (numberOfBytes = readBytes(input, buffer))) { - writeBytes(output, numberOfBytes); + writeBytes(output, numberOfBytes, buffer); totalNumberOfBytes += numberOfBytes; + buffer = new byte[bufferSize]; } + waitOnFinished(); return totalNumberOfBytes; - } catch (IOException ex) + } catch (Throwable ex) { throw new EnvironmentFailureException("Error after " + totalNumberOfBytes + " bytes copied: " + ex, ex); @@ -100,7 +136,7 @@ public class MonitoredIOStreamCopier } } - private int readBytes(InputStream input) throws IOException + private int readBytes(InputStream input, byte[] buffer) throws IOException { long t0 = startIO(); int numberOfBytes = input.read(buffer); @@ -108,11 +144,92 @@ public class MonitoredIOStreamCopier return numberOfBytes; } - private void writeBytes(OutputStream output, int numberOfBytes) throws IOException + private void writeBytes(OutputStream output, int numberOfBytes, byte[] buffer) throws Throwable { - long t0 = startIO(); - output.write(buffer, 0, numberOfBytes); - recordTime(numberOfBytes, t0, writeStatistics); + WritingItem writingItem = new WritingItem(output, buffer, numberOfBytes); + if (queue == null) + { + writingItem.write(); + } else + { + addToQueue(writingItem); + } + } + + private void startWritingThread() + { + if (queue == null) + { + return; + } + writingThread = new Thread(new Runnable() + { + @Override + public void run() + { + synchronized (queue) + { + while (true) + { + try + { + WritingItem writingItem = queue.take(); + if (writingItem.data == null) + { + break; + } + writingItem.write(); + } catch (InterruptedException ex) + { + // silently ignored + } catch (Throwable ex) + { + writingException = ex; + break; + } + } + } + } + }); + writingThread.start(); + } + + private void waitOnFinished() throws Throwable + { + if (queue == null) + { + return; + } + addToQueue(new WritingItem(null, null, 0)); + synchronized (queue) + { + try + { + writingThread.join(); + } catch (InterruptedException ex) + { + // silently ignored + } + if (writingException != null) + { + throw writingException; + } + } + } + + private void addToQueue(WritingItem writingItem) throws Throwable + { + try + { + if (writingException != null) + { + throw writingException; + } + queue.put(writingItem); + } catch (InterruptedException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } } private long startIO() @@ -128,6 +245,27 @@ public class MonitoredIOStreamCopier } } + private final class WritingItem + { + private OutputStream output; + private byte[] data; + private int numberOfBytes; + + public WritingItem(OutputStream output, byte[] data, int numberOfBytes) + { + this.output = output; + this.data = data; + this.numberOfBytes = numberOfBytes; + } + + public void write() throws IOException + { + long t0 = startIO(); + output.write(data, 0, numberOfBytes); + recordTime(numberOfBytes, t0, writeStatistics); + } + } + private static final class Statistics { private final List<NumberOfBytesAndIOTime> data = new ArrayList<NumberOfBytesAndIOTime>(); diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/io/MonitoredIOStreamCopierTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/io/MonitoredIOStreamCopierTest.java index 3420f1897d2d4d293f8f6f04937dc88ec7ae65fa..5f8736af5787c90b5ddec67779039d9104ba99af 100644 --- a/common/sourceTest/java/ch/systemsx/cisd/common/io/MonitoredIOStreamCopierTest.java +++ b/common/sourceTest/java/ch/systemsx/cisd/common/io/MonitoredIOStreamCopierTest.java @@ -18,12 +18,16 @@ package ch.systemsx.cisd.common.io; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.util.HashMap; +import java.util.Map; import org.apache.commons.io.FileUtils; import org.testng.AssertJUnit; import org.testng.annotations.Test; +import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException; import ch.systemsx.cisd.common.logging.MockLogger; +import ch.systemsx.cisd.common.utilities.ITimeAndWaitingProvider; import ch.systemsx.cisd.common.utilities.ITimeProvider; import ch.systemsx.cisd.common.utilities.MockTimeProvider; @@ -46,7 +50,7 @@ public class MonitoredIOStreamCopierTest extends AssertJUnit public void testCopyingInThreeChunks() { MonitoredIOStreamCopier copier = new MonitoredIOStreamCopier((int) FileUtils.ONE_MB); - ITimeProvider timeProvider = new MockTimeProvider(123456789, 1125, 0, 2500, 0, 3400, 0, 6790, + ITimeProvider timeProvider = new MultiThreadMockTimeProvider(123456789, 1125, 0, 2500, 0, 3400, 0, 6790, 0, 700, 0, 1400); copier.setTimeProvider(timeProvider); MockLogger logger = new MockLogger(); @@ -65,7 +69,7 @@ public class MonitoredIOStreamCopierTest extends AssertJUnit public void testCopyingInFourChunks() { MonitoredIOStreamCopier copier = new MonitoredIOStreamCopier((int) FileUtils.ONE_MB); - ITimeProvider timeProvider = new MockTimeProvider(123456789, 1125, 0, 2500, 0, 3400, 0, 6790, + ITimeProvider timeProvider = new MultiThreadMockTimeProvider(123456789, 1125, 0, 2500, 0, 3400, 0, 6790, 0, 0, 0, 5700, 0, 7750, 0, 700, 0, 1400); copier.setTimeProvider(timeProvider); MockLogger logger = new MockLogger(); @@ -85,7 +89,7 @@ public class MonitoredIOStreamCopierTest extends AssertJUnit public void testCopyingWithChunksToSmallForMedianCalculation() { MonitoredIOStreamCopier copier = new MonitoredIOStreamCopier((int) FileUtils.ONE_MB); - ITimeProvider timeProvider = new MockTimeProvider(123456789, 1125, 0, 2500, 0, 0, 0, 3400, 0, 6790); + ITimeProvider timeProvider = new MultiThreadMockTimeProvider(123456789, 1125, 0, 2500, 0, 0, 0, 3400, 0, 6790); copier.setTimeProvider(timeProvider); MockLogger logger = new MockLogger(); copier.setLogger(logger); @@ -104,7 +108,7 @@ public class MonitoredIOStreamCopierTest extends AssertJUnit public void testCopyingWithChunksToSmallForAverageCalculation() { MonitoredIOStreamCopier copier = new MonitoredIOStreamCopier((int) FileUtils.ONE_MB); - ITimeProvider timeProvider = new MockTimeProvider(123456789, 1125, 0, 2500, 0, 0, 0, 3400, 0, 6790); + ITimeProvider timeProvider = new MultiThreadMockTimeProvider(123456789, 1125, 0, 2500, 0, 0, 0, 3400, 0, 6790); copier.setTimeProvider(timeProvider); MockLogger logger = new MockLogger(); copier.setLogger(logger); @@ -118,6 +122,65 @@ public class MonitoredIOStreamCopierTest extends AssertJUnit logger.toString()); } + @Test + public void testCopyingInThreeChunksWithQueue() + { + MonitoredIOStreamCopier copier = new MonitoredIOStreamCopier((int) FileUtils.ONE_MB, 5 * FileUtils.ONE_MB); + MultiThreadMockTimeProvider timeProvider = new MultiThreadMockTimeProvider(123456789, 1125, 0, 3400, 0, 700); + timeProvider.defineMockTimeProviderForThread(null, 123456789, 2500, 0, 6790, 0, 1400); + copier.setTimeProvider(timeProvider); + MockLogger logger = new MockLogger(); + copier.setLogger(logger); + + copy(copier, (int) (2.5 * FileUtils.ONE_MB)); + copier.close(); + + assertEquals("INFO: Reading statistics for input stream: 2.50 MB in 3 chunks took 5sec. " + + "Average speed: 489.95 KB/sec. Median speed: 605.70 KB/sec.\n" + + "INFO: Writing statistics for output stream: 2.50 MB in 3 chunks took 11sec. " + + "Average speed: 239.48 KB/sec. Median speed: 280.21 KB/sec.\n", logger.toString()); + } + + @Test + public void testCopyingInThreeChunksWithLimitedQueue() + { + MonitoredIOStreamCopier copier = new MonitoredIOStreamCopier((int) FileUtils.ONE_MB, 2 * FileUtils.ONE_MB); + MultiThreadMockTimeProvider timeProvider = new MultiThreadMockTimeProvider(123456789, 1125, 0, 3400, 0, 700); + timeProvider.defineMockTimeProviderForThread(null, 123456789, 2500, 0, 6790, 0, 1400); + copier.setTimeProvider(timeProvider); + MockLogger logger = new MockLogger(); + copier.setLogger(logger); + + copy(copier, (int) (2.5 * FileUtils.ONE_MB)); + copier.close(); + + assertEquals("INFO: Reading statistics for input stream: 2.50 MB in 3 chunks took 5sec. " + + "Average speed: 489.95 KB/sec. Median speed: 605.70 KB/sec.\n" + + "INFO: Writing statistics for output stream: 2.50 MB in 3 chunks took 11sec. " + + "Average speed: 239.48 KB/sec. Median speed: 280.21 KB/sec.\n", logger.toString()); + } + + @Test + public void testCopyingInThreeChunksWithLimitedQueueFails() + { + MonitoredIOStreamCopier copier = new MonitoredIOStreamCopier((int) FileUtils.ONE_MB, 2 * FileUtils.ONE_MB); + MultiThreadMockTimeProvider timeProvider = new MultiThreadMockTimeProvider(123456789, 1125, 0, 3400, 0, 700); + timeProvider.defineMockTimeProviderForThread(null, -1, 2500, 0, 6790, 0, 1400); + copier.setTimeProvider(timeProvider); + MockLogger logger = new MockLogger(); + copier.setLogger(logger); + + try + { + copy(copier, (int) (2.5 * FileUtils.ONE_MB)); + fail("EnvironmentFailureException expected"); + } catch (EnvironmentFailureException ex) + { + assertEquals("writing error", ex.getCause().getMessage()); + } + copier.close(); + } + private void copy(MonitoredIOStreamCopier copier, int numberOfBytes) { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); @@ -144,5 +207,52 @@ public class MonitoredIOStreamCopierTest extends AssertJUnit } assertEquals(numberOfBytes, data.length); } + + private static final class MultiThreadMockTimeProvider implements ITimeAndWaitingProvider + { + private Map<Thread, MockTimeProvider> timeProvidersByThread = new HashMap<Thread, MockTimeProvider>(); + + private MockTimeProvider timeProvider; + + public MultiThreadMockTimeProvider(long startTime, long... timeSteps) + { + timeProvidersByThread.put(Thread.currentThread(), new MockTimeProvider(startTime, timeSteps)); + } + + void defineMockTimeProviderForThread(Thread threadOrNull, long startTime, long... timeSteps) + { + MockTimeProvider mockTimeProvider = new MockTimeProvider(startTime, timeSteps); + if (threadOrNull == null) + { + timeProvider = mockTimeProvider; + } else + { + timeProvidersByThread.put(threadOrNull, mockTimeProvider); + } + } + + @Override + public long getTimeInMilliseconds() + { + long timeInMilliseconds = getTimeProvider().getTimeInMilliseconds(); + if (timeInMilliseconds < 0) + { + throw new RuntimeException("writing error"); + } + return timeInMilliseconds; + } + + @Override + public void sleep(long milliseconds) + { + getTimeProvider().sleep(milliseconds); + } + + private ITimeAndWaitingProvider getTimeProvider() + { + MockTimeProvider provider = timeProvidersByThread.get(Thread.currentThread()); + return provider == null ? timeProvider : provider; + } + } }