From 35463493674793d4235e558e6e8ccb4283e90a25 Mon Sep 17 00:00:00 2001 From: brinn <brinn> Date: Sun, 18 Dec 2011 10:32:24 +0000 Subject: [PATCH] Add method IServiceMessenger.tryReceive(). SVN: 24046 --- .../IServiceMessenger.java | 47 +++++++++++++++++-- .../client/ClientMessenger.java | 44 ++++++++++++----- .../client/IServiceConversation.java | 16 ------- .../server/BidirectionalServiceMessenger.java | 33 +++++++++---- .../server/ServiceConversationServer.java | 2 +- .../ServiceConversationTest.java | 33 +++++++++++-- 6 files changed, 130 insertions(+), 45 deletions(-) diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceMessenger.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceMessenger.java index ae080c4dee1..121d600ed7d 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceMessenger.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceMessenger.java @@ -18,21 +18,60 @@ package ch.systemsx.cisd.common.serviceconversation; import java.io.Serializable; +import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked; +import ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked; +import ch.systemsx.cisd.common.serviceconversation.client.ServiceExecutionException; /** * A messaging interface for a service of the service conversation framework. - * + * * @author Bernd Rinn */ public interface IServiceMessenger { + /** + * Returns the service conversation id. + */ + public String getId(); + /** * Send a message to the counter part. + * + * @param message The message to send. + * @throws InterruptedExceptionUnchecked If the client signaled termination (server-side only). */ - public void send(Serializable message); - + public void send(Serializable message) throws InterruptedExceptionUnchecked; + + /** + * Receive a message from the counter part. + * + * @param messageClass The class of the message to receive. + * @return The message. + * @throws UnexpectedMessagePayloadException If the next message is not compatible with + * <var>messageClass</var>. + * @throws TimeoutExceptionUnchecked If no message arrived in the time specified in the + * settings. + * @throws InterruptedExceptionUnchecked If the client signaled termination (server-side only). + * @throws ServiceExecutionException If the server signaled an exception (client-side only). + */ + public <T extends Serializable> T receive(Class<T> messageClass) + throws UnexpectedMessagePayloadException, TimeoutExceptionUnchecked, + InterruptedExceptionUnchecked, ServiceExecutionException; + /** * Receive a message from the counter part. + * + * @param messageClass The class of the message to receive. + * @param timeoutMillis The timeout (in milli-seconds) to wait for a message to arrive, if no + * message is queued. + * @return The message, or <code>null</code>, if no message become available during the period + * given by <var>timeoutMillis</var>. + * @throws UnexpectedMessagePayloadException If the next message is not compatible with + * <var>messageClass</var>. + * @throws TimeoutExceptionUnchecked If no message arrived in the time specified in the + * settings. + * @throws InterruptedExceptionUnchecked If the client signaled termination (server-side only). + * @throws ServiceExecutionException If the server signaled an exception (client-side only). */ - public <T extends Serializable> T receive(Class<T> messageClass); + public <T extends Serializable> T tryReceive(Class<T> messageClass, int timeoutMillis); } diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/client/ClientMessenger.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/client/ClientMessenger.java index 5d4366a2f1f..3e3172eeafa 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/client/ClientMessenger.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/client/ClientMessenger.java @@ -40,7 +40,7 @@ class ClientMessenger implements IServiceConversation private final ClientResponseMessageMultiplexer responseMessageMultiplexer; - private int timeoutMillis; + private int serviceMessageTimeoutMillis; private int outgoingMessageIdx; @@ -52,7 +52,7 @@ class ClientMessenger implements IServiceConversation assert transportToService != null; this.serviceConversationId = serviceConversationDTO.getServiceConversationId(); assert serviceConversationId != null; - this.timeoutMillis = serviceConversationDTO.getClientTimeoutInMillis(); + this.serviceMessageTimeoutMillis = serviceConversationDTO.getClientTimeoutInMillis(); this.transportToService = transportToService; this.responseMessageQueue = responseMessageQueue; this.responseMessageMultiplexer = responseMessageMultiplexer; @@ -68,7 +68,7 @@ class ClientMessenger implements IServiceConversation public void terminate() { transportToService.send(ServiceMessage.terminate(serviceConversationId)); - + } private int nextOutgoingMessageIndex() @@ -80,7 +80,19 @@ class ClientMessenger implements IServiceConversation { try { - return handleMessage(responseMessageQueue.poll(timeoutMillis), messageClass); + return handleMessage(responseMessageQueue.poll(serviceMessageTimeoutMillis), + messageClass, true); + } catch (InterruptedException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } + + public <T extends Serializable> T tryReceive(Class<T> messageClass, int timeoutMillis) + { + try + { + return handleMessage(responseMessageQueue.poll(timeoutMillis), messageClass, false); } catch (InterruptedException ex) { throw CheckedExceptionTunnel.wrapIfNecessary(ex); @@ -88,17 +100,24 @@ class ClientMessenger implements IServiceConversation } @SuppressWarnings("unchecked") - private <T> T handleMessage(ServiceMessage message, Class<T> messageClass) + private <T> T handleMessage(ServiceMessage message, Class<T> messageClass, + boolean throwExceptionOnNull) { if (message == null) { - final TimeoutExceptionUnchecked exception = - new TimeoutExceptionUnchecked("Timeout while waiting on message from service."); - final String exceptionDescription = - ServiceExecutionException.getDescriptionFromException(exception); - transportToService.send(new ServiceMessage(serviceConversationId, - nextOutgoingMessageIndex(), true, exceptionDescription)); - throw exception; + if (throwExceptionOnNull) + { + final TimeoutExceptionUnchecked exception = + new TimeoutExceptionUnchecked("Timeout while waiting on message from service."); + final String exceptionDescription = + ServiceExecutionException.getDescriptionFromException(exception); + transportToService.send(new ServiceMessage(serviceConversationId, + nextOutgoingMessageIndex(), true, exceptionDescription)); + throw exception; + } else + { + return null; + } } if (message.isException()) { @@ -129,5 +148,4 @@ class ClientMessenger implements IServiceConversation close(); super.finalize(); } - } diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/client/IServiceConversation.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/client/IServiceConversation.java index 8000c00bcc9..1436393ae4c 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/client/IServiceConversation.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/client/IServiceConversation.java @@ -28,21 +28,6 @@ import ch.systemsx.cisd.common.serviceconversation.IServiceMessenger; */ public interface IServiceConversation extends IServiceMessenger, Closeable { - /** - * Returns the service conversation id. - */ - public String getId(); - - /** - * Send a message to the service. - */ - public void send(Serializable message); - - /** - * Receive a message from the service. - */ - public <T extends Serializable> T receive(Class<T> messageClass); - /** * Tells the service to terminate. Use this for calls that have no inherent definition of * "finished". @@ -54,5 +39,4 @@ public interface IServiceConversation extends IServiceMessenger, Closeable * after this call. */ public void close(); - } \ No newline at end of file diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/BidirectionalServiceMessenger.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/BidirectionalServiceMessenger.java index 21acb14b669..b96c61aa046 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/BidirectionalServiceMessenger.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/BidirectionalServiceMessenger.java @@ -64,24 +64,36 @@ class BidirectionalServiceMessenger { return new IServiceMessenger() { - @SuppressWarnings("unchecked") public <T extends Serializable> T receive(Class<T> messageClass) + { + final T payload = + tryReceive(messageClass, messageReceivingTimeoutMillis); + if (payload == null) + { + final String msg = "Timeout while waiting for message from client."; + ServiceConversationServer.operationLog.error(String.format( + "[id: %s] %s", conversationId, msg)); + throw new TimeoutExceptionUnchecked(msg); + } + return payload; + } + + @SuppressWarnings("unchecked") + public <T extends Serializable> T tryReceive(Class<T> messageClass, + int timeoutMillis) { if (interrupted.get()) { throw new InterruptedExceptionUnchecked(); } - final Object payload; + final Serializable payload; try { final ServiceMessage message = - incoming.poll(messageReceivingTimeoutMillis, TimeUnit.MILLISECONDS); + incoming.poll(timeoutMillis, TimeUnit.MILLISECONDS); if (message == null) { - final String msg = "Timeout while waiting for message from client."; - ServiceConversationServer.operationLog.error(String.format( - "[id: %s] %s", conversationId, msg)); - throw new TimeoutExceptionUnchecked(msg); + return null; } payload = message.getPayload(); } catch (InterruptedException ex) @@ -106,6 +118,11 @@ class BidirectionalServiceMessenger responseMessenger.send(new ServiceMessage(conversationId, nextOutgoingMessageIndex(), false, message)); } + + public String getId() + { + return conversationId; + } }; } @@ -131,7 +148,7 @@ class BidirectionalServiceMessenger { interrupted.set(true); } - + public boolean isMarkedAsInterrupted() { return interrupted.get(); diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/ServiceConversationServer.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/ServiceConversationServer.java index 57cf326ec9f..753b488ece3 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/ServiceConversationServer.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/ServiceConversationServer.java @@ -211,7 +211,7 @@ public class ServiceConversationServer try { serviceInstance.run(messenger.getServiceMessenger()); - } catch (Exception ex) + } catch (Throwable ex) { if (ex instanceof InterruptedExceptionUnchecked == false) { diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationTest.java index 2324be9f189..090e02dfed7 100644 --- a/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationTest.java +++ b/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationTest.java @@ -18,6 +18,7 @@ package ch.systemsx.cisd.common.serviceconversation; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertFalse; +import static org.testng.AssertJUnit.assertNull; import static org.testng.AssertJUnit.assertTrue; import static org.testng.AssertJUnit.fail; @@ -177,6 +178,29 @@ public class ServiceConversationTest messenger.close(); } + @Test + public void testSingleEchoServiceHappyCaseuseTryReceive() throws Exception + { + final ServiceConversationClient client = + createServerAndClient(SingleEchoService.createFactory()).client; + final IServiceConversation messenger = client.startConversation("singleEcho"); + messenger.send("Hallo Echo"); + String echo = null; + int count = 0; + while (count++ < 10) + { + echo = messenger.tryReceive(String.class, 0); + if (echo != null) + { + break; + } + ConcurrencyUtilities.sleep(10L); + } + System.err.println("Got response after " + count + " attempts."); + assertEquals("Hallo Echo", echo); + messenger.close(); + } + private static class EchoService implements IService { public void run(IServiceMessenger messenger) @@ -350,7 +374,7 @@ public class ServiceConversationTest ConcurrencyUtilities.sleep(10L); } assertFalse(holder.server.hasConversation(conversation.getId())); - + conversation.send("Three"); try { @@ -456,6 +480,8 @@ public class ServiceConversationTest messenger.send("OK1"); messenger.send("OK2"); messenger.send("OK3"); + assertNull("Received an unexpected message", + messenger.tryReceive(Serializable.class, 0)); throw new RuntimeException("Don't like you!"); } @@ -497,8 +523,8 @@ public class ServiceConversationTest } catch (ServiceExecutionException ex) { assertEquals(messenger.getId(), ex.getServiceConversationId()); - assertTrue(ex.getDescription().contains("RuntimeException")); - assertTrue(ex.getDescription().contains("Don't like you!")); + assertTrue(ex.getDescription(), ex.getDescription().contains("RuntimeException")); + assertTrue(ex.getDescription(), ex.getDescription().contains("Don't like you!")); } } @@ -543,6 +569,7 @@ public class ServiceConversationTest final ServiceConversationClient client = createServerAndClient(DelayedService.createFactory()).client; final IServiceConversation messenger = client.startConversation("delayed"); + assertNull(messenger.tryReceive(Serializable.class, 0)); try { messenger.receive(Serializable.class); -- GitLab