diff --git a/common/source/java/ch/systemsx/cisd/common/io/ConcatFileInputStream.java b/common/source/java/ch/systemsx/cisd/common/io/ConcatFileInputStream.java new file mode 100644 index 0000000000000000000000000000000000000000..922245747f93daf3c803cb056f53340c6f39eb2a --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/io/ConcatFileInputStream.java @@ -0,0 +1,177 @@ +/* + * Copyright 2010 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.io; + +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +/** + * Special <code>InputStream</code> that will concatenate the contents of an array of files into one + * stream. Content of each file will be preceded by one long which tells what is the size of the + * file in bytes.<BR> + * + * @author Tomasz Pylak + */ +public class ConcatFileInputStream extends InputStream +{ + private static final int EOF = -1; + + private int currentIndex = -1; + + private boolean eof = false; + + private File[] files; + + // If true then currentStream is not the content of the file, but a short stream + // which encodes one long number which describes file size. + // This stream is created every time before a new file content is appended to the stream. + private boolean readingFileSize; + + private InputStream currentStream; + + /** + * @files content of these files will be concatenated into one stream. + */ + public ConcatFileInputStream(File... files) + { + this.files = files; + this.readingFileSize = false; + } + + /** + * @files content of these files will be concatenated into one stream. + */ + public ConcatFileInputStream(List<File> files) + { + this(files.toArray(new File[files.size()])); + } + + @Override + public void close() throws IOException + { + closeCurrentStream(); + eof = true; + } + + @Override + public int read() throws IOException + { + int result = readCurrent(); + // we have a loop instead of an if to ignore files which are empty + while (result == EOF && !eof) + { + closeCurrentStream(); + if (readingFileSize) + { + currentStream = createFileStream(getCurrentFile()); + readingFileSize = false; + } else + { + File nextFile = tryGetNextFile(); + if (nextFile == null) + { + eof = true; + return EOF; + } + currentStream = createFileSizeStream(nextFile); + readingFileSize = true; + } + result = currentStream.read(); + } + return result; + } + + private File getCurrentFile() + { + return files[currentIndex]; + } + + private int readCurrent() throws IOException + { + return (eof || currentStream == null) ? EOF : currentStream.read(); + } + + // returns the next file to read + private File tryGetNextFile() throws IOException + { + currentIndex++; + if (files != null && currentIndex < files.length) + { + return getCurrentFile(); + } else + { + return null; + } + } + + private void closeCurrentStream() + { + close(currentStream); + currentStream = null; + } + + // -------------- static helper --------------- + + private static InputStream createFileStream(File currentFile) throws FileNotFoundException + { + return new BufferedInputStream(new FileInputStream(currentFile)); + } + + private static InputStream createFileSizeStream(File file) throws IOException + { + long fileSize = file.length(); + byte[] data = longToBytes(fileSize); + return new ByteArrayInputStream(data); + } + + private static byte[] longToBytes(long fileSize) throws IOException + { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + dos.writeLong(fileSize); + dos.flush(); + byte[] data = bos.toByteArray(); + dos.close(); + return data; + } + + /** + * Close a stream without throwing any exception if something went wrong. Do not attempt to + * close it if the argument is null. + */ + private static void close(InputStream streamOrNull) + { + if (streamOrNull != null) + { + try + { + streamOrNull.close(); + } catch (IOException ioex) + { + // ignore + } + } + } +} diff --git a/common/source/java/ch/systemsx/cisd/common/io/ConcatFileOutputStreamWriter.java b/common/source/java/ch/systemsx/cisd/common/io/ConcatFileOutputStreamWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..c459604005bf395c3beaca35d3ceeb751887aae4 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/io/ConcatFileOutputStreamWriter.java @@ -0,0 +1,171 @@ +/* + * Copyright 2010 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.io; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * A helper class which allows to separate multiple blocks written to one stream in a format: + * (<block-size><block-of-bytes>)* where block-size is the long number. Useful to parse the content + * of ConcatFileInputStream. Allocates only small constant amount of memory. + * + * @author Tomasz Pylak + */ +class ConcatFileOutputStreamWriter +{ + private static final int DEFAULT_BUFFER_SIZE = 1024 * 4; + + private static final int BYTES_PER_LONG = 8; + + private final InputStream inputStream; + + // buffer to read the file size + private final byte[] blockSizeBuffer = new byte[BYTES_PER_LONG]; + + // Number of bytes which left to be read from the currently read block, 0 if end of the current + // block is reached. + private long bytesToReadFromCurrent; + + public ConcatFileOutputStreamWriter(InputStream inputStream) + { + this.inputStream = inputStream; + this.bytesToReadFromCurrent = 0; + } + + /** + * Copies the next block into the specified output stream. Returns the number of bytes copied or + * -1 if there are no more blocks to read. + */ + public long writeNextBlock(OutputStream output) throws IOException + { + long blockSize = gotoNextBlock(); + if (blockSize == -1) + { + return -1; // no more blocks + } + return copyCurrentBlock(output); + } + + private long copyCurrentBlock(OutputStream output) throws IOException + { + byte[] buffer = new byte[DEFAULT_BUFFER_SIZE]; + long count = 0; + int n = 0; + while (-1 != (n = readCurrent(buffer, 0, buffer.length))) + { + output.write(buffer, 0, n); + count += n; + } + return count; + } + + /** + * block size if there is a next block in the stream and it is non-empty. Returns 0 if the block + * is empty, -1 if there are no more blocks to read.<br> + * Can be called only at the beginning or if the end of the previous block has been reached. + * + * @throws IOException when the stream is corrupted and the size of the block cannot be read + */ + private long gotoNextBlock() throws IOException + { + if (bytesToReadFromCurrent > 0) + { + throw new IllegalStateException( + "Cannot proceed to the next file before the current one is not read till the end."); + } + bytesToReadFromCurrent = readBlockSize(); + return bytesToReadFromCurrent; + } + + // next block size in bytes if it's not the end of the stream, -1 if the end of stream has been + // reached + private long readBlockSize() throws IOException + { + int bytesRead = inputStream.read(blockSizeBuffer, 0, BYTES_PER_LONG); + if (bytesRead == -1) + { + return -1; + } else if (bytesRead < BYTES_PER_LONG) + { + throw new IOException("Stream corrupted, cannot read the block size."); + } else + { + long blockSize = bytesToLong(blockSizeBuffer); + assert blockSize >= 0 : "block size cannot be negative"; + return blockSize; + } + } + + /** + * Reads the bytes from the current block. If len is 0 returns 0, otherwise if the end of one + * block has been reached returns -1. It does not mean that the end of the whole stream has been + * reached, the stream may contain other blocks. The method {@link #gotoNextBlock()} should be + * called to start reading the next block.<br> + * See {@link InputStream#read(byte[], int, int)} for parameters details. + */ + private int readCurrent(byte b[], int off, int len) throws IOException + { + if (len == 0) + { + return 0; + } + if (bytesToReadFromCurrent == 0) + { + return -1; + } + + int wantedBytes = (int) Math.min(len, bytesToReadFromCurrent); + int bytesRead = inputStream.read(b, off, wantedBytes); + + if (bytesRead != wantedBytes) + { + throw new IOException("Corrupted stream, there should be at least " + wantedBytes + + " bytes in the block, but only " + bytesRead + " were available."); + } else + { + bytesToReadFromCurrent -= bytesRead; + } + return bytesRead; + } + + private static long bytesToLong(byte[] bytes) throws IOException + { + ByteArrayInputStream bos = new ByteArrayInputStream(bytes); + DataInputStream dos = new DataInputStream(bos); + try + { + return dos.readLong(); + } catch (IOException e) + { + throw new IOException("Cannot create the long from bytes: " + bytes); + } finally + { + try + { + dos.close(); + } catch (IOException ex) + { + // ignore + } + } + } + +} diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/io/ConcatFileInputStreamTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/io/ConcatFileInputStreamTest.java new file mode 100644 index 0000000000000000000000000000000000000000..19e04b46c70b3e7eb0bbe9541197861f66845135 --- /dev/null +++ b/common/sourceTest/java/ch/systemsx/cisd/common/io/ConcatFileInputStreamTest.java @@ -0,0 +1,117 @@ +/* + * Copyright 2010 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.io; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.io.IOUtils; +import org.testng.AssertJUnit; +import org.testng.annotations.Test; + +import ch.systemsx.cisd.base.tests.AbstractFileSystemTestCase; + +/** + * Tests for {@link ConcatFileInputStream} + * + * @author Tomasz Pylak + */ +public class ConcatFileInputStreamTest extends AbstractFileSystemTestCase +{ + @Test + public void testNoFiles() throws IOException + { + ConcatFileInputStream stream = new ConcatFileInputStream(new File[0]); + AssertJUnit.assertEquals(-1, stream.read()); + } + + @Test + public void testOneFile() throws IOException + { + String content = createLongString("1"); + File file = createFile(content, "f1.txt"); + ConcatFileInputStream stream = new ConcatFileInputStream(file); + List<String> streamContent = readStrings(stream); + assertEquals(1, streamContent.size()); + assertEquals(content, streamContent.get(0)); + } + + @Test + public void testManyFiles() throws IOException + { + String content1 = createLongString("1"); + File file1 = createFile(content1, "f1.txt"); + + String content2 = ""; // empty content + File file2 = createFile(content2, "f2.txt"); + + String content3 = createLongString("3"); + File file3 = createFile(content3, "f3.txt"); + + ConcatFileInputStream stream = new ConcatFileInputStream(file1, file2, file3); + List<String> streamContent = readStrings(stream); + assertEquals(3, streamContent.size()); + assertEquals(content1, streamContent.get(0)); + assertEquals(content2, streamContent.get(1)); + assertEquals(content3, streamContent.get(2)); + } + + // --------- helpers + + private static List<String> readStrings(ConcatFileInputStream stream) throws IOException + { + ConcatFileOutputStreamWriter reader = new ConcatFileOutputStreamWriter(stream); + List<String> result = new ArrayList<String>(); + while (true) + { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + long blockSize = reader.writeNextBlock(out); + if (blockSize == -1) + { + break; + } + result.add(out.toString()); + } + return result; + } + + private File createFile(String content, String fileName) throws FileNotFoundException, + IOException + { + File file = new File(workingDirectory, fileName); + + IOUtils.writeLines(Arrays.asList(content), "", new FileOutputStream(file)); + return file; + } + + private static String createLongString(String text) + { + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < 1000; i++) + { + sb.append(text); + } + sb.append("\n"); + return sb.toString(); + } +}