diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientMessenger.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientMessenger.java index 1c824ef843fd76489492738db09f8592ce0eb179..8719626ec522de2eb141a0fbb386728194092812 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientMessenger.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ClientMessenger.java @@ -21,6 +21,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel; +import ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked; /** * A class that a client can use to receive messages from a service. @@ -40,7 +41,13 @@ public class ClientMessenger implements IClientMessenger private int outgoingMessageIdx; - public ClientMessenger(ISendingMessenger senderToService) + public ClientMessenger(String serviceConversationId, ISendingMessenger senderToService) + { + this.serviceConversationId = serviceConversationId; + this.senderToService = senderToService; + } + + ClientMessenger(ISendingMessenger senderToService) { this.senderToService = senderToService; } @@ -100,6 +107,10 @@ public class ClientMessenger implements IClientMessenger @SuppressWarnings("unchecked") private <T> T handleMessage(ServiceMessage message, Class<T> messageClass) { + if (message == null) + { + throw new TimeoutExceptionUnchecked("Timeout while waiting on message from service."); + } if (message.isException()) { throw new ServiceExecutionException(message.getConversationId(), @@ -118,7 +129,7 @@ public class ClientMessenger implements IClientMessenger return serviceConversationId; } - public void setServiceConversationId(String serviceConversationId) + void setServiceConversationId(String serviceConversationId) { this.serviceConversationId = serviceConversationId; } diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollection.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollection.java index f0b9ea350d13011112ecebb9a868f7d557fa65b1..d6fac7d4f4809c154288c0968ca12144ac12a5bf 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollection.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationCollection.java @@ -126,8 +126,16 @@ public class ServiceConversationCollection implements ISendingMessenger ex.printStackTrace(pw); pw.close(); final String errorMessage = new String(os.toByteArray()); - responseMessenger.send(new ServiceMessage(conversationId, - messenger.nextOutgoingMessageIndex(), errorMessage)); + try + { + responseMessenger + .send(new ServiceMessage(conversationId, messenger + .nextOutgoingMessageIndex(), errorMessage)); + } catch (Exception ex2) + { + // TODO: improve logging + ex2.printStackTrace(); + } } } finally { @@ -162,6 +170,21 @@ public class ServiceConversationCollection implements ISendingMessenger } } + public void shutdownNow() + { + try + { + for (ServiceConversationRecord record : conversations.values()) + { + record.getController().cancel(true); + } + executor.awaitTermination(0, TimeUnit.MILLISECONDS); + } catch (Exception ex) + { + throw new CheckedExceptionTunnel(ex); + } + } + public boolean hasConversation(String conversationId) { return conversations.containsKey(conversationId);