Skip to content
Snippets Groups Projects
Commit 1775fa5d authored by tpylak's avatar tpylak
Browse files

LMS-1511 input stream to merge several file, output stream writer to separate...

LMS-1511 input stream to merge several file, output stream writer to separate files from the merged stream

SVN: 15742
parent 8e87e344
No related branches found
No related tags found
No related merge requests found
/*
* Copyright 2010 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.systemsx.cisd.common.io;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
/**
* Special <code>InputStream</code> that will concatenate the contents of an array of files into one
* stream. Content of each file will be preceded by one long which tells what is the size of the
* file in bytes.<BR>
*
* @author Tomasz Pylak
*/
public class ConcatFileInputStream extends InputStream
{
private static final int EOF = -1;
private int currentIndex = -1;
private boolean eof = false;
private File[] files;
// If true then currentStream is not the content of the file, but a short stream
// which encodes one long number which describes file size.
// This stream is created every time before a new file content is appended to the stream.
private boolean readingFileSize;
private InputStream currentStream;
/**
* @files content of these files will be concatenated into one stream.
*/
public ConcatFileInputStream(File... files)
{
this.files = files;
this.readingFileSize = false;
}
/**
* @files content of these files will be concatenated into one stream.
*/
public ConcatFileInputStream(List<File> files)
{
this(files.toArray(new File[files.size()]));
}
@Override
public void close() throws IOException
{
closeCurrentStream();
eof = true;
}
@Override
public int read() throws IOException
{
int result = readCurrent();
// we have a loop instead of an if to ignore files which are empty
while (result == EOF && !eof)
{
closeCurrentStream();
if (readingFileSize)
{
currentStream = createFileStream(getCurrentFile());
readingFileSize = false;
} else
{
File nextFile = tryGetNextFile();
if (nextFile == null)
{
eof = true;
return EOF;
}
currentStream = createFileSizeStream(nextFile);
readingFileSize = true;
}
result = currentStream.read();
}
return result;
}
private File getCurrentFile()
{
return files[currentIndex];
}
private int readCurrent() throws IOException
{
return (eof || currentStream == null) ? EOF : currentStream.read();
}
// returns the next file to read
private File tryGetNextFile() throws IOException
{
currentIndex++;
if (files != null && currentIndex < files.length)
{
return getCurrentFile();
} else
{
return null;
}
}
private void closeCurrentStream()
{
close(currentStream);
currentStream = null;
}
// -------------- static helper ---------------
private static InputStream createFileStream(File currentFile) throws FileNotFoundException
{
return new BufferedInputStream(new FileInputStream(currentFile));
}
private static InputStream createFileSizeStream(File file) throws IOException
{
long fileSize = file.length();
byte[] data = longToBytes(fileSize);
return new ByteArrayInputStream(data);
}
private static byte[] longToBytes(long fileSize) throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
dos.writeLong(fileSize);
dos.flush();
byte[] data = bos.toByteArray();
dos.close();
return data;
}
/**
* Close a stream without throwing any exception if something went wrong. Do not attempt to
* close it if the argument is null.
*/
private static void close(InputStream streamOrNull)
{
if (streamOrNull != null)
{
try
{
streamOrNull.close();
} catch (IOException ioex)
{
// ignore
}
}
}
}
/*
* Copyright 2010 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.systemsx.cisd.common.io;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* A helper class which allows to separate multiple blocks written to one stream in a format:
* (<block-size><block-of-bytes>)* where block-size is the long number. Useful to parse the content
* of ConcatFileInputStream. Allocates only small constant amount of memory.
*
* @author Tomasz Pylak
*/
class ConcatFileOutputStreamWriter
{
private static final int DEFAULT_BUFFER_SIZE = 1024 * 4;
private static final int BYTES_PER_LONG = 8;
private final InputStream inputStream;
// buffer to read the file size
private final byte[] blockSizeBuffer = new byte[BYTES_PER_LONG];
// Number of bytes which left to be read from the currently read block, 0 if end of the current
// block is reached.
private long bytesToReadFromCurrent;
public ConcatFileOutputStreamWriter(InputStream inputStream)
{
this.inputStream = inputStream;
this.bytesToReadFromCurrent = 0;
}
/**
* Copies the next block into the specified output stream. Returns the number of bytes copied or
* -1 if there are no more blocks to read.
*/
public long writeNextBlock(OutputStream output) throws IOException
{
long blockSize = gotoNextBlock();
if (blockSize == -1)
{
return -1; // no more blocks
}
return copyCurrentBlock(output);
}
private long copyCurrentBlock(OutputStream output) throws IOException
{
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
long count = 0;
int n = 0;
while (-1 != (n = readCurrent(buffer, 0, buffer.length)))
{
output.write(buffer, 0, n);
count += n;
}
return count;
}
/**
* block size if there is a next block in the stream and it is non-empty. Returns 0 if the block
* is empty, -1 if there are no more blocks to read.<br>
* Can be called only at the beginning or if the end of the previous block has been reached.
*
* @throws IOException when the stream is corrupted and the size of the block cannot be read
*/
private long gotoNextBlock() throws IOException
{
if (bytesToReadFromCurrent > 0)
{
throw new IllegalStateException(
"Cannot proceed to the next file before the current one is not read till the end.");
}
bytesToReadFromCurrent = readBlockSize();
return bytesToReadFromCurrent;
}
// next block size in bytes if it's not the end of the stream, -1 if the end of stream has been
// reached
private long readBlockSize() throws IOException
{
int bytesRead = inputStream.read(blockSizeBuffer, 0, BYTES_PER_LONG);
if (bytesRead == -1)
{
return -1;
} else if (bytesRead < BYTES_PER_LONG)
{
throw new IOException("Stream corrupted, cannot read the block size.");
} else
{
long blockSize = bytesToLong(blockSizeBuffer);
assert blockSize >= 0 : "block size cannot be negative";
return blockSize;
}
}
/**
* Reads the bytes from the current block. If len is 0 returns 0, otherwise if the end of one
* block has been reached returns -1. It does not mean that the end of the whole stream has been
* reached, the stream may contain other blocks. The method {@link #gotoNextBlock()} should be
* called to start reading the next block.<br>
* See {@link InputStream#read(byte[], int, int)} for parameters details.
*/
private int readCurrent(byte b[], int off, int len) throws IOException
{
if (len == 0)
{
return 0;
}
if (bytesToReadFromCurrent == 0)
{
return -1;
}
int wantedBytes = (int) Math.min(len, bytesToReadFromCurrent);
int bytesRead = inputStream.read(b, off, wantedBytes);
if (bytesRead != wantedBytes)
{
throw new IOException("Corrupted stream, there should be at least " + wantedBytes
+ " bytes in the block, but only " + bytesRead + " were available.");
} else
{
bytesToReadFromCurrent -= bytesRead;
}
return bytesRead;
}
private static long bytesToLong(byte[] bytes) throws IOException
{
ByteArrayInputStream bos = new ByteArrayInputStream(bytes);
DataInputStream dos = new DataInputStream(bos);
try
{
return dos.readLong();
} catch (IOException e)
{
throw new IOException("Cannot create the long from bytes: " + bytes);
} finally
{
try
{
dos.close();
} catch (IOException ex)
{
// ignore
}
}
}
}
/*
* Copyright 2010 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.systemsx.cisd.common.io;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
import ch.systemsx.cisd.base.tests.AbstractFileSystemTestCase;
/**
* Tests for {@link ConcatFileInputStream}
*
* @author Tomasz Pylak
*/
public class ConcatFileInputStreamTest extends AbstractFileSystemTestCase
{
@Test
public void testNoFiles() throws IOException
{
ConcatFileInputStream stream = new ConcatFileInputStream(new File[0]);
AssertJUnit.assertEquals(-1, stream.read());
}
@Test
public void testOneFile() throws IOException
{
String content = createLongString("1");
File file = createFile(content, "f1.txt");
ConcatFileInputStream stream = new ConcatFileInputStream(file);
List<String> streamContent = readStrings(stream);
assertEquals(1, streamContent.size());
assertEquals(content, streamContent.get(0));
}
@Test
public void testManyFiles() throws IOException
{
String content1 = createLongString("1");
File file1 = createFile(content1, "f1.txt");
String content2 = ""; // empty content
File file2 = createFile(content2, "f2.txt");
String content3 = createLongString("3");
File file3 = createFile(content3, "f3.txt");
ConcatFileInputStream stream = new ConcatFileInputStream(file1, file2, file3);
List<String> streamContent = readStrings(stream);
assertEquals(3, streamContent.size());
assertEquals(content1, streamContent.get(0));
assertEquals(content2, streamContent.get(1));
assertEquals(content3, streamContent.get(2));
}
// --------- helpers
private static List<String> readStrings(ConcatFileInputStream stream) throws IOException
{
ConcatFileOutputStreamWriter reader = new ConcatFileOutputStreamWriter(stream);
List<String> result = new ArrayList<String>();
while (true)
{
ByteArrayOutputStream out = new ByteArrayOutputStream();
long blockSize = reader.writeNextBlock(out);
if (blockSize == -1)
{
break;
}
result.add(out.toString());
}
return result;
}
private File createFile(String content, String fileName) throws FileNotFoundException,
IOException
{
File file = new File(workingDirectory, fileName);
IOUtils.writeLines(Arrays.asList(content), "", new FileOutputStream(file));
return file;
}
private static String createLongString(String text)
{
StringBuffer sb = new StringBuffer();
for (int i = 0; i < 1000; i++)
{
sb.append(text);
}
sb.append("\n");
return sb.toString();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment