diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/BidirectionalServiceMessenger.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/BidirectionalServiceMessenger.java index b22d699b1d77eb56979f9636db1be60259f7705b..aefd3d89d54b71fc14d61314e322c28d4741cf3c 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/BidirectionalServiceMessenger.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/BidirectionalServiceMessenger.java @@ -38,7 +38,7 @@ class BidirectionalServiceMessenger private final String conversationId; - private final ISendingMessenger responseMessenger; + private final IServiceMessageTransport responseMessenger; private final int messageReceivingTimeoutMillis; @@ -49,7 +49,7 @@ class BidirectionalServiceMessenger private final AtomicBoolean interrupted = new AtomicBoolean(); BidirectionalServiceMessenger(String conversationId, int messageReceivingTimeoutMillis, - ISendingMessenger responseMessenger) + IServiceMessageTransport responseMessenger) { this.conversationId = conversationId; this.messageReceivingTimeoutMillis = messageReceivingTimeoutMillis; @@ -75,7 +75,7 @@ class BidirectionalServiceMessenger if (message == null) { final String msg = "Timeout while waiting for message from client."; - ServiceConversationCollection.operationLog.error(String.format( + ServiceConversationServer.operationLog.error(String.format( "[id: %s] %s", conversationId, msg)); throw new TimeoutExceptionUnchecked(msg); } diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientMessenger.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientMessenger.java index 13eddac9037dc1aed18426f0f45dad305bb0ecf8..85e592882f41af59b2dcf4e0f59d9fe3a533aec9 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientMessenger.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientMessenger.java @@ -17,9 +17,6 @@ package ch.systemsx.cisd.common.serviceconversation; import java.io.Serializable; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel; import ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked; @@ -29,70 +26,45 @@ import ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked; * * @author Bernd Rinn */ -public class ClientMessenger implements IClientMessenger +class ClientMessenger implements IServiceConversation { - private final BlockingQueue<ServiceMessage> messageQueue = - new LinkedBlockingQueue<ServiceMessage>(); + private final IServiceMessageTransport transportToService; - private final ISendingMessenger senderToService; + private final String serviceConversationId; - private String serviceConversationId; + private final ClientResponseMessageQueue responseMessageQueue; - private int timeoutMillis; + private final ClientResponseMessageMultiplexer responseMessageMultiplexer; - private int messageIdxLastSeen = -1; + private int timeoutMillis; private int outgoingMessageIdx; - public ClientMessenger(ServiceConversationDTO serviceConversationDTO, - ISendingMessenger senderToService) + ClientMessenger(ServiceConversationDTO serviceConversationDTO, + IServiceMessageTransport transportToService, + ClientResponseMessageQueue responseMessageQueue, + ClientResponseMessageMultiplexer responseMessageMultiplexer) { - assert senderToService != null; + assert transportToService != null; this.serviceConversationId = serviceConversationDTO.getServiceConversationId(); assert serviceConversationId != null; this.timeoutMillis = serviceConversationDTO.getClientTimeoutInMillis(); - this.senderToService = senderToService; - } - - ClientMessenger(ISendingMessenger senderToService) - { - this.senderToService = senderToService; + this.transportToService = transportToService; + this.responseMessageQueue = responseMessageQueue; + this.responseMessageMultiplexer = responseMessageMultiplexer; + responseMessageMultiplexer.addConversation(serviceConversationId, responseMessageQueue); } - public ISendingMessenger getResponseMessenger() + public void send(Serializable message) { - return new ISendingMessenger() - { - public void send(ServiceMessage message) - { - if (serviceConversationId != null - && serviceConversationId.equals(message.getConversationId()) == false) - { - throw new IllegalArgumentException( - "Attempt to put in a message for conversation " - + message.getConversationId() - + " into queue for conversation " + serviceConversationId); - } - if (message.getMessageIdx() <= messageIdxLastSeen) - { - return; - } else - { - messageIdxLastSeen = message.getMessageIdx(); - } - if (message.hasPayload() == false) - { - messageQueue.clear(); - } - messageQueue.add(message); - } - }; + transportToService.send(new ServiceMessage(serviceConversationId, + nextOutgoingMessageIndex(), false, message)); } - public void send(Serializable message) + public void terminate() { - senderToService.send(new ServiceMessage(serviceConversationId, nextOutgoingMessageIndex(), - false, message)); + transportToService.send(ServiceMessage.terminate(serviceConversationId)); + } private int nextOutgoingMessageIndex() @@ -104,8 +76,7 @@ public class ClientMessenger implements IClientMessenger { try { - return handleMessage(messageQueue.poll(timeoutMillis, TimeUnit.MILLISECONDS), - messageClass); + return handleMessage(responseMessageQueue.poll(timeoutMillis), messageClass); } catch (InterruptedException ex) { throw CheckedExceptionTunnel.wrapIfNecessary(ex); @@ -121,7 +92,7 @@ public class ClientMessenger implements IClientMessenger new TimeoutExceptionUnchecked("Timeout while waiting on message from service."); final String exceptionDescription = ServiceExecutionException.getDescriptionFromException(exception); - senderToService.send(new ServiceMessage(serviceConversationId, + transportToService.send(new ServiceMessage(serviceConversationId, nextOutgoingMessageIndex(), true, exceptionDescription)); throw exception; } @@ -138,15 +109,21 @@ public class ClientMessenger implements IClientMessenger return (T) payload; } - public String getServiceConversationId() + public String getId() { return serviceConversationId; } - void setServiceConversationDTO(ServiceConversationDTO serviceConversation) + public void close() + { + responseMessageMultiplexer.removeConversation(serviceConversationId); + } + + @Override + protected void finalize() throws Throwable { - this.serviceConversationId = serviceConversation.getServiceConversationId(); - this.timeoutMillis = serviceConversation.getClientTimeoutInMillis(); + close(); + super.finalize(); } } diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientResponseMessageMultiplexer.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientResponseMessageMultiplexer.java new file mode 100644 index 0000000000000000000000000000000000000000..0c0c387ca8d78a0df1f09261e71045b68a082d08 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientResponseMessageMultiplexer.java @@ -0,0 +1,83 @@ +/* + * Copyright 2011 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.serviceconversation; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.log4j.Logger; + +import ch.systemsx.cisd.common.logging.LogCategory; +import ch.systemsx.cisd.common.logging.LogFactory; + +/** + * A client-side multiplexer for incoming messages from the server. + * + * @author Bernd Rinn + */ +class ClientResponseMessageMultiplexer +{ + final static Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, + ClientResponseMessageMultiplexer.class); + + private final Map<String, IServiceMessageTransport> conversations = + new ConcurrentHashMap<String, IServiceMessageTransport>(); + + /** + * Returns the transport for incoming response messages. + */ + IServiceMessageTransport getIncomingTransport() + { + return new IServiceMessageTransport() + { + public void send(ServiceMessage message) + { + final String conversationId = message.getConversationId(); + final IServiceMessageTransport transport = conversations.get(conversationId); + if (transport == null) + { + final String msg = String.format( + "Message for unknown service conversation '%s'", conversationId); + operationLog.error(msg); + throw new UnknownServiceConversationException(msg); + } + transport.send(message); + } + }; + } + + /** + * Adds a new conversation to the multiplexer. + */ + void addConversation(String serviceConversationId, + IServiceMessageTransport responseMessageTransport) + { + conversations.put(serviceConversationId, responseMessageTransport); + } + + /** + * Removes a conversation from the multiplexer. + * + * @return <code>true</code> if the conversation was removed and <code>false</code>, if a + * conversation with the given id could not be found. + */ + boolean removeConversation(String serviceConversationId) + { + return conversations.remove(serviceConversationId) != null; + } + +} diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientResponseMessageQueue.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientResponseMessageQueue.java new file mode 100644 index 0000000000000000000000000000000000000000..f6098f3f8c58cc96f28010556611103cf1468de8 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientResponseMessageQueue.java @@ -0,0 +1,55 @@ +/* + * Copyright 2011 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.serviceconversation; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * A class that holds a queue for response messages to a client. + * + * @author Bernd Rinn + */ +class ClientResponseMessageQueue implements IServiceMessageTransport +{ + private int messageIdxLastSeen = -1; + + private final BlockingQueue<ServiceMessage> messageQueue = + new LinkedBlockingQueue<ServiceMessage>(); + + ServiceMessage poll(int timeoutMillis) throws InterruptedException + { + return messageQueue.poll(timeoutMillis, TimeUnit.MILLISECONDS); + } + + public void send(ServiceMessage message) + { + if (message.getMessageIdx() <= messageIdxLastSeen) + { + return; + } else + { + messageIdxLastSeen = message.getMessageIdx(); + } + if (message.hasPayload() == false) + { + messageQueue.clear(); + } + messageQueue.add(message); + } +} \ No newline at end of file diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/IRemoteServiceConversationServer.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IRemoteServiceConversationServer.java new file mode 100644 index 0000000000000000000000000000000000000000..b4d6cab3a504d2f698e7b4f1d0ebdb8021fbfc8d --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IRemoteServiceConversationServer.java @@ -0,0 +1,35 @@ +/* + * Copyright 2011 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.serviceconversation; + +/** + * A remote role for starting a service conversation. + * + * @author Bernd Rinn + */ +public interface IRemoteServiceConversationServer +{ + + /** + * Starts a service conversation of type <var>typeId</var>. + * + * @param typeId The service type of the conversation. + * @return The information about the service conversation started. + */ + public ServiceConversationDTO startConversation(final String typeId); + +} \ No newline at end of file diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceConversation.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceConversation.java new file mode 100644 index 0000000000000000000000000000000000000000..515c28acfb3d386aa22926f98c680ac5b730c311 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceConversation.java @@ -0,0 +1,56 @@ +/* + * Copyright 2011 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.serviceconversation; + +import java.io.Closeable; +import java.io.Serializable; + +/** + * The service conversation. + * + * @author Bernd Rinn + */ +public interface IServiceConversation extends Closeable +{ + /** + * Returns the service conversation id. + */ + public String getId(); + + /** + * Send a message to the service. + */ + public void send(Serializable message); + + /** + * Receive a message from the service. + */ + public <T extends Serializable> T receive(Class<T> messageClass); + + /** + * Tells the service to terminate. Use this for calls that have no inherent definition of + * "finished". + */ + public void terminate(); + + /** + * Closes this messenger. Do not call {@link #send(Serializable)} or {@link #receive(Class)} + * after this call. + */ + public void close(); + +} \ No newline at end of file diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ISendingMessenger.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceMessageTransport.java similarity index 83% rename from common/source/java/ch/systemsx/cisd/common/serviceconversation/ISendingMessenger.java rename to common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceMessageTransport.java index df24d4251d00b5d2ee287e4adbbb14ecf7f6fe8d..02bc17fa9b3827224bebd2e4210bec695b698b14 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ISendingMessenger.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceMessageTransport.java @@ -17,14 +17,14 @@ package ch.systemsx.cisd.common.serviceconversation; /** - * The sending messenger role for a service conversation. + * The role that can transport service messages. * * @author Bernd Rinn */ -public interface ISendingMessenger +public interface IServiceMessageTransport { /** - * Send the <var>message</var> using this messenger. + * Send the <var>message</var> using this transport. */ public void send(ServiceMessage message); } diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationClient.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationClient.java new file mode 100644 index 0000000000000000000000000000000000000000..04806349334ecd34318a2ffe14938ad8fc50f391 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationClient.java @@ -0,0 +1,63 @@ +/* + * Copyright 2011 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.serviceconversation; + +/** + * The client for the service conversations. + * + * @author Bernd Rinn + */ +public class ServiceConversationClient +{ + private final IRemoteServiceConversationServer server; + + private final IServiceMessageTransport transportToServer; + + private final ClientResponseMessageMultiplexer responseMessageMultiplexer = + new ClientResponseMessageMultiplexer(); + + public ServiceConversationClient(IRemoteServiceConversationServer server, + IServiceMessageTransport transportToServer) + { + this.server = server; + this.transportToServer = transportToServer; + } + + /** + * Returns the incoming transport for response messages. + */ + public IServiceMessageTransport getIncomingResponseMessageTransport() + { + return responseMessageMultiplexer.getIncomingTransport(); + } + + /** + * Starts a service conversation of type <var>typeId</var>. + * + * @param typeId The service type of the conversation. + * @return a {@link IServiceConversation} to communicate with the service. + */ + public IServiceConversation startConversation(final String typeId) + { + final ClientResponseMessageQueue responseMessageQueue = new ClientResponseMessageQueue(); + final ServiceConversationDTO serviceConversationRecord = server.startConversation(typeId); + final ClientMessenger clientMessenger = + new ClientMessenger(serviceConversationRecord, transportToServer, + responseMessageQueue, responseMessageMultiplexer); + return clientMessenger; + } +} diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationDTO.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationDTO.java index 638a1d167db91b7bdd55ec2543e417fc98c0bd57..2625dee55a369c0fe2a90475bdfa5747ff24beb9 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationDTO.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationDTO.java @@ -19,7 +19,7 @@ package ch.systemsx.cisd.common.serviceconversation; import java.io.Serializable; /** - * A record to save to provide the information about a new service conversation. + * A data transfer object to save to provide the information about a new service conversation. * * @author Bernd Rinn */ diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollection.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationServer.java similarity index 71% rename from common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollection.java rename to common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationServer.java index 6554d92824cab065f4c9bf49ed74d10b8b7c8911..0c505cf453d51d8a961662b897fddcabee209c6a 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollection.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationServer.java @@ -39,33 +39,74 @@ import ch.systemsx.cisd.common.logging.LogFactory; * * @author Bernd Rinn */ -public class ServiceConversationCollection implements ISendingMessenger +public class ServiceConversationServer { private final static int NUMBER_OF_CORE_THREADS = 10; private final static int SHUTDOWN_TIMEOUT_MILLIS = 10000; final static Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, - ServiceConversationCollection.class); + ServiceConversationServer.class); private final int messageReceivingTimeoutMillis; private final ExecutorService executor = new NamingThreadPoolExecutor("Service Conversations") .corePoolSize(NUMBER_OF_CORE_THREADS).daemonize(); - private final Random rng = new Random(); - private final Map<String, IServiceFactory> serviceFactoryMap = new ConcurrentHashMap<String, IServiceFactory>(); + private final Map<String, IServiceMessageTransport> responseMessageMap = + new ConcurrentHashMap<String, IServiceMessageTransport>(); + private final Map<String, ServiceConversationRecord> conversations = new ConcurrentHashMap<String, ServiceConversationRecord>(); - public ServiceConversationCollection(int messageReceivingTimeoutMillis) + private final Random rng = new Random(); + + private final IServiceMessageTransport incomingTransport = new IServiceMessageTransport() + { + public void send(ServiceMessage message) + { + final String conversationId = message.getConversationId(); + final ServiceConversationRecord record = conversations.get(conversationId); + if (record == null) + { + operationLog.error(String.format( + "Message for unknown service conversation '%s'", conversationId)); + return; + } + if (message.hasPayload()) + { + record.getMessenger().sendToService(message); + } else + { + if (message.isException()) + { + operationLog.error(String.format( + "[id: %s] Client execution exception.\n%s", conversationId, + message.tryGetExceptionDescription())); + } else + { + operationLog.error(String.format( + "[id: %s] Client requests termination of service conversation.", + conversationId)); + } + record.getMessenger().markAsInterrupted(); + record.getController().cancel(true); + } + } + }; + + public ServiceConversationServer(int messageReceivingTimeoutMillis) { this.messageReceivingTimeoutMillis = messageReceivingTimeoutMillis; } + // + // Initial setup + // + /** * Adds a new service type to this conversation object. */ @@ -79,37 +120,57 @@ public class ServiceConversationCollection implements ISendingMessenger serviceFactoryMap.put(id, factory); } + // + // Client setup + // + /** - * Starts a service conversation of type <var>typeId</var>. + * Adds the client transport (to be called when client connects). + */ + public void addClientResponseTransport(String clientId, + IServiceMessageTransport responseTransport) + { + responseMessageMap.put(clientId, responseTransport); + } + + /** + * Removes the client transport (to be called when client disconnects). * - * @param typeId The service type of the conversation. - * @return a {@link ClientMessenger} to communicate with the service. + * @return <code>true</code> if the client transport was removed. + */ + public boolean removeClientResponseTransport(String clientId) + { + return responseMessageMap.remove(clientId) != null; + } + + /** + * Returns the transport for incoming messages from clients. */ - public ClientMessenger startConversation(final String typeId) + public IServiceMessageTransport getIncomingMessageTransport() { - final ClientMessenger clientMessenger = new ClientMessenger(this); - final ServiceConversationDTO serviceConversationRecord = - startConversation(typeId, clientMessenger.getResponseMessenger()); - clientMessenger.setServiceConversationDTO(serviceConversationRecord); - return clientMessenger; + return incomingTransport; } /** * Starts a service conversation of type <var>typeId</var>. * * @param typeId The service type of the conversation. - * @param responseMessenger The messenger to communicate back the messages from the service to - * the client. + * @param clientId The id of the client, used to find a suitable transport to communicate back + * the messages from the service to the client. * @return The information about the service conversation started. */ - public ServiceConversationDTO startConversation(final String typeId, - final ISendingMessenger responseMessenger) + public ServiceConversationDTO startConversation(final String typeId, final String clientId) { final IServiceFactory serviceFactory = serviceFactoryMap.get(typeId); if (serviceFactory == null) { throw new UnknownServiceTypeException(typeId); } + final IServiceMessageTransport responseMessenger = responseMessageMap.get(clientId); + if (responseMessenger == null) + { + throw new UnknownClientException(clientId); + } final IService serviceInstance = serviceFactory.create(); final String serviceConversationId = Long.toString(System.currentTimeMillis()) + "-" + rng.nextInt(Integer.MAX_VALUE); @@ -202,38 +263,4 @@ public class ServiceConversationCollection implements ISendingMessenger return conversations.containsKey(conversationId); } - // - // IIncomingMessenger - // - - public void send(ServiceMessage message) - { - final String conversationId = message.getConversationId(); - final ServiceConversationRecord record = conversations.get(conversationId); - if (record == null) - { - operationLog.error(String.format("Message for unknown service conversation '%s'", - conversationId)); - return; - } - if (message.hasPayload()) - { - record.getMessenger().sendToService(message); - } else - { - if (message.isException()) - { - operationLog.error(String.format("[id: %s] Client execution exception.\n%s", - conversationId, message.tryGetExceptionDescription())); - } else - { - operationLog.error(String.format( - "[id: %s] Client requests termination of service conversation.", - conversationId)); - } - record.getMessenger().markAsInterrupted(); - record.getController().cancel(true); - } - } - } diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/IClientMessenger.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/UnknownClientException.java similarity index 65% rename from common/source/java/ch/systemsx/cisd/common/serviceconversation/IClientMessenger.java rename to common/source/java/ch/systemsx/cisd/common/serviceconversation/UnknownClientException.java index b13d04c729157ae6c0733a26bc5ab37424a522c2..4a11135bf2e140ed5f245ead26e8bda92641b0a3 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/IClientMessenger.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/UnknownClientException.java @@ -16,23 +16,17 @@ package ch.systemsx.cisd.common.serviceconversation; -import java.io.Serializable; - /** - * A messenger role for receiving messages from a service. - * + * An exception for signaling that a client is not known. + * * @author Bernd Rinn */ -public interface IClientMessenger +public class UnknownClientException extends RuntimeException { - /** - * Send a message to the service. - */ - public void send(Serializable message); - - /** - * Receive a message from the service. - */ - public <T extends Serializable> T receive(Class<T> messageClass); + private static final long serialVersionUID = 1L; -} \ No newline at end of file + public UnknownClientException(String clientId) + { + super("Client '" + clientId + "' is not known."); + } +} diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollectionTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollectionTest.java deleted file mode 100644 index 75d281140b18c9aa08322f7482d9599057fa4b4d..0000000000000000000000000000000000000000 --- a/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollectionTest.java +++ /dev/null @@ -1,364 +0,0 @@ -/* - * Copyright 2011 ETH Zuerich, CISD - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.systemsx.cisd.common.serviceconversation; - -import static org.testng.AssertJUnit.assertEquals; -import static org.testng.AssertJUnit.assertFalse; -import static org.testng.AssertJUnit.assertTrue; -import static org.testng.AssertJUnit.fail; - -import java.io.Serializable; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import org.testng.annotations.BeforeTest; -import org.testng.annotations.Test; - -import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked; -import ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked; -import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities; -import ch.systemsx.cisd.common.logging.LogInitializer; - -/** - * Test cases for the {@Link ServiceConversationCollection} class. - * - * @author Bernd Rinn - */ -public class ServiceConversationCollectionTest -{ - @BeforeTest - public void init() - { - LogInitializer.init(); - } - - private static class SingleEchoService implements IService - { - public void run(IServiceMessenger messenger) - { - messenger.send(messenger.receive(String.class)); - } - } - - @Test - public void testSingleEchoServiceHappyCase() throws Exception - { - final ServiceConversationCollection conversations = new ServiceConversationCollection(100); - conversations.addServiceType(new IServiceFactory() - { - public IService create() - { - return new SingleEchoService(); - } - - public int getClientTimeoutMillis() - { - return 100; - } - - public String getServiceTypeId() - { - return "singleEcho"; - } - }); - final IClientMessenger messenger = conversations.startConversation("singleEcho"); - messenger.send("Hallo Echo"); - assertEquals("Hallo Echo", messenger.receive(String.class)); - } - - private static class EchoService implements IService - { - public void run(IServiceMessenger messenger) - { - try - { - while (true) - { - System.err.println(Thread.currentThread().getName()); - messenger.send(messenger.receive(String.class)); - } - } catch (RuntimeException ex) - { - // Show exception - ex.printStackTrace(); - // This doesn't matter: the exception goes into the void. - throw ex; - } - } - } - - @Test - public void testMultipleEchoServiceTerminateHappyCase() throws Exception - { - final ServiceConversationCollection conversations = new ServiceConversationCollection(100); - conversations.addServiceType(new IServiceFactory() - { - public IService create() - { - return new EchoService(); - } - - public int getClientTimeoutMillis() - { - return 100; - } - - public String getServiceTypeId() - { - return "echo"; - } - }); - final BlockingQueue<ServiceMessage> messageQueue = - new LinkedBlockingQueue<ServiceMessage>(); - final String id = conversations.startConversation("echo", new ISendingMessenger() - { - public void send(ServiceMessage message) - { - messageQueue.add(message); - } - }).getServiceConversationId(); - assertTrue(conversations.hasConversation(id)); - int messageIdx = 0; - - conversations.send(new ServiceMessage(id, 0, false, "One")); - ServiceMessage m = messageQueue.take(); - assertEquals(id, m.getConversationId()); - assertEquals(messageIdx++, m.getMessageIdx()); - assertEquals("One", m.getPayload()); - - conversations.send(new ServiceMessage(id, 1, false, "Two")); - // Try to resend and check that the second one is swallowed. - conversations.send(new ServiceMessage(id, 1, false, "Two")); - m = messageQueue.take(); - assertEquals(id, m.getConversationId()); - assertEquals(messageIdx++, m.getMessageIdx()); - assertEquals("Two", m.getPayload()); - - conversations.send(new ServiceMessage(id, 2, false, "Three")); - m = messageQueue.take(); - assertEquals(id, m.getConversationId()); - assertEquals(messageIdx++, m.getMessageIdx()); - assertEquals("Three", m.getPayload()); - - conversations.send(ServiceMessage.terminate(id)); - for (int i = 0; i < 100 && conversations.hasConversation(id); ++i) - { - ConcurrencyUtilities.sleep(10L); - } - assertFalse(conversations.hasConversation(id)); - } - - @Test - public void testEchoServiceTimeout() throws Exception - { - final ServiceConversationCollection conversations = new ServiceConversationCollection(100); - conversations.addServiceType(new IServiceFactory() - { - public IService create() - { - return new EchoService(); - } - - public int getClientTimeoutMillis() - { - return 100; - } - - public String getServiceTypeId() - { - return "echo"; - } - }); - final BlockingQueue<ServiceMessage> messageQueue = - new LinkedBlockingQueue<ServiceMessage>(); - final String id = conversations.startConversation("echo", new ISendingMessenger() - { - public void send(ServiceMessage message) - { - messageQueue.add(message); - } - }).getServiceConversationId(); - assertTrue(conversations.hasConversation(id)); - int messageIdx = 0; - conversations.send(new ServiceMessage(id, 0, false, "One")); - ServiceMessage m = messageQueue.take(); - assertEquals(id, m.getConversationId()); - assertEquals(messageIdx++, m.getMessageIdx()); - assertEquals("One", m.getPayload()); - - // Wait for timeout to happen. - for (int i = 0; i < 100 && conversations.hasConversation(id); ++i) - { - ConcurrencyUtilities.sleep(10L); - } - assertFalse(conversations.hasConversation(id)); - m = messageQueue.take(); - assertTrue(m.isException()); - assertTrue(m.tryGetExceptionDescription().startsWith( - "ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked:")); - } - - @Test(expectedExceptions = UnknownServiceTypeException.class) - public void testNonExistentService() throws Exception - { - final ServiceConversationCollection conversations = new ServiceConversationCollection(100); - conversations.startConversation("echo", new ISendingMessenger() - { - public void send(ServiceMessage message) - { - } - }); - } - - private static class ExceptionThrowingService implements IService - { - public void run(IServiceMessenger messenger) - { - throw new RuntimeException("Don't like you!"); - } - } - - @Test - public void testServiceThrowsException() throws Exception - { - final ServiceConversationCollection conversations = new ServiceConversationCollection(100); - conversations.addServiceType(new IServiceFactory() - { - public IService create() - { - return new ExceptionThrowingService(); - } - - public int getClientTimeoutMillis() - { - return 100; - } - - public String getServiceTypeId() - { - return "throwException"; - } - }); - final ClientMessenger messenger = conversations.startConversation("throwException"); - try - { - messenger.receive(Serializable.class); - fail(); - } catch (ServiceExecutionException ex) - { - assertEquals(messenger.getServiceConversationId(), ex.getServiceConversationId()); - assertTrue(ex.getDescription().contains("RuntimeException")); - assertTrue(ex.getDescription().contains("Don't like you!")); - } - } - - private static class EventuallyExceptionThrowingService implements IService - { - public void run(IServiceMessenger messenger) - { - messenger.send("OK1"); - messenger.send("OK2"); - messenger.send("OK3"); - throw new RuntimeException("Don't like you!"); - } - } - - @Test - public void testServiceEventuallyThrowsException() throws Exception - { - final ServiceConversationCollection conversations = new ServiceConversationCollection(100); - conversations.addServiceType(new IServiceFactory() - { - public IService create() - { - return new EventuallyExceptionThrowingService(); - } - - public int getClientTimeoutMillis() - { - return 100; - } - - public String getServiceTypeId() - { - return "throwException"; - } - }); - final ClientMessenger messenger = conversations.startConversation("throwException"); - ConcurrencyUtilities.sleep(100L); - try - { - // The regular messages should have been cleared by now so that we get to see the - // exception immediately. - messenger.receive(Serializable.class); - fail(); - } catch (ServiceExecutionException ex) - { - assertEquals(messenger.getServiceConversationId(), ex.getServiceConversationId()); - assertTrue(ex.getDescription().contains("RuntimeException")); - assertTrue(ex.getDescription().contains("Don't like you!")); - } - } - - private static class DelayedService implements IService - { - public void run(IServiceMessenger messenger) - { - try - { - ConcurrencyUtilities.sleep(100L); - } catch (InterruptedExceptionUnchecked ex) - { - System.err.println("DelayedService got interrupted."); - } - } - } - - @Test - public void testClientTimesout() throws Exception - { - final ServiceConversationCollection conversations = new ServiceConversationCollection(100); - conversations.addServiceType(new IServiceFactory() - { - public IService create() - { - return new DelayedService(); - } - - public int getClientTimeoutMillis() - { - return 10; - } - - public String getServiceTypeId() - { - return "delayed"; - } - }); - final ClientMessenger messenger = conversations.startConversation("delayed"); - try - { - messenger.receive(Serializable.class); - fail(); - } catch (TimeoutExceptionUnchecked ex) - { - } - // Wait for service to find out that the client timed out. - ConcurrencyUtilities.sleep(100L); - } - -} diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationTest.java new file mode 100644 index 0000000000000000000000000000000000000000..695fbbe8b95b20347b966e95e1968cdf9e4dd39d --- /dev/null +++ b/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationTest.java @@ -0,0 +1,522 @@ +/* + * Copyright 2011 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.serviceconversation; + +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertFalse; +import static org.testng.AssertJUnit.assertTrue; +import static org.testng.AssertJUnit.fail; + +import java.io.Serializable; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked; +import ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked; +import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities; +import ch.systemsx.cisd.common.logging.LogInitializer; + +/** + * Test cases for the {@Link ServiceConversationCollection} class. + * + * @author Bernd Rinn + */ +public class ServiceConversationTest +{ + @BeforeTest + public void init() + { + LogInitializer.init(); + } + + /** + * This object encapsulates the client server connection for test purposes. + */ + private static class TestClientServerConnection implements IRemoteServiceConversationServer, + IServiceMessageTransport + { + private final ServiceConversationServer server; + + TestClientServerConnection(ServiceConversationServer server) + { + this.server = server; + } + + void setResponseMessageTransport(final IServiceMessageTransport responseMessageTransport) + { + server.addClientResponseTransport("dummyClient", new IServiceMessageTransport() + { + public void send(ServiceMessage message) + { + int attempt = 0; + while (attempt++ < 10) + { + try + { + // Send all messages twice to test detection of duplicate messages. + responseMessageTransport.send(message); + responseMessageTransport.send(message); + break; + } catch (Exception ex) + { + ConcurrencyUtilities.sleep(10); + } + } + } + }); + + } + + public void send(ServiceMessage message) + { + // Send all messages twice to test detection of duplicate messages. + server.getIncomingMessageTransport().send(message); + server.getIncomingMessageTransport().send(message); + } + + public ServiceConversationDTO startConversation(String typeId) + { + return server.startConversation(typeId, "dummyClient"); + } + + } + + private static class ServiceConversationServerAndClientHolder + { + final ServiceConversationServer server; + + final ServiceConversationClient client; + + ServiceConversationServerAndClientHolder(ServiceConversationServer server, + ServiceConversationClient client) + { + this.server = server; + this.client = client; + } + + } + + private ServiceConversationServerAndClientHolder createServerAndClient(IServiceFactory factory) + { + final ServiceConversationServer server = new ServiceConversationServer(100); + server.addServiceType(factory); + final TestClientServerConnection dummyRemoteServer = new TestClientServerConnection(server); + final ServiceConversationClient client = + new ServiceConversationClient(dummyRemoteServer, dummyRemoteServer); + dummyRemoteServer.setResponseMessageTransport(client.getIncomingResponseMessageTransport()); + return new ServiceConversationServerAndClientHolder(server, client); + } + + private static class SingleEchoService implements IService + { + public void run(IServiceMessenger messenger) + { + messenger.send(messenger.receive(String.class)); + } + + static IServiceFactory createFactory() + { + return new IServiceFactory() + { + public IService create() + { + return new SingleEchoService(); + } + + public int getClientTimeoutMillis() + { + return 100; + } + + public String getServiceTypeId() + { + return "singleEcho"; + } + }; + } + } + + @Test + public void testSingleEchoServiceHappyCase() throws Exception + { + final ServiceConversationClient client = + createServerAndClient(SingleEchoService.createFactory()).client; + final IServiceConversation messenger = client.startConversation("singleEcho"); + messenger.send("Hallo Echo"); + assertEquals("Hallo Echo", messenger.receive(String.class)); + messenger.close(); + } + + private static class EchoService implements IService + { + public void run(IServiceMessenger messenger) + { + try + { + while (true) + { + System.err.println(Thread.currentThread().getName()); + messenger.send(messenger.receive(String.class)); + } + } catch (RuntimeException ex) + { + // Show exception + ex.printStackTrace(); + // This doesn't matter: the exception goes into the void. + throw ex; + } + } + + static IServiceFactory createFactory() + { + return new IServiceFactory() + { + public IService create() + { + return new EchoService(); + } + + public int getClientTimeoutMillis() + { + return 100; + } + + public String getServiceTypeId() + { + return "echo"; + } + }; + } + } + + @Test + public void testMultipleEchoServiceTerminateHappyCase() throws Exception + { + final ServiceConversationServerAndClientHolder holder = + createServerAndClient(EchoService.createFactory()); + final IServiceConversation conversation = holder.client.startConversation("echo"); + + conversation.send("One"); + assertEquals("One", conversation.receive(String.class)); + + conversation.send("Two"); + assertEquals("Two", conversation.receive(String.class)); + + conversation.send("Three"); + assertEquals("Three", conversation.receive(String.class)); + + conversation.terminate(); + for (int i = 0; i < 100 + && holder.server.hasConversation(conversation.getId()); ++i) + { + ConcurrencyUtilities.sleep(10L); + } + assertFalse(holder.server.hasConversation(conversation.getId())); + } + + @Test + public void testMultipleEchoServiceTerminateLowLevelHappyCase() throws Exception + { + final ServiceConversationServer conversations = new ServiceConversationServer(100); + conversations.addServiceType(EchoService.createFactory()); + final BlockingQueue<ServiceMessage> messageQueue = + new LinkedBlockingQueue<ServiceMessage>(); + conversations.addClientResponseTransport("dummyClient", new IServiceMessageTransport() + { + public void send(ServiceMessage message) + { + messageQueue.add(message); + } + }); + final String id = + conversations.startConversation("echo", "dummyClient").getServiceConversationId(); + assertTrue(conversations.hasConversation(id)); + int messageIdx = 0; + + conversations.getIncomingMessageTransport().send(new ServiceMessage(id, 0, false, "One")); + ServiceMessage m = messageQueue.take(); + assertEquals(id, m.getConversationId()); + assertEquals(messageIdx++, m.getMessageIdx()); + assertEquals("One", m.getPayload()); + + conversations.getIncomingMessageTransport().send(new ServiceMessage(id, 1, false, "Two")); + // Try to resend and check that the second one is swallowed. + conversations.getIncomingMessageTransport().send(new ServiceMessage(id, 1, false, "Two")); + m = messageQueue.take(); + assertEquals(id, m.getConversationId()); + assertEquals(messageIdx++, m.getMessageIdx()); + assertEquals("Two", m.getPayload()); + + conversations.getIncomingMessageTransport().send(new ServiceMessage(id, 2, false, "Three")); + m = messageQueue.take(); + assertEquals(id, m.getConversationId()); + assertEquals(messageIdx++, m.getMessageIdx()); + assertEquals("Three", m.getPayload()); + + conversations.getIncomingMessageTransport().send(ServiceMessage.terminate(id)); + for (int i = 0; i < 100 && conversations.hasConversation(id); ++i) + { + ConcurrencyUtilities.sleep(10L); + } + assertFalse(conversations.hasConversation(id)); + } + + @Test + public void testEchoServiceTimeout() throws Exception + { + final ServiceConversationServer conversations = new ServiceConversationServer(100); + conversations.addServiceType(new IServiceFactory() + { + public IService create() + { + return new EchoService(); + } + + public int getClientTimeoutMillis() + { + return 100; + } + + public String getServiceTypeId() + { + return "echo"; + } + }); + final BlockingQueue<ServiceMessage> messageQueue = + new LinkedBlockingQueue<ServiceMessage>(); + conversations.addClientResponseTransport("dummyClient", new IServiceMessageTransport() + { + public void send(ServiceMessage message) + { + messageQueue.add(message); + } + }); + final String id = + conversations.startConversation("echo", "dummyClient").getServiceConversationId(); + assertTrue(conversations.hasConversation(id)); + int messageIdx = 0; + conversations.getIncomingMessageTransport().send(new ServiceMessage(id, 0, false, "One")); + ServiceMessage m = messageQueue.take(); + assertEquals(id, m.getConversationId()); + assertEquals(messageIdx++, m.getMessageIdx()); + assertEquals("One", m.getPayload()); + + // Wait for timeout to happen. + for (int i = 0; i < 100 && conversations.hasConversation(id); ++i) + { + ConcurrencyUtilities.sleep(10L); + } + assertFalse(conversations.hasConversation(id)); + m = messageQueue.take(); + assertTrue(m.isException()); + assertTrue(m.tryGetExceptionDescription().startsWith( + "ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked:")); + } + + @Test(expectedExceptions = UnknownServiceTypeException.class) + public void testUnknownService() throws Exception + { + final ServiceConversationServer conversations = new ServiceConversationServer(100); + conversations.addClientResponseTransport("dummyClient", new IServiceMessageTransport() + { + public void send(ServiceMessage message) + { + } + }); + conversations.startConversation("echo", "dummyClient"); + } + + @Test(expectedExceptions = UnknownClientException.class) + public void testUnknownClient() throws Exception + { + final ServiceConversationServer conversations = new ServiceConversationServer(100); + conversations.addServiceType(new IServiceFactory() + { + public IService create() + { + return new EchoService(); + } + + public int getClientTimeoutMillis() + { + return 100; + } + + public String getServiceTypeId() + { + return "echo"; + } + }); + conversations.startConversation("echo", "dummyClient"); + } + + private static class ExceptionThrowingService implements IService + { + public void run(IServiceMessenger messenger) + { + throw new RuntimeException("Don't like you!"); + } + + static IServiceFactory createFactory() + { + return new IServiceFactory() + { + public IService create() + { + return new ExceptionThrowingService(); + } + + public int getClientTimeoutMillis() + { + return 100; + } + + public String getServiceTypeId() + { + return "throwException"; + } + }; + } + } + + @Test + public void testServiceThrowsException() throws Exception + { + final ServiceConversationClient client = + createServerAndClient(ExceptionThrowingService.createFactory()).client; + final IServiceConversation messenger = client.startConversation("throwException"); + try + { + messenger.receive(Serializable.class); + fail(); + } catch (ServiceExecutionException ex) + { + assertEquals(messenger.getId(), ex.getServiceConversationId()); + assertTrue(ex.getDescription().contains("RuntimeException")); + assertTrue(ex.getDescription().contains("Don't like you!")); + } + } + + private static class EventuallyExceptionThrowingService implements IService + { + public void run(IServiceMessenger messenger) + { + messenger.send("OK1"); + messenger.send("OK2"); + messenger.send("OK3"); + throw new RuntimeException("Don't like you!"); + } + + static IServiceFactory createFactory() + { + return new IServiceFactory() + { + public IService create() + { + return new EventuallyExceptionThrowingService(); + } + + public int getClientTimeoutMillis() + { + return 100; + } + + public String getServiceTypeId() + { + return "throwException"; + } + }; + } + } + + @Test + public void testServiceEventuallyThrowsException() throws Exception + { + final ServiceConversationClient client = + createServerAndClient(EventuallyExceptionThrowingService.createFactory()).client; + final IServiceConversation messenger = client.startConversation("throwException"); + ConcurrencyUtilities.sleep(100L); + try + { + // The regular messages should have been cleared by now so that we get to see the + // exception immediately. + messenger.receive(Serializable.class); + fail(); + } catch (ServiceExecutionException ex) + { + assertEquals(messenger.getId(), ex.getServiceConversationId()); + assertTrue(ex.getDescription().contains("RuntimeException")); + assertTrue(ex.getDescription().contains("Don't like you!")); + } + } + + private static class DelayedService implements IService + { + public void run(IServiceMessenger messenger) + { + try + { + ConcurrencyUtilities.sleep(100L); + } catch (InterruptedExceptionUnchecked ex) + { + System.err.println("DelayedService got interrupted."); + } + } + + static IServiceFactory createFactory() + { + return new IServiceFactory() + { + public IService create() + { + return new DelayedService(); + } + + public int getClientTimeoutMillis() + { + return 50; + } + + public String getServiceTypeId() + { + return "delayed"; + } + }; + } + } + + @Test + public void testClientTimeout() throws Exception + { + final ServiceConversationClient client = + createServerAndClient(DelayedService.createFactory()).client; + final IServiceConversation messenger = client.startConversation("delayed"); + try + { + messenger.receive(Serializable.class); + fail(); + } catch (TimeoutExceptionUnchecked ex) + { + } + // Wait for service to find out that the client timed out. + ConcurrencyUtilities.sleep(100L); + } + +}