Newer
Older
piotr.kupczyk@id.ethz.ch
committed
/*
* Copyright 2018 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.ethz.sis.filetransfer;
piotr.kupczyk@id.ethz.ch
committed
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.zip.CRC32;
/**
* @author pkupczyk
*/
public class DefaultChunkDeserializer implements IChunkDeserializer
{
private ILogger logger;
private IDownloadItemIdDeserializer itemIdDeserializer;
private ByteBuffer sequenceNumberBuffer = ByteBuffer.allocate(4);
private ByteBuffer downloadItemIdLengthBuffer = ByteBuffer.allocate(2);
private ByteBuffer isDirectoryBuffer = ByteBuffer.allocate(1);
piotr.kupczyk@id.ethz.ch
committed
private ByteBuffer filePathLengthBuffer = ByteBuffer.allocate(2);
private ByteBuffer fileOffsetBuffer = ByteBuffer.allocate(8);
private ByteBuffer payloadLengthBuffer = ByteBuffer.allocate(4);
private ByteBuffer downloadItemIdBuffer;
private ByteBuffer filePathBuffer;
private ByteBuffer payloadBuffer;
private ByteBuffer sentHeaderChecksumBuffer = ByteBuffer.allocate(8);
private ByteBuffer sentPayloadChecksumBuffer = ByteBuffer.allocate(8);
private CRC32 calculatedHeaderChecksum = new CRC32();
private CRC32 calculatedPayloadChecksum = new CRC32();
public DefaultChunkDeserializer(ILogger logger, IDownloadItemIdDeserializer itemIdDeserializer)
{
this.logger = logger;
this.itemIdDeserializer = itemIdDeserializer;
}
@Override
public Chunk deserialize(InputStream stream) throws DownloadException
piotr.kupczyk@id.ethz.ch
committed
{
calculatedHeaderChecksum.reset();
calculatedPayloadChecksum.reset();
putToBuffer(stream, sequenceNumberBuffer, "sequenceNumber", sequenceNumberBuffer.capacity(), calculatedHeaderChecksum);
int sequenceNumber = sequenceNumberBuffer.getInt();
putToBuffer(stream, downloadItemIdLengthBuffer, "downloadItemIdLength", downloadItemIdLengthBuffer.capacity(), calculatedHeaderChecksum);
short downloadItemIdLength = downloadItemIdLengthBuffer.getShort();
putToBuffer(stream, isDirectoryBuffer, "isDirectory", isDirectoryBuffer.capacity(), calculatedHeaderChecksum);
boolean isDirectory = isDirectoryBuffer.get() > 0;
piotr.kupczyk@id.ethz.ch
committed
putToBuffer(stream, filePathLengthBuffer, "filePathLength", filePathLengthBuffer.capacity(), calculatedHeaderChecksum);
short filePathLength = filePathLengthBuffer.getShort();
putToBuffer(stream, fileOffsetBuffer, "fileOffset", fileOffsetBuffer.capacity(), calculatedHeaderChecksum);
long fileOffset = fileOffsetBuffer.getLong();
putToBuffer(stream, payloadLengthBuffer, "payloadLength", payloadLengthBuffer.capacity(), calculatedHeaderChecksum);
int payloadLength = payloadLengthBuffer.getInt();
downloadItemIdBuffer = reuseOrExtendBuffer(downloadItemIdBuffer, downloadItemIdLength);
putToBuffer(stream, downloadItemIdBuffer, "downloadItemId", downloadItemIdLength, calculatedHeaderChecksum);
IDownloadItemId downloadItemId = itemIdDeserializer.deserialize(downloadItemIdBuffer);
piotr.kupczyk@id.ethz.ch
committed
filePathBuffer = reuseOrExtendBuffer(filePathBuffer, filePathLength);
putToBuffer(stream, filePathBuffer, "filePath", filePathLength, calculatedHeaderChecksum);
String filePath = new String(filePathBuffer.array(), filePathBuffer.position(), filePathBuffer.limit());
piotr.kupczyk@id.ethz.ch
committed
putToBuffer(stream, sentHeaderChecksumBuffer, "headerChecksum", sentHeaderChecksumBuffer.capacity(), null);
long sentHeaderChecksum = sentHeaderChecksumBuffer.getLong();
if (logger.isEnabled(LogLevel.DEBUG))
{
logger.log(getClass(), LogLevel.DEBUG, "Header CRC (client): " + Long.toHexString(calculatedHeaderChecksum.getValue()));
}
if (calculatedHeaderChecksum.getValue() != sentHeaderChecksum)
{
throw new DownloadException(
piotr.kupczyk@id.ethz.ch
committed
"Error in header data detected. Calculated checksum: " + calculatedHeaderChecksum.getValue() + ". Sent checksum: "
+ sentHeaderChecksum,
true);
piotr.kupczyk@id.ethz.ch
committed
}
payloadBuffer = reuseOrExtendBuffer(payloadBuffer, payloadLength);
putToBuffer(stream, payloadBuffer, "payload", payloadLength, calculatedPayloadChecksum);
byte[] payload = payloadBuffer.array();
putToBuffer(stream, sentPayloadChecksumBuffer, "payloadChecksum", sentPayloadChecksumBuffer.capacity(), null);
long sentPayloadChecksum = sentPayloadChecksumBuffer.getLong();
if (logger.isEnabled(LogLevel.DEBUG))
{
logger.log(getClass(), LogLevel.DEBUG, "Payload CRC (client): " + Long.toHexString(calculatedPayloadChecksum.getValue()));
}
if (calculatedPayloadChecksum.getValue() != sentPayloadChecksum)
{
throw new DownloadException(
piotr.kupczyk@id.ethz.ch
committed
"Error in payload data detected. Calculated checksum: " + calculatedPayloadChecksum.getValue() + ". Sent checksum: "
+ sentPayloadChecksum,
true);
piotr.kupczyk@id.ethz.ch
committed
}
return new Chunk(sequenceNumber, downloadItemId, isDirectory, filePath, fileOffset, payloadLength)
piotr.kupczyk@id.ethz.ch
committed
{
@Override
public InputStream getPayload() throws DownloadException
piotr.kupczyk@id.ethz.ch
committed
{
return new ByteArrayInputStream(payload, 0, payloadLength);
}
};
}
private ByteBuffer reuseOrExtendBuffer(ByteBuffer buffer, int length)
{
if (buffer != null && buffer.capacity() >= length)
{
return buffer;
} else
{
return ByteBuffer.allocate(length);
}
}
private void putToBuffer(InputStream stream, ByteBuffer buffer, String name, int length, CRC32 checksum) throws DownloadException
piotr.kupczyk@id.ethz.ch
committed
{
buffer.clear();
int i = 0;
piotr.kupczyk@id.ethz.ch
committed
{
while (i < length)
piotr.kupczyk@id.ethz.ch
committed
{
int b = stream.read();
if (b == -1)
{
throw new DownloadException("Unexpected finish of '" + name + "' field. Actual length: " + i + ". Expected length: " + length,
true);
}
piotr.kupczyk@id.ethz.ch
committed
buffer.put((byte) b);
if (checksum != null)
{
checksum.update(b);
}
i++;
}
} catch (IOException e)
piotr.kupczyk@id.ethz.ch
committed
{
throw new DownloadException("Couldn't read a byte", true);
piotr.kupczyk@id.ethz.ch
committed
}