Skip to content
Snippets Groups Projects
Commit 216bab5f authored by felmer's avatar felmer
Browse files

SSDM-2069: MonitoredIoStreamCopier extended to allow parallel reading/writing.

SVN: 34330
parent 57c9f6f5
No related branches found
No related tags found
No related merge requests found
...@@ -22,7 +22,10 @@ import java.io.OutputStream; ...@@ -22,7 +22,10 @@ import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel;
import ch.systemsx.cisd.common.exceptions.ConfigurationFailureException;
import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException; import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException;
import ch.systemsx.cisd.common.filesystem.FileUtilities; import ch.systemsx.cisd.common.filesystem.FileUtilities;
import ch.systemsx.cisd.common.logging.ISimpleLogger; import ch.systemsx.cisd.common.logging.ISimpleLogger;
...@@ -33,6 +36,8 @@ import ch.systemsx.cisd.common.utilities.SystemTimeProvider; ...@@ -33,6 +36,8 @@ import ch.systemsx.cisd.common.utilities.SystemTimeProvider;
/** /**
* Helper class which copies bytes from an {@link InputStream} to an {@link OutputStream}. * Helper class which copies bytes from an {@link InputStream} to an {@link OutputStream}.
* Copying is either done in a series of reading/writing a specified amount of bytes (i.e. bufferSize)
* or in parallel with an ad-hoc writing thread.
* If {@link ISimpleLogger} is injected (via {@link #setLogger(ISimpleLogger)} statistical information * If {@link ISimpleLogger} is injected (via {@link #setLogger(ISimpleLogger)} statistical information
* especially reading/writing speed will be logged after copying has been finished. * especially reading/writing speed will be logged after copying has been finished.
* <p> * <p>
...@@ -42,7 +47,7 @@ import ch.systemsx.cisd.common.utilities.SystemTimeProvider; ...@@ -42,7 +47,7 @@ import ch.systemsx.cisd.common.utilities.SystemTimeProvider;
*/ */
public class MonitoredIOStreamCopier public class MonitoredIOStreamCopier
{ {
private final byte[] buffer; private int bufferSize;
private ISimpleLogger logger; private ISimpleLogger logger;
...@@ -51,10 +56,37 @@ public class MonitoredIOStreamCopier ...@@ -51,10 +56,37 @@ public class MonitoredIOStreamCopier
private Statistics readStatistics; private Statistics readStatistics;
private Statistics writeStatistics; private Statistics writeStatistics;
private LinkedBlockingQueue<WritingItem> queue;
private Throwable writingException;
private Thread writingThread;
/**
* Creates an instance for copying in a series of reading/writing chunks of specified size.
*/
public MonitoredIOStreamCopier(int bufferSize) public MonitoredIOStreamCopier(int bufferSize)
{ {
buffer = new byte[bufferSize]; this(bufferSize, null);
}
/**
* Creates an instance for copying in chunks of specified size in parallel using a queue of specified size.
*/
public MonitoredIOStreamCopier(int bufferSize, Long maxQueueSizeInBytes)
{
this.bufferSize = bufferSize;
if (maxQueueSizeInBytes != null)
{
long maxQueueSize = maxQueueSizeInBytes / bufferSize;
if (maxQueueSize < 1)
{
throw new ConfigurationFailureException("Maximum queue size "
+ maxQueueSize + " should be larger than buffer size " + bufferSize + ".");
}
queue = new LinkedBlockingQueue<WritingItem>((int) maxQueueSize);
}
} }
public void setLogger(ISimpleLogger logger) public void setLogger(ISimpleLogger logger)
...@@ -62,8 +94,8 @@ public class MonitoredIOStreamCopier ...@@ -62,8 +94,8 @@ public class MonitoredIOStreamCopier
this.logger = logger; this.logger = logger;
if (logger != null) if (logger != null)
{ {
readStatistics = new Statistics(buffer.length); readStatistics = new Statistics(bufferSize);
writeStatistics = new Statistics(buffer.length); writeStatistics = new Statistics(bufferSize);
} }
} }
...@@ -77,14 +109,18 @@ public class MonitoredIOStreamCopier ...@@ -77,14 +109,18 @@ public class MonitoredIOStreamCopier
long totalNumberOfBytes = 0; long totalNumberOfBytes = 0;
try try
{ {
startWritingThread();
int numberOfBytes = 0; int numberOfBytes = 0;
while (-1 != (numberOfBytes = readBytes(input))) byte[] buffer = new byte[bufferSize];
while (-1 != (numberOfBytes = readBytes(input, buffer)))
{ {
writeBytes(output, numberOfBytes); writeBytes(output, numberOfBytes, buffer);
totalNumberOfBytes += numberOfBytes; totalNumberOfBytes += numberOfBytes;
buffer = new byte[bufferSize];
} }
waitOnFinished();
return totalNumberOfBytes; return totalNumberOfBytes;
} catch (IOException ex) } catch (Throwable ex)
{ {
throw new EnvironmentFailureException("Error after " + totalNumberOfBytes throw new EnvironmentFailureException("Error after " + totalNumberOfBytes
+ " bytes copied: " + ex, ex); + " bytes copied: " + ex, ex);
...@@ -100,7 +136,7 @@ public class MonitoredIOStreamCopier ...@@ -100,7 +136,7 @@ public class MonitoredIOStreamCopier
} }
} }
private int readBytes(InputStream input) throws IOException private int readBytes(InputStream input, byte[] buffer) throws IOException
{ {
long t0 = startIO(); long t0 = startIO();
int numberOfBytes = input.read(buffer); int numberOfBytes = input.read(buffer);
...@@ -108,11 +144,92 @@ public class MonitoredIOStreamCopier ...@@ -108,11 +144,92 @@ public class MonitoredIOStreamCopier
return numberOfBytes; return numberOfBytes;
} }
private void writeBytes(OutputStream output, int numberOfBytes) throws IOException private void writeBytes(OutputStream output, int numberOfBytes, byte[] buffer) throws Throwable
{ {
long t0 = startIO(); WritingItem writingItem = new WritingItem(output, buffer, numberOfBytes);
output.write(buffer, 0, numberOfBytes); if (queue == null)
recordTime(numberOfBytes, t0, writeStatistics); {
writingItem.write();
} else
{
addToQueue(writingItem);
}
}
private void startWritingThread()
{
if (queue == null)
{
return;
}
writingThread = new Thread(new Runnable()
{
@Override
public void run()
{
synchronized (queue)
{
while (true)
{
try
{
WritingItem writingItem = queue.take();
if (writingItem.data == null)
{
break;
}
writingItem.write();
} catch (InterruptedException ex)
{
// silently ignored
} catch (Throwable ex)
{
writingException = ex;
break;
}
}
}
}
});
writingThread.start();
}
private void waitOnFinished() throws Throwable
{
if (queue == null)
{
return;
}
addToQueue(new WritingItem(null, null, 0));
synchronized (queue)
{
try
{
writingThread.join();
} catch (InterruptedException ex)
{
// silently ignored
}
if (writingException != null)
{
throw writingException;
}
}
}
private void addToQueue(WritingItem writingItem) throws Throwable
{
try
{
if (writingException != null)
{
throw writingException;
}
queue.put(writingItem);
} catch (InterruptedException ex)
{
throw CheckedExceptionTunnel.wrapIfNecessary(ex);
}
} }
private long startIO() private long startIO()
...@@ -128,6 +245,27 @@ public class MonitoredIOStreamCopier ...@@ -128,6 +245,27 @@ public class MonitoredIOStreamCopier
} }
} }
private final class WritingItem
{
private OutputStream output;
private byte[] data;
private int numberOfBytes;
public WritingItem(OutputStream output, byte[] data, int numberOfBytes)
{
this.output = output;
this.data = data;
this.numberOfBytes = numberOfBytes;
}
public void write() throws IOException
{
long t0 = startIO();
output.write(data, 0, numberOfBytes);
recordTime(numberOfBytes, t0, writeStatistics);
}
}
private static final class Statistics private static final class Statistics
{ {
private final List<NumberOfBytesAndIOTime> data = new ArrayList<NumberOfBytesAndIOTime>(); private final List<NumberOfBytesAndIOTime> data = new ArrayList<NumberOfBytesAndIOTime>();
......
...@@ -18,12 +18,16 @@ package ch.systemsx.cisd.common.io; ...@@ -18,12 +18,16 @@ package ch.systemsx.cisd.common.io;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.testng.AssertJUnit; import org.testng.AssertJUnit;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import ch.systemsx.cisd.common.exceptions.EnvironmentFailureException;
import ch.systemsx.cisd.common.logging.MockLogger; import ch.systemsx.cisd.common.logging.MockLogger;
import ch.systemsx.cisd.common.utilities.ITimeAndWaitingProvider;
import ch.systemsx.cisd.common.utilities.ITimeProvider; import ch.systemsx.cisd.common.utilities.ITimeProvider;
import ch.systemsx.cisd.common.utilities.MockTimeProvider; import ch.systemsx.cisd.common.utilities.MockTimeProvider;
...@@ -46,7 +50,7 @@ public class MonitoredIOStreamCopierTest extends AssertJUnit ...@@ -46,7 +50,7 @@ public class MonitoredIOStreamCopierTest extends AssertJUnit
public void testCopyingInThreeChunks() public void testCopyingInThreeChunks()
{ {
MonitoredIOStreamCopier copier = new MonitoredIOStreamCopier((int) FileUtils.ONE_MB); MonitoredIOStreamCopier copier = new MonitoredIOStreamCopier((int) FileUtils.ONE_MB);
ITimeProvider timeProvider = new MockTimeProvider(123456789, 1125, 0, 2500, 0, 3400, 0, 6790, ITimeProvider timeProvider = new MultiThreadMockTimeProvider(123456789, 1125, 0, 2500, 0, 3400, 0, 6790,
0, 700, 0, 1400); 0, 700, 0, 1400);
copier.setTimeProvider(timeProvider); copier.setTimeProvider(timeProvider);
MockLogger logger = new MockLogger(); MockLogger logger = new MockLogger();
...@@ -65,7 +69,7 @@ public class MonitoredIOStreamCopierTest extends AssertJUnit ...@@ -65,7 +69,7 @@ public class MonitoredIOStreamCopierTest extends AssertJUnit
public void testCopyingInFourChunks() public void testCopyingInFourChunks()
{ {
MonitoredIOStreamCopier copier = new MonitoredIOStreamCopier((int) FileUtils.ONE_MB); MonitoredIOStreamCopier copier = new MonitoredIOStreamCopier((int) FileUtils.ONE_MB);
ITimeProvider timeProvider = new MockTimeProvider(123456789, 1125, 0, 2500, 0, 3400, 0, 6790, ITimeProvider timeProvider = new MultiThreadMockTimeProvider(123456789, 1125, 0, 2500, 0, 3400, 0, 6790,
0, 0, 0, 5700, 0, 7750, 0, 700, 0, 1400); 0, 0, 0, 5700, 0, 7750, 0, 700, 0, 1400);
copier.setTimeProvider(timeProvider); copier.setTimeProvider(timeProvider);
MockLogger logger = new MockLogger(); MockLogger logger = new MockLogger();
...@@ -85,7 +89,7 @@ public class MonitoredIOStreamCopierTest extends AssertJUnit ...@@ -85,7 +89,7 @@ public class MonitoredIOStreamCopierTest extends AssertJUnit
public void testCopyingWithChunksToSmallForMedianCalculation() public void testCopyingWithChunksToSmallForMedianCalculation()
{ {
MonitoredIOStreamCopier copier = new MonitoredIOStreamCopier((int) FileUtils.ONE_MB); MonitoredIOStreamCopier copier = new MonitoredIOStreamCopier((int) FileUtils.ONE_MB);
ITimeProvider timeProvider = new MockTimeProvider(123456789, 1125, 0, 2500, 0, 0, 0, 3400, 0, 6790); ITimeProvider timeProvider = new MultiThreadMockTimeProvider(123456789, 1125, 0, 2500, 0, 0, 0, 3400, 0, 6790);
copier.setTimeProvider(timeProvider); copier.setTimeProvider(timeProvider);
MockLogger logger = new MockLogger(); MockLogger logger = new MockLogger();
copier.setLogger(logger); copier.setLogger(logger);
...@@ -104,7 +108,7 @@ public class MonitoredIOStreamCopierTest extends AssertJUnit ...@@ -104,7 +108,7 @@ public class MonitoredIOStreamCopierTest extends AssertJUnit
public void testCopyingWithChunksToSmallForAverageCalculation() public void testCopyingWithChunksToSmallForAverageCalculation()
{ {
MonitoredIOStreamCopier copier = new MonitoredIOStreamCopier((int) FileUtils.ONE_MB); MonitoredIOStreamCopier copier = new MonitoredIOStreamCopier((int) FileUtils.ONE_MB);
ITimeProvider timeProvider = new MockTimeProvider(123456789, 1125, 0, 2500, 0, 0, 0, 3400, 0, 6790); ITimeProvider timeProvider = new MultiThreadMockTimeProvider(123456789, 1125, 0, 2500, 0, 0, 0, 3400, 0, 6790);
copier.setTimeProvider(timeProvider); copier.setTimeProvider(timeProvider);
MockLogger logger = new MockLogger(); MockLogger logger = new MockLogger();
copier.setLogger(logger); copier.setLogger(logger);
...@@ -118,6 +122,65 @@ public class MonitoredIOStreamCopierTest extends AssertJUnit ...@@ -118,6 +122,65 @@ public class MonitoredIOStreamCopierTest extends AssertJUnit
logger.toString()); logger.toString());
} }
@Test
public void testCopyingInThreeChunksWithQueue()
{
MonitoredIOStreamCopier copier = new MonitoredIOStreamCopier((int) FileUtils.ONE_MB, 5 * FileUtils.ONE_MB);
MultiThreadMockTimeProvider timeProvider = new MultiThreadMockTimeProvider(123456789, 1125, 0, 3400, 0, 700);
timeProvider.defineMockTimeProviderForThread(null, 123456789, 2500, 0, 6790, 0, 1400);
copier.setTimeProvider(timeProvider);
MockLogger logger = new MockLogger();
copier.setLogger(logger);
copy(copier, (int) (2.5 * FileUtils.ONE_MB));
copier.close();
assertEquals("INFO: Reading statistics for input stream: 2.50 MB in 3 chunks took 5sec. "
+ "Average speed: 489.95 KB/sec. Median speed: 605.70 KB/sec.\n"
+ "INFO: Writing statistics for output stream: 2.50 MB in 3 chunks took 11sec. "
+ "Average speed: 239.48 KB/sec. Median speed: 280.21 KB/sec.\n", logger.toString());
}
@Test
public void testCopyingInThreeChunksWithLimitedQueue()
{
MonitoredIOStreamCopier copier = new MonitoredIOStreamCopier((int) FileUtils.ONE_MB, 2 * FileUtils.ONE_MB);
MultiThreadMockTimeProvider timeProvider = new MultiThreadMockTimeProvider(123456789, 1125, 0, 3400, 0, 700);
timeProvider.defineMockTimeProviderForThread(null, 123456789, 2500, 0, 6790, 0, 1400);
copier.setTimeProvider(timeProvider);
MockLogger logger = new MockLogger();
copier.setLogger(logger);
copy(copier, (int) (2.5 * FileUtils.ONE_MB));
copier.close();
assertEquals("INFO: Reading statistics for input stream: 2.50 MB in 3 chunks took 5sec. "
+ "Average speed: 489.95 KB/sec. Median speed: 605.70 KB/sec.\n"
+ "INFO: Writing statistics for output stream: 2.50 MB in 3 chunks took 11sec. "
+ "Average speed: 239.48 KB/sec. Median speed: 280.21 KB/sec.\n", logger.toString());
}
@Test
public void testCopyingInThreeChunksWithLimitedQueueFails()
{
MonitoredIOStreamCopier copier = new MonitoredIOStreamCopier((int) FileUtils.ONE_MB, 2 * FileUtils.ONE_MB);
MultiThreadMockTimeProvider timeProvider = new MultiThreadMockTimeProvider(123456789, 1125, 0, 3400, 0, 700);
timeProvider.defineMockTimeProviderForThread(null, -1, 2500, 0, 6790, 0, 1400);
copier.setTimeProvider(timeProvider);
MockLogger logger = new MockLogger();
copier.setLogger(logger);
try
{
copy(copier, (int) (2.5 * FileUtils.ONE_MB));
fail("EnvironmentFailureException expected");
} catch (EnvironmentFailureException ex)
{
assertEquals("writing error", ex.getCause().getMessage());
}
copier.close();
}
private void copy(MonitoredIOStreamCopier copier, int numberOfBytes) private void copy(MonitoredIOStreamCopier copier, int numberOfBytes)
{ {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
...@@ -144,5 +207,52 @@ public class MonitoredIOStreamCopierTest extends AssertJUnit ...@@ -144,5 +207,52 @@ public class MonitoredIOStreamCopierTest extends AssertJUnit
} }
assertEquals(numberOfBytes, data.length); assertEquals(numberOfBytes, data.length);
} }
private static final class MultiThreadMockTimeProvider implements ITimeAndWaitingProvider
{
private Map<Thread, MockTimeProvider> timeProvidersByThread = new HashMap<Thread, MockTimeProvider>();
private MockTimeProvider timeProvider;
public MultiThreadMockTimeProvider(long startTime, long... timeSteps)
{
timeProvidersByThread.put(Thread.currentThread(), new MockTimeProvider(startTime, timeSteps));
}
void defineMockTimeProviderForThread(Thread threadOrNull, long startTime, long... timeSteps)
{
MockTimeProvider mockTimeProvider = new MockTimeProvider(startTime, timeSteps);
if (threadOrNull == null)
{
timeProvider = mockTimeProvider;
} else
{
timeProvidersByThread.put(threadOrNull, mockTimeProvider);
}
}
@Override
public long getTimeInMilliseconds()
{
long timeInMilliseconds = getTimeProvider().getTimeInMilliseconds();
if (timeInMilliseconds < 0)
{
throw new RuntimeException("writing error");
}
return timeInMilliseconds;
}
@Override
public void sleep(long milliseconds)
{
getTimeProvider().sleep(milliseconds);
}
private ITimeAndWaitingProvider getTimeProvider()
{
MockTimeProvider provider = timeProvidersByThread.get(Thread.currentThread());
return provider == null ? timeProvider : provider;
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment