From 12981a6cedbf7de1dcb103e994f600aa11dee056 Mon Sep 17 00:00:00 2001 From: brinn <brinn> Date: Sat, 17 Dec 2011 12:13:58 +0000 Subject: [PATCH] Exception and termination messages now bypass the queue for both, client and server and become effective immediately. Introduce log4j logging. Make a service factory the source of the type id. SVN: 24021 --- ...ava => BidirectionalServiceMessenger.java} | 32 ++++-- .../ClientExecutionException.java | 81 ------------- .../serviceconversation/ClientMessenger.java | 9 +- .../serviceconversation/IServiceFactory.java | 5 + .../ServiceConversationCollection.java | 59 ++++++---- .../ServiceConversationRecord.java | 6 +- .../serviceconversation/ServiceMessage.java | 5 + .../ServiceConversationCollectionTest.java | 107 ++++++++++++++++-- 8 files changed, 179 insertions(+), 125 deletions(-) rename common/source/java/ch/systemsx/cisd/common/serviceconversation/{BidirectinoalServiceMessenger.java => BidirectionalServiceMessenger.java} (79%) delete mode 100644 common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientExecutionException.java diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/BidirectinoalServiceMessenger.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/BidirectionalServiceMessenger.java similarity index 79% rename from common/source/java/ch/systemsx/cisd/common/serviceconversation/BidirectinoalServiceMessenger.java rename to common/source/java/ch/systemsx/cisd/common/serviceconversation/BidirectionalServiceMessenger.java index 3133b9a6fc7..b22d699b1d7 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/BidirectinoalServiceMessenger.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/BidirectionalServiceMessenger.java @@ -20,8 +20,10 @@ import java.io.Serializable; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel; +import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked; import ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked; /** @@ -29,7 +31,7 @@ import ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked; * * @author Bernd Rinn */ -class BidirectinoalServiceMessenger +class BidirectionalServiceMessenger { private final BlockingQueue<ServiceMessage> incoming = new LinkedBlockingQueue<ServiceMessage>(); @@ -44,7 +46,9 @@ class BidirectinoalServiceMessenger private int messageIdxLastSeen = -1; - BidirectinoalServiceMessenger(String conversationId, int messageReceivingTimeoutMillis, + private final AtomicBoolean interrupted = new AtomicBoolean(); + + BidirectionalServiceMessenger(String conversationId, int messageReceivingTimeoutMillis, ISendingMessenger responseMessenger) { this.conversationId = conversationId; @@ -59,6 +63,10 @@ class BidirectinoalServiceMessenger @SuppressWarnings("unchecked") public <T extends Serializable> T receive(Class<T> messageClass) { + if (interrupted.get()) + { + throw new InterruptedExceptionUnchecked(); + } final Object payload; try { @@ -66,13 +74,10 @@ class BidirectinoalServiceMessenger incoming.poll(messageReceivingTimeoutMillis, TimeUnit.MILLISECONDS); if (message == null) { - throw new TimeoutExceptionUnchecked( - "Timeout while waiting for message from client."); - } - if (message.isException()) - { - throw new ClientExecutionException(conversationId, - message.tryGetExceptionDescription()); + final String msg = "Timeout while waiting for message from client."; + ServiceConversationCollection.operationLog.error(String.format( + "[id: %s] %s", conversationId, msg)); + throw new TimeoutExceptionUnchecked(msg); } payload = message.getPayload(); } catch (InterruptedException ex) @@ -90,6 +95,10 @@ class BidirectinoalServiceMessenger public void send(Serializable message) { + if (interrupted.get()) + { + throw new InterruptedExceptionUnchecked(); + } responseMessenger.send(new ServiceMessage(conversationId, nextOutgoingMessageIndex(), false, message)); } @@ -113,4 +122,9 @@ class BidirectinoalServiceMessenger } incoming.add(message); } + + public void markAsInterrupted() + { + interrupted.set(true); + } } \ No newline at end of file diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientExecutionException.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientExecutionException.java deleted file mode 100644 index 5d04897b05d..00000000000 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientExecutionException.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2011 ETH Zuerich, CISD - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ch.systemsx.cisd.common.serviceconversation; - -import java.io.PrintStream; -import java.io.PrintWriter; - -/** - * An exception that signals to the server that an exception happened during service execution on - * the client. - * - * @author Bernd Rinn - */ -public class ClientExecutionException extends RuntimeException -{ - private static final long serialVersionUID = 1L; - - private final String serviceConversationId; - - private final String description; - - ClientExecutionException(String serviceConversationId, String description) - { - super("Client execution exception in service conversation " + serviceConversationId); - this.serviceConversationId = serviceConversationId; - this.description = description; - } - - public String getServiceConversationId() - { - return serviceConversationId; - } - - public String getDescription() - { - return description; - } - - @Override - public String toString() - { - return "ClientExecutionException [serviceConversationId=" + serviceConversationId - + ", description=" + description + "]"; - } - - @Override - public void printStackTrace() - { - System.err.println(getMessage()); - System.err.println(getDescription()); - } - - @Override - public void printStackTrace(PrintStream s) - { - s.println(getMessage()); - s.println(getDescription()); - } - - @Override - public void printStackTrace(PrintWriter s) - { - s.println(getMessage()); - s.println(getDescription()); - } - -} diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientMessenger.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientMessenger.java index 774566cc2c7..13eddac9037 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientMessenger.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientMessenger.java @@ -47,7 +47,9 @@ public class ClientMessenger implements IClientMessenger public ClientMessenger(ServiceConversationDTO serviceConversationDTO, ISendingMessenger senderToService) { + assert senderToService != null; this.serviceConversationId = serviceConversationDTO.getServiceConversationId(); + assert serviceConversationId != null; this.timeoutMillis = serviceConversationDTO.getClientTimeoutInMillis(); this.senderToService = senderToService; } @@ -63,7 +65,8 @@ public class ClientMessenger implements IClientMessenger { public void send(ServiceMessage message) { - if (serviceConversationId.equals(message.getConversationId()) == false) + if (serviceConversationId != null + && serviceConversationId.equals(message.getConversationId()) == false) { throw new IllegalArgumentException( "Attempt to put in a message for conversation " @@ -77,6 +80,10 @@ public class ClientMessenger implements IClientMessenger { messageIdxLastSeen = message.getMessageIdx(); } + if (message.hasPayload() == false) + { + messageQueue.clear(); + } messageQueue.add(message); } }; diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceFactory.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceFactory.java index 44e9b17f4ae..5aaef6c070f 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceFactory.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/IServiceFactory.java @@ -23,6 +23,11 @@ package ch.systemsx.cisd.common.serviceconversation; */ public interface IServiceFactory { + /** + * Returns id for this service type. + */ + public String getServiceTypeId(); + /** * Create a new service. */ diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollection.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollection.java index ea48c28d2fa..6554d92824c 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollection.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollection.java @@ -22,6 +22,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.log4j.Logger; + import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel; import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked; import ch.systemsx.cisd.base.namedthread.NamingThreadPoolExecutor; @@ -29,9 +31,11 @@ import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities; import ch.systemsx.cisd.common.concurrent.ITerminableFuture; import ch.systemsx.cisd.common.concurrent.TerminableCallable.ICallable; import ch.systemsx.cisd.common.concurrent.TerminableCallable.IStoppableExecutor; +import ch.systemsx.cisd.common.logging.LogCategory; +import ch.systemsx.cisd.common.logging.LogFactory; /** - * The service conversation collection. + * A collection of service conversations. * * @author Bernd Rinn */ @@ -41,17 +45,20 @@ public class ServiceConversationCollection implements ISendingMessenger private final static int SHUTDOWN_TIMEOUT_MILLIS = 10000; + final static Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, + ServiceConversationCollection.class); + private final int messageReceivingTimeoutMillis; private final ExecutorService executor = new NamingThreadPoolExecutor("Service Conversations") .corePoolSize(NUMBER_OF_CORE_THREADS).daemonize(); - private Random rng = new Random(); + private final Random rng = new Random(); private final Map<String, IServiceFactory> serviceFactoryMap = new ConcurrentHashMap<String, IServiceFactory>(); - private Map<String, ServiceConversationRecord> conversations = + private final Map<String, ServiceConversationRecord> conversations = new ConcurrentHashMap<String, ServiceConversationRecord>(); public ServiceConversationCollection(int messageReceivingTimeoutMillis) @@ -62,8 +69,13 @@ public class ServiceConversationCollection implements ISendingMessenger /** * Adds a new service type to this conversation object. */ - public void addServiceType(String id, IServiceFactory factory) + public void addServiceType(IServiceFactory factory) { + final String id = factory.getServiceTypeId(); + if (serviceFactoryMap.containsKey(id)) + { + throw new IllegalArgumentException("Service type '" + id + "' is already registered."); + } serviceFactoryMap.put(id, factory); } @@ -101,8 +113,8 @@ public class ServiceConversationCollection implements ISendingMessenger final IService serviceInstance = serviceFactory.create(); final String serviceConversationId = Long.toString(System.currentTimeMillis()) + "-" + rng.nextInt(Integer.MAX_VALUE); - final BidirectinoalServiceMessenger messenger = - new BidirectinoalServiceMessenger(serviceConversationId, + final BidirectionalServiceMessenger messenger = + new BidirectionalServiceMessenger(serviceConversationId, messageReceivingTimeoutMillis, responseMessenger); final ServiceConversationRecord record = new ServiceConversationRecord(messenger); conversations.put(serviceConversationId, record); @@ -117,8 +129,7 @@ public class ServiceConversationCollection implements ISendingMessenger serviceInstance.run(messenger.getServiceMessenger()); } catch (Exception ex) { - if (ex instanceof InterruptedExceptionUnchecked == false - && ex instanceof ClientExecutionException == false) + if (ex instanceof InterruptedExceptionUnchecked == false) { final String errorMessage = ServiceExecutionException @@ -131,8 +142,10 @@ public class ServiceConversationCollection implements ISendingMessenger errorMessage)); } catch (Exception ex2) { - // TODO: improve logging - ex2.printStackTrace(); + operationLog.error( + String.format( + "[id: %s] Cannot send message about exception to client.", + serviceConversationId), ex2); } } } finally @@ -199,21 +212,27 @@ public class ServiceConversationCollection implements ISendingMessenger final ServiceConversationRecord record = conversations.get(conversationId); if (record == null) { - if (message.isException() == false) + operationLog.error(String.format("Message for unknown service conversation '%s'", + conversationId)); + return; + } + if (message.hasPayload()) + { + record.getMessenger().sendToService(message); + } else + { + if (message.isException()) { - throw new UnknownServiceConversationException(conversationId); + operationLog.error(String.format("[id: %s] Client execution exception.\n%s", + conversationId, message.tryGetExceptionDescription())); } else { - // If it was an exception on the client side, be silent. - return; + operationLog.error(String.format( + "[id: %s] Client requests termination of service conversation.", + conversationId)); } - } - if (message.isTerminate()) - { + record.getMessenger().markAsInterrupted(); record.getController().cancel(true); - } else - { - record.getMessenger().sendToService(message); } } diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationRecord.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationRecord.java index 6e6b415969f..005fa10b773 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationRecord.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationRecord.java @@ -25,17 +25,17 @@ import ch.systemsx.cisd.common.concurrent.ITerminableFuture; */ class ServiceConversationRecord { - private final BidirectinoalServiceMessenger messenger; + private final BidirectionalServiceMessenger messenger; private ITerminableFuture<Void> controller; - ServiceConversationRecord(BidirectinoalServiceMessenger messenger) + ServiceConversationRecord(BidirectionalServiceMessenger messenger) { super(); this.messenger = messenger; } - BidirectinoalServiceMessenger getMessenger() + BidirectionalServiceMessenger getMessenger() { return messenger; } diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceMessage.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceMessage.java index 90b6b8a79be..b7dd0d2ef0c 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceMessage.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceMessage.java @@ -78,6 +78,11 @@ public class ServiceMessage { return (payload == null) && (exceptionDescription != null); } + + public boolean hasPayload() + { + return (payload != null); + } public String tryGetExceptionDescription() { diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollectionTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollectionTest.java index 9660ddc81e1..75d281140b1 100644 --- a/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollectionTest.java +++ b/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollectionTest.java @@ -25,10 +25,13 @@ import java.io.Serializable; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; +import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked; import ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked; import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities; +import ch.systemsx.cisd.common.logging.LogInitializer; /** * Test cases for the {@Link ServiceConversationCollection} class. @@ -37,6 +40,12 @@ import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities; */ public class ServiceConversationCollectionTest { + @BeforeTest + public void init() + { + LogInitializer.init(); + } + private static class SingleEchoService implements IService { public void run(IServiceMessenger messenger) @@ -49,16 +58,22 @@ public class ServiceConversationCollectionTest public void testSingleEchoServiceHappyCase() throws Exception { final ServiceConversationCollection conversations = new ServiceConversationCollection(100); - conversations.addServiceType("singleEcho", new IServiceFactory() + conversations.addServiceType(new IServiceFactory() { public IService create() { return new SingleEchoService(); } + public int getClientTimeoutMillis() { return 100; } + + public String getServiceTypeId() + { + return "singleEcho"; + } }); final IClientMessenger messenger = conversations.startConversation("singleEcho"); messenger.send("Hallo Echo"); @@ -90,16 +105,22 @@ public class ServiceConversationCollectionTest public void testMultipleEchoServiceTerminateHappyCase() throws Exception { final ServiceConversationCollection conversations = new ServiceConversationCollection(100); - conversations.addServiceType("echo", new IServiceFactory() + conversations.addServiceType(new IServiceFactory() { public IService create() { return new EchoService(); } + public int getClientTimeoutMillis() { return 100; } + + public String getServiceTypeId() + { + return "echo"; + } }); final BlockingQueue<ServiceMessage> messageQueue = new LinkedBlockingQueue<ServiceMessage>(); @@ -145,16 +166,22 @@ public class ServiceConversationCollectionTest public void testEchoServiceTimeout() throws Exception { final ServiceConversationCollection conversations = new ServiceConversationCollection(100); - conversations.addServiceType("echo", new IServiceFactory() + conversations.addServiceType(new IServiceFactory() { public IService create() { return new EchoService(); } + public int getClientTimeoutMillis() { return 100; } + + public String getServiceTypeId() + { + return "echo"; + } }); final BlockingQueue<ServiceMessage> messageQueue = new LinkedBlockingQueue<ServiceMessage>(); @@ -209,26 +236,79 @@ public class ServiceConversationCollectionTest public void testServiceThrowsException() throws Exception { final ServiceConversationCollection conversations = new ServiceConversationCollection(100); - conversations.addServiceType("throwException", new IServiceFactory() + conversations.addServiceType(new IServiceFactory() { public IService create() { return new ExceptionThrowingService(); } + + public int getClientTimeoutMillis() + { + return 100; + } + + public String getServiceTypeId() + { + return "throwException"; + } + }); + final ClientMessenger messenger = conversations.startConversation("throwException"); + try + { + messenger.receive(Serializable.class); + fail(); + } catch (ServiceExecutionException ex) + { + assertEquals(messenger.getServiceConversationId(), ex.getServiceConversationId()); + assertTrue(ex.getDescription().contains("RuntimeException")); + assertTrue(ex.getDescription().contains("Don't like you!")); + } + } + + private static class EventuallyExceptionThrowingService implements IService + { + public void run(IServiceMessenger messenger) + { + messenger.send("OK1"); + messenger.send("OK2"); + messenger.send("OK3"); + throw new RuntimeException("Don't like you!"); + } + } + + @Test + public void testServiceEventuallyThrowsException() throws Exception + { + final ServiceConversationCollection conversations = new ServiceConversationCollection(100); + conversations.addServiceType(new IServiceFactory() + { + public IService create() + { + return new EventuallyExceptionThrowingService(); + } + public int getClientTimeoutMillis() { return 100; } + + public String getServiceTypeId() + { + return "throwException"; + } }); final ClientMessenger messenger = conversations.startConversation("throwException"); + ConcurrencyUtilities.sleep(100L); try { + // The regular messages should have been cleared by now so that we get to see the + // exception immediately. messenger.receive(Serializable.class); fail(); } catch (ServiceExecutionException ex) { - assertEquals(messenger.getServiceConversationId(), - ex.getServiceConversationId()); + assertEquals(messenger.getServiceConversationId(), ex.getServiceConversationId()); assertTrue(ex.getDescription().contains("RuntimeException")); assertTrue(ex.getDescription().contains("Don't like you!")); } @@ -238,13 +318,12 @@ public class ServiceConversationCollectionTest { public void run(IServiceMessenger messenger) { - ConcurrencyUtilities.sleep(100L); try { - messenger.receive(Serializable.class); - } catch (ClientExecutionException ex) + ConcurrencyUtilities.sleep(100L); + } catch (InterruptedExceptionUnchecked ex) { - System.err.println("Client timed out."); + System.err.println("DelayedService got interrupted."); } } } @@ -253,16 +332,22 @@ public class ServiceConversationCollectionTest public void testClientTimesout() throws Exception { final ServiceConversationCollection conversations = new ServiceConversationCollection(100); - conversations.addServiceType("delayed", new IServiceFactory() + conversations.addServiceType(new IServiceFactory() { public IService create() { return new DelayedService(); } + public int getClientTimeoutMillis() { return 10; } + + public String getServiceTypeId() + { + return "delayed"; + } }); final ClientMessenger messenger = conversations.startConversation("delayed"); try -- GitLab