From 789de58c4cb5bf550a85112477e7da2a583bdda5 Mon Sep 17 00:00:00 2001
From: brinn <brinn>
Date: Thu, 15 Dec 2011 22:02:20 +0000
Subject: [PATCH] Add the first version of the service conversations.

SVN: 24005
---
 .../BidirectinoalServiceMessenger.java        | 110 +++++++++
 .../serviceconversation/ClientMessenger.java  | 126 ++++++++++
 .../serviceconversation/IClientMessenger.java |  41 ++++
 .../ISendingMessenger.java                    |  30 +++
 .../common/serviceconversation/IService.java  |  33 +++
 .../serviceconversation/IServiceFactory.java  |  30 +++
 .../IServiceMessenger.java                    |  36 +++
 .../ServiceConversationCollection.java        | 191 +++++++++++++++
 .../ServiceConversationRecord.java            |  52 ++++
 .../ServiceExecutionException.java            |  80 ++++++
 .../serviceconversation/ServiceMessage.java   | 101 ++++++++
 .../UnexpectedMessagePayloadException.java    |  34 +++
 .../UnknownServiceConversationException.java  |  32 +++
 .../UnknownServiceTypeException.java          |  32 +++
 .../ServiceConversationCollectionTest.java    | 228 ++++++++++++++++++
 15 files changed, 1156 insertions(+)
 create mode 100644 common/source/java/ch/systemsx/cisd/common/serviceconversation/BidirectinoalServiceMessenger.java
 create mode 100644 common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientMessenger.java
 create mode 100644 common/source/java/ch/systemsx/cisd/common/serviceconversation/IClientMessenger.java
 create mode 100644 common/source/java/ch/systemsx/cisd/common/serviceconversation/ISendingMessenger.java
 create mode 100644 common/source/java/ch/systemsx/cisd/common/serviceconversation/IService.java
 create mode 100644 common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceFactory.java
 create mode 100644 common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceMessenger.java
 create mode 100644 common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollection.java
 create mode 100644 common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationRecord.java
 create mode 100644 common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceExecutionException.java
 create mode 100644 common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceMessage.java
 create mode 100644 common/source/java/ch/systemsx/cisd/common/serviceconversation/UnexpectedMessagePayloadException.java
 create mode 100644 common/source/java/ch/systemsx/cisd/common/serviceconversation/UnknownServiceConversationException.java
 create mode 100644 common/source/java/ch/systemsx/cisd/common/serviceconversation/UnknownServiceTypeException.java
 create mode 100644 common/sourceTest/java/ch/systemsx/cisd/common/serviceconversarions/ServiceConversationCollectionTest.java

diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/BidirectinoalServiceMessenger.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/BidirectinoalServiceMessenger.java
new file mode 100644
index 00000000000..d40c9a8df5b
--- /dev/null
+++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/BidirectinoalServiceMessenger.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2011 ETH Zuerich, CISD
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ch.systemsx.cisd.common.serviceconversation;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel;
+import ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked;
+
+/**
+ * A bidirectional messenger on the server side.
+ * 
+ * @author Bernd Rinn
+ */
+class BidirectinoalServiceMessenger
+{
+    private final BlockingQueue<ServiceMessage> incoming =
+            new LinkedBlockingQueue<ServiceMessage>();
+
+    private final String conversationId;
+
+    private final ISendingMessenger responseMessenger;
+
+    private final int messageReceivingTimeoutMillis;
+
+    private int outgoingMessageIdx;
+
+    private int messageIdxLastSeen = -1;
+
+    BidirectinoalServiceMessenger(String conversationId, int messageReceivingTimeoutMillis,
+            ISendingMessenger responseMessenger)
+    {
+        this.conversationId = conversationId;
+        this.messageReceivingTimeoutMillis = messageReceivingTimeoutMillis;
+        this.responseMessenger = responseMessenger;
+    }
+
+    IServiceMessenger getServiceMessenger()
+    {
+        return new IServiceMessenger()
+            {
+                @SuppressWarnings("unchecked")
+                public <T> T receive(Class<T> messageClass)
+                {
+                    final Object payload;
+                    try
+                    {
+                        final ServiceMessage message =
+                                incoming.poll(messageReceivingTimeoutMillis, TimeUnit.MILLISECONDS);
+                        if (message == null)
+                        {
+                            throw new TimeoutExceptionUnchecked(
+                                    "Timeout while waiting for message to return.");
+                        }
+                        payload = message.getPayload();
+                    } catch (InterruptedException ex)
+                    {
+                        throw CheckedExceptionTunnel.wrapIfNecessary(ex);
+                    }
+                    if (messageClass != null
+                            && messageClass.isAssignableFrom(payload.getClass()) == false)
+                    {
+                        throw new UnexpectedMessagePayloadException(payload.getClass(),
+                                messageClass);
+                    }
+                    return (T) payload;
+                }
+
+                public void send(Object message)
+                {
+                    responseMessenger.send(new ServiceMessage(conversationId,
+                            nextOutgoingMessageIndex(), message));
+                }
+            };
+    }
+
+    int nextOutgoingMessageIndex()
+    {
+        return outgoingMessageIdx++;
+    }
+
+    public void sendToService(ServiceMessage message)
+    {
+        if (message.getMessageIdx() <= messageIdxLastSeen)
+        {
+            // Drop duplicate message.
+            return;
+        } else
+        {
+            messageIdxLastSeen = message.getMessageIdx();
+        }
+        incoming.add(message);
+    }
+}
\ No newline at end of file
diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientMessenger.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientMessenger.java
new file mode 100644
index 00000000000..1c824ef843f
--- /dev/null
+++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientMessenger.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2011 ETH Zuerich, CISD
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ch.systemsx.cisd.common.serviceconversation;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel;
+
+/**
+ * A class that a client can use to receive messages from a service.
+ * 
+ * @author Bernd Rinn
+ */
+public class ClientMessenger implements IClientMessenger
+{
+    private final BlockingQueue<ServiceMessage> messageQueue =
+            new LinkedBlockingQueue<ServiceMessage>();
+
+    private final ISendingMessenger senderToService;
+
+    private String serviceConversationId;
+
+    private int messageIdxLastSeen = -1;
+
+    private int outgoingMessageIdx;
+
+    public ClientMessenger(ISendingMessenger senderToService)
+    {
+        this.senderToService = senderToService;
+    }
+
+    public ISendingMessenger getResponseMessenger()
+    {
+        return new ISendingMessenger()
+            {
+                public void send(ServiceMessage message)
+                {
+                    if (message.getMessageIdx() <= messageIdxLastSeen)
+                    {
+                        return;
+                    } else
+                    {
+                        messageIdxLastSeen = message.getMessageIdx();
+                    }
+                    messageQueue.add(message);
+                }
+            };
+    }
+
+    public void send(Object message)
+    {
+        senderToService.send(new ServiceMessage(serviceConversationId, nextOutgoingMessageIndex(),
+                message));
+    }
+
+    private int nextOutgoingMessageIndex()
+    {
+        return outgoingMessageIdx++;
+    }
+
+    public <T> T receive(Class<T> messageClass)
+    {
+        try
+        {
+            return handleMessage(messageQueue.take(), messageClass);
+        } catch (InterruptedException ex)
+        {
+            throw CheckedExceptionTunnel.wrapIfNecessary(ex);
+        }
+    }
+
+    public <T> T receive(Class<T> messageClass, int timeoutMillis)
+    {
+        try
+        {
+            return handleMessage(messageQueue.poll(timeoutMillis, TimeUnit.MILLISECONDS),
+                    messageClass);
+        } catch (InterruptedException ex)
+        {
+            throw CheckedExceptionTunnel.wrapIfNecessary(ex);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> T handleMessage(ServiceMessage message, Class<T> messageClass)
+    {
+        if (message.isException())
+        {
+            throw new ServiceExecutionException(message.getConversationId(),
+                    message.tryGetExceptionDescription());
+        }
+        final Object payload = message.getPayload();
+        if (messageClass != null && messageClass.isAssignableFrom(payload.getClass()) == false)
+        {
+            throw new UnexpectedMessagePayloadException(payload.getClass(), messageClass);
+        }
+        return (T) payload;
+    }
+
+    public String getServiceConversationId()
+    {
+        return serviceConversationId;
+    }
+
+    public void setServiceConversationId(String serviceConversationId)
+    {
+        this.serviceConversationId = serviceConversationId;
+    }
+
+}
diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/IClientMessenger.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IClientMessenger.java
new file mode 100644
index 00000000000..bcdda844e36
--- /dev/null
+++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IClientMessenger.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2011 ETH Zuerich, CISD
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ch.systemsx.cisd.common.serviceconversation;
+
+/**
+ * A messenger role for receiving messages from a service.
+ *
+ * @author Bernd Rinn
+ */
+public interface IClientMessenger
+{
+    /**
+     * Send a message to the service.
+     */
+    public void send(Object message);
+    
+    /**
+     * Receive a message from the service.
+     */
+    public <T> T receive(Class<T> messageClass);
+
+    /**
+     * Receive a message from the service.
+     */
+    public <T> T receive(Class<T> messageClass, int timeoutMillis);
+
+}
\ No newline at end of file
diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ISendingMessenger.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ISendingMessenger.java
new file mode 100644
index 00000000000..df24d4251d0
--- /dev/null
+++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ISendingMessenger.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2011 ETH Zuerich, CISD
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ch.systemsx.cisd.common.serviceconversation;
+
+/**
+ * The sending messenger role for a service conversation.
+ *
+ * @author Bernd Rinn
+ */
+public interface ISendingMessenger
+{
+    /**
+     * Send the <var>message</var> using this messenger.
+     */
+    public void send(ServiceMessage message);
+}
diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/IService.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IService.java
new file mode 100644
index 00000000000..04013cd0720
--- /dev/null
+++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IService.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2011 ETH Zuerich, CISD
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ch.systemsx.cisd.common.serviceconversation;
+
+/**
+ * The interface for a service that can participate in a service conversation.
+ *
+ * @author Bernd Rinn
+ */
+public interface IService
+{
+    /**
+     * Calling this method starts the service conversation.
+     * <p>
+     * This method is called by the service conversation framework.
+     */
+    public void run(IServiceMessenger messenger);
+
+}
diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceFactory.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceFactory.java
new file mode 100644
index 00000000000..7032806d485
--- /dev/null
+++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2011 ETH Zuerich, CISD
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ch.systemsx.cisd.common.serviceconversation;
+
+/**
+ * A factory for services.
+ *
+ * @author Bernd Rinn
+ */
+public interface IServiceFactory
+{
+    /**
+     * Create a new service.
+     */
+    public IService create();
+}
diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceMessenger.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceMessenger.java
new file mode 100644
index 00000000000..a43206aff14
--- /dev/null
+++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceMessenger.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2011 ETH Zuerich, CISD
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ch.systemsx.cisd.common.serviceconversation;
+
+
+/**
+ * A messaging interface for a service of the service conversation framework.
+ *
+ * @author Bernd Rinn
+ */
+public interface IServiceMessenger
+{
+    /**
+     * Send a message to the client.
+     */
+    public void send(Object message);
+    
+    /**
+     * Receive a message from the client.
+     */
+    public <T> T receive(Class<T> messageClass);
+}
diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollection.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollection.java
new file mode 100644
index 00000000000..f0b9ea350d1
--- /dev/null
+++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollection.java
@@ -0,0 +1,191 @@
+/*
+ * Copyright 2011 ETH Zuerich, CISD
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ch.systemsx.cisd.common.serviceconversation;
+
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+
+import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel;
+import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked;
+import ch.systemsx.cisd.base.namedthread.NamingThreadPoolExecutor;
+import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities;
+import ch.systemsx.cisd.common.concurrent.ITerminableFuture;
+import ch.systemsx.cisd.common.concurrent.TerminableCallable.ICallable;
+import ch.systemsx.cisd.common.concurrent.TerminableCallable.IStoppableExecutor;
+
+/**
+ * The service conversation collection.
+ * 
+ * @author Bernd Rinn
+ */
+public class ServiceConversationCollection implements ISendingMessenger
+{
+    private final static int NUMBER_OF_CORE_THREADS = 10;
+
+    private final static int SHUTDOWN_TIMEOUT_MILLIS = 10000;
+
+    private final int messageReceivingTimeoutMillis;
+
+    private final ExecutorService executor = new NamingThreadPoolExecutor("Service Conversations")
+            .corePoolSize(NUMBER_OF_CORE_THREADS).daemonize();
+
+    private Random rng = new Random();
+
+    private final Map<String, IServiceFactory> serviceFactoryMap =
+            new ConcurrentHashMap<String, IServiceFactory>();
+
+    private Map<String, ServiceConversationRecord> conversations =
+            new ConcurrentHashMap<String, ServiceConversationRecord>();
+
+    public ServiceConversationCollection(int messageReceivingTimeoutMillis)
+    {
+        this.messageReceivingTimeoutMillis = messageReceivingTimeoutMillis;
+    }
+
+    /**
+     * Adds a new service type to this conversation object.
+     */
+    public void addServiceType(String id, IServiceFactory factory)
+    {
+        serviceFactoryMap.put(id, factory);
+    }
+
+    /**
+     * Starts a service conversation of type <var>typeId</var>.
+     * 
+     * @param typeId The service type of the conversation.
+     * @return a {@link ClientMessenger} to communicate with the service.
+     */
+    public ClientMessenger startConversation(final String typeId)
+    {
+        final ClientMessenger clientMessenger = new ClientMessenger(this);
+        final String serviceConversationId =
+                startConversation(typeId, clientMessenger.getResponseMessenger());
+        clientMessenger.setServiceConversationId(serviceConversationId);
+        return clientMessenger;
+    }
+
+    /**
+     * Starts a service conversation of type <var>typeId</var>.
+     * 
+     * @param typeId The service type of the conversation.
+     * @param responseMessenger The messenger to communicate back the messages from the service to
+     *            the client.
+     * @return The service conversation id.
+     */
+    public String startConversation(final String typeId, final ISendingMessenger responseMessenger)
+    {
+        final IServiceFactory serviceFactory = serviceFactoryMap.get(typeId);
+        if (serviceFactory == null)
+        {
+            throw new UnknownServiceTypeException(typeId);
+        }
+        final IService serviceInstance = serviceFactory.create();
+        final String conversationId =
+                Long.toString(System.currentTimeMillis()) + "-" + rng.nextInt(Integer.MAX_VALUE);
+        final BidirectinoalServiceMessenger messenger =
+                new BidirectinoalServiceMessenger(conversationId, messageReceivingTimeoutMillis,
+                        responseMessenger);
+        final ServiceConversationRecord record = new ServiceConversationRecord(messenger);
+        conversations.put(conversationId, record);
+        final ITerminableFuture<Void> controller =
+                ConcurrencyUtilities.submit(executor, new ICallable<Void>()
+                    {
+                        public Void call(IStoppableExecutor<Void> stoppableExecutor)
+                                throws Exception
+                        {
+                            try
+                            {
+                                serviceInstance.run(messenger.getServiceMessenger());
+                            } catch (Exception ex)
+                            {
+                                if (ex instanceof InterruptedExceptionUnchecked == false)
+                                {
+                                    final ByteArrayOutputStream os = new ByteArrayOutputStream();
+                                    final PrintWriter pw = new PrintWriter(os);
+                                    ex.printStackTrace(pw);
+                                    pw.close();
+                                    final String errorMessage = new String(os.toByteArray());
+                                    responseMessenger.send(new ServiceMessage(conversationId,
+                                            messenger.nextOutgoingMessageIndex(), errorMessage));
+                                }
+                            } finally
+                            {
+                                conversations.remove(conversationId);
+                            }
+                            return null;
+                        }
+
+                        // TODO: uncomment once we can name an ICallable.
+                        // public String getCallableName()
+                        // {
+                        // return conversationId + " (" + typeId + ")";
+                        // }
+
+                    });
+        record.setController(controller);
+        return conversationId;
+    }
+
+    public void shutdown()
+    {
+        try
+        {
+            for (ServiceConversationRecord record : conversations.values())
+            {
+                record.getController().cancel(true);
+            }
+            executor.awaitTermination(SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+        } catch (Exception ex)
+        {
+            throw new CheckedExceptionTunnel(ex);
+        }
+    }
+
+    public boolean hasConversation(String conversationId)
+    {
+        return conversations.containsKey(conversationId);
+    }
+
+    //
+    // IIncomingMessenger
+    //
+
+    public void send(ServiceMessage message)
+    {
+        final String conversationId = message.getConversationId();
+        final ServiceConversationRecord record = conversations.get(conversationId);
+        if (record == null)
+        {
+            throw new UnknownServiceConversationException(conversationId);
+        }
+        if (message.isTerminate())
+        {
+            record.getController().cancel(true);
+        } else
+        {
+            record.getMessenger().sendToService(message);
+        }
+    }
+
+}
diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationRecord.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationRecord.java
new file mode 100644
index 00000000000..6e6b415969f
--- /dev/null
+++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationRecord.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2011 ETH Zuerich, CISD
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ch.systemsx.cisd.common.serviceconversation;
+
+import ch.systemsx.cisd.common.concurrent.ITerminableFuture;
+
+/**
+ * The record holding information about a service conversation.
+ *
+ * @author Bernd Rinn
+ */
+class ServiceConversationRecord
+{
+    private final BidirectinoalServiceMessenger messenger;
+    
+    private ITerminableFuture<Void> controller;
+
+    ServiceConversationRecord(BidirectinoalServiceMessenger messenger)
+    {
+        super();
+        this.messenger = messenger;
+    }
+
+    BidirectinoalServiceMessenger getMessenger()
+    {
+        return messenger;
+    }
+
+    ITerminableFuture<Void> getController()
+    {
+        return controller;
+    }
+
+    void setController(ITerminableFuture<Void> controller)
+    {
+        this.controller = controller;
+    }
+}
diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceExecutionException.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceExecutionException.java
new file mode 100644
index 00000000000..7ae3f882a69
--- /dev/null
+++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceExecutionException.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2011 ETH Zuerich, CISD
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ch.systemsx.cisd.common.serviceconversation;
+
+import java.io.PrintStream;
+import java.io.PrintWriter;
+
+/**
+ * An exception that signals to the client that an exception happened during service execution. 
+ *
+ * @author Bernd Rinn
+ */
+public class ServiceExecutionException extends RuntimeException
+{
+    private static final long serialVersionUID = 1L;
+
+    private final String serviceConversationId;
+    
+    private final String description;
+    
+    ServiceExecutionException(String serviceConversationId, String description)
+    {
+        super("Execution exception in service conversation " + serviceConversationId);
+        this.serviceConversationId = serviceConversationId;
+        this.description = description;
+    }
+
+    public String getServiceConversationId()
+    {
+        return serviceConversationId;
+    }
+
+    public String getDescription()
+    {
+        return description;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "ServiceExecutionException [serviceConversationId=" + serviceConversationId
+                + ", description=" + description + "]";
+    }
+
+    @Override
+    public void printStackTrace()
+    {
+        System.err.println(getMessage());
+        System.err.println(getDescription());
+    }
+
+    @Override
+    public void printStackTrace(PrintStream s)
+    {
+        s.println(getMessage());
+        s.println(getDescription());
+    }
+
+    @Override
+    public void printStackTrace(PrintWriter s)
+    {
+        s.println(getMessage());
+        s.println(getDescription());
+    }
+    
+}
diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceMessage.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceMessage.java
new file mode 100644
index 00000000000..afff492c9a6
--- /dev/null
+++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceMessage.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2011 ETH Zuerich, CISD
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ch.systemsx.cisd.common.serviceconversation;
+
+/**
+ * A service message which is part of a service conversation.
+ *
+ * @author Bernd Rinn
+ */
+public class ServiceMessage
+{
+    private final String conversationId;
+    
+    private final int messageIdx;
+    
+    private final Object payload;
+    
+    private final String exceptionDescription;
+    
+    public static ServiceMessage terminate(String conversationId)
+    {
+        return new ServiceMessage(conversationId, 0, null);
+    }
+
+    public ServiceMessage(String conversationId, int messageId, Object payload)
+    {
+        this.conversationId = conversationId;
+        this.messageIdx = messageId;
+        this.payload = payload;
+        this.exceptionDescription = null;
+    }
+
+    ServiceMessage(String conversationId, int messageId, String exceptionDescription)
+    {
+        this.conversationId = conversationId;
+        this.messageIdx = messageId;
+        this.payload = null;
+        this.exceptionDescription = exceptionDescription;
+    }
+
+    public String getConversationId()
+    {
+        return conversationId;
+    }
+
+    public int getMessageIdx()
+    {
+        return messageIdx;
+    }
+
+    public Object getPayload()
+    {
+        return payload;
+    }
+
+    public boolean isTerminate()
+    {
+        return (payload == null) && (exceptionDescription == null);
+    }
+
+    public boolean isException()
+    {
+        return (payload == null) && (exceptionDescription != null);
+    }
+    
+    public String tryGetExceptionDescription()
+    {
+        return exceptionDescription;
+    }
+
+    @Override
+    public String toString()
+    {
+        if (isTerminate())
+        {
+            return "ServiceMessage [conversationId=" + conversationId + ", TERMINATE]";
+        } else if (isException())
+        {
+            return "ServiceMessage [conversationId=" + conversationId + ", messageIdx=" + messageIdx
+                    + ", exceptionDescription=" + exceptionDescription + "]";
+        } else
+        {
+            return "ServiceMessage [conversationId=" + conversationId + ", messageIdx=" + messageIdx
+                    + ", payload=" + payload + "]";
+        }
+    }
+}
\ No newline at end of file
diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/UnexpectedMessagePayloadException.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/UnexpectedMessagePayloadException.java
new file mode 100644
index 00000000000..b90c60f08fe
--- /dev/null
+++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/UnexpectedMessagePayloadException.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2011 ETH Zuerich, CISD
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ch.systemsx.cisd.common.serviceconversation;
+
+/**
+ * An exception for signaling that a service method payload is of unexpected type.
+ * 
+ * @author Bernd Rinn
+ */
+public class UnexpectedMessagePayloadException extends RuntimeException
+{
+    private static final long serialVersionUID = 1L;
+
+    public UnexpectedMessagePayloadException(Class<?> payloadClassFound,
+            Class<?> payloadClassExpected)
+    {
+        super("Unexpected service message payload: '" + payloadClassFound.getSimpleName()
+                + "', expected: '" + payloadClassExpected.getSimpleName() + "'.");
+    }
+}
diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/UnknownServiceConversationException.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/UnknownServiceConversationException.java
new file mode 100644
index 00000000000..39a7c016a91
--- /dev/null
+++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/UnknownServiceConversationException.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2011 ETH Zuerich, CISD
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ch.systemsx.cisd.common.serviceconversation;
+
+/**
+ * An exception for signaling that a service conversation is not known.
+ * 
+ * @author Bernd Rinn
+ */
+public class UnknownServiceConversationException extends RuntimeException
+{
+    private static final long serialVersionUID = 1L;
+
+    public UnknownServiceConversationException(String serviceConversationIf)
+    {
+        super("Service conversation '" + serviceConversationIf + "' is not known.");
+    }
+}
diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/UnknownServiceTypeException.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/UnknownServiceTypeException.java
new file mode 100644
index 00000000000..d0679180c34
--- /dev/null
+++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/UnknownServiceTypeException.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2011 ETH Zuerich, CISD
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ch.systemsx.cisd.common.serviceconversation;
+
+/**
+ * An exception for signaling that a service type is not known.
+ * 
+ * @author Bernd Rinn
+ */
+public class UnknownServiceTypeException extends RuntimeException
+{
+    private static final long serialVersionUID = 1L;
+
+    public UnknownServiceTypeException(String serviceType)
+    {
+        super("Service type '" + serviceType + "' is not known.");
+    }
+}
diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversarions/ServiceConversationCollectionTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversarions/ServiceConversationCollectionTest.java
new file mode 100644
index 00000000000..e14a0664678
--- /dev/null
+++ b/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversarions/ServiceConversationCollectionTest.java
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2011 ETH Zuerich, CISD
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ch.systemsx.cisd.common.serviceconversarions;
+
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+import static org.testng.AssertJUnit.fail;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.testng.annotations.Test;
+
+import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities;
+import ch.systemsx.cisd.common.serviceconversation.ClientMessenger;
+import ch.systemsx.cisd.common.serviceconversation.IClientMessenger;
+import ch.systemsx.cisd.common.serviceconversation.ISendingMessenger;
+import ch.systemsx.cisd.common.serviceconversation.IService;
+import ch.systemsx.cisd.common.serviceconversation.IServiceFactory;
+import ch.systemsx.cisd.common.serviceconversation.IServiceMessenger;
+import ch.systemsx.cisd.common.serviceconversation.ServiceConversationCollection;
+import ch.systemsx.cisd.common.serviceconversation.ServiceExecutionException;
+import ch.systemsx.cisd.common.serviceconversation.ServiceMessage;
+import ch.systemsx.cisd.common.serviceconversation.UnknownServiceTypeException;
+
+/**
+ * Test cases for the {@Link ServiceConversationCollection} class.
+ * 
+ * @author Bernd Rinn
+ */
+public class ServiceConversationCollectionTest
+{
+    private static class SingleEchoService implements IService
+    {
+        public void run(IServiceMessenger messenger)
+        {
+            messenger.send(messenger.receive(String.class));
+        }
+    }
+
+    @Test
+    public void testSingleEchoServiceHappyCase() throws Exception
+    {
+        final ServiceConversationCollection conversations = new ServiceConversationCollection(100);
+        conversations.addServiceType("singleEcho", new IServiceFactory()
+            {
+                public IService create()
+                {
+                    return new SingleEchoService();
+                }
+            });
+        final IClientMessenger messenger = conversations.startConversation("singleEcho");
+        messenger.send("Hallo Echo");
+        assertEquals("Hallo Echo", messenger.receive(String.class));
+    }
+
+    private static class EchoService implements IService
+    {
+        public void run(IServiceMessenger messenger)
+        {
+            try
+            {
+                while (true)
+                {
+                    System.err.println(Thread.currentThread().getName());
+                    messenger.send(messenger.receive(String.class));
+                }
+            } catch (RuntimeException ex)
+            {
+                // Show exception
+                ex.printStackTrace();
+                // This doesn't matter: the exception goes into the void.
+                throw ex;
+            }
+        }
+    }
+
+    @Test
+    public void testMultipleEchoServiceTerminateHappyCase() throws Exception
+    {
+        final ServiceConversationCollection conversations = new ServiceConversationCollection(100);
+        conversations.addServiceType("echo", new IServiceFactory()
+            {
+                public IService create()
+                {
+                    return new EchoService();
+                }
+            });
+        final BlockingQueue<ServiceMessage> messageQueue =
+                new LinkedBlockingQueue<ServiceMessage>();
+        final String id = conversations.startConversation("echo", new ISendingMessenger()
+            {
+                public void send(ServiceMessage message)
+                {
+                    messageQueue.add(message);
+                }
+            });
+        assertTrue(conversations.hasConversation(id));
+        int messageIdx = 0;
+
+        conversations.send(new ServiceMessage(id, 0, "One"));
+        ServiceMessage m = messageQueue.take();
+        assertEquals(id, m.getConversationId());
+        assertEquals(messageIdx++, m.getMessageIdx());
+        assertEquals("One", m.getPayload());
+
+        conversations.send(new ServiceMessage(id, 1, "Two"));
+        // Try to resend and check that the second one is swallowed.
+        conversations.send(new ServiceMessage(id, 1, "Two"));
+        m = messageQueue.take();
+        assertEquals(id, m.getConversationId());
+        assertEquals(messageIdx++, m.getMessageIdx());
+        assertEquals("Two", m.getPayload());
+
+        conversations.send(new ServiceMessage(id, 2, "Three"));
+        m = messageQueue.take();
+        assertEquals(id, m.getConversationId());
+        assertEquals(messageIdx++, m.getMessageIdx());
+        assertEquals("Three", m.getPayload());
+
+        conversations.send(ServiceMessage.terminate(id));
+        for (int i = 0; i < 100 && conversations.hasConversation(id); ++i)
+        {
+            ConcurrencyUtilities.sleep(10L);
+        }
+        assertFalse(conversations.hasConversation(id));
+    }
+
+    @Test
+    public void testEchoServiceTimeout() throws Exception
+    {
+        final ServiceConversationCollection conversations = new ServiceConversationCollection(100);
+        conversations.addServiceType("echo", new IServiceFactory()
+            {
+                public IService create()
+                {
+                    return new EchoService();
+                }
+            });
+        final BlockingQueue<ServiceMessage> messageQueue =
+                new LinkedBlockingQueue<ServiceMessage>();
+        final String id = conversations.startConversation("echo", new ISendingMessenger()
+            {
+                public void send(ServiceMessage message)
+                {
+                    messageQueue.add(message);
+                }
+            });
+        assertTrue(conversations.hasConversation(id));
+        int messageIdx = 0;
+        conversations.send(new ServiceMessage(id, 0, "One"));
+        ServiceMessage m = messageQueue.take();
+        assertEquals(id, m.getConversationId());
+        assertEquals(messageIdx++, m.getMessageIdx());
+        assertEquals("One", m.getPayload());
+
+        // Wait for timeout to happen.
+        for (int i = 0; i < 100 && conversations.hasConversation(id); ++i)
+        {
+            ConcurrencyUtilities.sleep(10L);
+        }
+        assertFalse(conversations.hasConversation(id));
+        m = messageQueue.take();
+        assertTrue(m.isException());
+        assertTrue(m.tryGetExceptionDescription().startsWith(
+                "ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked:"));
+    }
+
+    @Test(expectedExceptions = UnknownServiceTypeException.class)
+    public void testNonExistentService() throws Exception
+    {
+        final ServiceConversationCollection conversations = new ServiceConversationCollection(100);
+        conversations.startConversation("echo", new ISendingMessenger()
+            {
+                public void send(ServiceMessage message)
+                {
+                }
+            });
+    }
+
+    private static class ExceptionThrowingService implements IService
+    {
+        public void run(IServiceMessenger messenger)
+        {
+            throw new RuntimeException("Don't like you!");
+        }
+    }
+
+    @Test
+    public void testServiceThrowsException() throws Exception
+    {
+        final ServiceConversationCollection conversations = new ServiceConversationCollection(100);
+        conversations.addServiceType("throwException", new IServiceFactory()
+            {
+                public IService create()
+                {
+                    return new ExceptionThrowingService();
+                }
+            });
+        final ClientMessenger messenger = conversations.startConversation("throwException");
+        try
+        {
+            messenger.receive(Object.class);
+            fail();
+        } catch (ServiceExecutionException ex)
+        {
+            assertEquals(messenger.getServiceConversationId(),
+                    ex.getServiceConversationId());
+            assertTrue(ex.getDescription().contains("RuntimeException"));
+            assertTrue(ex.getDescription().contains("Don't like you!"));
+        }
+    }
+}
-- 
GitLab