ThreadPoolExecutor 和 ThreadPoolTaskExecutor
您目前处于:技术核心竞争力  2014-01-11

1. ThreadPoolExecutor

Spring 中的 ThreadPoolTaskExecutor 是借助于 JDK 并发包中的 java.util.concurrent.ThreadPoolExecutor 来实现的,下面先学习下 ThreadPoolExecutor 中的相关信息,ThreadPoolExecutor 构造函数如下:

public ThreadPoolExecutor(int corePoolSize,
	                      int maximumPoolSize,
	                      long keepAliveTime,
	                      TimeUnit unit,
	                      BlockingQueue<Runnable> workQueue,
	                      ThreadFactory threadFactory,
	                      RejectedExecutionHandler handler) {
}

参数具体意义:

int corePoolSize:线程池维护线程的最小数量
int maximumPoolSize:线程池维护线程的最大数量
long keepAliveTime:空闲线程的存活时间
TimeUnit unit:时间单位,现有纳秒、微秒、毫秒、秒枚举值
BlockingQueue<Runnable> workQueue:持有等待执行的任务队列
RejectedExecutionHandler handler:用来拒绝一个任务的执行,有两种情况会发生这种情况
  一是在 execute 方法中若 addIfUnderMaximumPoolSize(command) 为 false,即线程池已经饱和
  二是在 execute 方法中, 发现 runState!=RUNNING || poolSize == 0,即已经 shutdown,就调用 ensureQueuedTaskHandled(Runnable command),在该方法中有可能调用 reject
Reject策略预定义有四种:
  (1) ThreadPoolExecutor.AbortPolicy 策略,是默认的策略,处理程序遭到拒绝将抛出运行时 RejectedExecutionException
  (2) ThreadPoolExecutor.CallerRunsPolicy 策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃
  (3) ThreadPoolExecutor.DiscardPolicy 策略,不能执行的任务将被丢弃
  (4) ThreadPoolExecutor.DiscardOldestPolicy 策略,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)

ThreadPoolExecutor 源码分析:

public class ThreadPoolExecutor extends AbstractExecutorService {
    ……
}
public abstract class AbstractExecutorService implements ExecutorService {
    ……
}
public interface ExecutorService extends Executor {
    ……
}

2. Spring 中 ThreadPoolTaskExecutor

直接调用

ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
//线程池所使用的缓冲队列
poolTaskExecutor.setQueueCapacity(200);
//线程池维护线程的最少数量
poolTaskExecutor.setCorePoolSize(5);
//线程池维护线程的最大数量
poolTaskExecutor.setMaxPoolSize(1000);
//线程池维护线程所允许的空闲时间
poolTaskExecutor.setKeepAliveSeconds(30000);
poolTaskExecutor.initialize();

Spring配置文件

<!-- 配置Spring线程池 -->
<bean id="threadPoolTaskExecutor" 
    class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="30"/>
    <property name="maxPoolSize" value="200"/>
    <property name="keepAliveSeconds" value="5"/>
    <property name="queueCapacity" value="1000"/>
    <property name="rejectedExecutionHandler">
        <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
    </property>
</bean>

ThreadPoolTaskExecutor 执行器的处理流程:

(1) 当线程池大小小于 corePoolSize 就新建线程,并处理请求
(2) 当线程池大小等于 corePoolSize,把请求放入 workQueue 中,池子里的空闲线程就去从 workQueue 中取任务并处理
(3) 当 workQueue 放不下新入的任务时,新建线程加入线程池,并处理请求,如果池子大小撑到了 maximumPoolSize 就用 RejectedExecutionHandler 来做拒绝处理
(4) 另外,当线程池的线程数大于 corePoolSize 的时候,多余的线程会等待 keepAliveTime 长的时间,如果无请求可处理就自行销毁

ThreadPoolTaskExecutor 源码分析:

public class ThreadPoolTaskExecutor extends CustomizableThreadFactory
                implements SchedulingTaskExecutor, Executor, 
                            BeanNameAware, InitializingBean, DisposableBean {
   ……
}

Spring的 ThreadPoolTaskExecutor 类最终是通过调用 Java 的 ThreadPoolExecutor 的 void execute(Runnable task) 方法或 Future submit(Runnable task) 方法执行任务的。

public class ThreadPoolTaskExecutor extends CustomizableThreadFactory
		implements SchedulingTaskExecutor, Executor, 
		            BeanNameAware, InitializingBean, DisposableBean {

	private ThreadPoolExecutor threadPoolExecutor;

        ……

	public void execute(Runnable task) {
		Executor executor = getThreadPoolExecutor();
		try {
			executor.execute(task);
		}
		catch (RejectedExecutionException ex) {
			throw new TaskRejectedException("Executor [" + executor 
			            + "] did not accept task: " + task, ex);
		}
	}

	public ThreadPoolExecutor getThreadPoolExecutor() 
	        throws IllegalStateException {
		Assert.state(this.threadPoolExecutor != null, 
		            "ThreadPoolTaskExecutor not initialized");
		return this.threadPoolExecutor;
	}
}

转载请并标注: “本文转载自 linkedkeeper.com ”  ©著作权归作者所有