From 03f78d6425c4d38a8f79513c866dcca93111fa4c Mon Sep 17 00:00:00 2001 From: brinn <brinn> Date: Sat, 17 Dec 2011 21:45:02 +0000 Subject: [PATCH] Add: ServiceConversationServerConfig object to configure various parameters of the ServiceConversationServer. SVN: 24040 --- .../server/ServiceConversationServer.java | 52 ++++-- .../ServiceConversationServerConfig.java | 157 ++++++++++++++++++ .../ServiceConversationTest.java | 14 +- 3 files changed, 205 insertions(+), 18 deletions(-) create mode 100644 common/source/java/ch/systemsx/cisd/common/serviceconversation/server/ServiceConversationServerConfig.java diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/ServiceConversationServer.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/ServiceConversationServer.java index 16b0c04d8d8..a1510f9e29a 100644 --- a/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/ServiceConversationServer.java +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/ServiceConversationServer.java @@ -19,7 +19,6 @@ package ch.systemsx.cisd.common.serviceconversation.server; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; @@ -47,17 +46,14 @@ import ch.systemsx.cisd.common.serviceconversation.client.ServiceExecutionExcept */ public class ServiceConversationServer { - private final static int NUMBER_OF_CORE_THREADS = 10; - - private final static int SHUTDOWN_TIMEOUT_MILLIS = 10000; - final static Logger operationLog = LogFactory.getLogger(LogCategory.OPERATION, ServiceConversationServer.class); private final int messageReceivingTimeoutMillis; - private final ExecutorService executor = new NamingThreadPoolExecutor("Service Conversations") - .corePoolSize(NUMBER_OF_CORE_THREADS).daemonize(); + private final int shutdownTimeoutMillis; + + private final NamingThreadPoolExecutor executor; private final Map<String, IServiceFactory> serviceFactoryMap = new ConcurrentHashMap<String, IServiceFactory>(); @@ -104,9 +100,23 @@ public class ServiceConversationServer } }; - public ServiceConversationServer(int messageReceivingTimeoutMillis) + public ServiceConversationServer() + { + this(ServiceConversationServerConfig.create()); + } + + public ServiceConversationServer(ServiceConversationServerConfig config) { - this.messageReceivingTimeoutMillis = messageReceivingTimeoutMillis; + this.executor = + new NamingThreadPoolExecutor("Service Conversations", config.getWorkQueueSize()) + .corePoolSize(config.getNumberOfCoreThreads()).maximumPoolSize( + config.getMaxNumberOfThreads()); + if (config.isDaemonize()) + { + this.executor.daemonize(); + } + this.messageReceivingTimeoutMillis = config.getMessageReceivingTimeoutMillis(); + this.shutdownTimeoutMillis = config.getShutdownTimeoutMillis(); } // @@ -175,7 +185,7 @@ public class ServiceConversationServer final IServiceMessageTransport responseMessenger = responseMessageMap.get(clientId); if (responseMessenger == null) { - final UnknownClientException ex = new UnknownClientException(clientId); + final UnknownClientException ex = new UnknownClientException(clientId); operationLog.error(ex.getMessage()); throw ex; } @@ -235,7 +245,13 @@ public class ServiceConversationServer serviceFactory.getClientTimeoutMillis()); } - public void shutdown() + /** + * Shuts down the server, waiting for ongoing conversations to finish for + * {@link ServiceConversationServerConfig#getShutdownTimeoutMillis()} milli-seconds. + * + * @return <code>true</code>, if the server was shut down properly. + */ + public boolean shutdown() { try { @@ -243,14 +259,19 @@ public class ServiceConversationServer { record.getController().cancel(true); } - executor.awaitTermination(SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + return executor.awaitTermination(shutdownTimeoutMillis, TimeUnit.MILLISECONDS); } catch (Exception ex) { throw new CheckedExceptionTunnel(ex); } } - public void shutdownNow() + /** + * Shuts down the server, not waiting for ongoing conversations to finish. + * + * @return <code>true</code>, if the server was shut down properly. + */ + public boolean shutdownNow() { try { @@ -258,13 +279,16 @@ public class ServiceConversationServer { record.getController().cancel(true); } - executor.awaitTermination(0, TimeUnit.MILLISECONDS); + return executor.awaitTermination(0, TimeUnit.MILLISECONDS); } catch (Exception ex) { throw new CheckedExceptionTunnel(ex); } } + /** + * Returns <code>true</code> if this server has the given <var>conversationId</var>. + */ public boolean hasConversation(String conversationId) { return conversations.containsKey(conversationId); diff --git a/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/ServiceConversationServerConfig.java b/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/ServiceConversationServerConfig.java new file mode 100644 index 00000000000..365981ceaa0 --- /dev/null +++ b/common/source/java/ch/systemsx/cisd/common/serviceconversation/server/ServiceConversationServerConfig.java @@ -0,0 +1,157 @@ +/* + * Copyright 2011 ETH Zuerich, CISD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ch.systemsx.cisd.common.serviceconversation.server; + +/** + * A configuration object for service conversations. + * + * @author Bernd Rinn + */ +public class ServiceConversationServerConfig +{ + private int numberOfCoreThreads = 10; + + private int maxNumberOfThreads = Integer.MAX_VALUE; + + private int workQueueSize = 0; + + private int messageReceivingTimeoutMillis = 30000; + + private int shutdownTimeoutMillis = 60000; + + private boolean daemonize = true; + + public int getNumberOfCoreThreads() + { + return numberOfCoreThreads; + } + + /** + * Creates a new configuration with default values. + */ + public static ServiceConversationServerConfig create() + { + return new ServiceConversationServerConfig(); + } + + /** + * Configures the number of core threads being spawned for service conversations (default: 10). + * Core threads are not shut down and all filled before any incoming service conversation + * request is queued. + */ + public ServiceConversationServerConfig numberOfCoreThreads(@SuppressWarnings("hiding") + int numberOfCoreThreads) + { + this.numberOfCoreThreads = numberOfCoreThreads; + if (this.maxNumberOfThreads < numberOfCoreThreads) + { + this.maxNumberOfThreads = numberOfCoreThreads; + } + return this; + } + + public int getMaxNumberOfThreads() + { + return maxNumberOfThreads; + } + + /** + * Configures the maximum number of threads being spawned for service conversations (default: + * {@link Integer#MAX_VALUE}). + */ + public ServiceConversationServerConfig maxNumberOfThreads(@SuppressWarnings("hiding") + int maxNumberOfThreads) + { + this.maxNumberOfThreads = maxNumberOfThreads; + return this; + } + + public int getWorkQueueSize() + { + return workQueueSize; + } + + /** + * Configures the length of the work queue (default 0). If set to a value larger than 0, new + * service conversations will be queued rather than new threads being spawned when all core + * threads are busy. + */ + public ServiceConversationServerConfig workQueueSize(@SuppressWarnings("hiding") + int workQueueSize) + { + this.workQueueSize = workQueueSize; + this.maxNumberOfThreads = this.numberOfCoreThreads; + return this; + } + + public int getMessageReceivingTimeoutMillis() + { + return messageReceivingTimeoutMillis; + } + + /** + * Configures the time in milli-seconds that a service conversation methods waits for an + * incoming message from the client before timing out (default: 30000). + */ + public ServiceConversationServerConfig messageReceivingTimeoutMillis( + @SuppressWarnings("hiding") + int messageReceivingTimeoutMillis) + { + this.messageReceivingTimeoutMillis = messageReceivingTimeoutMillis; + return this; + } + + public int getShutdownTimeoutMillis() + { + return shutdownTimeoutMillis; + } + + /** + * Configures the shutdown timeout in milli-seconds (default: 60000). + */ + public ServiceConversationServerConfig shutdownTimeoutMillis(@SuppressWarnings("hiding") + int shutdownTimeoutMillis) + { + this.shutdownTimeoutMillis = shutdownTimeoutMillis; + return this; + } + + public boolean isDaemonize() + { + return daemonize; + } + + /** + * Configures whether the service conversation threads are daemonized (default: + * <code>true</code>). + */ + public ServiceConversationServerConfig daemonize(@SuppressWarnings("hiding") + boolean daemonize) + { + this.daemonize = daemonize; + return this; + } + + /** + * Configures that the service conversation threads are not daemonized. + */ + public ServiceConversationServerConfig undaemonize() + { + this.daemonize = false; + return this; + } +} diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationTest.java index 2d4e731752c..ff693199cc0 100644 --- a/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationTest.java +++ b/common/sourceTest/java/ch/systemsx/cisd/common/serviceconversation/ServiceConversationTest.java @@ -39,6 +39,7 @@ import ch.systemsx.cisd.common.serviceconversation.client.ServiceExecutionExcept import ch.systemsx.cisd.common.serviceconversation.server.IService; import ch.systemsx.cisd.common.serviceconversation.server.IServiceFactory; import ch.systemsx.cisd.common.serviceconversation.server.ServiceConversationServer; +import ch.systemsx.cisd.common.serviceconversation.server.ServiceConversationServerConfig; /** * Test cases for the {@Link ServiceConversationCollection} class. @@ -53,6 +54,11 @@ public class ServiceConversationTest LogInitializer.init(); } + private ServiceConversationServerConfig config() + { + return ServiceConversationServerConfig.create().messageReceivingTimeoutMillis(100); + } + /** * This object encapsulates the client server connection for test purposes. */ @@ -122,7 +128,7 @@ public class ServiceConversationTest private ServiceConversationServerAndClientHolder createServerAndClient(IServiceFactory factory) { - final ServiceConversationServer server = new ServiceConversationServer(100); + final ServiceConversationServer server = new ServiceConversationServer(config()); server.addServiceType(factory); final TestClientServerConnection dummyRemoteServer = new TestClientServerConnection(server); final ServiceConversationClient client = @@ -240,7 +246,7 @@ public class ServiceConversationTest @Test public void testMultipleEchoServiceTerminateLowLevelHappyCase() throws Exception { - final ServiceConversationServer conversations = new ServiceConversationServer(100); + final ServiceConversationServer conversations = new ServiceConversationServer(config()); conversations.addServiceType(EchoService.createFactory()); final BlockingQueue<ServiceMessage> messageQueue = new LinkedBlockingQueue<ServiceMessage>(); @@ -320,7 +326,7 @@ public class ServiceConversationTest @Test(expectedExceptions = UnknownServiceTypeException.class) public void testUnknownService() throws Exception { - final ServiceConversationServer conversations = new ServiceConversationServer(100); + final ServiceConversationServer conversations = new ServiceConversationServer(config()); conversations.addClientResponseTransport("dummyClient", new IServiceMessageTransport() { public void send(ServiceMessage message) @@ -333,7 +339,7 @@ public class ServiceConversationTest @Test(expectedExceptions = UnknownClientException.class) public void testUnknownClient() throws Exception { - final ServiceConversationServer conversations = new ServiceConversationServer(100); + final ServiceConversationServer conversations = new ServiceConversationServer(config()); conversations.addServiceType(new IServiceFactory() { public IService create() -- GitLab