Skip to content
Snippets Groups Projects
Commit 3c50c330 authored by brinn's avatar brinn
Browse files

Make the service factory specify the client timeout. Notify server of client...

Make the service factory specify the client timeout. Notify server of client timeout. Change message payload from Object to Serializable.

SVN: 24008
parent e442e335
No related branches found
No related tags found
No related merge requests found
Showing
with 322 additions and 102 deletions
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
package ch.systemsx.cisd.common.serviceconversation; package ch.systemsx.cisd.common.serviceconversation;
import java.io.Serializable;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -56,7 +57,7 @@ class BidirectinoalServiceMessenger ...@@ -56,7 +57,7 @@ class BidirectinoalServiceMessenger
return new IServiceMessenger() return new IServiceMessenger()
{ {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <T> T receive(Class<T> messageClass) public <T extends Serializable> T receive(Class<T> messageClass)
{ {
final Object payload; final Object payload;
try try
...@@ -66,7 +67,12 @@ class BidirectinoalServiceMessenger ...@@ -66,7 +67,12 @@ class BidirectinoalServiceMessenger
if (message == null) if (message == null)
{ {
throw new TimeoutExceptionUnchecked( throw new TimeoutExceptionUnchecked(
"Timeout while waiting for message to return."); "Timeout while waiting for message from client.");
}
if (message.isException())
{
throw new ClientExecutionException(conversationId,
message.tryGetExceptionDescription());
} }
payload = message.getPayload(); payload = message.getPayload();
} catch (InterruptedException ex) } catch (InterruptedException ex)
...@@ -82,10 +88,10 @@ class BidirectinoalServiceMessenger ...@@ -82,10 +88,10 @@ class BidirectinoalServiceMessenger
return (T) payload; return (T) payload;
} }
public void send(Object message) public void send(Serializable message)
{ {
responseMessenger.send(new ServiceMessage(conversationId, responseMessenger.send(new ServiceMessage(conversationId,
nextOutgoingMessageIndex(), message)); nextOutgoingMessageIndex(), false, message));
} }
}; };
} }
......
/*
* Copyright 2011 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.systemsx.cisd.common.serviceconversation;
import java.io.PrintStream;
import java.io.PrintWriter;
/**
* An exception that signals to the server that an exception happened during service execution on
* the client.
*
* @author Bernd Rinn
*/
public class ClientExecutionException extends RuntimeException
{
private static final long serialVersionUID = 1L;
private final String serviceConversationId;
private final String description;
ClientExecutionException(String serviceConversationId, String description)
{
super("Client execution exception in service conversation " + serviceConversationId);
this.serviceConversationId = serviceConversationId;
this.description = description;
}
public String getServiceConversationId()
{
return serviceConversationId;
}
public String getDescription()
{
return description;
}
@Override
public String toString()
{
return "ClientExecutionException [serviceConversationId=" + serviceConversationId
+ ", description=" + description + "]";
}
@Override
public void printStackTrace()
{
System.err.println(getMessage());
System.err.println(getDescription());
}
@Override
public void printStackTrace(PrintStream s)
{
s.println(getMessage());
s.println(getDescription());
}
@Override
public void printStackTrace(PrintWriter s)
{
s.println(getMessage());
s.println(getDescription());
}
}
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
package ch.systemsx.cisd.common.serviceconversation; package ch.systemsx.cisd.common.serviceconversation;
import java.io.Serializable;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -37,6 +38,8 @@ public class ClientMessenger implements IClientMessenger ...@@ -37,6 +38,8 @@ public class ClientMessenger implements IClientMessenger
private String serviceConversationId; private String serviceConversationId;
private int timeoutMillis;
private int messageIdxLastSeen = -1; private int messageIdxLastSeen = -1;
private int outgoingMessageIdx; private int outgoingMessageIdx;
...@@ -70,10 +73,10 @@ public class ClientMessenger implements IClientMessenger ...@@ -70,10 +73,10 @@ public class ClientMessenger implements IClientMessenger
}; };
} }
public void send(Object message) public void send(Serializable message)
{ {
senderToService.send(new ServiceMessage(serviceConversationId, nextOutgoingMessageIndex(), senderToService.send(new ServiceMessage(serviceConversationId, nextOutgoingMessageIndex(),
message)); false, message));
} }
private int nextOutgoingMessageIndex() private int nextOutgoingMessageIndex()
...@@ -81,18 +84,7 @@ public class ClientMessenger implements IClientMessenger ...@@ -81,18 +84,7 @@ public class ClientMessenger implements IClientMessenger
return outgoingMessageIdx++; return outgoingMessageIdx++;
} }
public <T> T receive(Class<T> messageClass) public <T extends Serializable> T receive(Class<T> messageClass)
{
try
{
return handleMessage(messageQueue.take(), messageClass);
} catch (InterruptedException ex)
{
throw CheckedExceptionTunnel.wrapIfNecessary(ex);
}
}
public <T> T receive(Class<T> messageClass, int timeoutMillis)
{ {
try try
{ {
...@@ -109,7 +101,13 @@ public class ClientMessenger implements IClientMessenger ...@@ -109,7 +101,13 @@ public class ClientMessenger implements IClientMessenger
{ {
if (message == null) if (message == null)
{ {
throw new TimeoutExceptionUnchecked("Timeout while waiting on message from service."); final TimeoutExceptionUnchecked exception =
new TimeoutExceptionUnchecked("Timeout while waiting on message from service.");
final String exceptionDescription =
ServiceExecutionException.getDescriptionFromException(exception);
senderToService.send(new ServiceMessage(serviceConversationId,
nextOutgoingMessageIndex(), true, exceptionDescription));
throw exception;
} }
if (message.isException()) if (message.isException())
{ {
...@@ -129,9 +127,10 @@ public class ClientMessenger implements IClientMessenger ...@@ -129,9 +127,10 @@ public class ClientMessenger implements IClientMessenger
return serviceConversationId; return serviceConversationId;
} }
void setServiceConversationId(String serviceConversationId) void setServiceConversationDTO(ServiceConversationDTO serviceConversation)
{ {
this.serviceConversationId = serviceConversationId; this.serviceConversationId = serviceConversation.getServiceConversationId();
this.timeoutMillis = serviceConversation.getClientTimeoutInMillis();
} }
} }
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
package ch.systemsx.cisd.common.serviceconversation; package ch.systemsx.cisd.common.serviceconversation;
import java.io.Serializable;
/** /**
* A messenger role for receiving messages from a service. * A messenger role for receiving messages from a service.
* *
...@@ -26,16 +28,11 @@ public interface IClientMessenger ...@@ -26,16 +28,11 @@ public interface IClientMessenger
/** /**
* Send a message to the service. * Send a message to the service.
*/ */
public void send(Object message); public void send(Serializable message);
/** /**
* Receive a message from the service. * Receive a message from the service.
*/ */
public <T> T receive(Class<T> messageClass); public <T extends Serializable> T receive(Class<T> messageClass);
/**
* Receive a message from the service.
*/
public <T> T receive(Class<T> messageClass, int timeoutMillis);
} }
\ No newline at end of file
...@@ -18,7 +18,7 @@ package ch.systemsx.cisd.common.serviceconversation; ...@@ -18,7 +18,7 @@ package ch.systemsx.cisd.common.serviceconversation;
/** /**
* A factory for services. * A factory for services.
* *
* @author Bernd Rinn * @author Bernd Rinn
*/ */
public interface IServiceFactory public interface IServiceFactory
...@@ -27,4 +27,10 @@ public interface IServiceFactory ...@@ -27,4 +27,10 @@ public interface IServiceFactory
* Create a new service. * Create a new service.
*/ */
public IService create(); public IService create();
/**
* Returns the suggested timeout (in milli-seconds) of the client when waiting for a message
* from this service.
*/
public int getClientTimeoutMillis();
} }
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
package ch.systemsx.cisd.common.serviceconversation; package ch.systemsx.cisd.common.serviceconversation;
import java.io.Serializable;
/** /**
* A messaging interface for a service of the service conversation framework. * A messaging interface for a service of the service conversation framework.
...@@ -27,10 +29,10 @@ public interface IServiceMessenger ...@@ -27,10 +29,10 @@ public interface IServiceMessenger
/** /**
* Send a message to the client. * Send a message to the client.
*/ */
public void send(Object message); public void send(Serializable message);
/** /**
* Receive a message from the client. * Receive a message from the client.
*/ */
public <T> T receive(Class<T> messageClass); public <T extends Serializable> T receive(Class<T> messageClass);
} }
...@@ -16,15 +16,12 @@ ...@@ -16,15 +16,12 @@
package ch.systemsx.cisd.common.serviceconversation; package ch.systemsx.cisd.common.serviceconversation;
import java.io.PrintWriter;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.io.output.ByteArrayOutputStream;
import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel; import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel;
import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked; import ch.systemsx.cisd.base.exceptions.InterruptedExceptionUnchecked;
import ch.systemsx.cisd.base.namedthread.NamingThreadPoolExecutor; import ch.systemsx.cisd.base.namedthread.NamingThreadPoolExecutor;
...@@ -79,9 +76,9 @@ public class ServiceConversationCollection implements ISendingMessenger ...@@ -79,9 +76,9 @@ public class ServiceConversationCollection implements ISendingMessenger
public ClientMessenger startConversation(final String typeId) public ClientMessenger startConversation(final String typeId)
{ {
final ClientMessenger clientMessenger = new ClientMessenger(this); final ClientMessenger clientMessenger = new ClientMessenger(this);
final String serviceConversationId = final ServiceConversationDTO serviceConversationRecord =
startConversation(typeId, clientMessenger.getResponseMessenger()); startConversation(typeId, clientMessenger.getResponseMessenger());
clientMessenger.setServiceConversationId(serviceConversationId); clientMessenger.setServiceConversationDTO(serviceConversationRecord);
return clientMessenger; return clientMessenger;
} }
...@@ -91,9 +88,10 @@ public class ServiceConversationCollection implements ISendingMessenger ...@@ -91,9 +88,10 @@ public class ServiceConversationCollection implements ISendingMessenger
* @param typeId The service type of the conversation. * @param typeId The service type of the conversation.
* @param responseMessenger The messenger to communicate back the messages from the service to * @param responseMessenger The messenger to communicate back the messages from the service to
* the client. * the client.
* @return The service conversation id. * @return The information about the service conversation started.
*/ */
public String startConversation(final String typeId, final ISendingMessenger responseMessenger) public ServiceConversationDTO startConversation(final String typeId,
final ISendingMessenger responseMessenger)
{ {
final IServiceFactory serviceFactory = serviceFactoryMap.get(typeId); final IServiceFactory serviceFactory = serviceFactoryMap.get(typeId);
if (serviceFactory == null) if (serviceFactory == null)
...@@ -101,13 +99,13 @@ public class ServiceConversationCollection implements ISendingMessenger ...@@ -101,13 +99,13 @@ public class ServiceConversationCollection implements ISendingMessenger
throw new UnknownServiceTypeException(typeId); throw new UnknownServiceTypeException(typeId);
} }
final IService serviceInstance = serviceFactory.create(); final IService serviceInstance = serviceFactory.create();
final String conversationId = final String serviceConversationId =
Long.toString(System.currentTimeMillis()) + "-" + rng.nextInt(Integer.MAX_VALUE); Long.toString(System.currentTimeMillis()) + "-" + rng.nextInt(Integer.MAX_VALUE);
final BidirectinoalServiceMessenger messenger = final BidirectinoalServiceMessenger messenger =
new BidirectinoalServiceMessenger(conversationId, messageReceivingTimeoutMillis, new BidirectinoalServiceMessenger(serviceConversationId,
responseMessenger); messageReceivingTimeoutMillis, responseMessenger);
final ServiceConversationRecord record = new ServiceConversationRecord(messenger); final ServiceConversationRecord record = new ServiceConversationRecord(messenger);
conversations.put(conversationId, record); conversations.put(serviceConversationId, record);
final ITerminableFuture<Void> controller = final ITerminableFuture<Void> controller =
ConcurrencyUtilities.submit(executor, new ICallable<Void>() ConcurrencyUtilities.submit(executor, new ICallable<Void>()
{ {
...@@ -119,18 +117,18 @@ public class ServiceConversationCollection implements ISendingMessenger ...@@ -119,18 +117,18 @@ public class ServiceConversationCollection implements ISendingMessenger
serviceInstance.run(messenger.getServiceMessenger()); serviceInstance.run(messenger.getServiceMessenger());
} catch (Exception ex) } catch (Exception ex)
{ {
if (ex instanceof InterruptedExceptionUnchecked == false) if (ex instanceof InterruptedExceptionUnchecked == false
&& ex instanceof ClientExecutionException == false)
{ {
final ByteArrayOutputStream os = new ByteArrayOutputStream(); final String errorMessage =
final PrintWriter pw = new PrintWriter(os); ServiceExecutionException
ex.printStackTrace(pw); .getDescriptionFromException(ex);
pw.close();
final String errorMessage = new String(os.toByteArray());
try try
{ {
responseMessenger responseMessenger.send(new ServiceMessage(
.send(new ServiceMessage(conversationId, messenger serviceConversationId, messenger
.nextOutgoingMessageIndex(), errorMessage)); .nextOutgoingMessageIndex(), true,
errorMessage));
} catch (Exception ex2) } catch (Exception ex2)
{ {
// TODO: improve logging // TODO: improve logging
...@@ -139,7 +137,7 @@ public class ServiceConversationCollection implements ISendingMessenger ...@@ -139,7 +137,7 @@ public class ServiceConversationCollection implements ISendingMessenger
} }
} finally } finally
{ {
conversations.remove(conversationId); conversations.remove(serviceConversationId);
} }
return null; return null;
} }
...@@ -152,7 +150,8 @@ public class ServiceConversationCollection implements ISendingMessenger ...@@ -152,7 +150,8 @@ public class ServiceConversationCollection implements ISendingMessenger
}); });
record.setController(controller); record.setController(controller);
return conversationId; return new ServiceConversationDTO(serviceConversationId,
serviceFactory.getClientTimeoutMillis());
} }
public void shutdown() public void shutdown()
...@@ -200,7 +199,14 @@ public class ServiceConversationCollection implements ISendingMessenger ...@@ -200,7 +199,14 @@ public class ServiceConversationCollection implements ISendingMessenger
final ServiceConversationRecord record = conversations.get(conversationId); final ServiceConversationRecord record = conversations.get(conversationId);
if (record == null) if (record == null)
{ {
throw new UnknownServiceConversationException(conversationId); if (message.isException() == false)
{
throw new UnknownServiceConversationException(conversationId);
} else
{
// If it was an exception on the client side, be silent.
return;
}
} }
if (message.isTerminate()) if (message.isTerminate())
{ {
......
/*
* Copyright 2011 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.systemsx.cisd.common.serviceconversation;
import java.io.Serializable;
/**
* A record to save to provide the information about a new service conversation.
*
* @author Bernd Rinn
*/
public class ServiceConversationDTO implements Serializable
{
private static final long serialVersionUID = 1L;
private final String serviceConversationId;
private int clientTimeoutInMillis;
ServiceConversationDTO(String serviceConversationId, int clientTimeout)
{
this.serviceConversationId = serviceConversationId;
this.clientTimeoutInMillis = clientTimeout;
}
public int getClientTimeoutInMillis()
{
return clientTimeoutInMillis;
}
public void setClientTimeoutInMillis(int clientTimeoutInMillis)
{
this.clientTimeoutInMillis = clientTimeoutInMillis;
}
public String getServiceConversationId()
{
return serviceConversationId;
}
}
...@@ -19,9 +19,12 @@ package ch.systemsx.cisd.common.serviceconversation; ...@@ -19,9 +19,12 @@ package ch.systemsx.cisd.common.serviceconversation;
import java.io.PrintStream; import java.io.PrintStream;
import java.io.PrintWriter; import java.io.PrintWriter;
import org.apache.commons.io.output.ByteArrayOutputStream;
/** /**
* An exception that signals to the client that an exception happened during service execution. * An exception that signals to the client that an exception happened during service execution on
* * the server.
*
* @author Bernd Rinn * @author Bernd Rinn
*/ */
public class ServiceExecutionException extends RuntimeException public class ServiceExecutionException extends RuntimeException
...@@ -29,9 +32,9 @@ public class ServiceExecutionException extends RuntimeException ...@@ -29,9 +32,9 @@ public class ServiceExecutionException extends RuntimeException
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private final String serviceConversationId; private final String serviceConversationId;
private final String description; private final String description;
ServiceExecutionException(String serviceConversationId, String description) ServiceExecutionException(String serviceConversationId, String description)
{ {
super("Execution exception in service conversation " + serviceConversationId); super("Execution exception in service conversation " + serviceConversationId);
...@@ -76,5 +79,18 @@ public class ServiceExecutionException extends RuntimeException ...@@ -76,5 +79,18 @@ public class ServiceExecutionException extends RuntimeException
s.println(getMessage()); s.println(getMessage());
s.println(getDescription()); s.println(getDescription());
} }
/**
* Creates a text description from an exception.
*/
public static String getDescriptionFromException(Throwable th)
{
final ByteArrayOutputStream os = new ByteArrayOutputStream();
final PrintWriter pw = new PrintWriter(os);
th.printStackTrace(pw);
pw.close();
final String errorMessage = new String(os.toByteArray());
return errorMessage;
}
} }
...@@ -16,40 +16,42 @@ ...@@ -16,40 +16,42 @@
package ch.systemsx.cisd.common.serviceconversation; package ch.systemsx.cisd.common.serviceconversation;
import java.io.Serializable;
/** /**
* A service message which is part of a service conversation. * A service message which is part of a service conversation.
* *
* @author Bernd Rinn * @author Bernd Rinn
*/ */
public class ServiceMessage public class ServiceMessage
{ {
private final String conversationId; private final String conversationId;
private final int messageIdx; private final int messageIdx;
private final Object payload; private final Serializable payload;
private final String exceptionDescription; private final String exceptionDescription;
public static ServiceMessage terminate(String conversationId)
{
return new ServiceMessage(conversationId, 0, null);
}
public ServiceMessage(String conversationId, int messageId, Object payload) public static ServiceMessage terminate(String conversationId)
{ {
this.conversationId = conversationId; return new ServiceMessage(conversationId, 0, false, null);
this.messageIdx = messageId;
this.payload = payload;
this.exceptionDescription = null;
} }
ServiceMessage(String conversationId, int messageId, String exceptionDescription) public ServiceMessage(String conversationId, int messageId, boolean exception,
Serializable payload)
{ {
this.conversationId = conversationId; this.conversationId = conversationId;
this.messageIdx = messageId; this.messageIdx = messageId;
this.payload = null; if (exception)
this.exceptionDescription = exceptionDescription; {
this.payload = null;
this.exceptionDescription = payload.toString();
} else
{
this.payload = payload;
this.exceptionDescription = null;
}
} }
public String getConversationId() public String getConversationId()
...@@ -62,7 +64,7 @@ public class ServiceMessage ...@@ -62,7 +64,7 @@ public class ServiceMessage
return messageIdx; return messageIdx;
} }
public Object getPayload() public Serializable getPayload()
{ {
return payload; return payload;
} }
...@@ -76,7 +78,7 @@ public class ServiceMessage ...@@ -76,7 +78,7 @@ public class ServiceMessage
{ {
return (payload == null) && (exceptionDescription != null); return (payload == null) && (exceptionDescription != null);
} }
public String tryGetExceptionDescription() public String tryGetExceptionDescription()
{ {
return exceptionDescription; return exceptionDescription;
...@@ -90,12 +92,12 @@ public class ServiceMessage ...@@ -90,12 +92,12 @@ public class ServiceMessage
return "ServiceMessage [conversationId=" + conversationId + ", TERMINATE]"; return "ServiceMessage [conversationId=" + conversationId + ", TERMINATE]";
} else if (isException()) } else if (isException())
{ {
return "ServiceMessage [conversationId=" + conversationId + ", messageIdx=" + messageIdx return "ServiceMessage [conversationId=" + conversationId + ", messageIdx="
+ ", exceptionDescription=" + exceptionDescription + "]"; + messageIdx + ", exceptionDescription=" + exceptionDescription + "]";
} else } else
{ {
return "ServiceMessage [conversationId=" + conversationId + ", messageIdx=" + messageIdx return "ServiceMessage [conversationId=" + conversationId + ", messageIdx="
+ ", payload=" + payload + "]"; + messageIdx + ", payload=" + payload + "]";
} }
} }
} }
\ No newline at end of file
...@@ -14,29 +14,21 @@ ...@@ -14,29 +14,21 @@
* limitations under the License. * limitations under the License.
*/ */
package ch.systemsx.cisd.common.serviceconversarions; package ch.systemsx.cisd.common.serviceconversation;
import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue; import static org.testng.AssertJUnit.assertTrue;
import static org.testng.AssertJUnit.fail; import static org.testng.AssertJUnit.fail;
import java.io.Serializable;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked;
import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities; import ch.systemsx.cisd.common.concurrent.ConcurrencyUtilities;
import ch.systemsx.cisd.common.serviceconversation.ClientMessenger;
import ch.systemsx.cisd.common.serviceconversation.IClientMessenger;
import ch.systemsx.cisd.common.serviceconversation.ISendingMessenger;
import ch.systemsx.cisd.common.serviceconversation.IService;
import ch.systemsx.cisd.common.serviceconversation.IServiceFactory;
import ch.systemsx.cisd.common.serviceconversation.IServiceMessenger;
import ch.systemsx.cisd.common.serviceconversation.ServiceConversationCollection;
import ch.systemsx.cisd.common.serviceconversation.ServiceExecutionException;
import ch.systemsx.cisd.common.serviceconversation.ServiceMessage;
import ch.systemsx.cisd.common.serviceconversation.UnknownServiceTypeException;
/** /**
* Test cases for the {@Link ServiceConversationCollection} class. * Test cases for the {@Link ServiceConversationCollection} class.
...@@ -63,6 +55,10 @@ public class ServiceConversationCollectionTest ...@@ -63,6 +55,10 @@ public class ServiceConversationCollectionTest
{ {
return new SingleEchoService(); return new SingleEchoService();
} }
public int getClientTimeoutMillis()
{
return 100;
}
}); });
final IClientMessenger messenger = conversations.startConversation("singleEcho"); final IClientMessenger messenger = conversations.startConversation("singleEcho");
messenger.send("Hallo Echo"); messenger.send("Hallo Echo");
...@@ -100,6 +96,10 @@ public class ServiceConversationCollectionTest ...@@ -100,6 +96,10 @@ public class ServiceConversationCollectionTest
{ {
return new EchoService(); return new EchoService();
} }
public int getClientTimeoutMillis()
{
return 100;
}
}); });
final BlockingQueue<ServiceMessage> messageQueue = final BlockingQueue<ServiceMessage> messageQueue =
new LinkedBlockingQueue<ServiceMessage>(); new LinkedBlockingQueue<ServiceMessage>();
...@@ -109,25 +109,25 @@ public class ServiceConversationCollectionTest ...@@ -109,25 +109,25 @@ public class ServiceConversationCollectionTest
{ {
messageQueue.add(message); messageQueue.add(message);
} }
}); }).getServiceConversationId();
assertTrue(conversations.hasConversation(id)); assertTrue(conversations.hasConversation(id));
int messageIdx = 0; int messageIdx = 0;
conversations.send(new ServiceMessage(id, 0, "One")); conversations.send(new ServiceMessage(id, 0, false, "One"));
ServiceMessage m = messageQueue.take(); ServiceMessage m = messageQueue.take();
assertEquals(id, m.getConversationId()); assertEquals(id, m.getConversationId());
assertEquals(messageIdx++, m.getMessageIdx()); assertEquals(messageIdx++, m.getMessageIdx());
assertEquals("One", m.getPayload()); assertEquals("One", m.getPayload());
conversations.send(new ServiceMessage(id, 1, "Two")); conversations.send(new ServiceMessage(id, 1, false, "Two"));
// Try to resend and check that the second one is swallowed. // Try to resend and check that the second one is swallowed.
conversations.send(new ServiceMessage(id, 1, "Two")); conversations.send(new ServiceMessage(id, 1, false, "Two"));
m = messageQueue.take(); m = messageQueue.take();
assertEquals(id, m.getConversationId()); assertEquals(id, m.getConversationId());
assertEquals(messageIdx++, m.getMessageIdx()); assertEquals(messageIdx++, m.getMessageIdx());
assertEquals("Two", m.getPayload()); assertEquals("Two", m.getPayload());
conversations.send(new ServiceMessage(id, 2, "Three")); conversations.send(new ServiceMessage(id, 2, false, "Three"));
m = messageQueue.take(); m = messageQueue.take();
assertEquals(id, m.getConversationId()); assertEquals(id, m.getConversationId());
assertEquals(messageIdx++, m.getMessageIdx()); assertEquals(messageIdx++, m.getMessageIdx());
...@@ -151,6 +151,10 @@ public class ServiceConversationCollectionTest ...@@ -151,6 +151,10 @@ public class ServiceConversationCollectionTest
{ {
return new EchoService(); return new EchoService();
} }
public int getClientTimeoutMillis()
{
return 100;
}
}); });
final BlockingQueue<ServiceMessage> messageQueue = final BlockingQueue<ServiceMessage> messageQueue =
new LinkedBlockingQueue<ServiceMessage>(); new LinkedBlockingQueue<ServiceMessage>();
...@@ -160,10 +164,10 @@ public class ServiceConversationCollectionTest ...@@ -160,10 +164,10 @@ public class ServiceConversationCollectionTest
{ {
messageQueue.add(message); messageQueue.add(message);
} }
}); }).getServiceConversationId();
assertTrue(conversations.hasConversation(id)); assertTrue(conversations.hasConversation(id));
int messageIdx = 0; int messageIdx = 0;
conversations.send(new ServiceMessage(id, 0, "One")); conversations.send(new ServiceMessage(id, 0, false, "One"));
ServiceMessage m = messageQueue.take(); ServiceMessage m = messageQueue.take();
assertEquals(id, m.getConversationId()); assertEquals(id, m.getConversationId());
assertEquals(messageIdx++, m.getMessageIdx()); assertEquals(messageIdx++, m.getMessageIdx());
...@@ -211,11 +215,15 @@ public class ServiceConversationCollectionTest ...@@ -211,11 +215,15 @@ public class ServiceConversationCollectionTest
{ {
return new ExceptionThrowingService(); return new ExceptionThrowingService();
} }
public int getClientTimeoutMillis()
{
return 100;
}
}); });
final ClientMessenger messenger = conversations.startConversation("throwException"); final ClientMessenger messenger = conversations.startConversation("throwException");
try try
{ {
messenger.receive(Object.class); messenger.receive(Serializable.class);
fail(); fail();
} catch (ServiceExecutionException ex) } catch (ServiceExecutionException ex)
{ {
...@@ -225,4 +233,47 @@ public class ServiceConversationCollectionTest ...@@ -225,4 +233,47 @@ public class ServiceConversationCollectionTest
assertTrue(ex.getDescription().contains("Don't like you!")); assertTrue(ex.getDescription().contains("Don't like you!"));
} }
} }
private static class DelayedService implements IService
{
public void run(IServiceMessenger messenger)
{
ConcurrencyUtilities.sleep(100L);
try
{
messenger.receive(Serializable.class);
} catch (ClientExecutionException ex)
{
System.err.println("Client timed out.");
}
}
}
@Test
public void testClientTimesout() throws Exception
{
final ServiceConversationCollection conversations = new ServiceConversationCollection(100);
conversations.addServiceType("delayed", new IServiceFactory()
{
public IService create()
{
return new DelayedService();
}
public int getClientTimeoutMillis()
{
return 10;
}
});
final ClientMessenger messenger = conversations.startConversation("delayed");
try
{
messenger.receive(Serializable.class);
fail();
} catch (TimeoutExceptionUnchecked ex)
{
}
// Wait for service to find out that the client timed out.
ConcurrencyUtilities.sleep(100L);
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment