From 3c50c3307b5fe41f69d24127caa6c3dda7e98d5c Mon Sep 17 00:00:00 2001 From: brinn <brinn> Date: Fri, 16 Dec 2011 07:42:30 +0000 Subject: [PATCH] Make the service factory specify the client timeout. Notify server of client timeout. Change message payload from Object to Serializable. SVN: 24008 --- .../BidirectinoalServiceMessenger.java | 14 ++- .../ClientExecutionException.java | 81 +++++++++++++++++ .../serviceconversation/ClientMessenger.java | 33 ++++--- .../serviceconversation/IClientMessenger.java | 11 +-- .../serviceconversation/IServiceFactory.java | 8 +- .../IServiceMessenger.java | 6 +- .../ServiceConversationCollection.java | 52 ++++++----- .../ServiceConversationDTO.java | 54 +++++++++++ .../ServiceExecutionException.java | 26 ++++-- .../serviceconversation/ServiceMessage.java | 50 ++++++----- .../ServiceConversationCollectionTest.java | 89 +++++++++++++++---- 11 files changed, 322 insertions(+), 102 deletions(-) create mode 100644 common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientExecutionException.java create mode 100644 common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationDTO.java rename common/sourceTest/java/ch/systemsx/cisd/common/{serviceconversarions => serviceconversation}/ServiceConversationCollectionTest.java (75%) diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/BidirectinoalServiceMessenger.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/BidirectinoalServiceMessenger.java index d40c9a8df5b..3133b9a6fc7 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/BidirectinoalServiceMessenger.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/BidirectinoalServiceMessenger.java @@ -16,6 +16,7 @@ package ch.systemsx.cisd.common.serviceconversation; +import java.io.Serializable; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -56,7 +57,7 @@ class BidirectinoalServiceMessenger return new IServiceMessenger() { @SuppressWarnings("unchecked") - public <T> T receive(Class<T> messageClass) + public <T extends Serializable> T receive(Class<T> messageClass) { final Object payload; try @@ -66,7 +67,12 @@ class BidirectinoalServiceMessenger if (message == null) { throw new TimeoutExceptionUnchecked( - "Timeout while waiting for message to return."); + "Timeout while waiting for message from client."); + } + if (message.isException()) + { + throw new ClientExecutionException(conversationId, + message.tryGetExceptionDescription()); } payload = message.getPayload(); } catch (InterruptedException ex) @@ -82,10 +88,10 @@ class BidirectinoalServiceMessenger return (T) payload; } - public void send(Object message) + public void send(Serializable message) { responseMessenger.send(new ServiceMessage(conversationId, - nextOutgoingMessageIndex(), message)); + nextOutgoingMessageIndex(), false, message)); } }; } diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientExecutionException.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientExecutionException.java new file mode 100644 index 00000000000..5d04897b05d --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientExecutionException.java @@ -0,0 +1,81 @@ +/* + * Copyright 2011 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.serviceconversation; + +import java.io.PrintStream; +import java.io.PrintWriter; + +/** + * An exception that signals to the server that an exception happened during service execution on + * the client. + * + * @author Bernd Rinn + */ +public class ClientExecutionException extends RuntimeException +{ + private static final long serialVersionUID = 1L; + + private final String serviceConversationId; + + private final String description; + + ClientExecutionException(String serviceConversationId, String description) + { + super("Client execution exception in service conversation " + serviceConversationId); + this.serviceConversationId = serviceConversationId; + this.description = description; + } + + public String getServiceConversationId() + { + return serviceConversationId; + } + + public String getDescription() + { + return description; + } + + @Override + public String toString() + { + return "ClientExecutionException [serviceConversationId=" + serviceConversationId + + ", description=" + description + "]"; + } + + @Override + public void printStackTrace() + { + System.err.println(getMessage()); + System.err.println(getDescription()); + } + + @Override + public void printStackTrace(PrintStream s) + { + s.println(getMessage()); + s.println(getDescription()); + } + + @Override + public void printStackTrace(PrintWriter s) + { + s.println(getMessage()); + s.println(getDescription()); + } + +} diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientMessenger.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientMessenger.java index 8719626ec52..83b08a84e4e 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientMessenger.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientMessenger.java @@ -16,6 +16,7 @@ package ch.systemsx.cisd.common.serviceconversation; +import java.io.Serializable; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -37,6 +38,8 @@ public class ClientMessenger implements IClientMessenger private String serviceConversationId; + private int timeoutMillis; + private int messageIdxLastSeen = -1; private int outgoingMessageIdx; @@ -70,10 +73,10 @@ public class ClientMessenger implements IClientMessenger }; } - public void send(Object message) + public void send(Serializable message) { senderToService.send(new ServiceMessage(serviceConversationId, nextOutgoingMessageIndex(), - message)); + false, message)); } private int nextOutgoingMessageIndex() @@ -81,18 +84,7 @@ public class ClientMessenger implements IClientMessenger return outgoingMessageIdx++; } - public <T> T receive(Class<T> messageClass) - { - try - { - return handleMessage(messageQueue.take(), messageClass); - } catch (InterruptedException ex) - { - throw CheckedExceptionTunnel.wrapIfNecessary(ex); - } - } - - public <T> T receive(Class<T> messageClass, int timeoutMillis) + public <T extends Serializable> T receive(Class<T> messageClass) { try { @@ -109,7 +101,13 @@ public class ClientMessenger implements IClientMessenger { if (message == null) { - throw new TimeoutExceptionUnchecked("Timeout while waiting on message from service."); + final TimeoutExceptionUnchecked exception = + new TimeoutExceptionUnchecked("Timeout while waiting on message from service."); + final String exceptionDescription = + ServiceExecutionException.getDescriptionFromException(exception); + senderToService.send(new ServiceMessage(serviceConversationId, + nextOutgoingMessageIndex(), true, exceptionDescription)); + throw exception; } if (message.isException()) { @@ -129,9 +127,10 @@ public class ClientMessenger implements IClientMessenger return serviceConversationId; } - void setServiceConversationId(String serviceConversationId) + void setServiceConversationDTO(ServiceConversationDTO serviceConversation) { - this.serviceConversationId = serviceConversationId; + this.serviceConversationId = serviceConversation.getServiceConversationId(); + this.timeoutMillis = serviceConversation.getClientTimeoutInMillis(); } } diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/IClientMessenger.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IClientMessenger.java index bcdda844e36..b13d04c7291 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/IClientMessenger.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IClientMessenger.java @@ -16,6 +16,8 @@ package ch.systemsx.cisd.common.serviceconversation; +import java.io.Serializable; + /** * A messenger role for receiving messages from a service. * @@ -26,16 +28,11 @@ public interface IClientMessenger /** * Send a message to the service. */ - public void send(Object message); + public void send(Serializable message); /** * Receive a message from the service. */ - public <T> T receive(Class<T> messageClass); - - /** - * Receive a message from the service. - */ - public <T> T receive(Class<T> messageClass, int timeoutMillis); + public <T extends Serializable> T receive(Class<T> messageClass); } \ No newline at end of file diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceFactory.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceFactory.java index 7032806d485..44e9b17f4ae 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceFactory.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceFactory.java @@ -18,7 +18,7 @@ package ch.systemsx.cisd.common.serviceconversation; /** * A factory for services. - * + * * @author Bernd Rinn */ public interface IServiceFactory @@ -27,4 +27,10 @@ public interface IServiceFactory * Create a new service. */ public IService create(); + + /** + * Returns the suggested timeout (in milli-seconds) of the client when waiting for a message + * from this service. + */ + public int getClientTimeoutMillis(); } diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceMessenger.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceMessenger.java index a43206aff14..d5c87e8a51f 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceMessenger.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceMessenger.java @@ -16,6 +16,8 @@ package ch.systemsx.cisd.common.serviceconversation; +import java.io.Serializable; + /** * A messaging interface for a service of the service conversation framework. @@ -27,10 +29,10 @@ public interface IServiceMessenger /** * Send a message to the client. */ - public void send(Object message); + public void send(Serializable message); /** * Receive a message from the client. */ - public <T> T receive(Class<T> messageClass); + public <T extends Serializable> T receive(Class<T> messageClass); } diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollection.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollection.java index d6fac7d4f48..ea48c28d2fa 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollection.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollection.java @@ -16,15 +16,12 @@ package ch.systemsx.cisd.common.serviceconversation; -import java.io.PrintWriter; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.commons.io.output.ByteArrayOutputStream; - import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel; import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked; import ch.systemsx.cisd.base.namedthread.NamingThreadPoolExecutor; @@ -79,9 +76,9 @@ public class ServiceConversationCollection implements ISendingMessenger public ClientMessenger startConversation(final String typeId) { final ClientMessenger clientMessenger = new ClientMessenger(this); - final String serviceConversationId = + final ServiceConversationDTO serviceConversationRecord = startConversation(typeId, clientMessenger.getResponseMessenger()); - clientMessenger.setServiceConversationId(serviceConversationId); + clientMessenger.setServiceConversationDTO(serviceConversationRecord); return clientMessenger; } @@ -91,9 +88,10 @@ public class ServiceConversationCollection implements ISendingMessenger * @param typeId The service type of the conversation. * @param responseMessenger The messenger to communicate back the messages from the service to * the client. - * @return The service conversation id. + * @return The information about the service conversation started. */ - public String startConversation(final String typeId, final ISendingMessenger responseMessenger) + public ServiceConversationDTO startConversation(final String typeId, + final ISendingMessenger responseMessenger) { final IServiceFactory serviceFactory = serviceFactoryMap.get(typeId); if (serviceFactory == null) @@ -101,13 +99,13 @@ public class ServiceConversationCollection implements ISendingMessenger throw new UnknownServiceTypeException(typeId); } final IService serviceInstance = serviceFactory.create(); - final String conversationId = + final String serviceConversationId = Long.toString(System.currentTimeMillis()) + "-" + rng.nextInt(Integer.MAX_VALUE); final BidirectinoalServiceMessenger messenger = - new BidirectinoalServiceMessenger(conversationId, messageReceivingTimeoutMillis, - responseMessenger); + new BidirectinoalServiceMessenger(serviceConversationId, + messageReceivingTimeoutMillis, responseMessenger); final ServiceConversationRecord record = new ServiceConversationRecord(messenger); - conversations.put(conversationId, record); + conversations.put(serviceConversationId, record); final ITerminableFuture<Void> controller = ConcurrencyUtilities.submit(executor, new ICallable<Void>() { @@ -119,18 +117,18 @@ public class ServiceConversationCollection implements ISendingMessenger serviceInstance.run(messenger.getServiceMessenger()); } catch (Exception ex) { - if (ex instanceof InterruptedExceptionUnchecked == false) + if (ex instanceof InterruptedExceptionUnchecked == false + && ex instanceof ClientExecutionException == false) { - final ByteArrayOutputStream os = new ByteArrayOutputStream(); - final PrintWriter pw = new PrintWriter(os); - ex.printStackTrace(pw); - pw.close(); - final String errorMessage = new String(os.toByteArray()); + final String errorMessage = + ServiceExecutionException + .getDescriptionFromException(ex); try { - responseMessenger - .send(new ServiceMessage(conversationId, messenger - .nextOutgoingMessageIndex(), errorMessage)); + responseMessenger.send(new ServiceMessage( + serviceConversationId, messenger + .nextOutgoingMessageIndex(), true, + errorMessage)); } catch (Exception ex2) { // TODO: improve logging @@ -139,7 +137,7 @@ public class ServiceConversationCollection implements ISendingMessenger } } finally { - conversations.remove(conversationId); + conversations.remove(serviceConversationId); } return null; } @@ -152,7 +150,8 @@ public class ServiceConversationCollection implements ISendingMessenger }); record.setController(controller); - return conversationId; + return new ServiceConversationDTO(serviceConversationId, + serviceFactory.getClientTimeoutMillis()); } public void shutdown() @@ -200,7 +199,14 @@ public class ServiceConversationCollection implements ISendingMessenger final ServiceConversationRecord record = conversations.get(conversationId); if (record == null) { - throw new UnknownServiceConversationException(conversationId); + if (message.isException() == false) + { + throw new UnknownServiceConversationException(conversationId); + } else + { + // If it was an exception on the client side, be silent. + return; + } } if (message.isTerminate()) { diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationDTO.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationDTO.java new file mode 100644 index 00000000000..638a1d167db --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationDTO.java @@ -0,0 +1,54 @@ +/* + * Copyright 2011 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.serviceconversation; + +import java.io.Serializable; + +/** + * A record to save to provide the information about a new service conversation. + * + * @author Bernd Rinn + */ +public class ServiceConversationDTO implements Serializable +{ + private static final long serialVersionUID = 1L; + + private final String serviceConversationId; + + private int clientTimeoutInMillis; + + ServiceConversationDTO(String serviceConversationId, int clientTimeout) + { + this.serviceConversationId = serviceConversationId; + this.clientTimeoutInMillis = clientTimeout; + } + + public int getClientTimeoutInMillis() + { + return clientTimeoutInMillis; + } + + public void setClientTimeoutInMillis(int clientTimeoutInMillis) + { + this.clientTimeoutInMillis = clientTimeoutInMillis; + } + + public String getServiceConversationId() + { + return serviceConversationId; + } +} diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceExecutionException.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceExecutionException.java index 7ae3f882a69..1e30cb466fc 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceExecutionException.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceExecutionException.java @@ -19,9 +19,12 @@ package ch.systemsx.cisd.common.serviceconversation; import java.io.PrintStream; import java.io.PrintWriter; +import org.apache.commons.io.output.ByteArrayOutputStream; + /** - * An exception that signals to the client that an exception happened during service execution. - * + * An exception that signals to the client that an exception happened during service execution on + * the server. + * * @author Bernd Rinn */ public class ServiceExecutionException extends RuntimeException @@ -29,9 +32,9 @@ public class ServiceExecutionException extends RuntimeException private static final long serialVersionUID = 1L; private final String serviceConversationId; - + private final String description; - + ServiceExecutionException(String serviceConversationId, String description) { super("Execution exception in service conversation " + serviceConversationId); @@ -76,5 +79,18 @@ public class ServiceExecutionException extends RuntimeException s.println(getMessage()); s.println(getDescription()); } - + + /** + * Creates a text description from an exception. + */ + public static String getDescriptionFromException(Throwable th) + { + final ByteArrayOutputStream os = new ByteArrayOutputStream(); + final PrintWriter pw = new PrintWriter(os); + th.printStackTrace(pw); + pw.close(); + final String errorMessage = new String(os.toByteArray()); + return errorMessage; + } + } diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceMessage.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceMessage.java index afff492c9a6..90b6b8a79be 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceMessage.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceMessage.java @@ -16,40 +16,42 @@ package ch.systemsx.cisd.common.serviceconversation; +import java.io.Serializable; + /** * A service message which is part of a service conversation. - * + * * @author Bernd Rinn */ public class ServiceMessage { private final String conversationId; - + private final int messageIdx; - - private final Object payload; - + + private final Serializable payload; + private final String exceptionDescription; - - public static ServiceMessage terminate(String conversationId) - { - return new ServiceMessage(conversationId, 0, null); - } - public ServiceMessage(String conversationId, int messageId, Object payload) + public static ServiceMessage terminate(String conversationId) { - this.conversationId = conversationId; - this.messageIdx = messageId; - this.payload = payload; - this.exceptionDescription = null; + return new ServiceMessage(conversationId, 0, false, null); } - ServiceMessage(String conversationId, int messageId, String exceptionDescription) + public ServiceMessage(String conversationId, int messageId, boolean exception, + Serializable payload) { this.conversationId = conversationId; this.messageIdx = messageId; - this.payload = null; - this.exceptionDescription = exceptionDescription; + if (exception) + { + this.payload = null; + this.exceptionDescription = payload.toString(); + } else + { + this.payload = payload; + this.exceptionDescription = null; + } } public String getConversationId() @@ -62,7 +64,7 @@ public class ServiceMessage return messageIdx; } - public Object getPayload() + public Serializable getPayload() { return payload; } @@ -76,7 +78,7 @@ public class ServiceMessage { return (payload == null) && (exceptionDescription != null); } - + public String tryGetExceptionDescription() { return exceptionDescription; @@ -90,12 +92,12 @@ public class ServiceMessage return "ServiceMessage [conversationId=" + conversationId + ", TERMINATE]"; } else if (isException()) { - return "ServiceMessage [conversationId=" + conversationId + ", messageIdx=" + messageIdx - + ", exceptionDescription=" + exceptionDescription + "]"; + return "ServiceMessage [conversationId=" + conversationId + ", messageIdx=" + + messageIdx + ", exceptionDescription=" + exceptionDescription + "]"; } else { - return "ServiceMessage [conversationId=" + conversationId + ", messageIdx=" + messageIdx - + ", payload=" + payload + "]"; + return "ServiceMessage [conversationId=" + conversationId + ", messageIdx=" + + messageIdx + ", payload=" + payload + "]"; } } } \ No newline at end of file diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversarions/ServiceConversationCollectionTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollectionTest.java similarity index 75% rename from common/sourceTest/java/ch/systemsx/cisd/common/serviceconversarions/ServiceConversationCollectionTest.java rename to common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollectionTest.java index e14a0664678..9660ddc81e1 100644 --- a/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversarions/ServiceConversationCollectionTest.java +++ b/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollectionTest.java @@ -14,29 +14,21 @@ * limitations under the License. */ -package ch.systemsx.cisd.common.serviceconversarions; +package ch.systemsx.cisd.common.serviceconversation; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertTrue; import static org.testng.AssertJUnit.fail; +import java.io.Serializable; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.testng.annotations.Test; +import ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked; import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities; -import ch.systemsx.cisd.common.serviceconversation.ClientMessenger; -import ch.systemsx.cisd.common.serviceconversation.IClientMessenger; -import ch.systemsx.cisd.common.serviceconversation.ISendingMessenger; -import ch.systemsx.cisd.common.serviceconversation.IService; -import ch.systemsx.cisd.common.serviceconversation.IServiceFactory; -import ch.systemsx.cisd.common.serviceconversation.IServiceMessenger; -import ch.systemsx.cisd.common.serviceconversation.ServiceConversationCollection; -import ch.systemsx.cisd.common.serviceconversation.ServiceExecutionException; -import ch.systemsx.cisd.common.serviceconversation.ServiceMessage; -import ch.systemsx.cisd.common.serviceconversation.UnknownServiceTypeException; /** * Test cases for the {@Link ServiceConversationCollection} class. @@ -63,6 +55,10 @@ public class ServiceConversationCollectionTest { return new SingleEchoService(); } + public int getClientTimeoutMillis() + { + return 100; + } }); final IClientMessenger messenger = conversations.startConversation("singleEcho"); messenger.send("Hallo Echo"); @@ -100,6 +96,10 @@ public class ServiceConversationCollectionTest { return new EchoService(); } + public int getClientTimeoutMillis() + { + return 100; + } }); final BlockingQueue<ServiceMessage> messageQueue = new LinkedBlockingQueue<ServiceMessage>(); @@ -109,25 +109,25 @@ public class ServiceConversationCollectionTest { messageQueue.add(message); } - }); + }).getServiceConversationId(); assertTrue(conversations.hasConversation(id)); int messageIdx = 0; - conversations.send(new ServiceMessage(id, 0, "One")); + conversations.send(new ServiceMessage(id, 0, false, "One")); ServiceMessage m = messageQueue.take(); assertEquals(id, m.getConversationId()); assertEquals(messageIdx++, m.getMessageIdx()); assertEquals("One", m.getPayload()); - conversations.send(new ServiceMessage(id, 1, "Two")); + conversations.send(new ServiceMessage(id, 1, false, "Two")); // Try to resend and check that the second one is swallowed. - conversations.send(new ServiceMessage(id, 1, "Two")); + conversations.send(new ServiceMessage(id, 1, false, "Two")); m = messageQueue.take(); assertEquals(id, m.getConversationId()); assertEquals(messageIdx++, m.getMessageIdx()); assertEquals("Two", m.getPayload()); - conversations.send(new ServiceMessage(id, 2, "Three")); + conversations.send(new ServiceMessage(id, 2, false, "Three")); m = messageQueue.take(); assertEquals(id, m.getConversationId()); assertEquals(messageIdx++, m.getMessageIdx()); @@ -151,6 +151,10 @@ public class ServiceConversationCollectionTest { return new EchoService(); } + public int getClientTimeoutMillis() + { + return 100; + } }); final BlockingQueue<ServiceMessage> messageQueue = new LinkedBlockingQueue<ServiceMessage>(); @@ -160,10 +164,10 @@ public class ServiceConversationCollectionTest { messageQueue.add(message); } - }); + }).getServiceConversationId(); assertTrue(conversations.hasConversation(id)); int messageIdx = 0; - conversations.send(new ServiceMessage(id, 0, "One")); + conversations.send(new ServiceMessage(id, 0, false, "One")); ServiceMessage m = messageQueue.take(); assertEquals(id, m.getConversationId()); assertEquals(messageIdx++, m.getMessageIdx()); @@ -211,11 +215,15 @@ public class ServiceConversationCollectionTest { return new ExceptionThrowingService(); } + public int getClientTimeoutMillis() + { + return 100; + } }); final ClientMessenger messenger = conversations.startConversation("throwException"); try { - messenger.receive(Object.class); + messenger.receive(Serializable.class); fail(); } catch (ServiceExecutionException ex) { @@ -225,4 +233,47 @@ public class ServiceConversationCollectionTest assertTrue(ex.getDescription().contains("Don't like you!")); } } + + private static class DelayedService implements IService + { + public void run(IServiceMessenger messenger) + { + ConcurrencyUtilities.sleep(100L); + try + { + messenger.receive(Serializable.class); + } catch (ClientExecutionException ex) + { + System.err.println("Client timed out."); + } + } + } + + @Test + public void testClientTimesout() throws Exception + { + final ServiceConversationCollection conversations = new ServiceConversationCollection(100); + conversations.addServiceType("delayed", new IServiceFactory() + { + public IService create() + { + return new DelayedService(); + } + public int getClientTimeoutMillis() + { + return 10; + } + }); + final ClientMessenger messenger = conversations.startConversation("delayed"); + try + { + messenger.receive(Serializable.class); + fail(); + } catch (TimeoutExceptionUnchecked ex) + { + } + // Wait for service to find out that the client timed out. + ConcurrencyUtilities.sleep(100L); + } + } -- GitLab