diff --git a/common/source/java/ch/systemsx/cisd/common/collections/RecordBasedQueuePersister.java b/common/source/java/ch/systemsx/cisd/common/collections/RecordBasedQueuePersister.java index 035118c0df1d97f39236c865a3493978f1bee0b0..b228d96deaf3a46ec7716d3488be12c40482e9bf 100644 --- a/common/source/java/ch/systemsx/cisd/common/collections/RecordBasedQueuePersister.java +++ b/common/source/java/ch/systemsx/cisd/common/collections/RecordBasedQueuePersister.java @@ -382,15 +382,18 @@ public class RecordBasedQueuePersister<E> implements IQueuePersister<E> public void check() throws IllegalStateException { - try + synchronized (queueFile) { - if (randomAccessFile.getFD().valid() == false) + try { - throw new IllegalStateException("Cannot persist: file is closed."); + if (randomAccessFile.getFD().valid() == false) + { + throw new IllegalStateException("Cannot persist: file is closed."); + } + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); } - } catch (IOException ex) - { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); } } @@ -410,12 +413,15 @@ public class RecordBasedQueuePersister<E> implements IQueuePersister<E> public void sync() { - try - { - randomAccessFile.getFD().sync(); - } catch (IOException ex) + synchronized (queueFile) { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); + try + { + randomAccessFile.getFD().sync(); + } catch (IOException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } } } diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/collections/PersistentExtendedBlockingQueueDecoratorTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/collections/PersistentExtendedBlockingQueueDecoratorTest.java index f7a5866e58aaab87aa72d01acebf5912d5285102..947dcca3e5d8e8848fc4adbfd62a963093dd54b6 100644 --- a/common/sourceTest/java/ch/systemsx/cisd/common/collections/PersistentExtendedBlockingQueueDecoratorTest.java +++ b/common/sourceTest/java/ch/systemsx/cisd/common/collections/PersistentExtendedBlockingQueueDecoratorTest.java @@ -26,12 +26,15 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Queue; +import java.util.Random; import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; import ch.systemsx.cisd.base.exceptions.IOExceptionUnchecked; +import ch.systemsx.cisd.common.concurrent.MessageChannel; +import ch.systemsx.cisd.common.filesystem.FileUtilities; /** * Test cases for the {@link PersistentExtendedBlockingQueueDecorator}. @@ -42,7 +45,7 @@ public class PersistentExtendedBlockingQueueDecoratorTest { private static final File workingDirectory = new File("targets" + File.separator - + "unit-test-wd"); + + "unit-test-wd/" + PersistentExtendedBlockingQueueDecoratorTest.class.getName()); private static final File queueFile = new File(workingDirectory, "persistentQueue.dat"); @@ -76,7 +79,8 @@ public class PersistentExtendedBlockingQueueDecoratorTest @BeforeMethod public void deleteQueueFile() { - queueFile.delete(); + FileUtilities.deleteRecursively(workingDirectory); + workingDirectory.mkdirs(); } @Test @@ -257,6 +261,56 @@ public class PersistentExtendedBlockingQueueDecoratorTest assertEquals(items, asList(createQueue())); } + + @Test + public void testThreadSafetiness() throws Exception + { + final PersistentExtendedBlockingQueueDecorator<Integer> queue = + ExtendedBlockingQueueFactory.<Integer> createPersistRecordBased(queueFile); + final MessageChannel messageChannel = new MessageChannel(20000); + new Thread(new Runnable() + { + public void run() + { + try + { + while (true) + { + Integer number = queue.peekWait(); + if (number < 0) + { + System.out.println("BREAK"); + break; + } + beBusy(number); + queue.take(); + } + messageChannel.send("finished"); + } catch (InterruptedException ex) + { + ex.printStackTrace(); + System.out.println("Queue size: " + queue.size()); + } + } + }).start(); + Random random = new Random(37); + for (int i = 0; i < 10000; i++) + { + try + { + queue.add(random.nextInt(100)); + } catch (Exception ex) + { + throw new RuntimeException("Crash for queue size " + queue.size(), ex); + } + + beBusy(0); + } + queue.add(-1); + messageChannel.assertNextMessage("finished"); + + queueFile.delete(); + } private String createItemOfLength(int itemLength) { @@ -268,4 +322,14 @@ public class PersistentExtendedBlockingQueueDecoratorTest return item; } + private double beBusy(Integer number) + { + double sum = 0; + for (int i = 0; i < (10 + number) * 100; i++) + { + sum += Math.sin(i); + } + return sum; + } + }