From 666145910bf5d09f9a50f7747f623ef3d755eb54 Mon Sep 17 00:00:00 2001 From: brinn <brinn> Date: Mon, 9 Jun 2008 06:21:07 +0000 Subject: [PATCH] fix: have NamingThreadPoolExecutor use a SynchronousQueue in order to get caching rather than blocking thread pool behavior change: improve the configuration of NamingThreadPoolExecutor add: - allow to easily set the daemon state of the created threads - make it optional to use the pool name in the thread names SVN: 6496 --- .../concurrent/NamingThreadFactory.java | 40 ++++- .../concurrent/NamingThreadPoolExecutor.java | 141 +++++++++++------- .../common/concurrent/PoolNameThread.java | 17 ++- .../process/ProcessExecutionHelper.java | 3 +- .../concurrent/ConcurrencyUtilitiesTest.java | 33 ++-- .../NamingThreadPoolExecutorTest.java | 102 ++++++++++--- 6 files changed, 242 insertions(+), 94 deletions(-) diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/NamingThreadFactory.java b/common/source/java/ch/systemsx/cisd/common/concurrent/NamingThreadFactory.java index 06c1fb2e377..c87f18fb386 100644 --- a/common/source/java/ch/systemsx/cisd/common/concurrent/NamingThreadFactory.java +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/NamingThreadFactory.java @@ -31,22 +31,56 @@ public class NamingThreadFactory implements ThreadFactory private final String poolName; + private boolean createDaemonThreads; + + private boolean addPoolName; + private int threadCount; public NamingThreadFactory(String poolName) { this.poolName = poolName; - this.threadCount = 1; + this.addPoolName = true; + this.createDaemonThreads = false; + this.threadCount = 0; } public Thread newThread(Runnable r) { - final String completePoolName = poolName + "-T" + threadCount++; - return new PoolNameThread(r, completePoolName); + ++threadCount; + final String completePoolName = poolName + "-T" + threadCount; + final Thread thread = new PoolNameThread(r, completePoolName, addPoolName); + thread.setDaemon(createDaemonThreads); + return thread; } String getPoolName() { return poolName; } + + public final boolean isCreateDaemonThreads() + { + return createDaemonThreads; + } + + public final void setCreateDaemonThreads(boolean createDaemonThreads) + { + this.createDaemonThreads = createDaemonThreads; + } + + public final int getThreadCount() + { + return threadCount; + } + + public final boolean isAddPoolName() + { + return addPoolName; + } + + public final void setAddPoolName(boolean addPoolName) + { + this.addPoolName = addPoolName; + } } diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/NamingThreadPoolExecutor.java b/common/source/java/ch/systemsx/cisd/common/concurrent/NamingThreadPoolExecutor.java index de16227920c..5ac6954e4ed 100644 --- a/common/source/java/ch/systemsx/cisd/common/concurrent/NamingThreadPoolExecutor.java +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/NamingThreadPoolExecutor.java @@ -20,8 +20,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -41,62 +42,15 @@ public class NamingThreadPoolExecutor extends ThreadPoolExecutor public final static long DEFAULT_KEEP_ALIVE_TIME_MILLIS = 10000L; /** - * Creates a new <tt>NamingThreadPoolExecutor</tt> with the given initial parameters. + * Creates a new (caching) <tt>NamingThreadPoolExecutor</tt> with the given initial + * parameters. This executor will create new threads as needed. * * @param poolName the default name for new threads */ public NamingThreadPoolExecutor(String poolName) { super(1, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_TIME_MILLIS, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>(), new NamingThreadFactory(poolName)); - } - - /** - * Creates a new <tt>NamingThreadPoolExecutor</tt> with the given initial parameters. - * - * @param poolName the default name for new threads - * @param corePoolSize the number of threads to keep in the pool, even if they are idle. - * @throws IllegalArgumentException if corePoolSize less than zero. - */ - public NamingThreadPoolExecutor(String poolName, int corePoolSize) - { - super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_TIME_MILLIS, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>(), new NamingThreadFactory(poolName)); - } - - /** - * Creates a new <tt>NamingThreadPoolExecutor</tt> with the given initial parameters. - * - * @param poolName the default name for new threads - * @param corePoolSize the number of threads to keep in the pool, even if they are idle. - * @param maximumPoolSize the maximum number of threads to allow in the pool. - * @param keepAliveTimeMillis when the number of threads is greater than the core, this is the - * maximum time in milliseconds that excess idle threads will wait for new tasks - * before terminating. - * @throws IllegalArgumentException if corePoolSize, or keepAliveTime less than zero, or if - * maximumPoolSize less than or equal to zero, or if corePoolSize greater than - * maximumPoolSize. - */ - public NamingThreadPoolExecutor(String poolName, int corePoolSize, int maximumPoolSize, - long keepAliveTimeMillis) - { - super(corePoolSize, maximumPoolSize, keepAliveTimeMillis, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>(), new NamingThreadFactory(poolName)); - } - - /** - * Creates a new <tt>NamingThreadPoolExecutor</tt> with the given initial parameters. - * - * @param poolName the default name for new threads - * @param corePoolSize the number of threads to keep in the pool, even if they are idle. - * @param maximumPoolSize the maximum number of threads to allow in the pool. - * @throws IllegalArgumentException if corePoolSize less than zero, or if maximumPoolSize less - * than or equal to zero, or if corePoolSize greater than maximumPoolSize. - */ - public NamingThreadPoolExecutor(String poolName, int corePoolSize, int maximumPoolSize) - { - super(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE_TIME_MILLIS, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>(), new NamingThreadFactory(poolName)); + new SynchronousQueue<Runnable>(), new NamingThreadFactory(poolName)); } /** @@ -201,6 +155,91 @@ public class NamingThreadPoolExecutor extends ThreadPoolExecutor super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } + /** + * Sets the thread factory of this pool executor to daemon creation mode. + * <p> + * This method is supposed to be used in chaining mode, i.e. + * + * <pre> + * final ExecutorService executor = new NamingThreadPoolExecutor("name").daemonize(); + * </pre> + * + * @return This class itself. + */ + public NamingThreadPoolExecutor daemonize() + { + getThreadFactory().setCreateDaemonThreads(true); + return this; + } + + /** + * Same as {@link #setCorePoolSize(int)}, but returns the object itself for chaining. + */ + public NamingThreadPoolExecutor corePoolSize(int corePoolSize) + { + setCorePoolSize(corePoolSize); + return this; + } + + /** + * Same as {@link #setMaximumPoolSize(int)}, but returns the object itself for chaining. + */ + public NamingThreadPoolExecutor maximumPoolSize(int maximumPoolSize) + { + setMaximumPoolSize(maximumPoolSize); + return this; + } + + /** + * Same as {@link #setKeepAliveTime(long, TimeUnit)}, but uses always + * {@link TimeUnit#MILLISECONDS} and returns the object itself for chaining. + */ + public NamingThreadPoolExecutor keepAliveTime(long keepAliveTimeMillis) + { + setKeepAliveTime(keepAliveTimeMillis, TimeUnit.MILLISECONDS); + return this; + } + + /** + * If <var>addPoolName</var> is <code>true</code>, the threads will contain the pool name as + * the first part of the thread names. + */ + public NamingThreadPoolExecutor addPoolName(boolean addPoolName) + { + getThreadFactory().setAddPoolName(addPoolName); + return this; + } + + @Override + public NamingThreadFactory getThreadFactory() + { + return (NamingThreadFactory) super.getThreadFactory(); + } + + /** + * Sets the thread factory of this pool executor. + */ + public void setThreadFactory(NamingThreadFactory threadFactory) + { + super.setThreadFactory(threadFactory); + } + + /** + * @deprecated Use {@link #setThreadFactory(NamingThreadFactory)} instead! + */ + @Override + @Deprecated + public void setThreadFactory(ThreadFactory threadFactory) + { + if (threadFactory instanceof NamingThreadFactory == false) + { + throw new IllegalArgumentException("thread factory is of type '" + + threadFactory.getClass().getCanonicalName() + ", but needs to be of type " + + NamingThreadFactory.class.getCanonicalName()); + } + super.setThreadFactory(threadFactory); + } + @Override protected void beforeExecute(Thread t, Runnable r) { diff --git a/common/source/java/ch/systemsx/cisd/common/concurrent/PoolNameThread.java b/common/source/java/ch/systemsx/cisd/common/concurrent/PoolNameThread.java index 96ba890677a..3601019a2dc 100644 --- a/common/source/java/ch/systemsx/cisd/common/concurrent/PoolNameThread.java +++ b/common/source/java/ch/systemsx/cisd/common/concurrent/PoolNameThread.java @@ -25,19 +25,28 @@ public class PoolNameThread extends Thread { private final String poolName; - public PoolNameThread(Runnable target, String poolName) + private final boolean addPoolName; + + public PoolNameThread(Runnable target, String poolName, boolean addPoolName) { super(target, poolName); this.poolName = poolName; + this.addPoolName = addPoolName; } /** - * Sets the thread's name to be a combination of the name of the thread in the pool and the name - * of the runnable, separated by '::'. + * Sets the thread's name to the <var>runnableName</var>, possibly adding the pool name if + * <var>addPoolName</var> has been set to <code>true</code> in the constructor. */ public void setRunnableName(String runnableName) { - setName(poolName + "::" + runnableName); + if (addPoolName) + { + setName(poolName + "::" + runnableName); + } else + { + setName(runnableName); + } } /** Clears the name of the runnable, setting the name of the thread to the pool name. */ diff --git a/common/source/java/ch/systemsx/cisd/common/process/ProcessExecutionHelper.java b/common/source/java/ch/systemsx/cisd/common/process/ProcessExecutionHelper.java index e301399d508..e24dd6b468a 100644 --- a/common/source/java/ch/systemsx/cisd/common/process/ProcessExecutionHelper.java +++ b/common/source/java/ch/systemsx/cisd/common/process/ProcessExecutionHelper.java @@ -74,7 +74,8 @@ public final class ProcessExecutionHelper private static final long SHORT_TIMEOUT = 100; /** The executor service handling the threads that OS processes are spawned in. */ - private static final ExecutorService executor = new NamingThreadPoolExecutor("osproc", 10); + private static final ExecutorService executor = + new NamingThreadPoolExecutor("osproc").corePoolSize(10).daemonize(); /** The counter to draw the <var>processNumber</var> from. */ private static final AtomicInteger processCounter = new AtomicInteger(); diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilitiesTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilitiesTest.java index 0470de2f4eb..38c1f753892 100644 --- a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilitiesTest.java +++ b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/ConcurrencyUtilitiesTest.java @@ -51,7 +51,8 @@ public class ConcurrencyUtilitiesTest public void testTryGetFutureOK() { final String valueProvided = "This is the execution return value"; - final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 2); + final ThreadPoolExecutor eservice = + new NamingThreadPoolExecutor(name).corePoolSize(1).maximumPoolSize(2); final Future<String> future = eservice.submit(new Callable<String>() { public String call() throws Exception @@ -68,7 +69,8 @@ public class ConcurrencyUtilitiesTest public void testGetExecutionResultOK() { final String valueProvided = "This is the execution return value"; - final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 2); + final ThreadPoolExecutor eservice = + new NamingThreadPoolExecutor(name).corePoolSize(1).maximumPoolSize(2); final Future<String> future = eservice.submit(new Callable<String>() { public String call() throws Exception @@ -86,7 +88,8 @@ public class ConcurrencyUtilitiesTest @Test(groups = "slow") public void testTryGetFutureTimeout() { - final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 2); + final ThreadPoolExecutor eservice = + new NamingThreadPoolExecutor(name).corePoolSize(1).maximumPoolSize(2); final Future<String> future = eservice.submit(new Callable<String>() { public String call() throws Exception @@ -109,7 +112,8 @@ public class ConcurrencyUtilitiesTest @Test(groups = "slow") public void testGetExecutionResultTimeout() { - final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 2); + final ThreadPoolExecutor eservice = + new NamingThreadPoolExecutor(name).corePoolSize(1).maximumPoolSize(2); final Future<String> future = eservice.submit(new Callable<String>() { public String call() throws Exception @@ -135,7 +139,8 @@ public class ConcurrencyUtilitiesTest @Test(groups = "slow") public void testGetExecutionResultTimeoutWithoutCancelation() { - final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 2); + final ThreadPoolExecutor eservice = + new NamingThreadPoolExecutor(name).corePoolSize(1).maximumPoolSize(2); final Future<String> future = eservice.submit(new Callable<String>() { public String call() throws Exception @@ -162,7 +167,8 @@ public class ConcurrencyUtilitiesTest @Test public void testTryGetFutureInterrupted() { - final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 2); + final ThreadPoolExecutor eservice = + new NamingThreadPoolExecutor(name).corePoolSize(1).maximumPoolSize(2); final Thread thread = Thread.currentThread(); final Future<String> future = eservice.submit(new Callable<String>() { @@ -194,10 +200,12 @@ public class ConcurrencyUtilitiesTest assertFalse(Thread.interrupted()); } - @Test(expectedExceptions = { StopException.class }) + @Test(expectedExceptions = + { StopException.class }) public void testTryGetFutureStop() { - final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 2); + final ThreadPoolExecutor eservice = + new NamingThreadPoolExecutor(name).corePoolSize(1).maximumPoolSize(2); final Thread thread = Thread.currentThread(); final Future<String> future = eservice.submit(new Callable<String>() { @@ -229,7 +237,8 @@ public class ConcurrencyUtilitiesTest @Test public void testGetExecutionResultInterrupted() { - final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 2); + final ThreadPoolExecutor eservice = + new NamingThreadPoolExecutor(name).corePoolSize(1).maximumPoolSize(2); final Thread thread = Thread.currentThread(); final Future<String> future = eservice.submit(new Callable<String>() { @@ -271,7 +280,8 @@ public class ConcurrencyUtilitiesTest @Test public void testGetExecutionResultException() { - final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 2); + final ThreadPoolExecutor eservice = + new NamingThreadPoolExecutor(name).corePoolSize(1).maximumPoolSize(2); final Future<String> future = eservice.submit(new Callable<String>() { public String call() throws Exception @@ -289,7 +299,8 @@ public class ConcurrencyUtilitiesTest @Test public void testTryGetFutureException() { - final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 2); + final ThreadPoolExecutor eservice = + new NamingThreadPoolExecutor(name).corePoolSize(1).maximumPoolSize(2); final Future<String> future = eservice.submit(new Callable<String>() { public String call() throws Exception diff --git a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/NamingThreadPoolExecutorTest.java b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/NamingThreadPoolExecutorTest.java index 3aa75557ea1..46213d98acd 100644 --- a/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/NamingThreadPoolExecutorTest.java +++ b/common/sourceTest/java/ch/systemsx/cisd/common/concurrent/NamingThreadPoolExecutorTest.java @@ -18,6 +18,7 @@ package ch.systemsx.cisd.common.concurrent; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertTrue; +import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.fail; import java.util.Collections; @@ -26,6 +27,7 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -36,7 +38,7 @@ import ch.systemsx.cisd.common.logging.LogInitializer; /** * Test cases for the {@link NamingThreadPoolExecutor}. - * + * * @author Bernd Rinn */ public class NamingThreadPoolExecutorTest @@ -53,7 +55,8 @@ public class NamingThreadPoolExecutorTest @Test public void testNamedPool() throws Throwable { - final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 2); + final ThreadPoolExecutor eservice = + new NamingThreadPoolExecutor(name).corePoolSize(1).maximumPoolSize(2); assertEquals(1, eservice.getCorePoolSize()); assertEquals(2, eservice.getMaximumPoolSize()); final Future<?> future = eservice.submit(new Runnable() @@ -72,11 +75,58 @@ public class NamingThreadPoolExecutorTest } } + @Test + public void testDaemonize() + { + final NamingThreadPoolExecutor eservice = + new NamingThreadPoolExecutor(name).corePoolSize(1).maximumPoolSize(2); + assertFalse(eservice.getThreadFactory().isCreateDaemonThreads()); + eservice.daemonize(); + assertTrue(eservice.getThreadFactory().isCreateDaemonThreads()); + } + + @Test + public void testSetNamedThreadFactory() + { + final NamingThreadPoolExecutor eservice = + new NamingThreadPoolExecutor(name).corePoolSize(1).maximumPoolSize(2); + final NamingThreadFactory factory = new NamingThreadFactory("name"); + eservice.setThreadFactory(factory); + assertEquals(factory, eservice.getThreadFactory()); + } + + @Test + public void testSetThreadFactory() + { + final ThreadPoolExecutor eservice = + new NamingThreadPoolExecutor(name).corePoolSize(1).maximumPoolSize(2); + final ThreadFactory factory = new NamingThreadFactory("name"); + eservice.setThreadFactory(factory); + assertEquals(factory, eservice.getThreadFactory()); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testSetThreadFactoryFailed() + { + final ThreadPoolExecutor eservice = + new NamingThreadPoolExecutor(name).corePoolSize(1).maximumPoolSize(2); + final ThreadFactory factory = new ThreadFactory() + { + public Thread newThread(Runnable r) + { + return null; // Doesn't matter, never used + } + }; + // It needs to be NamingThreadFactory, thus it will throw an IllegalArgumentException. + eservice.setThreadFactory(factory); + } + @Test(groups = "slow") public void testThreadDefaultNames() throws Throwable { final int max = 10; - final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, max, max); + final ThreadPoolExecutor eservice = + new NamingThreadPoolExecutor(name).corePoolSize(max).maximumPoolSize(max); assertEquals(max, eservice.getCorePoolSize()); assertEquals(max, eservice.getMaximumPoolSize()); final Set<String> expectedThreadNameSet = new HashSet<String>(); @@ -120,7 +170,8 @@ public class NamingThreadPoolExecutorTest public void testSubmitNamedRunnable() throws Throwable { final String runnableName = "This is the special runnable name"; - final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 1); + final ThreadPoolExecutor eservice = + new NamingThreadPoolExecutor(name).corePoolSize(1).maximumPoolSize(1); assertEquals(1, eservice.getCorePoolSize()); assertEquals(1, eservice.getMaximumPoolSize()); final Future<?> future = eservice.submit(new NamedRunnable() @@ -148,7 +199,8 @@ public class NamingThreadPoolExecutorTest public void testExecuteNamedRunnable() throws Throwable { final String runnableName = "This is the special runnable name"; - final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 1); + final ThreadPoolExecutor eservice = + new NamingThreadPoolExecutor(name).corePoolSize(1).maximumPoolSize(1); assertEquals(1, eservice.getCorePoolSize()); assertEquals(1, eservice.getMaximumPoolSize()); final Semaphore sem = new Semaphore(0); @@ -172,7 +224,8 @@ public class NamingThreadPoolExecutorTest public void testSubmitNamedCallable() throws Throwable { final String callableName = "This is the special callable name"; - final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 1); + final ThreadPoolExecutor eservice = + new NamingThreadPoolExecutor(name).corePoolSize(1).maximumPoolSize(1); assertEquals(1, eservice.getCorePoolSize()); assertEquals(1, eservice.getMaximumPoolSize()); final Future<?> future = eservice.submit(new NamedCallable<Object>() @@ -201,7 +254,8 @@ public class NamingThreadPoolExecutorTest public void testSubmitNamedCallables() throws Throwable { final String callableName1 = "This is the first special callable name"; - final ThreadPoolExecutor eservice = new NamingThreadPoolExecutor(name, 1, 1); + final ThreadPoolExecutor eservice = + new NamingThreadPoolExecutor(name).corePoolSize(1).maximumPoolSize(1); assertEquals(1, eservice.getCorePoolSize()); assertEquals(1, eservice.getMaximumPoolSize()); final Future<?> future1 = eservice.submit(new NamedCallable<Object>() @@ -226,25 +280,25 @@ public class NamingThreadPoolExecutorTest } final String callableName2 = "This is the second special callable name"; final Future<?> future2 = eservice.submit(new NamedCallable<Object>() + { + public Object call() throws Exception { - public Object call() throws Exception - { - assertEquals(name + "-T1::" + callableName2, Thread.currentThread().getName()); - return null; - } + assertEquals(name + "-T1::" + callableName2, Thread.currentThread().getName()); + return null; + } - public String getCallableName() - { - return callableName2; - } - }); - try - { - future2.get(200L, TimeUnit.MILLISECONDS); - } catch (ExecutionException ex) - { - throw ex.getCause(); - } + public String getCallableName() + { + return callableName2; + } + }); + try + { + future2.get(200L, TimeUnit.MILLISECONDS); + } catch (ExecutionException ex) + { + throw ex.getCause(); + } } } -- GitLab