From 9bec39f34248a77c20e99a11d842d87381eb7212 Mon Sep 17 00:00:00 2001 From: brinn <brinn> Date: Sun, 18 Dec 2011 11:07:58 +0000 Subject: [PATCH] Detect service execution exceptions in client also during send. SVN: 24047 --- .../client/ClientMessenger.java | 47 +++++++++++++++++-- .../ClientResponseMessageMultiplexer.java | 26 ++++++---- .../IServiceMessageTransportWithControl.java | 33 +++++++++++++ .../ServiceConversationTest.java | 24 ++++++++-- 4 files changed, 113 insertions(+), 17 deletions(-) create mode 100644 common/source/java/ch/systemsx/cisd/common/serviceconversation/client/IServiceMessageTransportWithControl.java diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/client/ClientMessenger.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/client/ClientMessenger.java index 3e3172eeafa..1b231ad90f6 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/client/ClientMessenger.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/client/ClientMessenger.java @@ -17,6 +17,7 @@ package ch.systemsx.cisd.common.serviceconversation.client; import java.io.Serializable; +import java.util.concurrent.atomic.AtomicBoolean; import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel; import ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked; @@ -44,6 +45,8 @@ class ClientMessenger implements IServiceConversation private int outgoingMessageIdx; + private final AtomicBoolean serviceExceptionSignaled = new AtomicBoolean(); + ClientMessenger(ServiceConversationDTO serviceConversationDTO, IServiceMessageTransport transportToService, ClientResponseMessageQueue responseMessageQueue, @@ -56,24 +59,52 @@ class ClientMessenger implements IServiceConversation this.transportToService = transportToService; this.responseMessageQueue = responseMessageQueue; this.responseMessageMultiplexer = responseMessageMultiplexer; - responseMessageMultiplexer.addConversation(serviceConversationId, responseMessageQueue); + responseMessageMultiplexer.addConversation(serviceConversationId, + new IServiceMessageTransportWithControl() + { + public void send(ServiceMessage message) + { + ClientMessenger.this.responseMessageQueue.send(message); + } + + public void sendException(ServiceMessage message) + { + ClientMessenger.this.serviceExceptionSignaled.set(true); + ClientMessenger.this.responseMessageQueue.send(message); + } + }); } public void send(Serializable message) { + checkServiceException(); transportToService.send(new ServiceMessage(serviceConversationId, nextOutgoingMessageIndex(), false, message)); } public void terminate() { + checkServiceException(); transportToService.send(ServiceMessage.terminate(serviceConversationId)); - } - private int nextOutgoingMessageIndex() + private void checkServiceException() throws ServiceExecutionException { - return outgoingMessageIdx++; + if (serviceExceptionSignaled.getAndSet(false)) + { + try + { + final ServiceMessage messageOrNull = responseMessageQueue.poll(0); + if (messageOrNull != null && messageOrNull.isException()) + { + throw new ServiceExecutionException(messageOrNull.getConversationId(), + messageOrNull.tryGetExceptionDescription()); + } + } catch (InterruptedException ex) + { + throw CheckedExceptionTunnel.wrapIfNecessary(ex); + } + } } public <T extends Serializable> T receive(Class<T> messageClass) @@ -108,7 +139,8 @@ class ClientMessenger implements IServiceConversation if (throwExceptionOnNull) { final TimeoutExceptionUnchecked exception = - new TimeoutExceptionUnchecked("Timeout while waiting on message from service."); + new TimeoutExceptionUnchecked( + "Timeout while waiting on message from service."); final String exceptionDescription = ServiceExecutionException.getDescriptionFromException(exception); transportToService.send(new ServiceMessage(serviceConversationId, @@ -132,6 +164,11 @@ class ClientMessenger implements IServiceConversation return (T) payload; } + private int nextOutgoingMessageIndex() + { + return outgoingMessageIdx++; + } + public String getId() { return serviceConversationId; diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/client/ClientResponseMessageMultiplexer.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/client/ClientResponseMessageMultiplexer.java index 71049741044..834c1980a6b 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/client/ClientResponseMessageMultiplexer.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/client/ClientResponseMessageMultiplexer.java @@ -37,9 +37,9 @@ class ClientResponseMessageMultiplexer final static Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, ClientResponseMessageMultiplexer.class); - private final Map<String, IServiceMessageTransport> conversations = - new ConcurrentHashMap<String, IServiceMessageTransport>(); - + private final Map<String, IServiceMessageTransportWithControl> conversations = + new ConcurrentHashMap<String, IServiceMessageTransportWithControl>(); + /** * Returns the transport for incoming response messages. */ @@ -50,24 +50,32 @@ class ClientResponseMessageMultiplexer public void send(ServiceMessage message) { final String conversationId = message.getConversationId(); - final IServiceMessageTransport transport = conversations.get(conversationId); + final IServiceMessageTransportWithControl transport = + conversations.get(conversationId); if (transport == null) { - final String msg = String.format( - "Message for unknown service conversation '%s'", conversationId); + final String msg = + String.format("Message for unknown service conversation '%s'", + conversationId); operationLog.error(msg); throw new UnknownServiceConversationException(msg); } - transport.send(message); + if (message.isException()) + { + transport.sendException(message); + } else + { + transport.send(message); + } } }; } - + /** * Adds a new conversation to the multiplexer. */ void addConversation(String serviceConversationId, - IServiceMessageTransport responseMessageTransport) + IServiceMessageTransportWithControl responseMessageTransport) { conversations.put(serviceConversationId, responseMessageTransport); } diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/client/IServiceMessageTransportWithControl.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/client/IServiceMessageTransportWithControl.java new file mode 100644 index 00000000000..854400005ba --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/client/IServiceMessageTransportWithControl.java @@ -0,0 +1,33 @@ +/* + * Copyright 2011 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.serviceconversation.client; + +import ch.systemsx.cisd.common.serviceconversation.IServiceMessageTransport; +import ch.systemsx.cisd.common.serviceconversation.ServiceMessage; + +/** + * The role that can transport both regular payload and exception service messages separately. + * + * @author Bernd Rinn + */ +interface IServiceMessageTransportWithControl extends IServiceMessageTransport +{ + /** + * Send the <var>message</var> using this transport. + */ + public void sendException(ServiceMessage message); +} diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationTest.java index 090e02dfed7..29d01b9a2a2 100644 --- a/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationTest.java +++ b/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationTest.java @@ -375,10 +375,9 @@ public class ServiceConversationTest } assertFalse(holder.server.hasConversation(conversation.getId())); - conversation.send("Three"); try { - conversation.receive(String.class); + conversation.send("Three"); fail("Server timeout not signaled to client"); } catch (ServiceExecutionException ex) { @@ -464,7 +463,26 @@ public class ServiceConversationTest try { messenger.receive(Serializable.class); - fail(); + fail("Failed to detect error state on serve-side while receiving."); + } catch (ServiceExecutionException ex) + { + assertEquals(messenger.getId(), ex.getServiceConversationId()); + assertTrue(ex.getDescription().contains("RuntimeException")); + assertTrue(ex.getDescription().contains("Don't like you!")); + } + } + + @Test + public void testServiceThrowsExceptionOnSend() throws Exception + { + final ServiceConversationClient client = + createServerAndClient(ExceptionThrowingService.createFactory()).client; + final IServiceConversation messenger = client.startConversation("throwException"); + ConcurrencyUtilities.sleep(20L); + try + { + messenger.send("Test"); + fail("Failed to detect error state on serve-side while sending."); } catch (ServiceExecutionException ex) { assertEquals(messenger.getId(), ex.getServiceConversationId()); -- GitLab