Skip to content
Snippets Groups Projects
Commit 03f78d64 authored by brinn's avatar brinn
Browse files

Add: ServiceConversationServerConfig object to configure various parameters of...

Add: ServiceConversationServerConfig object to configure various parameters of the ServiceConversationServer.

SVN: 24040
parent e611e039
No related branches found
No related tags found
No related merge requests found
...@@ -19,7 +19,6 @@ package ch.systemsx.cisd.common.serviceconversation.server; ...@@ -19,7 +19,6 @@ package ch.systemsx.cisd.common.serviceconversation.server;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
...@@ -47,17 +46,14 @@ import ch.systemsx.cisd.common.serviceconversation.client.ServiceExecutionExcept ...@@ -47,17 +46,14 @@ import ch.systemsx.cisd.common.serviceconversation.client.ServiceExecutionExcept
*/ */
public class ServiceConversationServer public class ServiceConversationServer
{ {
private final static int NUMBER_OF_CORE_THREADS = 10;
private final static int SHUTDOWN_TIMEOUT_MILLIS = 10000;
final static Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, final static Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION,
ServiceConversationServer.class); ServiceConversationServer.class);
private final int messageReceivingTimeoutMillis; private final int messageReceivingTimeoutMillis;
private final ExecutorService executor = new NamingThreadPoolExecutor("Service Conversations") private final int shutdownTimeoutMillis;
.corePoolSize(NUMBER_OF_CORE_THREADS).daemonize();
private final NamingThreadPoolExecutor executor;
private final Map<String, IServiceFactory> serviceFactoryMap = private final Map<String, IServiceFactory> serviceFactoryMap =
new ConcurrentHashMap<String, IServiceFactory>(); new ConcurrentHashMap<String, IServiceFactory>();
...@@ -104,9 +100,23 @@ public class ServiceConversationServer ...@@ -104,9 +100,23 @@ public class ServiceConversationServer
} }
}; };
public ServiceConversationServer(int messageReceivingTimeoutMillis) public ServiceConversationServer()
{
this(ServiceConversationServerConfig.create());
}
public ServiceConversationServer(ServiceConversationServerConfig config)
{ {
this.messageReceivingTimeoutMillis = messageReceivingTimeoutMillis; this.executor =
new NamingThreadPoolExecutor("Service Conversations", config.getWorkQueueSize())
.corePoolSize(config.getNumberOfCoreThreads()).maximumPoolSize(
config.getMaxNumberOfThreads());
if (config.isDaemonize())
{
this.executor.daemonize();
}
this.messageReceivingTimeoutMillis = config.getMessageReceivingTimeoutMillis();
this.shutdownTimeoutMillis = config.getShutdownTimeoutMillis();
} }
// //
...@@ -175,7 +185,7 @@ public class ServiceConversationServer ...@@ -175,7 +185,7 @@ public class ServiceConversationServer
final IServiceMessageTransport responseMessenger = responseMessageMap.get(clientId); final IServiceMessageTransport responseMessenger = responseMessageMap.get(clientId);
if (responseMessenger == null) if (responseMessenger == null)
{ {
final UnknownClientException ex = new UnknownClientException(clientId); final UnknownClientException ex = new UnknownClientException(clientId);
operationLog.error(ex.getMessage()); operationLog.error(ex.getMessage());
throw ex; throw ex;
} }
...@@ -235,7 +245,13 @@ public class ServiceConversationServer ...@@ -235,7 +245,13 @@ public class ServiceConversationServer
serviceFactory.getClientTimeoutMillis()); serviceFactory.getClientTimeoutMillis());
} }
public void shutdown() /**
* Shuts down the server, waiting for ongoing conversations to finish for
* {@link ServiceConversationServerConfig#getShutdownTimeoutMillis()} milli-seconds.
*
* @return <code>true</code>, if the server was shut down properly.
*/
public boolean shutdown()
{ {
try try
{ {
...@@ -243,14 +259,19 @@ public class ServiceConversationServer ...@@ -243,14 +259,19 @@ public class ServiceConversationServer
{ {
record.getController().cancel(true); record.getController().cancel(true);
} }
executor.awaitTermination(SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); return executor.awaitTermination(shutdownTimeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception ex) } catch (Exception ex)
{ {
throw new CheckedExceptionTunnel(ex); throw new CheckedExceptionTunnel(ex);
} }
} }
public void shutdownNow() /**
* Shuts down the server, not waiting for ongoing conversations to finish.
*
* @return <code>true</code>, if the server was shut down properly.
*/
public boolean shutdownNow()
{ {
try try
{ {
...@@ -258,13 +279,16 @@ public class ServiceConversationServer ...@@ -258,13 +279,16 @@ public class ServiceConversationServer
{ {
record.getController().cancel(true); record.getController().cancel(true);
} }
executor.awaitTermination(0, TimeUnit.MILLISECONDS); return executor.awaitTermination(0, TimeUnit.MILLISECONDS);
} catch (Exception ex) } catch (Exception ex)
{ {
throw new CheckedExceptionTunnel(ex); throw new CheckedExceptionTunnel(ex);
} }
} }
/**
* Returns <code>true</code> if this server has the given <var>conversationId</var>.
*/
public boolean hasConversation(String conversationId) public boolean hasConversation(String conversationId)
{ {
return conversations.containsKey(conversationId); return conversations.containsKey(conversationId);
......
/*
* Copyright 2011 ETH Zuerich, CISD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ch.systemsx.cisd.common.serviceconversation.server;
/**
* A configuration object for service conversations.
*
* @author Bernd Rinn
*/
public class ServiceConversationServerConfig
{
private int numberOfCoreThreads = 10;
private int maxNumberOfThreads = Integer.MAX_VALUE;
private int workQueueSize = 0;
private int messageReceivingTimeoutMillis = 30000;
private int shutdownTimeoutMillis = 60000;
private boolean daemonize = true;
public int getNumberOfCoreThreads()
{
return numberOfCoreThreads;
}
/**
* Creates a new configuration with default values.
*/
public static ServiceConversationServerConfig create()
{
return new ServiceConversationServerConfig();
}
/**
* Configures the number of core threads being spawned for service conversations (default: 10).
* Core threads are not shut down and all filled before any incoming service conversation
* request is queued.
*/
public ServiceConversationServerConfig numberOfCoreThreads(@SuppressWarnings("hiding")
int numberOfCoreThreads)
{
this.numberOfCoreThreads = numberOfCoreThreads;
if (this.maxNumberOfThreads < numberOfCoreThreads)
{
this.maxNumberOfThreads = numberOfCoreThreads;
}
return this;
}
public int getMaxNumberOfThreads()
{
return maxNumberOfThreads;
}
/**
* Configures the maximum number of threads being spawned for service conversations (default:
* {@link Integer#MAX_VALUE}).
*/
public ServiceConversationServerConfig maxNumberOfThreads(@SuppressWarnings("hiding")
int maxNumberOfThreads)
{
this.maxNumberOfThreads = maxNumberOfThreads;
return this;
}
public int getWorkQueueSize()
{
return workQueueSize;
}
/**
* Configures the length of the work queue (default 0). If set to a value larger than 0, new
* service conversations will be queued rather than new threads being spawned when all core
* threads are busy.
*/
public ServiceConversationServerConfig workQueueSize(@SuppressWarnings("hiding")
int workQueueSize)
{
this.workQueueSize = workQueueSize;
this.maxNumberOfThreads = this.numberOfCoreThreads;
return this;
}
public int getMessageReceivingTimeoutMillis()
{
return messageReceivingTimeoutMillis;
}
/**
* Configures the time in milli-seconds that a service conversation methods waits for an
* incoming message from the client before timing out (default: 30000).
*/
public ServiceConversationServerConfig messageReceivingTimeoutMillis(
@SuppressWarnings("hiding")
int messageReceivingTimeoutMillis)
{
this.messageReceivingTimeoutMillis = messageReceivingTimeoutMillis;
return this;
}
public int getShutdownTimeoutMillis()
{
return shutdownTimeoutMillis;
}
/**
* Configures the shutdown timeout in milli-seconds (default: 60000).
*/
public ServiceConversationServerConfig shutdownTimeoutMillis(@SuppressWarnings("hiding")
int shutdownTimeoutMillis)
{
this.shutdownTimeoutMillis = shutdownTimeoutMillis;
return this;
}
public boolean isDaemonize()
{
return daemonize;
}
/**
* Configures whether the service conversation threads are daemonized (default:
* <code>true</code>).
*/
public ServiceConversationServerConfig daemonize(@SuppressWarnings("hiding")
boolean daemonize)
{
this.daemonize = daemonize;
return this;
}
/**
* Configures that the service conversation threads are not daemonized.
*/
public ServiceConversationServerConfig undaemonize()
{
this.daemonize = false;
return this;
}
}
...@@ -39,6 +39,7 @@ import ch.systemsx.cisd.common.serviceconversation.client.ServiceExecutionExcept ...@@ -39,6 +39,7 @@ import ch.systemsx.cisd.common.serviceconversation.client.ServiceExecutionExcept
import ch.systemsx.cisd.common.serviceconversation.server.IService; import ch.systemsx.cisd.common.serviceconversation.server.IService;
import ch.systemsx.cisd.common.serviceconversation.server.IServiceFactory; import ch.systemsx.cisd.common.serviceconversation.server.IServiceFactory;
import ch.systemsx.cisd.common.serviceconversation.server.ServiceConversationServer; import ch.systemsx.cisd.common.serviceconversation.server.ServiceConversationServer;
import ch.systemsx.cisd.common.serviceconversation.server.ServiceConversationServerConfig;
/** /**
* Test cases for the {@Link ServiceConversationCollection} class. * Test cases for the {@Link ServiceConversationCollection} class.
...@@ -53,6 +54,11 @@ public class ServiceConversationTest ...@@ -53,6 +54,11 @@ public class ServiceConversationTest
LogInitializer.init(); LogInitializer.init();
} }
private ServiceConversationServerConfig config()
{
return ServiceConversationServerConfig.create().messageReceivingTimeoutMillis(100);
}
/** /**
* This object encapsulates the client server connection for test purposes. * This object encapsulates the client server connection for test purposes.
*/ */
...@@ -122,7 +128,7 @@ public class ServiceConversationTest ...@@ -122,7 +128,7 @@ public class ServiceConversationTest
private ServiceConversationServerAndClientHolder createServerAndClient(IServiceFactory factory) private ServiceConversationServerAndClientHolder createServerAndClient(IServiceFactory factory)
{ {
final ServiceConversationServer server = new ServiceConversationServer(100); final ServiceConversationServer server = new ServiceConversationServer(config());
server.addServiceType(factory); server.addServiceType(factory);
final TestClientServerConnection dummyRemoteServer = new TestClientServerConnection(server); final TestClientServerConnection dummyRemoteServer = new TestClientServerConnection(server);
final ServiceConversationClient client = final ServiceConversationClient client =
...@@ -240,7 +246,7 @@ public class ServiceConversationTest ...@@ -240,7 +246,7 @@ public class ServiceConversationTest
@Test @Test
public void testMultipleEchoServiceTerminateLowLevelHappyCase() throws Exception public void testMultipleEchoServiceTerminateLowLevelHappyCase() throws Exception
{ {
final ServiceConversationServer conversations = new ServiceConversationServer(100); final ServiceConversationServer conversations = new ServiceConversationServer(config());
conversations.addServiceType(EchoService.createFactory()); conversations.addServiceType(EchoService.createFactory());
final BlockingQueue<ServiceMessage> messageQueue = final BlockingQueue<ServiceMessage> messageQueue =
new LinkedBlockingQueue<ServiceMessage>(); new LinkedBlockingQueue<ServiceMessage>();
...@@ -320,7 +326,7 @@ public class ServiceConversationTest ...@@ -320,7 +326,7 @@ public class ServiceConversationTest
@Test(expectedExceptions = UnknownServiceTypeException.class) @Test(expectedExceptions = UnknownServiceTypeException.class)
public void testUnknownService() throws Exception public void testUnknownService() throws Exception
{ {
final ServiceConversationServer conversations = new ServiceConversationServer(100); final ServiceConversationServer conversations = new ServiceConversationServer(config());
conversations.addClientResponseTransport("dummyClient", new IServiceMessageTransport() conversations.addClientResponseTransport("dummyClient", new IServiceMessageTransport()
{ {
public void send(ServiceMessage message) public void send(ServiceMessage message)
...@@ -333,7 +339,7 @@ public class ServiceConversationTest ...@@ -333,7 +339,7 @@ public class ServiceConversationTest
@Test(expectedExceptions = UnknownClientException.class) @Test(expectedExceptions = UnknownClientException.class)
public void testUnknownClient() throws Exception public void testUnknownClient() throws Exception
{ {
final ServiceConversationServer conversations = new ServiceConversationServer(100); final ServiceConversationServer conversations = new ServiceConversationServer(config());
conversations.addServiceType(new IServiceFactory() conversations.addServiceType(new IServiceFactory()
{ {
public IService create() public IService create()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment