Skip to content
Snippets Groups Projects
Commit 9bec39f3 authored by brinn's avatar brinn
Browse files

Detect service execution exceptions in client also during send.

SVN: 24047
parent 35463493
No related branches found
No related tags found
No related merge requests found
......@@ -17,6 +17,7 @@
package ch.systemsx.cisd.common.serviceconversation.client;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicBoolean;
import ch.systemsx.cisd.base.exceptions.CheckedExceptionTunnel;
import ch.systemsx.cisd.base.exceptions.TimeoutExceptionUnchecked;
......@@ -44,6 +45,8 @@ class ClientMessenger implements IServiceConversation
private int outgoingMessageIdx;
private final AtomicBoolean serviceExceptionSignaled = new AtomicBoolean();
ClientMessenger(ServiceConversationDTO serviceConversationDTO,
IServiceMessageTransport transportToService,
ClientResponseMessageQueue responseMessageQueue,
......@@ -56,24 +59,52 @@ class ClientMessenger implements IServiceConversation
this.transportToService = transportToService;
this.responseMessageQueue = responseMessageQueue;
this.responseMessageMultiplexer = responseMessageMultiplexer;
responseMessageMultiplexer.addConversation(serviceConversationId, responseMessageQueue);
responseMessageMultiplexer.addConversation(serviceConversationId,
new IServiceMessageTransportWithControl()
{
public void send(ServiceMessage message)
{
ClientMessenger.this.responseMessageQueue.send(message);
}
public void sendException(ServiceMessage message)
{
ClientMessenger.this.serviceExceptionSignaled.set(true);
ClientMessenger.this.responseMessageQueue.send(message);
}
});
}
public void send(Serializable message)
{
checkServiceException();
transportToService.send(new ServiceMessage(serviceConversationId,
nextOutgoingMessageIndex(), false, message));
}
public void terminate()
{
checkServiceException();
transportToService.send(ServiceMessage.terminate(serviceConversationId));
}
private int nextOutgoingMessageIndex()
private void checkServiceException() throws ServiceExecutionException
{
return outgoingMessageIdx++;
if (serviceExceptionSignaled.getAndSet(false))
{
try
{
final ServiceMessage messageOrNull = responseMessageQueue.poll(0);
if (messageOrNull != null && messageOrNull.isException())
{
throw new ServiceExecutionException(messageOrNull.getConversationId(),
messageOrNull.tryGetExceptionDescription());
}
} catch (InterruptedException ex)
{
throw CheckedExceptionTunnel.wrapIfNecessary(ex);
}
}
}
public <T extends Serializable> T receive(Class<T> messageClass)
......@@ -108,7 +139,8 @@ class ClientMessenger implements IServiceConversation
if (throwExceptionOnNull)
{
final TimeoutExceptionUnchecked exception =
new TimeoutExceptionUnchecked("Timeout while waiting on message from service.");
new TimeoutExceptionUnchecked(
"Timeout while waiting on message from service.");
final String exceptionDescription =
ServiceExecutionException.getDescriptionFromException(exception);
transportToService.send(new ServiceMessage(serviceConversationId,
......@@ -132,6 +164,11 @@ class ClientMessenger implements IServiceConversation
return (T) payload;
}
private int nextOutgoingMessageIndex()
{
return outgoingMessageIdx++;
}
public String getId()
{
return serviceConversationId;
......
......@@ -37,9 +37,9 @@ class ClientResponseMessageMultiplexer
final static Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION,
ClientResponseMessageMultiplexer.class);
private final Map<String, IServiceMessageTransport> conversations =
new ConcurrentHashMap<String, IServiceMessageTransport>();
private final Map<String, IServiceMessageTransportWithControl> conversations =
new ConcurrentHashMap<String, IServiceMessageTransportWithControl>();
/**
* Returns the transport for incoming response messages.
*/
......@@ -50,24 +50,32 @@ class ClientResponseMessageMultiplexer
public void send(ServiceMessage message)
{
final String conversationId = message.getConversationId();
final IServiceMessageTransport transport = conversations.get(conversationId);
final IServiceMessageTransportWithControl transport =
conversations.get(conversationId);
if (transport == null)
{
final String msg = String.format(
"Message for unknown service conversation '%s'", conversationId);
final String msg =
String.format("Message for unknown service conversation '%s'",
conversationId);
operationLog.error(msg);
throw new UnknownServiceConversationException(msg);
}
transport.send(message);
if (message.isException())
{
transport.sendException(message);
} else
{
transport.send(message);
}
}
};
}
/**
* Adds a new conversation to the multiplexer.
*/
void addConversation(String serviceConversationId,
IServiceMessageTransport responseMessageTransport)
IServiceMessageTransportWithControl responseMessageTransport)
{
conversations.put(serviceConversationId, responseMessageTransport);
}
......
/*
* Copyright 2011 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.systemsx.cisd.common.serviceconversation.client;
import ch.systemsx.cisd.common.serviceconversation.IServiceMessageTransport;
import ch.systemsx.cisd.common.serviceconversation.ServiceMessage;
/**
* The role that can transport both regular payload and exception service messages separately.
*
* @author Bernd Rinn
*/
interface IServiceMessageTransportWithControl extends IServiceMessageTransport
{
/**
* Send the <var>message</var> using this transport.
*/
public void sendException(ServiceMessage message);
}
......@@ -375,10 +375,9 @@ public class ServiceConversationTest
}
assertFalse(holder.server.hasConversation(conversation.getId()));
conversation.send("Three");
try
{
conversation.receive(String.class);
conversation.send("Three");
fail("Server timeout not signaled to client");
} catch (ServiceExecutionException ex)
{
......@@ -464,7 +463,26 @@ public class ServiceConversationTest
try
{
messenger.receive(Serializable.class);
fail();
fail("Failed to detect error state on serve-side while receiving.");
} catch (ServiceExecutionException ex)
{
assertEquals(messenger.getId(), ex.getServiceConversationId());
assertTrue(ex.getDescription().contains("RuntimeException"));
assertTrue(ex.getDescription().contains("Don't like you!"));
}
}
@Test
public void testServiceThrowsExceptionOnSend() throws Exception
{
final ServiceConversationClient client =
createServerAndClient(ExceptionThrowingService.createFactory()).client;
final IServiceConversation messenger = client.startConversation("throwException");
ConcurrencyUtilities.sleep(20L);
try
{
messenger.send("Test");
fail("Failed to detect error state on serve-side while sending.");
} catch (ServiceExecutionException ex)
{
assertEquals(messenger.getId(), ex.getServiceConversationId());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment