diff --git a/common/source/java/ch/systemsx/cisd/common/collections/ExtendedBlockingQueueFactory.java b/common/source/java/ch/systemsx/cisd/common/collections/ExtendedBlockingQueueFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..20f257d091d110ce87310f63e4a76bb57a712d97 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/collections/ExtendedBlockingQueueFactory.java @@ -0,0 +1,76 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.collections; + +import java.io.File; + +/** + * A factory class for {@link IExtendedBlockingQueue}s. + * + * @author Bernd Rinn + */ +public class ExtendedBlockingQueueFactory +{ + + /** + * Creates a {@link PersistentExtendedBlockingQueueDecorator} with a + * {@link ExtendedLinkedBlockingQueue} that persists record-based. + * + * @param queueFile The file to persist the queue in. + * @param initialRecordSize The initial size of the record. If an element of the queue is larger + * than this, the whole queue file has to be re-written with a larger record size. + * @param autoSync If <code>true</code>, the underlying file will be synchronized after each + * write operation. This is safer, but costs a lot of performance. + */ + public static <E> PersistentExtendedBlockingQueueDecorator<E> createPersistRecordBased( + File queueFile, int initialRecordSize, boolean autoSync) + { + final IExtendedBlockingQueue<E> queue = new ExtendedLinkedBlockingQueue<E>(); + final IQueuePersister<E> queuePersister = + new RecordBasedQueuePersister<E>(queue, queueFile, initialRecordSize, autoSync); + return new PersistentExtendedBlockingQueueDecorator<E>(queue, queuePersister); + } + + /** + * Creates a {@link PersistentExtendedBlockingQueueDecorator} with a + * {@link ExtendedLinkedBlockingQueue} that persists record-based. (Switches off auto-sync.) + * + * @param queueFile The file to persist the queue in. + * @param initialRecordSize The initial size of the record. If an element of the queue is larger + * than this, the whole queue file has to be re-written with a larger record size. + */ + public static <E> PersistentExtendedBlockingQueueDecorator<E> createPersistRecordBased( + File queueFile, int initialRecordSize) + { + return createPersistRecordBased(queueFile, initialRecordSize, false); + } + + /** + * Creates a {@link PersistentExtendedBlockingQueueDecorator} with a + * {@link ExtendedLinkedBlockingQueue} that persists record-based. (Uses default record size of + * 32 and switches off auto-sync.) + * + * @param queueFile The file to persist the queue in. + */ + public static <E> PersistentExtendedBlockingQueueDecorator<E> createPersistRecordBased( + File queueFile) + { + return createPersistRecordBased(queueFile, + RecordBasedQueuePersister.DEFAULT_INITIAL_RECORD_SIZE, false); + } + +} diff --git a/common/source/java/ch/systemsx/cisd/common/collections/ExtendedLinkedBlockingQueue.java b/common/source/java/ch/systemsx/cisd/common/collections/ExtendedLinkedBlockingQueue.java new file mode 100644 index 0000000000000000000000000000000000000000..5d26d68aa0e473cce062b98b6a8b0ffe6a4d48c1 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/collections/ExtendedLinkedBlockingQueue.java @@ -0,0 +1,80 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.collections; + +import java.util.Collection; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * A {@link IExtendedBlockingQueue} implementation based on {@link LinkedBlockingQueue}. + * + * @author Bernd Rinn + */ +public class ExtendedLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> implements + IExtendedBlockingQueue<E> +{ + + public ExtendedLinkedBlockingQueue() + { + super(); + } + + public ExtendedLinkedBlockingQueue(Collection<? extends E> c) + { + super(c); + } + + public ExtendedLinkedBlockingQueue(int capacity) + { + super(capacity); + } + + private static final long serialVersionUID = -6903933977591709194L; + + private static final long PEEK_INTERVAL_MILLIS = 100L; + + public E peekWait() throws InterruptedException + { + while (true) + { + final E item = peek(); + if (item != null) + { + return item; + } + Thread.sleep(PEEK_INTERVAL_MILLIS); + } + } + + public E peekWait(long timeout, TimeUnit unit) throws InterruptedException + { + final long timeoutMillis = TimeUnit.MILLISECONDS.convert(timeout, unit); + final long maxCount = Math.max(1, timeoutMillis / PEEK_INTERVAL_MILLIS); + for (int i = 0; i < maxCount; ++i) + { + final E item = peek(); + if (item != null) + { + return item; + } + Thread.sleep(PEEK_INTERVAL_MILLIS); + } + return null; + } + +} diff --git a/common/source/java/ch/systemsx/cisd/common/collections/IExtendedBlockingQueue.java b/common/source/java/ch/systemsx/cisd/common/collections/IExtendedBlockingQueue.java new file mode 100644 index 0000000000000000000000000000000000000000..73bc7fa32e1c580dbb651cb9a7d0b6776cd98291 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/collections/IExtendedBlockingQueue.java @@ -0,0 +1,51 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.collections; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * A {@link BlockingQueue} with methods that are able to retrieve, but not remove, the head of the + * queue, waiting if no elements are present. + * + * @author Bernd Rinn + */ +public interface IExtendedBlockingQueue<E> extends BlockingQueue<E> +{ + + /** + * Retrieves, but does not remove, the head of this queue, waiting if no elements are present on + * this queue. + * + * @return the head of this queue + * @throws InterruptedException if interrupted while waiting. + */ + public E peekWait() throws InterruptedException; + + /** + * Retrieves, but does not remove, the head of this queue, waiting if necessary up to the + * specified wait time if no elements are present on this queue. + * + * @param timeout how long to wait before giving up, in units of <tt>unit</tt> + * @param unit a <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter + * @return the head of this queue + * @throws InterruptedException if interrupted while waiting. + */ + public E peekWait(long timeout, TimeUnit unit) throws InterruptedException; + +} diff --git a/common/source/java/ch/systemsx/cisd/common/collections/IQueuePersister.java b/common/source/java/ch/systemsx/cisd/common/collections/IQueuePersister.java new file mode 100644 index 0000000000000000000000000000000000000000..570eec838ecffbaa4d94d13f35c1599eb5cb1640 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/collections/IQueuePersister.java @@ -0,0 +1,66 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.collections; + +/** + * A role that can persist a {@link java.util.Queue}. It is assumed that the persister gets a + * reference to the queue in the constructor. + * + * @author Bernd Rinn + */ +public interface IQueuePersister<E> +{ + + /** + * Checks if the persister is in good working order. + * + * @throws IllegalStateException If the persister has been closed. + */ + public void check() throws IllegalStateException; + + /** + * Close the persister. The persister must not be used after this method has been called. + */ + public void close(); + + /** + * Synchronize the persistence store, ensuring that everything is written. + */ + public void sync(); + + /** + * Persist the current form of the queue. + * <p> + * This method needs to be thread-safe if the queue is supposed to be thread-safe. + */ + public void persist(); + + /** + * Add <var>elem</var> to the tail of the queue. + * <p> + * This method needs to be thread-safe if the queue is supposed to be thread-safe. + */ + public void addToTail(E elem); + + /** + * Remove <var>elem</var> from the head of the queue. + * <p> + * This method needs to be thread-safe if the queue is supposed to be thread-safe. + */ + public void removeFromHead(E elem); + +} diff --git a/common/source/java/ch/systemsx/cisd/common/collections/PersistentExtendedBlockingQueueDecorator.java b/common/source/java/ch/systemsx/cisd/common/collections/PersistentExtendedBlockingQueueDecorator.java new file mode 100644 index 0000000000000000000000000000000000000000..aa6df74f8b4cfa31cc6d9d9af609c4c80afc9245 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/collections/PersistentExtendedBlockingQueueDecorator.java @@ -0,0 +1,317 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.collections; + +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.TimeUnit; + +import ch.systemsx.cisd.common.filesystem.ICloseable; +import ch.systemsx.cisd.common.filesystem.ISynchronizable; + +/** + * A decorator of a {@link IExtendedBlockingQueue} that keeps the current state of the queue current + * with a file specified in the constructor. + * + * @author Bernd Rinn + */ +public class PersistentExtendedBlockingQueueDecorator<E> implements IExtendedBlockingQueue<E>, + ICloseable, ISynchronizable +{ + + private final IExtendedBlockingQueue<E> delegate; + + private final IQueuePersister<E> persister; + + public PersistentExtendedBlockingQueueDecorator(IExtendedBlockingQueue<E> delegate, + IQueuePersister<E> persister) + { + this.delegate = delegate; + this.persister = persister; + } + + // + // Closeable + // + + public void close() + { + persister.close(); + } + + // + // ISynchronizable + // + + public void synchronize() + { + persister.sync(); + } + + // + // IExtendedBlockingQueue + // + + public boolean contains(Object o) + { + return delegate.contains(o); + } + + public boolean containsAll(Collection<?> c) + { + return delegate.containsAll(c); + } + + public boolean isEmpty() + { + return delegate.isEmpty(); + } + + public int size() + { + return delegate.size(); + } + + public int remainingCapacity() + { + return delegate.remainingCapacity(); + } + + public Object[] toArray() + { + return delegate.toArray(); + } + + public <T> T[] toArray(T[] a) + { + return delegate.toArray(a); + } + + public E element() + { + return delegate.element(); + } + + public E peek() + { + return delegate.peek(); + } + + public E peekWait() throws InterruptedException + { + return delegate.peekWait(); + } + + public E peekWait(long timeout, TimeUnit unit) throws InterruptedException + { + return delegate.peekWait(timeout, unit); + } + + public E poll() + { + persister.check(); + final E elementOrNull = delegate.poll(); + if (elementOrNull != null) + { + persister.removeFromHead(elementOrNull); + } + return elementOrNull; + } + + public E remove() + { + persister.check(); + final E element = delegate.remove(); + persister.removeFromHead(element); + return element; + } + + public E poll(long timeout, TimeUnit unit) throws InterruptedException + { + persister.check(); + final E elementOrNull = delegate.poll(timeout, unit); + if (elementOrNull != null) + { + persister.removeFromHead(elementOrNull); + } + return elementOrNull; + } + + public E take() throws InterruptedException + { + persister.check(); + final E element = delegate.take(); + persister.removeFromHead(element); + return element; + } + + public boolean add(E o) + { + persister.check(); + final boolean ok = delegate.add(o); + if (ok) + { + persister.addToTail(o); + } + return ok; + } + + public boolean offer(E o) + { + persister.check(); + final boolean ok = delegate.offer(o); + if (ok) + { + persister.addToTail(o); + } + return ok; + } + + public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException + { + persister.check(); + final boolean ok = delegate.offer(o, timeout, unit); + if (ok) + { + persister.addToTail(o); + } + return ok; + } + + public void put(E o) throws InterruptedException + { + persister.check(); + delegate.put(o); + persister.addToTail(o); + } + + public boolean addAll(Collection<? extends E> c) + { + persister.check(); + final boolean ok = delegate.addAll(c); + if (ok) + { + persister.persist(); + } + return ok; + } + + public boolean remove(Object o) + { + persister.check(); + final boolean ok = delegate.remove(o); + if (ok) + { + persister.persist(); + } + return ok; + } + + public boolean removeAll(Collection<?> c) + { + persister.check(); + final boolean ok = delegate.removeAll(c); + if (ok) + { + persister.persist(); + } + return ok; + } + + public boolean retainAll(Collection<?> c) + { + persister.check(); + final boolean ok = delegate.retainAll(c); + if (ok) + { + persister.persist(); + } + return ok; + } + + public int drainTo(Collection<? super E> c, int maxElements) + { + persister.check(); + final int elementsDrained = delegate.drainTo(c, maxElements); + if (elementsDrained > 0) + { + persister.persist(); + } + return elementsDrained; + } + + public int drainTo(Collection<? super E> c) + { + persister.check(); + final int elementsDrained = delegate.drainTo(c); + if (elementsDrained > 0) + { + persister.persist(); + } + return elementsDrained; + } + + public void clear() + { + persister.check(); + delegate.clear(); + persister.persist(); + } + + public Iterator<E> iterator() + { + return new Iterator<E>() + { + private final Iterator<E> delegateIterator = delegate.iterator(); + + public boolean hasNext() + { + return delegateIterator.hasNext(); + } + + public E next() + { + return delegateIterator.next(); + } + + public void remove() + { + persister.check(); + delegateIterator.remove(); + persister.persist(); + } + + }; + } + + // + // Object + // + + @Override + public boolean equals(Object o) + { + return delegate.equals(o); + } + + @Override + public int hashCode() + { + return delegate.hashCode(); + } + +} diff --git a/common/source/java/ch/systemsx/cisd/common/collections/RecordBasedQueuePersister.java b/common/source/java/ch/systemsx/cisd/common/collections/RecordBasedQueuePersister.java new file mode 100644 index 0000000000000000000000000000000000000000..826a143e22ff8ce271b12bbb1f54463ea7cf5d41 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/collections/RecordBasedQueuePersister.java @@ -0,0 +1,395 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.collections; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.RandomAccessFile; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Queue; + +import ch.systemsx.cisd.common.exceptions.CheckedExceptionTunnel; + +/** + * An {@link IQueuePersister} that is based on records in a file. This class uses Java serialization + * on the queue elements and thus requires queue elements to be serializable. + * + * @author Bernd Rinn + */ +public class RecordBasedQueuePersister<E> implements IQueuePersister<E> +{ + final static int DEFAULT_INITIAL_RECORD_SIZE = 32; + + private final static int HEADER_LENGTH = 3 * 4; // 3 * sizeof(int) + + private final static int RECORD_HEADER_LENGTH = 4; // sizeof(int) + + private final static int MAX_SLICK = 1000; + + private final Queue<E> queue; + + private final File queueFile; + + private final File newQueueFile; + + private final boolean autoSync; + + private int recordSize; + + private RandomAccessFile randomAccessFile; + + private int firstRecord; + + private int lastRecord; + + /** + * Returns a list of the content of the <var>queueFile</var>. + */ + public static <E> List<E> list(Class<E> clazz, File queueFile) + { + final File newQueueFile = new File(queueFile.getParentFile(), queueFile.getName() + ".new"); + final List<E> result = new ArrayList<E>(); + RandomAccessFile randomAccessFile = null; + try + { + if (queueFile.exists() == false && newQueueFile.exists()) + { + randomAccessFile = new RandomAccessFile(newQueueFile, "r"); + } else + { + randomAccessFile = new RandomAccessFile(queueFile, "r"); + } + final int firstRecord = randomAccessFile.readInt(); + final int lastRecord = randomAccessFile.readInt(); + final int recordSize = randomAccessFile.readInt(); + load(randomAccessFile, result, firstRecord, lastRecord, recordSize); + } catch (IOException ex) + { + return Collections.emptyList(); + } finally + { + if (randomAccessFile != null) + { + try + { + randomAccessFile.close(); + } catch (IOException ex) + { + // Silence + } + } + } + return Collections.unmodifiableList(result); + } + + /** + * Create a {@link RecordBasedQueuePersister} for <var>queue</var>. (Uses a default initial + * record size of 32 and switches off auto-sync.) + * + * @param queue The queue to persist. + * @param queueFile The file to persist the queue in. + */ + public RecordBasedQueuePersister(Queue<E> queue, File queueFile) + { + this(queue, queueFile, DEFAULT_INITIAL_RECORD_SIZE, false); + } + + /** + * Create a {@link RecordBasedQueuePersister} for <var>queue</var>. + * + * @param queue The queue to persist. + * @param queueFile The file to persist the queue in. + * @param initialRecordSize The initial size of the record. If an element of the queue is larger + * than this, the whole queue file has to be re-written with a larger record size. + * @param autoSync If <code>true</code>, the underlying file will be synchronized after each + * write operation. This is safer, but costs a lot of performance. + */ + public RecordBasedQueuePersister(Queue<E> queue, File queueFile, int initialRecordSize, + boolean autoSync) + { + this.queue = queue; + this.queueFile = queueFile; + this.newQueueFile = new File(queueFile.getParentFile(), queueFile.getName() + ".new"); + this.autoSync = autoSync; + if (queueFile.exists() == false && newQueueFile.exists()) + { + if (newQueueFile.renameTo(queueFile) == false) + { + throw CheckedExceptionTunnel.wrapIfNecessary(new IOException("Cannot rename file '" + + newQueueFile.getPath() + "' to '" + queueFile.getPath() + "'")); + } + } + try + { + this.randomAccessFile = new RandomAccessFile(queueFile, "rw"); + if (randomAccessFile.length() == 0) + { + this.recordSize = initialRecordSize; + writeFullHeader(randomAccessFile, firstRecord, lastRecord, initialRecordSize); + } else + { + this.firstRecord = randomAccessFile.readInt(); + this.lastRecord = randomAccessFile.readInt(); + this.recordSize = randomAccessFile.readInt(); + } + load(randomAccessFile, queue, firstRecord, lastRecord, recordSize); + // Clean up + if (firstRecord > 0) + { + persist(); + } + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + + private byte[] toByteArray(E o) + { + final ByteArrayOutputStream ba = new ByteArrayOutputStream(); + try + { + final ObjectOutputStream oo = new ObjectOutputStream(ba); + oo.writeObject(o); + oo.close(); + return ba.toByteArray(); + } catch (Exception ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + + private static Object objFromByteArray(byte[] data) + { + final ByteArrayInputStream bi = new ByteArrayInputStream(data); + try + { + final ObjectInputStream oi = new ObjectInputStream(bi); + return oi.readObject(); + } catch (Exception ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + + private void writeHeader() throws IOException + { + randomAccessFile.seek(0L); + randomAccessFile.writeInt(firstRecord); + randomAccessFile.writeInt(lastRecord); + } + + private static void writeFullHeader(RandomAccessFile raf, int firstRecord, int lastRecord, + int initialRecordSize) throws IOException + { + raf.seek(0L); + raf.writeInt(firstRecord); + raf.writeInt(lastRecord); + raf.writeInt(initialRecordSize); + } + + private static <E> void load(RandomAccessFile randomAccessFile, Collection<E> collection, + int firstRecord, int lastRecord, int recordSize) + { + int pos = HEADER_LENGTH + recordSize * firstRecord; + for (int i = firstRecord; i < lastRecord; ++i) + { + try + { + randomAccessFile.seek(pos); + pos += recordSize; + final int len = randomAccessFile.readInt(); + final byte[] data = new byte[len]; + randomAccessFile.read(data, 0, len); + deserializeAndAdd(collection, data); + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + } + + @SuppressWarnings("unchecked") + private static <E> void deserializeAndAdd(Collection<E> collection, final byte[] data) + { + collection.add((E) objFromByteArray(data)); + } + + private static int getNewRecordSize(int oldRecordSize, int elementSize) + { + return oldRecordSize * (elementSize / oldRecordSize + 1); + } + + // + // IQueuePersister + // + + public void persist() + { + primPersist(recordSize); + } + + private void primPersist(int newRecordSize) + { + synchronized (queueFile) + { + try + { + randomAccessFile.close(); + final RandomAccessFile newRandomAccessFile = + new RandomAccessFile(newQueueFile, "rw"); + firstRecord = 0; + lastRecord = queue.size(); + writeFullHeader(newRandomAccessFile, firstRecord, lastRecord, newRecordSize); + int pos = HEADER_LENGTH; + for (E elem : queue) + { + newRandomAccessFile.seek(pos); + pos += newRecordSize; + final byte[] data = toByteArray(elem); + final int elementSize = data.length + RECORD_HEADER_LENGTH; + if (elementSize > newRecordSize) + { + primPersist(getNewRecordSize(newRecordSize, elementSize)); + return; + } + newRandomAccessFile.writeInt(data.length); + newRandomAccessFile.write(data); + } + randomAccessFile = newRandomAccessFile; + recordSize = newRecordSize; + if (queueFile.delete() == false) + { + throw new IOException("Cannot delete file '" + queueFile.getPath() + "'"); + } + if (newQueueFile.renameTo(queueFile) == false) + { + throw new IOException("Cannot rename file '" + newQueueFile.getPath() + + "' to '" + queueFile.getPath() + "'"); + } + if (autoSync) + { + sync(); + } + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + } + + public void addToTail(E elem) + { + synchronized (queueFile) + { + try + { + randomAccessFile.seek(HEADER_LENGTH + lastRecord * recordSize); + final byte[] data = toByteArray(elem); + final int elementSize = data.length + RECORD_HEADER_LENGTH; + if (elementSize > recordSize) + { + primPersist(getNewRecordSize(recordSize, elementSize)); + return; + } + randomAccessFile.writeInt(data.length); + randomAccessFile.write(data); + ++lastRecord; + writeHeader(); + if (autoSync) + { + sync(); + } + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + } + + public void removeFromHead(E elem) + { + synchronized (queueFile) + { + try + { + if (firstRecord > MAX_SLICK) + { + persist(); + } else + { + ++firstRecord; + writeHeader(); + if (autoSync) + { + sync(); + } + } + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + } + + public void check() throws IllegalStateException + { + try + { + if (randomAccessFile.getFD().valid() == false) + { + throw new IllegalStateException("Cannot persist: file is closed."); + } + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + + public void close() + { + synchronized (queueFile) + { + try + { + randomAccessFile.close(); + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + } + + public void sync() + { + try + { + randomAccessFile.getFD().sync(); + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + +} diff --git a/common/source/java/ch/systemsx/cisd/common/collections/ToStringIdentityConverter.java b/common/source/java/ch/systemsx/cisd/common/collections/ToStringIdentityConverter.java new file mode 100644 index 0000000000000000000000000000000000000000..32844730198d231e2a3d935d39ad01beaa9aa37c --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/collections/ToStringIdentityConverter.java @@ -0,0 +1,51 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.collections; + +/** + * An identity {@link IToStringConverter} for {@link String}s. + * + * @author Bernd Rinn + */ +public class ToStringIdentityConverter implements IToStringConverter<String> +{ + + private final static ToStringIdentityConverter instance = new ToStringIdentityConverter(); + + private ToStringIdentityConverter() + { + // This is a singleton. + } + + /** + * @return The instance of the {@link ToStringIdentityConverter}. + */ + public final static ToStringIdentityConverter getInstance() + { + return instance; + } + + // + // IToStringConverter + // + + public String toString(String value) + { + return value; + } + +} diff --git a/common/source/java/ch/systemsx/cisd/common/filesystem/ICloseable.java b/common/source/java/ch/systemsx/cisd/common/filesystem/ICloseable.java new file mode 100644 index 0000000000000000000000000000000000000000..c301a0f31d2b6eace25ede75f105d368d598cdc2 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/filesystem/ICloseable.java @@ -0,0 +1,35 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.filesystem; + +import java.io.IOException; + +/** + * A roles that allows to close resource. Like {@link java.io.Closeable} but doesn't throw an + * {@link IOException} but instead an unchecked exception. + * + * @author Bernd Rinn + */ +public interface ICloseable +{ + + /** + * Closes the resource. + */ + public void close(); + +} diff --git a/common/source/java/ch/systemsx/cisd/common/filesystem/ISynchronizable.java b/common/source/java/ch/systemsx/cisd/common/filesystem/ISynchronizable.java new file mode 100644 index 0000000000000000000000000000000000000000..227276b5b48f3807ac61f6224a36a34a00adb5f7 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/filesystem/ISynchronizable.java @@ -0,0 +1,32 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.filesystem; + +/** + * A roles that allows to synchronize a resource with an underlying resource. + * + * @author Bernd Rinn + */ +public interface ISynchronizable +{ + + /** + * Performs a synchronization of the resource. + */ + public void synchronize(); + +} diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/collections/PersistentExtendedBlockingQueueDecoratorTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/collections/PersistentExtendedBlockingQueueDecoratorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..f1750e1a70bc8714393247d3d6ee23a3480e174d --- /dev/null +++ b/common/sourceTest/java/ch/systemsx/cisd/common/collections/PersistentExtendedBlockingQueueDecoratorTest.java @@ -0,0 +1,233 @@ +/* + * Copyright 2008 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.collections; + +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertFalse; +import static org.testng.AssertJUnit.assertTrue; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; + +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +/** + * Test cases for the {@link PersistentExtendedBlockingQueueDecorator}. + * + * @author Bernd Rinn + */ +public class PersistentExtendedBlockingQueueDecoratorTest +{ + + private static final File workingDirectory = + new File("targets" + File.separator + "unit-test-wd"); + + private static final File queueFile = new File(workingDirectory, "persistentQueue.dat"); + + private PersistentExtendedBlockingQueueDecorator<String> createQueue() + { + return ExtendedBlockingQueueFactory.createPersistRecordBased(queueFile, 16, false); + } + + private List<String> asList(Queue<String> queue) + { + return Arrays.asList(queue.toArray(new String[queue.size()])); + } + + private void createQueueFile(List<String> entries) + { + queueFile.delete(); + assertFalse(queueFile.exists()); + final PersistentExtendedBlockingQueueDecorator<String> q = createQueue(); + q.addAll(entries); + q.close(); + assertTrue(queueFile.exists()); + } + + @BeforeTest + public void setUp() + { + workingDirectory.mkdirs(); + queueFile.deleteOnExit(); + } + + @BeforeMethod + public void deleteQueueFile() + { + queueFile.delete(); + } + + @Test + public void testCreateEmpty() + { + final PersistentExtendedBlockingQueueDecorator<String> persistentQueue = createQueue(); + assertTrue(persistentQueue.isEmpty()); + } + + @Test + public void testCreateWithEntries() + { + final List<String> itemsWritten = Arrays.asList("a", "b", "c'"); + createQueueFile(itemsWritten); + final PersistentExtendedBlockingQueueDecorator<String> persistentQueue = createQueue(); + assertEquals(itemsWritten.size(), persistentQueue.size()); + assertEquals(itemsWritten, asList(persistentQueue)); + } + + @Test + public void testAdd() + { + final List<String> items = Arrays.asList("one", "two"); + PersistentExtendedBlockingQueueDecorator<String> persistentQueue = createQueue(); + for (String item : items) + { + persistentQueue.add(item); + } + assertEquals(items, asList(createQueue())); + } + + @Test + public void testAddLongItem() + { + final List<String> items = Arrays.asList("one", "two", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + PersistentExtendedBlockingQueueDecorator<String> persistentQueue = createQueue(); + for (String item : items) + { + persistentQueue.add(item); + } + assertEquals(items, asList(createQueue())); + } + + @Test + public void testAddAllLongItems() + { + final List<String> items = Arrays.asList("one", "two"); + final List<String> longItems = + Arrays.asList("aaaaaaaaaaaaaaaaaaaa", "bbbbbbbbbbbbbbbbbbbbbbbb"); + PersistentExtendedBlockingQueueDecorator<String> persistentQueue = createQueue(); + final List<String> allItems = new ArrayList<String>(); + allItems.addAll(items); + allItems.addAll(longItems); + persistentQueue.addAll(items); + persistentQueue.addAll(longItems); + + assertEquals(allItems, asList(createQueue())); + } + + @Test + public void testPut() throws InterruptedException + { + final PersistentExtendedBlockingQueueDecorator<String> persistentQueue = createQueue(); + persistentQueue.put("one"); + persistentQueue.put("two"); + assertEquals(asList(persistentQueue), asList(createQueue())); + } + + @Test + public void testRemove() + { + final List<String> itemsWritten = Arrays.asList("1", "2", "3", "4"); + createQueueFile(itemsWritten); + final PersistentExtendedBlockingQueueDecorator<String> persistentQueue = createQueue(); + persistentQueue.remove("3"); + assertEquals(asList(persistentQueue), asList(createQueue())); + } + + @Test + public void testRemoveAll() + { + createQueueFile(Arrays.asList("1", "2", "3", "4")); + final PersistentExtendedBlockingQueueDecorator<String> persistentQueue = createQueue(); + persistentQueue.removeAll(Arrays.asList("4", "1", "17")); + assertEquals(asList(persistentQueue), asList(createQueue())); + } + + @Test + public void testRetainAll() + { + createQueueFile(Arrays.asList("1", "2", "3", "4")); + final PersistentExtendedBlockingQueueDecorator<String> persistentQueue = createQueue(); + persistentQueue.retainAll(Arrays.asList("4", "1", "17")); + assertEquals(asList(persistentQueue), asList(createQueue())); + } + + @Test + public void testRemoveUntilEmpty() + { + createQueueFile(Arrays.asList("1", "2", "3", "4")); + PersistentExtendedBlockingQueueDecorator<String> persistentQueue = createQueue(); + while (persistentQueue.isEmpty() == false) + { + persistentQueue.remove(); + final List<String> snapshot = asList(persistentQueue); + persistentQueue = createQueue(); + assertEquals(snapshot, asList(persistentQueue)); + } + } + + @Test + public void testTakeUntilEmpty() throws InterruptedException + { + createQueueFile(Arrays.asList("1", "2", "3", "4")); + while (true) + { + PersistentExtendedBlockingQueueDecorator<String> persistentQueue = createQueue(); + if (persistentQueue.isEmpty()) + { + break; + } + persistentQueue.take(); + final List<String> snapshot = asList(persistentQueue); + persistentQueue = createQueue(); + assertEquals(snapshot, asList(persistentQueue)); + } + } + + @Test + public void testClear() + { + final List<String> itemsWritten = Arrays.asList("1", "2", "3", "4"); + createQueueFile(itemsWritten); + final PersistentExtendedBlockingQueueDecorator<String> persistentQueue = createQueue(); + persistentQueue.clear(); + assertEquals(asList(persistentQueue), asList(createQueue())); + } + + @Test + public void testIterator() + { + final List<String> itemsWritten = Arrays.asList("1", "2", "3", "4"); + createQueueFile(itemsWritten); + PersistentExtendedBlockingQueueDecorator<String> persistentQueue = createQueue(); + final Iterator<String> iterItemsWritten = itemsWritten.iterator(); + final Iterator<String> iter = persistentQueue.iterator(); + while (iter.hasNext()) + { + assertEquals(iterItemsWritten.next(), iter.next()); + iter.remove(); + } + assertTrue(persistentQueue.isEmpty()); + assertTrue(createQueue().isEmpty()); + } + +}