Java线程池ThreadPoolExecutor源码浅析
一、UML图
二、创建线程池
2.1、Executors工厂方法
在ThreadPoolExecutor类的文档注释中有这么一句话:An ExecutorService that executes each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods.
也就是说,已经提供了通过Executors获取线程池的工厂方法,也就是常说的五种线程池:
Executors中的方法:
/**
* @since 1.5
* @author Doug Lea
*/
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1));
}
}
可以看到,这些方法最终都是实例化一个ThreadPoolExecutor对象。
2.2、ThreadPoolExecutor构造器
ThreadPoolExecutor类中一共提供了四个构造方法:
这几个方法只有入参不同而已,也就是说,我们可以根据所需要的自定义线程池配置,选择合适的方法;而前面三个最终都是调用第四个方法实例化ThreadPoolExecutor的对象。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
三、线程池构造参数
通过构造方法public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
我们可以知晓线程池可自定义的参数。
通过API可知,各参数意义如下:
3.1、corePoolSize—核心池容量
用于指定线程池中处于就绪状态的线程最小数,也就是说,当没有任务提交时,线程池中保持多少个工作线程。如果设置了工作线程的超时时间,则此值可能为0。注:当向线程池提交一个任务时,若线程池已创建的线程数小于corePoolSize,即便此时存在空闲线程,也会通过创建一个新线程来执行该任务,直到已创建的线程数大于或等于corePoolSize。
/**
* Core pool size is the minimum number of workers to keep alive
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
*/
private volatile int corePoolSize;
3.2、maximumPoolSize—最大池容量
用于指定线程池可容纳的最大工作线程数,线程池根据corePoolSize和maximumPoolSize来自动调节容量大小:
A ThreadPoolExecutor will automatically adjust the pool size according to the bounds set by corePoolSize and maximumPoolSize.
/**
* Maximum pool size. Note that the actual maximum is internally
* bounded by CAPACITY.
*/
private volatile int maximumPoolSize;
3.3、keepAliveTime—存活时间
指定空闲线程等待的超时时间(默认是纳秒)。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用。如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
/**
* Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
*/
private volatile long keepAliveTime;
3.4、TimeUnit—时间单位
指定keepAliveTime的单位,默认使用纳秒。TimeUnit是一个枚举,定义的值如下:
TimeUnit.NANOSECONDS; //纳秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.SECONDS; //秒
TimeUnit.MINUTES; //分钟
TimeUnit.HOURS; //小时
TimeUnit.DAYS; //天
public enum TimeUnit {
/**
* Time unit representing one thousandth of a microsecond
*/
NANOSECONDS {
public long toNanos(long d) { return d; }
public long toMicros(long d) { return d/(C1/C0); }
public long toMillis(long d) { return d/(C2/C0); }
public long toSeconds(long d) { return d/(C3/C0); }
public long toMinutes(long d) { return d/(C4/C0); }
public long toHours(long d) { return d/(C5/C0); }
public long toDays(long d) { return d/(C6/C0); }
public long convert(long d, TimeUnit u) { return u.toNanos(d); }
int excessNanos(long d, long m) { return (int)(d - (m*C2)); }
},
...
}
3.5、workQueue—工作队列
用于存储等待中的任务的队列。
/**
* The queue used for holding tasks and handing off to worker
* threads. We do not require that workQueue.poll() returning
* null necessarily means that workQueue.isEmpty(), so rely
* solely on isEmpty to see if the queue is empty (which we must
* do for example when deciding whether to transition from
* SHUTDOWN to TIDYING). This accommodates special-purpose
* queues such as DelayQueues for which poll() is allowed to
* return null even if it may later return non-null when delays
* expire.
*/
private final BlockingQueue<Runnable> workQueue;
而BlockingQueue的实现类如下:
而Executors工厂方法中使用的是LinkedBlockingQueue
、DelayedWorkQueue
和SynchronousQueue
。
3.6、ThreadFactory—线程工厂
用于创建线程。
/**
* Factory for new threads. All threads are created using this
* factory (via method addWorker). All callers must be prepared
* for addWorker to fail, which may reflect a system or user‘s
* policy limiting the number of threads. Even though it is not
* treated as an error, failure to create threads may result in
* new tasks being rejected or existing ones remaining stuck in
* the queue.
*
* We go further and preserve pool invariants even in the face of
* errors such as OutOfMemoryError, that might be thrown while
* trying to create threads. Such errors are rather common due to
* the need to allocate a native stack in Thread.start, and users
* will want to perform clean pool shutdown to clean up. There
* will likely be enough memory available for the cleanup code to
* complete without encountering yet another OutOfMemoryError.
*/
private volatile ThreadFactory threadFactory;
3.7、RejectedExecutionHandler—饱和策略
用于指定当线程池和工作队列都处于饱和状态时,新任务的处理策略。
/**
* Handler called when saturated or shutdown in execute.
*/
private volatile RejectedExecutionHandler handler;
一共有四种策略:
-
AbortPolicy:终止策略
通过抛出一个
RejectedExecutionException
异常而拒绝此任务。
**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {...}
-
CallerRunsPolicy:调用者执行策略
直接让调用线程执行此任务,若调用线程已经停止,则丢弃此任务。
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {...}
-
DiscardOldestPolicy:丢弃最旧任务策略
丢弃处于等待队列头部的任务,并尝试执行此任务。
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {...}
-
DiscardPolicy:丢弃策略
静默拒绝任务,不抛出异常。
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {...}
四、提交任务
向线程池提交任务主要有两种方式
execute(Runnable command)
submit(Callable<T> task)
、submit(Runnable task, T result)
、submit(Runnable task)
4.1、execute方法
传入一个Runnable对象,无返回值。无法知晓任务是否执行成功。
public class ThreadPoolExecutor extends AbstractExecutorService {
...
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
...
}
}
4.2、submit方法
传入一个Callable或Runnable对象,返回一个Future对象。通过返回的Future对象可判断让任务是否执行成功。可通过future的get方法来获取任务执行结果,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。
public interface ExecutorService extends Executor {
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
...
}
五、关闭线程池
ThreadPoolExecutor提供了两个关闭方法:shutdown()
和shutdownNow()
shutdown的原理是只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。shutdownNow的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。shutdownNow会首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。
只要调用了这两个关闭方法的其中一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于我们应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow。
public class ThreadPoolExecutor extends AbstractExecutorService {
...
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查终止权限
checkShutdownAccess();
//设置线程池状态为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查终止权限
checkShutdownAccess();
//设置线程池状态为STOP
advanceRunState(STOP);
//中断线程
interruptWorkers();
//获取待执行任务的列表
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
}