From b4eba1fa7cd666bafa8e346fc17df540c2b0d520 Mon Sep 17 00:00:00 2001
From: pkupczyk <pkupczyk>
Date: Wed, 26 Sep 2012 13:05:09 +0000
Subject: [PATCH] BIS-185 / Make long-running method calls in IDataStoreService
 use service conversations BIS-196 / Make service conversations timeout
 configurable - junit tests and bugfixes

SVN: 26822
---
 .../systemtests/ServiceConversationTest.java  | 136 ++++++++++++------
 .../BaseServiceConversationClientManager.java |  17 ++-
 .../BaseServiceConversationServerManager.java |  14 +-
 3 files changed, 117 insertions(+), 50 deletions(-)

diff --git a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/datastoreserver/systemtests/ServiceConversationTest.java b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/datastoreserver/systemtests/ServiceConversationTest.java
index dad63f2ce86..dcda1f090c5 100644
--- a/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/datastoreserver/systemtests/ServiceConversationTest.java
+++ b/datastore_server/sourceTest/java/ch/systemsx/cisd/openbis/datastoreserver/systemtests/ServiceConversationTest.java
@@ -47,6 +47,7 @@ import org.testng.annotations.Test;
 
 import ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked;
 import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities;
+import ch.systemsx.cisd.common.concurrent.MessageChannel;
 import ch.systemsx.cisd.common.conversation.annotation.Conversational;
 import ch.systemsx.cisd.common.conversation.annotation.Progress;
 import ch.systemsx.cisd.common.conversation.client.ServiceConversationClientDetails;
@@ -399,59 +400,107 @@ public class ServiceConversationTest
     @Test
     public void testMultipleClientsWithSameService()
     {
-        context.checking(new Expectations()
-            {
-                {
-                    one(serviceOnServerSideWrapper2.getService()).echo(1);
-                    will(returnValue(1));
-
-                    one(serviceOnServerSideWrapper2.getService()).echo(2);
-                    will(returnValue(2));
-
-                    one(serviceOnServerSideWrapper2.getService()).echo(3);
-                    will(returnValue(3));
-
-                    one(serviceOnServerSideWrapper2.getService()).echo(4);
-                    will(returnValue(4));
-                }
-            });
-        Assert.assertEquals(getServiceOnClientSide1(TestService2.class).echo(1), 1);
-        Assert.assertEquals(getServiceOnClientSide2(TestService2.class).echo(2), 2);
-        Assert.assertEquals(getServiceOnClientSide2(TestService2.class).echo(3), 3);
-        Assert.assertEquals(getServiceOnClientSide1(TestService2.class).echo(4), 4);
-        assertNoMoreConversations();
+        testMultipleClients(TestService2.class, serviceOnServerSideWrapper2.getService(),
+                TestService2.class, serviceOnServerSideWrapper2.getService());
     }
 
     @Test
     public void testMultipleClientsWithDifferentService()
     {
+        testMultipleClients(TestService1.class, serviceOnServerSideWrapper1.getService(),
+                TestService2.class, serviceOnServerSideWrapper2.getService());
+    }
+
+    private void testMultipleClients(final Class<? extends TestService> serviceAInterface,
+            final TestService serviceA, final Class<? extends TestService> serviceBInterface,
+            final TestService serviceB)
+    {
+        final int NUMBER_OF_CALLS = 10;
+        final MessageChannel channel = new MessageChannel();
+
         context.checking(new Expectations()
             {
                 {
-                    one(serviceOnServerSideWrapper1.getService()).echo(1);
-                    will(returnValue(1));
+                    for (int i = 0; i < NUMBER_OF_CALLS; i++)
+                    {
+                        one(serviceA).echo(i);
+                        will(returnValue(i));
+                    }
+                    for (int i = 0; i < NUMBER_OF_CALLS; i++)
+                    {
+                        one(serviceB).echo(NUMBER_OF_CALLS + i);
+                        will(returnValue(NUMBER_OF_CALLS + i));
+                    }
+                }
+            });
 
-                    one(serviceOnServerSideWrapper2.getService()).echo(2);
-                    will(returnValue(2));
+        Thread client1Thread = new Thread(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    for (int i = 0; i < NUMBER_OF_CALLS; i++)
+                    {
+                        Assert.assertEquals(getServiceOnClientSide1(serviceAInterface).echo(i), i);
+                        ConcurrencyUtilities.sleep(TIMEOUT / 10);
+                    }
+                    channel.send("finished");
                 }
             });
-        Assert.assertEquals(getServiceOnClientSide1(TestService1.class).echo(1), 1);
-        Assert.assertEquals(getServiceOnClientSide2(TestService2.class).echo(2), 2);
+
+        Thread client2Thread = new Thread(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    for (int i = 0; i < NUMBER_OF_CALLS; i++)
+                    {
+                        Assert.assertEquals(
+                                getServiceOnClientSide2(serviceBInterface)
+                                        .echo(NUMBER_OF_CALLS + i), NUMBER_OF_CALLS + i);
+                        ConcurrencyUtilities.sleep(TIMEOUT / 10);
+                    }
+                    channel.send("finished");
+                }
+            });
+
+        client1Thread.start();
+        client2Thread.start();
+
+        channel.assertNextMessage("finished");
+        channel.assertNextMessage("finished");
+
         assertNoMoreConversations();
     }
 
     private void assertNoMoreConversations()
     {
-        assertNoMoreConversations(0);
+        assertNoMoreConversations(TIMEOUT);
     }
 
-    private void assertNoMoreConversations(int delayBeforeChecking)
+    private void assertNoMoreConversations(int additionalWaitingTime)
     {
-        // wait for the server thread to run and clean up
-        ConcurrencyUtilities.sleep(delayBeforeChecking + 100);
-        Assert.assertEquals(clientManager1.getConversationCount(), 0);
-        Assert.assertEquals(clientManager2.getConversationCount(), 0);
-        Assert.assertEquals(serverManager.getConversationCount(), 0);
+        long maxFinishTime = System.currentTimeMillis() + additionalWaitingTime + 100;
+
+        while (System.currentTimeMillis() < maxFinishTime)
+        {
+            boolean noMoreConversations =
+                    clientManager1.getConversationCount() == 0
+                            && clientManager2.getConversationCount() == 0
+                            && serverManager.getConversationCount() == 0;
+            if (noMoreConversations)
+            {
+                return;
+            } else
+            {
+                ConcurrencyUtilities.sleep(100);
+            }
+        }
+
+        Assert.fail("Some conversations still remain open - client1: "
+                + clientManager1.getConversationCount() + " client2: "
+                + clientManager2.getConversationCount() + " server: "
+                + serverManager.getConversationCount());
     }
 
     private BaseServiceConversationClientManager createClientManager()
@@ -573,7 +622,15 @@ public class ServiceConversationTest
 
     }
 
-    public static interface TestService1
+    public static interface TestService
+    {
+
+        @Conversational(progress = Progress.MANUAL)
+        public Object echo(Object parameter);
+
+    }
+
+    public static interface TestService1 extends TestService
     {
 
         public void nonConversationalMethod();
@@ -596,13 +653,11 @@ public class ServiceConversationTest
         @Conversational(progress = Progress.AUTOMATIC)
         public Object methodWithAutomaticProgress();
 
-        @Conversational(progress = Progress.MANUAL)
-        public Object echo(Object parameter);
-
     }
 
-    public static interface TestService2
+    public static interface TestService2 extends TestService
     {
+
         @Conversational(progress = Progress.MANUAL)
         public void methodWithPrimitiveParameter(int parameter);
 
@@ -612,9 +667,6 @@ public class ServiceConversationTest
         @Conversational(progress = Progress.MANUAL)
         public void methodWithObjectParameter(Object parameter);
 
-        @Conversational(progress = Progress.MANUAL)
-        public Object echo(Object parameter);
-
     }
 
     public static interface TestServiceLocal
diff --git a/openbis-common/source/java/ch/systemsx/cisd/common/conversation/manager/BaseServiceConversationClientManager.java b/openbis-common/source/java/ch/systemsx/cisd/common/conversation/manager/BaseServiceConversationClientManager.java
index 6806fa0a190..df49d481dbc 100644
--- a/openbis-common/source/java/ch/systemsx/cisd/common/conversation/manager/BaseServiceConversationClientManager.java
+++ b/openbis-common/source/java/ch/systemsx/cisd/common/conversation/manager/BaseServiceConversationClientManager.java
@@ -16,6 +16,7 @@
 
 package ch.systemsx.cisd.common.conversation.manager;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -41,11 +42,19 @@ public class BaseServiceConversationClientManager implements
         IServiceConversationClientManagerRemote
 {
 
-    private Map<ClientConfig, ServiceConversationClientWithConversationTracking> clientConfigToClientMap =
-            new HashMap<ClientConfig, ServiceConversationClientWithConversationTracking>();
+    private Map<ClientConfig, ServiceConversationClientWithConversationTracking> clientConfigToClientMap;
 
-    private Map<String, ServiceConversationClientWithConversationTracking> conversationIdToClientMap =
-            new HashMap<String, ServiceConversationClientWithConversationTracking>();
+    private Map<String, ServiceConversationClientWithConversationTracking> conversationIdToClientMap;
+
+    public BaseServiceConversationClientManager()
+    {
+        clientConfigToClientMap =
+                Collections
+                        .synchronizedMap(new HashMap<ClientConfig, ServiceConversationClientWithConversationTracking>());
+        conversationIdToClientMap =
+                Collections
+                        .synchronizedMap(new HashMap<String, ServiceConversationClientWithConversationTracking>());
+    }
 
     @Override
     public void send(ServiceMessage message)
diff --git a/openbis-common/source/java/ch/systemsx/cisd/common/conversation/manager/BaseServiceConversationServerManager.java b/openbis-common/source/java/ch/systemsx/cisd/common/conversation/manager/BaseServiceConversationServerManager.java
index e288398532d..6290055ebf9 100644
--- a/openbis-common/source/java/ch/systemsx/cisd/common/conversation/manager/BaseServiceConversationServerManager.java
+++ b/openbis-common/source/java/ch/systemsx/cisd/common/conversation/manager/BaseServiceConversationServerManager.java
@@ -16,6 +16,7 @@
 
 package ch.systemsx.cisd.common.conversation.manager;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -44,15 +45,20 @@ public abstract class BaseServiceConversationServerManager implements
 
     private ServiceConversationServer server;
 
-    private Map<ServiceConversationClientDetails, IServiceConversationClientManagerRemote> clientDetailsToClientMap =
-            new HashMap<ServiceConversationClientDetails, IServiceConversationClientManagerRemote>();
+    private Map<ServiceConversationClientDetails, IServiceConversationClientManagerRemote> clientDetailsToClientMap;
 
-    private Map<String, ServiceConversationClientDetails> conversationIdToClientDetailsMap =
-            new HashMap<String, ServiceConversationClientDetails>();
+    private Map<String, ServiceConversationClientDetails> conversationIdToClientDetailsMap;
 
     public BaseServiceConversationServerManager()
     {
         server = new ServiceConversationServer();
+
+        clientDetailsToClientMap =
+                Collections
+                        .synchronizedMap(new HashMap<ServiceConversationClientDetails, IServiceConversationClientManagerRemote>());
+        conversationIdToClientDetailsMap =
+                Collections
+                        .synchronizedMap(new HashMap<String, ServiceConversationClientDetails>());
     }
 
     /**
-- 
GitLab