diff --git a/common/source/java/ch/systemsx/cisd/common/collections/ExtendedBlockingQueueFactory.java b/common/source/java/ch/systemsx/cisd/common/collections/ExtendedBlockingQueueFactory.java index 0c42ae45ecd2c872cd2c8c3685ec50d11378e599..e1c05ca94541e04d1a97393af4eb6d04644aed30 100644 --- a/common/source/java/ch/systemsx/cisd/common/collections/ExtendedBlockingQueueFactory.java +++ b/common/source/java/ch/systemsx/cisd/common/collections/ExtendedBlockingQueueFactory.java @@ -40,7 +40,7 @@ public class ExtendedBlockingQueueFactory { final IExtendedBlockingQueue<E> queue = new ExtendedLinkedBlockingQueue<E>(); final IQueuePersister<E> queuePersister = - new SmartQueuePersister<E>(queue, queueFile, autoSync); + new QueuePersister<E>(queue, queueFile, autoSync); return new PersistentExtendedBlockingQueueDecorator<E>(queue, queuePersister); } diff --git a/common/source/java/ch/systemsx/cisd/common/collections/QueuePersister.java b/common/source/java/ch/systemsx/cisd/common/collections/QueuePersister.java new file mode 100644 index 0000000000000000000000000000000000000000..4b1129798f8ab29af714e426690ed70fb3ec79a9 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/collections/QueuePersister.java @@ -0,0 +1,870 @@ +/* + * Copyright 2011 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.collections; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.RandomAccessFile; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Queue; + +import org.apache.log4j.Logger; + +import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel; +import ch.systemsx.cisd.base.exceptions.IOExceptionUnchecked; +import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities; +import ch.systemsx.cisd.common.logging.LogCategory; +import ch.systemsx.cisd.common.logging.LogFactory; + +/** + * An {@link IQueuePersister} that is based on records in a file. This class uses Java serialization + * on the queue elements and thus requires queue elements to be serializable. + * + * @author Pawel Glyzewski + */ +public class QueuePersister<E> implements IQueuePersister<E> +{ + private static final int MAX_RETRIES_ON_FAILURE = 3; + + private static final long MILLIS_TO_SLEEP_ON_FAILURE = 3000L; + + private final static Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, + QueuePersister.class); + + final static int QUEUE_IMPLEMENTATION_MARKER = -123456789; + + private final static int HEADER_LENGTH = 3 * 4; // 3 * sizeof(int) + + private final static int RECORD_HEADER_LENGTH = 4; // sizeof(int) + + private final static int MAX_SLICK = 100000; + + private final Queue<E> queue; + + private final File queueFile; + + private final File newQueueFile; + + private final boolean autoSync; + + private RandomAccessFile randomAccessFile; + + private int firstRecord = HEADER_LENGTH; + + private int lastRecord; + + /** + * Returns a list of the content of the <var>queueFile</var>. + */ + public static <E> List<E> list(Class<E> clazz, File queueFile) + { + final File newQueueFile = new File(queueFile.getParentFile(), queueFile.getName() + ".new"); + List<E> result = new ArrayList<E>(); + RandomAccessFile randomAccessFile = null; + try + { + if (queueFile.exists() == false && newQueueFile.exists()) + { + randomAccessFile = new RandomAccessFile(newQueueFile, "r"); + } else + { + randomAccessFile = new RandomAccessFile(queueFile, "r"); + } + if (randomAccessFile.readInt() != QUEUE_IMPLEMENTATION_MARKER) + { + return LegacyQueuePersister.list(clazz, queueFile); + } else + { + final int firstRecord = randomAccessFile.readInt(); + final int lastRecord = randomAccessFile.readInt(); + load(randomAccessFile, result, firstRecord, lastRecord); + } + } catch (IOException ex) + { + return Collections.emptyList(); + } finally + { + if (randomAccessFile != null) + { + try + { + randomAccessFile.close(); + } catch (IOException ex) + { + // Silence + } + } + } + return Collections.unmodifiableList(result); + } + + public QueuePersister(Queue<E> queue, File queueFile) + { + this(queue, queueFile, false); + } + + /** + * Create a {@link QueuePersister} for <var>queue</var>. + * + * @param queue The queue to persist. + * @param queueFile The file to persist the queue in. + * @param autoSync If <code>true</code>, the underlying file will be synchronized after each + * write operation. This is safer, but costs a lot of performance. + */ + public QueuePersister(Queue<E> queue, File queueFile, boolean autoSync) + { + this.queue = queue; + this.queueFile = queueFile; + this.newQueueFile = new File(queueFile.getParentFile(), queueFile.getName() + ".new"); + this.autoSync = autoSync; + if (queueFile.exists() == false && newQueueFile.exists()) + { + if (newQueueFile.renameTo(queueFile) == false) + { + throw CheckedExceptionTunnel.wrapIfNecessary(new IOException("Cannot rename file '" + + newQueueFile.getPath() + "' to '" + queueFile.getPath() + "'")); + } + } + open(); + } + + private void open() throws IOExceptionUnchecked + { + synchronized (queueFile) + { + for (int i = 0; true /* See EXIT LOOP below */; ++i) + { + try + { + this.randomAccessFile = new RandomAccessFile(queueFile, "rw"); + if (randomAccessFile.length() >= 4 + && randomAccessFile.readInt() != QUEUE_IMPLEMENTATION_MARKER) + { + LegacyQueuePersister<E> oldPersister = + new LegacyQueuePersister<E>(queue, queueFile); + oldPersister.close(); + persist(); + } else + { + + if (randomAccessFile.length() < HEADER_LENGTH) + { + writeFullHeader(randomAccessFile, firstRecord, lastRecord); + } else + { + this.firstRecord = randomAccessFile.readInt(); + if (this.firstRecord < 0) + { + this.firstRecord = HEADER_LENGTH; + } + this.lastRecord = randomAccessFile.readInt(); + if (this.lastRecord < 0) + { + this.lastRecord = 0; + } + } + load(randomAccessFile, queue, firstRecord, lastRecord); + // Clean up + if (firstRecord > 0) + { + persist(); + } + } + // EXIT LOOP: break on successful execution + break; + } catch (IOException ex) + { + operationLog.error(String.format("Error opening queue file '%s', position %d, " + + "trying to re-open.", queueFile.getPath(), lastRecord), ex); + closeQuietly(); + if (i == MAX_RETRIES_ON_FAILURE) + { + // EXIT LOOP: throw exception on unsuccessful execution + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + ConcurrencyUtilities.sleep(MILLIS_TO_SLEEP_ON_FAILURE); + } + } + } + } + + private byte[] toByteArray(E o) + { + final ByteArrayOutputStream ba = new ByteArrayOutputStream(); + try + { + final ObjectOutputStream oo = new ObjectOutputStream(ba); + oo.writeObject(o); + oo.close(); + return ba.toByteArray(); + } catch (Exception ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + + private static Object objFromByteArray(byte[] data) + { + final ByteArrayInputStream bi = new ByteArrayInputStream(data); + try + { + final ObjectInputStream oi = new ObjectInputStream(bi); + return oi.readObject(); + } catch (Exception ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + + private void writeHeader() throws IOException + { + writeFullHeader(randomAccessFile, firstRecord, lastRecord); + } + + private static void writeFullHeader(RandomAccessFile raf, int firstRecord, int lastRecord) + throws IOException + { + raf.seek(0L); + raf.writeInt(QUEUE_IMPLEMENTATION_MARKER); + raf.writeInt(firstRecord); + raf.writeInt(lastRecord); + } + + private static <E> void load(RandomAccessFile randomAccessFile, Collection<E> collection, + int firstRecord, int lastRecord) throws IOException + { + long pos = firstRecord; + while (pos < lastRecord) + { + randomAccessFile.seek(pos); + final int len = randomAccessFile.readInt(); + pos += len + RECORD_HEADER_LENGTH; + final byte[] data = new byte[len]; + randomAccessFile.read(data, 0, len); + deserializeAndAdd(collection, data); + } + } + + @SuppressWarnings("unchecked") + private static <E> void deserializeAndAdd(Collection<E> collection, final byte[] data) + { + collection.add((E) objFromByteArray(data)); + } + + // + // IQueuePersister + // + public void persist() + { + synchronized (queueFile) + { + try + { + closeQuietly(); + fillNewQueueFile(); + if (queueFile.delete() == false) + { + throw new IOException("Cannot delete file '" + queueFile.getPath() + "'"); + } + if (newQueueFile.renameTo(queueFile) == false) + { + throw new IOException("Cannot rename file '" + newQueueFile.getPath() + + "' to '" + queueFile.getPath() + "'"); + } + randomAccessFile = new RandomAccessFile(queueFile, "rw"); + if (autoSync) + { + sync(); + } + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + } + + private void fillNewQueueFile() throws IOException + { + RandomAccessFile newRandomAccessFile = null; + try + { + newRandomAccessFile = new RandomAccessFile(newQueueFile, "rw"); + firstRecord = HEADER_LENGTH; + lastRecord = -1; + writeFullHeader(newRandomAccessFile, firstRecord, lastRecord); + int pos = HEADER_LENGTH; + int elementSize = 0; + for (E elem : queue) + { + pos += elementSize; + newRandomAccessFile.seek(pos); + final byte[] data = toByteArray(elem); + elementSize = data.length + RECORD_HEADER_LENGTH; + newRandomAccessFile.writeInt(data.length); + newRandomAccessFile.write(data); + } + lastRecord = pos + elementSize; + writeFullHeader(newRandomAccessFile, firstRecord, lastRecord); + } finally + { + if (newRandomAccessFile != null) + { + newRandomAccessFile.close(); + } + } + } + + public void addToTail(E elem) + { + synchronized (queueFile) + { + for (int i = 0; true /* See EXIT LOOP below */; ++i) + { + try + { + randomAccessFile.seek(lastRecord); + final byte[] data = toByteArray(elem); + final int elementSize = data.length + RECORD_HEADER_LENGTH; + randomAccessFile.writeInt(data.length); + randomAccessFile.write(data); + lastRecord += elementSize; + writeHeader(); + if (autoSync) + { + sync(); + } + // EXIT LOOP: break on successful execution + break; + } catch (IOException ex) + { + operationLog.error(String.format( + "Error adding to tail of queue file '%s', position %d, " + + "trying to re-open.", queueFile.getPath(), lastRecord), ex); + closeQuietly(); + if (i == MAX_RETRIES_ON_FAILURE) + { + // EXIT LOOP: throw exception on unsuccessful execution + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + ConcurrencyUtilities.sleep(MILLIS_TO_SLEEP_ON_FAILURE); + open(); + } + } + } + } + + private void closeQuietly() + { + synchronized (queueFile) + { + try + { + randomAccessFile.close(); + } catch (IOException ex) + { + operationLog.error(String.format("Error on closing file '%s'", queueFile)); + } + } + } + + public void removeFromHead(E elem) throws IOExceptionUnchecked + { + synchronized (queueFile) + { + for (int i = 0; true /* See EXIT LOOP below */; ++i) + { + try + { + if (firstRecord > MAX_SLICK) + { + persist(); + } else + { + randomAccessFile.seek(firstRecord); + firstRecord += randomAccessFile.readInt() + RECORD_HEADER_LENGTH; + writeHeader(); + if (autoSync) + { + sync(); + } + } + // EXIT LOOP: break on successful execution + break; + } catch (IOException ex) + { + operationLog.error(String.format( + "Error removing from head of queue file '%s', position %d, " + + "trying to re-open.", queueFile.getPath(), lastRecord), ex); + closeQuietly(); + if (i == MAX_RETRIES_ON_FAILURE) + { + // EXIT LOOP: throw exception on unsuccessful execution + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + ConcurrencyUtilities.sleep(MILLIS_TO_SLEEP_ON_FAILURE); + open(); + } + } + } + } + + public void check() throws IllegalStateException + { + synchronized (queueFile) + { + try + { + if (randomAccessFile.getFD().valid() == false + || false == randomAccessFile.getChannel().isOpen()) + { + throw new IllegalStateException("Cannot persist: file is closed."); + } + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + } + + public void close() + { + synchronized (queueFile) + { + try + { + randomAccessFile.close(); + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + } + + public void sync() + { + synchronized (queueFile) + { + try + { + randomAccessFile.getFD().sync(); + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + } + + static class LegacyQueuePersister<E> implements IQueuePersister<E> + { + final static int DEFAULT_INITIAL_RECORD_SIZE = 32; + + private final static int LEGACY_HEADER_LENGTH = 3 * 4; // 3 * sizeof(int) + + private final static int LEGACY_RECORD_HEADER_LENGTH = 4; // sizeof(int) + + private final static int LEGACY_MAX_SLICK = 1000; + + private final Queue<E> queue; + + private final File queueFile; + + private final File newQueueFile; + + private final boolean autoSync; + + private int recordSize; + + private RandomAccessFile randomAccessFile; + + private int firstRecord; + + private int lastRecord; + + /** + * Returns a list of the content of the <var>queueFile</var>. + */ + static <E> List<E> list(Class<E> clazz, File queueFile) + { + final File newQueueFile = + new File(queueFile.getParentFile(), queueFile.getName() + ".new"); + final List<E> result = new ArrayList<E>(); + RandomAccessFile randomAccessFile = null; + try + { + if (queueFile.exists() == false && newQueueFile.exists()) + { + randomAccessFile = new RandomAccessFile(newQueueFile, "r"); + } else + { + randomAccessFile = new RandomAccessFile(queueFile, "r"); + } + final int firstRecord = randomAccessFile.readInt(); + final int lastRecord = randomAccessFile.readInt(); + final int recordSize = randomAccessFile.readInt(); + load(randomAccessFile, result, firstRecord, lastRecord, recordSize); + } catch (IOException ex) + { + return Collections.emptyList(); + } finally + { + if (randomAccessFile != null) + { + try + { + randomAccessFile.close(); + } catch (IOException ex) + { + // Silence + } + } + } + return Collections.unmodifiableList(result); + } + + /** + * Create a {@link QueuePersister.LegacyQueuePersister} for <var>queue</var>. (Uses a + * default initial record size of 32 and switches off auto-sync.) + * + * @param queue The queue to persist. + * @param queueFile The file to persist the queue in. + */ + LegacyQueuePersister(Queue<E> queue, File queueFile) + { + this(queue, queueFile, DEFAULT_INITIAL_RECORD_SIZE, false); + } + + /** + * Create a {@link QueuePersister.LegacyQueuePersister} for <var>queue</var>. + * + * @param queue The queue to persist. + * @param queueFile The file to persist the queue in. + * @param initialRecordSize The initial size of the record. If an element of the queue is + * larger than this, the whole queue file has to be re-written with a larger + * record size. + * @param autoSync If <code>true</code>, the underlying file will be synchronized after each + * write operation. This is safer, but costs a lot of performance. + */ + LegacyQueuePersister(Queue<E> queue, File queueFile, int initialRecordSize, boolean autoSync) + { + this.queue = queue; + this.queueFile = queueFile; + this.newQueueFile = new File(queueFile.getParentFile(), queueFile.getName() + ".new"); + this.autoSync = autoSync; + if (queueFile.exists() == false && newQueueFile.exists()) + { + if (newQueueFile.renameTo(queueFile) == false) + { + throw CheckedExceptionTunnel.wrapIfNecessary(new IOException( + "Cannot rename file '" + newQueueFile.getPath() + "' to '" + + queueFile.getPath() + "'")); + } + } + try + { + this.randomAccessFile = new RandomAccessFile(queueFile, "rw"); + if (randomAccessFile.length() < LEGACY_HEADER_LENGTH) + { + this.recordSize = initialRecordSize; + writeFullHeader(randomAccessFile, firstRecord, lastRecord, initialRecordSize); + } else + { + this.firstRecord = randomAccessFile.readInt(); + if (this.firstRecord < 0) + { + this.firstRecord = 0; + } + this.lastRecord = randomAccessFile.readInt(); + if (this.lastRecord < 0) + { + this.lastRecord = 0; + } + this.recordSize = randomAccessFile.readInt(); + if (this.recordSize < 0) + { + this.recordSize = 0; + } + } + load(randomAccessFile, queue, firstRecord, lastRecord, recordSize); + // Clean up + if (firstRecord > 0) + { + persist(); + } + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + + private byte[] toByteArray(E o) + { + final ByteArrayOutputStream ba = new ByteArrayOutputStream(); + try + { + final ObjectOutputStream oo = new ObjectOutputStream(ba); + oo.writeObject(o); + oo.close(); + return ba.toByteArray(); + } catch (Exception ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + + private static Object objFromByteArray(byte[] data) + { + final ByteArrayInputStream bi = new ByteArrayInputStream(data); + try + { + final ObjectInputStream oi = new ObjectInputStream(bi); + return oi.readObject(); + } catch (Exception ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + + private void writeHeader() throws IOException + { + randomAccessFile.seek(0L); + randomAccessFile.writeInt(firstRecord); + randomAccessFile.writeInt(lastRecord); + } + + private static void writeFullHeader(RandomAccessFile raf, int firstRecord, int lastRecord, + int initialRecordSize) throws IOException + { + raf.seek(0L); + raf.writeInt(firstRecord); + raf.writeInt(lastRecord); + raf.writeInt(initialRecordSize); + } + + private static <E> void load(RandomAccessFile randomAccessFile, Collection<E> collection, + int firstRecord, int lastRecord, int recordSize) + { + long pos = LEGACY_HEADER_LENGTH + ((long) recordSize) * firstRecord; + for (int i = firstRecord; i < lastRecord; ++i) + { + try + { + randomAccessFile.seek(pos); + pos += recordSize; + final int len = randomAccessFile.readInt(); + final byte[] data = new byte[len]; + randomAccessFile.read(data, 0, len); + deserializeAndAdd(collection, data); + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + } + + @SuppressWarnings("unchecked") + private static <E> void deserializeAndAdd(Collection<E> collection, final byte[] data) + { + collection.add((E) objFromByteArray(data)); + } + + private static int getNewRecordSize(int oldRecordSize, int elementSize) + { + return (oldRecordSize < 1) ? elementSize : oldRecordSize + * (elementSize / oldRecordSize + 1); + } + + // + // IQueuePersister + // + + public void persist() + { + primPersist(recordSize); + } + + private void primPersist(int newRecordSize) + { + synchronized (queueFile) + { + try + { + randomAccessFile.close(); + recordSize = fillNewQueueFile(newRecordSize); + if (queueFile.delete() == false) + { + throw new IOException("Cannot delete file '" + queueFile.getPath() + "'"); + } + if (newQueueFile.renameTo(queueFile) == false) + { + throw new IOException("Cannot rename file '" + newQueueFile.getPath() + + "' to '" + queueFile.getPath() + "'"); + } + randomAccessFile = new RandomAccessFile(queueFile, "rw"); + if (autoSync) + { + sync(); + } + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + } + + private int fillNewQueueFile(int newRecordSize) throws IOException + { + RandomAccessFile newRandomAccessFile = null; + try + { + newRandomAccessFile = new RandomAccessFile(newQueueFile, "rw"); + firstRecord = 0; + lastRecord = queue.size(); + writeFullHeader(newRandomAccessFile, firstRecord, lastRecord, newRecordSize); + long pos = LEGACY_HEADER_LENGTH; + for (E elem : queue) + { + newRandomAccessFile.seek(pos); + pos += newRecordSize; + final byte[] data = toByteArray(elem); + final int elementSize = data.length + LEGACY_RECORD_HEADER_LENGTH; + if (elementSize > newRecordSize) + { + newRandomAccessFile.close(); + return fillNewQueueFile(getNewRecordSize(newRecordSize, elementSize)); + } + newRandomAccessFile.writeInt(data.length); + newRandomAccessFile.write(data); + } + return newRecordSize; + } finally + { + if (newRandomAccessFile != null) + { + newRandomAccessFile.close(); + } + } + } + + public void addToTail(E elem) + { + synchronized (queueFile) + { + try + { + long pos = LEGACY_HEADER_LENGTH + ((long) lastRecord) * recordSize; + randomAccessFile.seek(pos); + final byte[] data = toByteArray(elem); + final int elementSize = data.length + LEGACY_RECORD_HEADER_LENGTH; + if (elementSize > recordSize) + { + primPersist(getNewRecordSize(recordSize, elementSize)); + return; + } + randomAccessFile.writeInt(data.length); + randomAccessFile.write(data); + ++lastRecord; + writeHeader(); + if (autoSync) + { + sync(); + } + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + } + + public void removeFromHead(E elem) + { + synchronized (queueFile) + { + try + { + if (firstRecord > LEGACY_MAX_SLICK) + { + persist(); + } else + { + ++firstRecord; + writeHeader(); + if (autoSync) + { + sync(); + } + } + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + } + + public void check() throws IllegalStateException + { + synchronized (queueFile) + { + try + { + if (randomAccessFile.getFD().valid() == false) + { + throw new IllegalStateException("Cannot persist: file is closed."); + } + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + } + + public void close() + { + synchronized (queueFile) + { + try + { + randomAccessFile.close(); + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + } + + public void sync() + { + synchronized (queueFile) + { + try + { + randomAccessFile.getFD().sync(); + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + } + + } +} diff --git a/common/source/java/ch/systemsx/cisd/common/collections/RecordBasedQueuePersister.java b/common/source/java/ch/systemsx/cisd/common/collections/RecordBasedQueuePersister.java deleted file mode 100644 index 09ffc06e16923ebb6e8a7c3ac5332fa0ca1079be..0000000000000000000000000000000000000000 --- a/common/source/java/ch/systemsx/cisd/common/collections/RecordBasedQueuePersister.java +++ /dev/null @@ -1,431 +0,0 @@ -/* - * Copyright 2008 ETH Zuerich, CISD - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.systemsx.cisd.common.collections; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.RandomAccessFile; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Queue; - -import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel; - -/** - * An {@link IQueuePersister} that is based on records in a file. This class uses Java serialization - * on the queue elements and thus requires queue elements to be serializable. - * - * @deprecated Use the {@link SmartQueuePersister} instead. - * @author Bernd Rinn - */ -@Deprecated -class RecordBasedQueuePersister<E> implements IQueuePersister<E> -{ - final static int DEFAULT_INITIAL_RECORD_SIZE = 32; - - private final static int HEADER_LENGTH = 3 * 4; // 3 * sizeof(int) - - private final static int RECORD_HEADER_LENGTH = 4; // sizeof(int) - - private final static int MAX_SLICK = 1000; - - private final Queue<E> queue; - - private final File queueFile; - - private final File newQueueFile; - - private final boolean autoSync; - - private int recordSize; - - private RandomAccessFile randomAccessFile; - - private int firstRecord; - - private int lastRecord; - - /** - * Returns a list of the content of the <var>queueFile</var>. - */ - public static <E> List<E> list(Class<E> clazz, File queueFile) - { - final File newQueueFile = new File(queueFile.getParentFile(), queueFile.getName() + ".new"); - final List<E> result = new ArrayList<E>(); - RandomAccessFile randomAccessFile = null; - try - { - if (queueFile.exists() == false && newQueueFile.exists()) - { - randomAccessFile = new RandomAccessFile(newQueueFile, "r"); - } else - { - randomAccessFile = new RandomAccessFile(queueFile, "r"); - } - final int firstRecord = randomAccessFile.readInt(); - final int lastRecord = randomAccessFile.readInt(); - final int recordSize = randomAccessFile.readInt(); - load(randomAccessFile, result, firstRecord, lastRecord, recordSize); - } catch (IOException ex) - { - return Collections.emptyList(); - } finally - { - if (randomAccessFile != null) - { - try - { - randomAccessFile.close(); - } catch (IOException ex) - { - // Silence - } - } - } - return Collections.unmodifiableList(result); - } - - /** - * Create a {@link RecordBasedQueuePersister} for <var>queue</var>. (Uses a default initial - * record size of 32 and switches off auto-sync.) - * - * @param queue The queue to persist. - * @param queueFile The file to persist the queue in. - */ - public RecordBasedQueuePersister(Queue<E> queue, File queueFile) - { - this(queue, queueFile, DEFAULT_INITIAL_RECORD_SIZE, false); - } - - /** - * Create a {@link RecordBasedQueuePersister} for <var>queue</var>. - * - * @param queue The queue to persist. - * @param queueFile The file to persist the queue in. - * @param initialRecordSize The initial size of the record. If an element of the queue is larger - * than this, the whole queue file has to be re-written with a larger record size. - * @param autoSync If <code>true</code>, the underlying file will be synchronized after each - * write operation. This is safer, but costs a lot of performance. - */ - public RecordBasedQueuePersister(Queue<E> queue, File queueFile, int initialRecordSize, - boolean autoSync) - { - this.queue = queue; - this.queueFile = queueFile; - this.newQueueFile = new File(queueFile.getParentFile(), queueFile.getName() + ".new"); - this.autoSync = autoSync; - if (queueFile.exists() == false && newQueueFile.exists()) - { - if (newQueueFile.renameTo(queueFile) == false) - { - throw CheckedExceptionTunnel.wrapIfNecessary(new IOException("Cannot rename file '" - + newQueueFile.getPath() + "' to '" + queueFile.getPath() + "'")); - } - } - try - { - this.randomAccessFile = new RandomAccessFile(queueFile, "rw"); - if (randomAccessFile.length() < HEADER_LENGTH) - { - this.recordSize = initialRecordSize; - writeFullHeader(randomAccessFile, firstRecord, lastRecord, initialRecordSize); - } else - { - this.firstRecord = randomAccessFile.readInt(); - if (this.firstRecord < 0) - { - this.firstRecord = 0; - } - this.lastRecord = randomAccessFile.readInt(); - if (this.lastRecord < 0) - { - this.lastRecord = 0; - } - this.recordSize = randomAccessFile.readInt(); - if (this.recordSize < 0) - { - this.recordSize = 0; - } - } - load(randomAccessFile, queue, firstRecord, lastRecord, recordSize); - // Clean up - if (firstRecord > 0) - { - persist(); - } - } catch (IOException ex) - { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); - } - } - - private byte[] toByteArray(E o) - { - final ByteArrayOutputStream ba = new ByteArrayOutputStream(); - try - { - final ObjectOutputStream oo = new ObjectOutputStream(ba); - oo.writeObject(o); - oo.close(); - return ba.toByteArray(); - } catch (Exception ex) - { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); - } - } - - private static Object objFromByteArray(byte[] data) - { - final ByteArrayInputStream bi = new ByteArrayInputStream(data); - try - { - final ObjectInputStream oi = new ObjectInputStream(bi); - return oi.readObject(); - } catch (Exception ex) - { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); - } - } - - private void writeHeader() throws IOException - { - randomAccessFile.seek(0L); - randomAccessFile.writeInt(firstRecord); - randomAccessFile.writeInt(lastRecord); - } - - private static void writeFullHeader(RandomAccessFile raf, int firstRecord, int lastRecord, - int initialRecordSize) throws IOException - { - raf.seek(0L); - raf.writeInt(firstRecord); - raf.writeInt(lastRecord); - raf.writeInt(initialRecordSize); - } - - private static <E> void load(RandomAccessFile randomAccessFile, Collection<E> collection, - int firstRecord, int lastRecord, int recordSize) - { - long pos = HEADER_LENGTH + ((long) recordSize) * firstRecord; - for (int i = firstRecord; i < lastRecord; ++i) - { - try - { - randomAccessFile.seek(pos); - pos += recordSize; - final int len = randomAccessFile.readInt(); - final byte[] data = new byte[len]; - randomAccessFile.read(data, 0, len); - deserializeAndAdd(collection, data); - } catch (IOException ex) - { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); - } - } - } - - @SuppressWarnings("unchecked") - private static <E> void deserializeAndAdd(Collection<E> collection, final byte[] data) - { - collection.add((E) objFromByteArray(data)); - } - - private static int getNewRecordSize(int oldRecordSize, int elementSize) - { - return (oldRecordSize < 1) ? elementSize : oldRecordSize - * (elementSize / oldRecordSize + 1); - } - - // - // IQueuePersister - // - - public void persist() - { - primPersist(recordSize); - } - - private void primPersist(int newRecordSize) - { - synchronized (queueFile) - { - try - { - randomAccessFile.close(); - recordSize = fillNewQueueFile(newRecordSize); - if (queueFile.delete() == false) - { - throw new IOException("Cannot delete file '" + queueFile.getPath() + "'"); - } - if (newQueueFile.renameTo(queueFile) == false) - { - throw new IOException("Cannot rename file '" + newQueueFile.getPath() - + "' to '" + queueFile.getPath() + "'"); - } - randomAccessFile = new RandomAccessFile(queueFile, "rw"); - if (autoSync) - { - sync(); - } - } catch (IOException ex) - { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); - } - } - } - - private int fillNewQueueFile(int newRecordSize) throws IOException - { - RandomAccessFile newRandomAccessFile = null; - try - { - newRandomAccessFile = new RandomAccessFile(newQueueFile, "rw"); - firstRecord = 0; - lastRecord = queue.size(); - writeFullHeader(newRandomAccessFile, firstRecord, lastRecord, newRecordSize); - long pos = HEADER_LENGTH; - for (E elem : queue) - { - newRandomAccessFile.seek(pos); - pos += newRecordSize; - final byte[] data = toByteArray(elem); - final int elementSize = data.length + RECORD_HEADER_LENGTH; - if (elementSize > newRecordSize) - { - newRandomAccessFile.close(); - return fillNewQueueFile(getNewRecordSize(newRecordSize, elementSize)); - } - newRandomAccessFile.writeInt(data.length); - newRandomAccessFile.write(data); - } - return newRecordSize; - } finally - { - if (newRandomAccessFile != null) - { - newRandomAccessFile.close(); - } - } - } - - public void addToTail(E elem) - { - synchronized (queueFile) - { - try - { - long pos = HEADER_LENGTH + ((long) lastRecord) * recordSize; - randomAccessFile.seek(pos); - final byte[] data = toByteArray(elem); - final int elementSize = data.length + RECORD_HEADER_LENGTH; - if (elementSize > recordSize) - { - primPersist(getNewRecordSize(recordSize, elementSize)); - return; - } - randomAccessFile.writeInt(data.length); - randomAccessFile.write(data); - ++lastRecord; - writeHeader(); - if (autoSync) - { - sync(); - } - } catch (IOException ex) - { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); - } - } - } - - public void removeFromHead(E elem) - { - synchronized (queueFile) - { - try - { - if (firstRecord > MAX_SLICK) - { - persist(); - } else - { - ++firstRecord; - writeHeader(); - if (autoSync) - { - sync(); - } - } - } catch (IOException ex) - { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); - } - } - } - - public void check() throws IllegalStateException - { - synchronized (queueFile) - { - try - { - if (randomAccessFile.getFD().valid() == false) - { - throw new IllegalStateException("Cannot persist: file is closed."); - } - } catch (IOException ex) - { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); - } - } - } - - public void close() - { - synchronized (queueFile) - { - try - { - randomAccessFile.close(); - } catch (IOException ex) - { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); - } - } - } - - public void sync() - { - synchronized (queueFile) - { - try - { - randomAccessFile.getFD().sync(); - } catch (IOException ex) - { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); - } - } - } - -} diff --git a/common/source/java/ch/systemsx/cisd/common/collections/SmartQueuePersister.java b/common/source/java/ch/systemsx/cisd/common/collections/SmartQueuePersister.java deleted file mode 100644 index 4c3fe4ea3e183cb12f587bbf5c29a4a3ab8cb91c..0000000000000000000000000000000000000000 --- a/common/source/java/ch/systemsx/cisd/common/collections/SmartQueuePersister.java +++ /dev/null @@ -1,407 +0,0 @@ -/* - * Copyright 2011 ETH Zuerich, CISD - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.systemsx.cisd.common.collections; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.RandomAccessFile; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Queue; - -import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel; - -/** - * An {@link IQueuePersister} that is based on records in a file. This class uses Java serialization - * on the queue elements and thus requires queue elements to be serializable. - * - * @author Pawel Glyzewski - */ -public class SmartQueuePersister<E> implements IQueuePersister<E> -{ - final static int QUEUE_IMPLEMENTATION_MARKER = -123456789; - - private final static int HEADER_LENGTH = 3 * 4; // 3 * sizeof(int) - - private final static int RECORD_HEADER_LENGTH = 4; // sizeof(int) - - private final static int MAX_SLICK = 100000; - - private final Queue<E> queue; - - private final File queueFile; - - private final File newQueueFile; - - private final boolean autoSync; - - private RandomAccessFile randomAccessFile; - - private int firstRecord = HEADER_LENGTH; - - private int lastRecord; - - /** - * Returns a list of the content of the <var>queueFile</var>. - */ - @SuppressWarnings("deprecation") - public static <E> List<E> list(Class<E> clazz, File queueFile) - { - final File newQueueFile = new File(queueFile.getParentFile(), queueFile.getName() + ".new"); - List<E> result = new ArrayList<E>(); - RandomAccessFile randomAccessFile = null; - try - { - if (queueFile.exists() == false && newQueueFile.exists()) - { - randomAccessFile = new RandomAccessFile(newQueueFile, "r"); - } else - { - randomAccessFile = new RandomAccessFile(queueFile, "r"); - } - if (randomAccessFile.readInt() != QUEUE_IMPLEMENTATION_MARKER) - { - return RecordBasedQueuePersister.list(clazz, queueFile); - } else - { - final int firstRecord = randomAccessFile.readInt(); - final int lastRecord = randomAccessFile.readInt(); - load(randomAccessFile, result, firstRecord, lastRecord); - } - } catch (IOException ex) - { - return Collections.emptyList(); - } finally - { - if (randomAccessFile != null) - { - try - { - randomAccessFile.close(); - } catch (IOException ex) - { - // Silence - } - } - } - return Collections.unmodifiableList(result); - } - - public SmartQueuePersister(Queue<E> queue, File queueFile) - { - this(queue, queueFile, false); - } - - /** - * Create a {@link SmartQueuePersister} for <var>queue</var>. - * - * @param queue The queue to persist. - * @param queueFile The file to persist the queue in. - * @param autoSync If <code>true</code>, the underlying file will be synchronized after each - * write operation. This is safer, but costs a lot of performance. - */ - @SuppressWarnings("deprecation") - public SmartQueuePersister(Queue<E> queue, File queueFile, boolean autoSync) - { - this.queue = queue; - this.queueFile = queueFile; - this.newQueueFile = new File(queueFile.getParentFile(), queueFile.getName() + ".new"); - this.autoSync = autoSync; - if (queueFile.exists() == false && newQueueFile.exists()) - { - if (newQueueFile.renameTo(queueFile) == false) - { - throw CheckedExceptionTunnel.wrapIfNecessary(new IOException("Cannot rename file '" - + newQueueFile.getPath() + "' to '" + queueFile.getPath() + "'")); - } - } - try - { - this.randomAccessFile = new RandomAccessFile(queueFile, "rw"); - if (randomAccessFile.length() >= 4 - && randomAccessFile.readInt() != QUEUE_IMPLEMENTATION_MARKER) - { - RecordBasedQueuePersister<E> oldPersister = - new RecordBasedQueuePersister<E>(queue, queueFile); - oldPersister.close(); - persist(); - } else - { - - if (randomAccessFile.length() < HEADER_LENGTH) - { - writeFullHeader(randomAccessFile, firstRecord, lastRecord); - } else - { - this.firstRecord = randomAccessFile.readInt(); - if (this.firstRecord < 0) - { - this.firstRecord = HEADER_LENGTH; - } - this.lastRecord = randomAccessFile.readInt(); - if (this.lastRecord < 0) - { - this.lastRecord = 0; - } - } - load(randomAccessFile, queue, firstRecord, lastRecord); - // Clean up - if (firstRecord > 0) - { - persist(); - } - } - } catch (IOException ex) - { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); - } - } - - private byte[] toByteArray(E o) - { - final ByteArrayOutputStream ba = new ByteArrayOutputStream(); - try - { - final ObjectOutputStream oo = new ObjectOutputStream(ba); - oo.writeObject(o); - oo.close(); - return ba.toByteArray(); - } catch (Exception ex) - { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); - } - } - - private static Object objFromByteArray(byte[] data) - { - final ByteArrayInputStream bi = new ByteArrayInputStream(data); - try - { - final ObjectInputStream oi = new ObjectInputStream(bi); - return oi.readObject(); - } catch (Exception ex) - { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); - } - } - - private void writeHeader() throws IOException - { - writeFullHeader(randomAccessFile, firstRecord, lastRecord); - } - - private static void writeFullHeader(RandomAccessFile raf, int firstRecord, int lastRecord) - throws IOException - { - raf.seek(0L); - raf.writeInt(QUEUE_IMPLEMENTATION_MARKER); - raf.writeInt(firstRecord); - raf.writeInt(lastRecord); - } - - private static <E> void load(RandomAccessFile randomAccessFile, Collection<E> collection, - int firstRecord, int lastRecord) - { - long pos = firstRecord; - while (pos < lastRecord) - { - try - { - randomAccessFile.seek(pos); - final int len = randomAccessFile.readInt(); - pos += len + RECORD_HEADER_LENGTH; - final byte[] data = new byte[len]; - randomAccessFile.read(data, 0, len); - deserializeAndAdd(collection, data); - } catch (IOException ex) - { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); - } - } - } - - @SuppressWarnings("unchecked") - private static <E> void deserializeAndAdd(Collection<E> collection, final byte[] data) - { - collection.add((E) objFromByteArray(data)); - } - - // - // IQueuePersister - // - public void persist() - { - synchronized (queueFile) - { - try - { - randomAccessFile.close(); - fillNewQueueFile(); - if (queueFile.delete() == false) - { - throw new IOException("Cannot delete file '" + queueFile.getPath() + "'"); - } - if (newQueueFile.renameTo(queueFile) == false) - { - throw new IOException("Cannot rename file '" + newQueueFile.getPath() - + "' to '" + queueFile.getPath() + "'"); - } - randomAccessFile = new RandomAccessFile(queueFile, "rw"); - if (autoSync) - { - sync(); - } - } catch (IOException ex) - { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); - } - } - } - - private void fillNewQueueFile() throws IOException - { - RandomAccessFile newRandomAccessFile = null; - try - { - newRandomAccessFile = new RandomAccessFile(newQueueFile, "rw"); - firstRecord = HEADER_LENGTH; - lastRecord = -1; - writeFullHeader(newRandomAccessFile, firstRecord, lastRecord); - int pos = HEADER_LENGTH; - int elementSize = 0; - for (E elem : queue) - { - pos += elementSize; - newRandomAccessFile.seek(pos); - final byte[] data = toByteArray(elem); - elementSize = data.length + RECORD_HEADER_LENGTH; - newRandomAccessFile.writeInt(data.length); - newRandomAccessFile.write(data); - } - lastRecord = pos + elementSize; - writeFullHeader(newRandomAccessFile, firstRecord, lastRecord); - } finally - { - if (newRandomAccessFile != null) - { - newRandomAccessFile.close(); - } - } - } - - public void addToTail(E elem) - { - synchronized (queueFile) - { - try - { - randomAccessFile.seek(lastRecord); - final byte[] data = toByteArray(elem); - final int elementSize = data.length + RECORD_HEADER_LENGTH; - randomAccessFile.writeInt(data.length); - randomAccessFile.write(data); - lastRecord += elementSize; - writeHeader(); - if (autoSync) - { - sync(); - } - } catch (IOException ex) - { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); - } - } - } - - public void removeFromHead(E elem) - { - synchronized (queueFile) - { - try - { - if (firstRecord > MAX_SLICK) - { - persist(); - } else - { - randomAccessFile.seek(firstRecord); - firstRecord += randomAccessFile.readInt() + RECORD_HEADER_LENGTH; - writeHeader(); - if (autoSync) - { - sync(); - } - } - } catch (IOException ex) - { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); - } - } - } - - public void check() throws IllegalStateException - { - synchronized (queueFile) - { - try - { - if (randomAccessFile.getFD().valid() == false - || false == randomAccessFile.getChannel().isOpen()) - { - throw new IllegalStateException("Cannot persist: file is closed."); - } - } catch (IOException ex) - { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); - } - } - } - - public void close() - { - synchronized (queueFile) - { - try - { - randomAccessFile.close(); - } catch (IOException ex) - { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); - } - } - } - - public void sync() - { - synchronized (queueFile) - { - try - { - randomAccessFile.getFD().sync(); - } catch (IOException ex) - { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); - } - } - } -} diff --git a/common/source/java/ch/systemsx/cisd/common/filesystem/QueueingPathRemoverService.java b/common/source/java/ch/systemsx/cisd/common/filesystem/QueueingPathRemoverService.java index e27eecb9723568ef43867ebc922902b99c9ae690..d513dcca50450470c53b05330eb021a6ea10c675 100644 --- a/common/source/java/ch/systemsx/cisd/common/filesystem/QueueingPathRemoverService.java +++ b/common/source/java/ch/systemsx/cisd/common/filesystem/QueueingPathRemoverService.java @@ -30,7 +30,7 @@ import ch.systemsx.cisd.common.collections.ExtendedBlockingQueueFactory; import ch.systemsx.cisd.common.collections.ExtendedLinkedBlockingQueue; import ch.systemsx.cisd.common.collections.IExtendedBlockingQueue; import ch.systemsx.cisd.common.collections.PersistentExtendedBlockingQueueDecorator; -import ch.systemsx.cisd.common.collections.SmartQueuePersister; +import ch.systemsx.cisd.common.collections.QueuePersister; import ch.systemsx.cisd.common.logging.ISimpleLogger; import ch.systemsx.cisd.common.logging.Log4jSimpleLogger; import ch.systemsx.cisd.common.logging.LogCategory; @@ -236,7 +236,7 @@ public class QueueingPathRemoverService */ public static final List<File> listShredderItems(File queueFile) { - return SmartQueuePersister.list(File.class, queueFile); + return QueuePersister.list(File.class, queueFile); } private QueueingPathRemoverService() diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/collections/RecordBasedQueuePersisterTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/collections/LegacyQueuePersisterTest.java similarity index 77% rename from common/sourceTest/java/ch/systemsx/cisd/common/collections/RecordBasedQueuePersisterTest.java rename to common/sourceTest/java/ch/systemsx/cisd/common/collections/LegacyQueuePersisterTest.java index 3e65d4d7be18a8a114f6acedc13dd02e197c831f..9f0747570076b317fb68c54e0f67012d24c3458e 100644 --- a/common/sourceTest/java/ch/systemsx/cisd/common/collections/RecordBasedQueuePersisterTest.java +++ b/common/sourceTest/java/ch/systemsx/cisd/common/collections/LegacyQueuePersisterTest.java @@ -25,21 +25,21 @@ import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; +import ch.systemsx.cisd.common.collections.QueuePersister.LegacyQueuePersister; import ch.systemsx.cisd.common.filesystem.FileUtilities; /** * @author Franz-Josef Elmer */ -@SuppressWarnings("deprecation") -public class RecordBasedQueuePersisterTest extends AssertJUnit +public class LegacyQueuePersisterTest extends AssertJUnit { private static final File TMP = new File("targets/unit-test-wd"); - private static final File QUEUE_FILE = new File(TMP, "RecordBasedQueuePersisterTestQueue"); + private static final File QUEUE_FILE = new File(TMP, "LegacyQueuePersisterTestQueue"); private ArrayBlockingQueue<String> queue; - private RecordBasedQueuePersister<String> persister; + private LegacyQueuePersister<String> persister; @BeforeTest public void setUp() @@ -47,7 +47,7 @@ public class RecordBasedQueuePersisterTest extends AssertJUnit assertEquals("Couldn't delete " + TMP, true, FileUtilities.deleteRecursively(TMP)); TMP.mkdirs(); queue = new ArrayBlockingQueue<String>(10); - persister = new RecordBasedQueuePersister<String>(queue, QUEUE_FILE); + persister = new LegacyQueuePersister<String>(queue, QUEUE_FILE); } @AfterTest @@ -61,15 +61,15 @@ public class RecordBasedQueuePersisterTest extends AssertJUnit { String element = "a string with more characters than " - + "RecordBasedQueuePersister.DEFAULT_INITIAL_RECORD_SIZE"; - assertEquals(true, element.length() > RecordBasedQueuePersister.DEFAULT_INITIAL_RECORD_SIZE); + + "LegacyQueuePersister.DEFAULT_INITIAL_RECORD_SIZE"; + assertEquals(true, element.length() > LegacyQueuePersister.DEFAULT_INITIAL_RECORD_SIZE); queue.add(element); persister.persist(); persister.close(); queue.clear(); - persister = new RecordBasedQueuePersister<String>(queue, QUEUE_FILE); + persister = new LegacyQueuePersister<String>(queue, QUEUE_FILE); assertEquals(1, queue.size()); assertEquals(element, queue.peek()); @@ -89,14 +89,14 @@ public class RecordBasedQueuePersisterTest extends AssertJUnit assertEquals("Couldn't delete " + TMP, true, FileUtilities.deleteRecursively(TMP)); TMP.mkdirs(); queue = new ArrayBlockingQueue<String>(10); - persister = new RecordBasedQueuePersister<String>(queue, QUEUE_FILE, 0, false); + persister = new LegacyQueuePersister<String>(queue, QUEUE_FILE, 0, false); queue.add(""); queue.add("foo"); persister.persist(); persister.close(); queue.clear(); - persister = new RecordBasedQueuePersister<String>(queue, QUEUE_FILE); + persister = new LegacyQueuePersister<String>(queue, QUEUE_FILE); assertEquals(2, queue.size()); assertEquals("", queue.poll()); @@ -115,13 +115,13 @@ public class RecordBasedQueuePersisterTest extends AssertJUnit tooShortRaf.writeInt(1); tooShortRaf.close(); - persister = new RecordBasedQueuePersister<String>(queue, QUEUE_FILE); + persister = new LegacyQueuePersister<String>(queue, QUEUE_FILE); queue.add("test"); persister.persist(); persister.close(); queue.clear(); - persister = new RecordBasedQueuePersister<String>(queue, QUEUE_FILE); + persister = new LegacyQueuePersister<String>(queue, QUEUE_FILE); assertEquals(1, queue.size()); assertEquals("test", queue.poll()); diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/collections/SmartQueuePersisterTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/collections/QueuePersisterTest.java similarity index 82% rename from common/sourceTest/java/ch/systemsx/cisd/common/collections/SmartQueuePersisterTest.java rename to common/sourceTest/java/ch/systemsx/cisd/common/collections/QueuePersisterTest.java index 69d172c00514ffb667df81884f4d3737fae0c2ce..f2c45c3b6a7e0ed906d32932f8059cafb463540c 100644 --- a/common/sourceTest/java/ch/systemsx/cisd/common/collections/SmartQueuePersisterTest.java +++ b/common/sourceTest/java/ch/systemsx/cisd/common/collections/QueuePersisterTest.java @@ -25,12 +25,13 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import ch.systemsx.cisd.common.collections.QueuePersister.LegacyQueuePersister; import ch.systemsx.cisd.common.filesystem.FileUtilities; /** * @author Pawel Glyzewski */ -public class SmartQueuePersisterTest extends AssertJUnit +public class QueuePersisterTest extends AssertJUnit { private static final File TMP = new File("targets/unit-test-wd"); @@ -40,7 +41,7 @@ public class SmartQueuePersisterTest extends AssertJUnit private ArrayBlockingQueue<String> queue; - private SmartQueuePersister<String> persister; + private QueuePersister<String> persister; @BeforeMethod public void setUp() @@ -48,7 +49,7 @@ public class SmartQueuePersisterTest extends AssertJUnit assertEquals("Couldn't delete " + TMP, true, FileUtilities.deleteRecursively(TMP)); TMP.mkdirs(); queue = new ArrayBlockingQueue<String>(10); - persister = new SmartQueuePersister<String>(queue, QUEUE_FILE); + persister = new QueuePersister<String>(queue, QUEUE_FILE); } @AfterMethod @@ -66,14 +67,14 @@ public class SmartQueuePersisterTest extends AssertJUnit { String element = "a string with more characters than " - + "RecordBasedQueuePersister.DEFAULT_INITIAL_RECORD_SIZE"; + + "LegacyQueuePersister.DEFAULT_INITIAL_RECORD_SIZE"; queue.add(element); persister.persist(); persister.close(); queue.clear(); - persister = new SmartQueuePersister<String>(queue, QUEUE_FILE); + persister = new QueuePersister<String>(queue, QUEUE_FILE); assertEquals(1, queue.size()); assertEquals(element, queue.peek()); @@ -89,14 +90,14 @@ public class SmartQueuePersisterTest extends AssertJUnit public void testPersistMoreElements() { queue = new ArrayBlockingQueue<String>(10); - persister = new SmartQueuePersister<String>(queue, QUEUE_FILE, false); + persister = new QueuePersister<String>(queue, QUEUE_FILE, false); queue.add(""); queue.add("foo"); persister.persist(); persister.close(); queue.clear(); - persister = new SmartQueuePersister<String>(queue, QUEUE_FILE); + persister = new QueuePersister<String>(queue, QUEUE_FILE); assertEquals(2, queue.size()); assertEquals("", queue.poll()); @@ -117,27 +118,26 @@ public class SmartQueuePersisterTest extends AssertJUnit tooShortRaf.writeInt(1); tooShortRaf.close(); - persister = new SmartQueuePersister<String>(queue, QUEUE_FILE); + persister = new QueuePersister<String>(queue, QUEUE_FILE); queue.add("test"); persister.persist(); persister.close(); queue.clear(); - persister = new SmartQueuePersister<String>(queue, QUEUE_FILE); + persister = new QueuePersister<String>(queue, QUEUE_FILE); assertEquals(1, queue.size()); assertEquals("test", queue.poll()); } - @SuppressWarnings("deprecation") @Test public void testMigrateRecordBasedToSmartQueue() throws Exception { assertEquals("Couldn't delete " + TMP, true, FileUtilities.deleteRecursively(TMP)); TMP.mkdirs(); queue = new ArrayBlockingQueue<String>(10); - RecordBasedQueuePersister<String> recordBasedPersister = - new RecordBasedQueuePersister<String>(queue, QUEUE_FILE, 0, false); + LegacyQueuePersister<String> recordBasedPersister = + new LegacyQueuePersister<String>(queue, QUEUE_FILE, 0, false); queue.add(""); queue.add("foobar"); @@ -148,7 +148,7 @@ public class SmartQueuePersisterTest extends AssertJUnit recordBasedPersister.close(); queue.clear(); long fileSize = QUEUE_FILE.length(); - persister = new SmartQueuePersister<String>(queue, QUEUE_FILE); + persister = new QueuePersister<String>(queue, QUEUE_FILE); assertTrue(fileSize > 3 * QUEUE_FILE.length()); assertEquals(4, queue.size()); @@ -165,7 +165,7 @@ public class SmartQueuePersisterTest extends AssertJUnit public void testAddAndRemove() throws Exception { queue = new ArrayBlockingQueue<String>(10); - persister = new SmartQueuePersister<String>(queue, QUEUE_FILE); + persister = new QueuePersister<String>(queue, QUEUE_FILE); queue.add("one"); queue.add("two"); queue.add("three"); @@ -176,7 +176,7 @@ public class SmartQueuePersisterTest extends AssertJUnit persister.removeFromHead(queue.remove()); persister.close(); queue.clear(); - persister = new SmartQueuePersister<String>(queue, QUEUE_FILE); + persister = new QueuePersister<String>(queue, QUEUE_FILE); assertEquals(2, queue.size()); } }