Java线程池是使用频率很高的开源框架。也是在面试中常被问到的组件。它的实现源码在J.U.C包下,本人也经常使用线程池,简单方便。大多是浮于表面的一些API的调用,对于框架实现中具体做了哪些事情,却是知之甚少。本文将从源码角度,深入了聊一聊Java线程池的源码实现中到底做了什么事情。Java线程池的源码编写者也是大名鼎鼎的 Doug Lea 。这老爷子对Java并发编程的贡献相当大,说他是这个世界上对Java影响力最大的一个人,一点也不为过。因为两次Java历史上的大变革,他都间接或直接的扮演了举足轻重的角色。2004年所推出的Tiger。Tiger广纳了15项JSRs(Java Specification Requests)的语法及标准,其中一项便是JSR-166。JSR-166是来自于Doug编写的util.concurrent包。所以看J.U.C包下的源码,也是理解老爷子对于并发编程的思维方式的最好材料。
为什么使用线程池
在Java开发过程中,总是不可避免的要使用各种池,比如线程池,连接池,缓存池等等。那么既然可以使用原生的方式实现功能,为什么还要使用池呢?其实虽然池管理的对象不一样,但原因大同小异。对于线程池来说,线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,合理的使用线程池对线程进行统一分配、调优和监控,有以下好处:
- 降低资源消耗。对于任何在池中的对象而言,肯定是使用频率比较高,而且创建的代价比较昂贵。如果频繁的被创建和销毁,就会过多的消耗系统资源,降低了服务的性能。
- 提高响应速度。正因为创建的代价高,比较消耗资源,因此在需要对象的时候直接去线程池中获取就行了,不需要等待创建线程,提高了响应速度
- 提高线程的可管理性。连接池可帮我们做一些公共的事情,比如想让线程被阻塞时间过长后返回,想知道当前线程的状态。回收一些长期空闲的线程等
在Java1.5中引入的Executor框架把任务的提交和执行进行解耦,只需要定义好任务,然后提交给线程池,而不用关心该任务是如何执行、被哪个线程执行,以及什么时候执行。在深入源码之前先来看看Executor框架线程池的类图:
它们的最顶层是一个Executor接口,它只有一个方法:
public interface Executor {
void execute(Runnable command);
}
线程池状态
ExecutorService扩展了Executor,添加了操控线程池生命周期的方法,如shutDown(),shutDownNow()等,以及扩展了可异步跟踪执行任务生成返回值Future的方法,如submit()等方法。ThreadPoolExecutor继承自AbstractExecutorService,同时实现了ExecutorService接口,也是Executor框架默认的线程池实现类,也是这篇文章重点分析的对象,一般我们使用线程池,如没有特殊要求,直接创建ThreadPoolExecutor,初始化一个线程池,如果需要特殊的线程池,则直接继承ThreadPoolExecutor,并实现特定的功能,如ScheduledThreadPoolExecutor,它是一个具有定时执行任务的线程池。Executor框架中创建线程池非常简单,使用线程池提交线程也很简单。代码如下
// 创建线程池
ExecutorService executeService = Executors.newCachedThreadPool();
// 提交线程
executeService.submit(thread);
executeService.execute(thread);
executeService.invokeAll(threads);
submit,execute,invokeAll都可以提交线程。其中invokeAll可以批量提交线程,并提供了返回值。而这三个方法最终都会执行顶层接口中的execute方法。因此直接看execute方法的实现。该方法的实现在ThreadPoolExecutor类中,在看该方法的实现之前需要先了解一些线程池的是属性信息,因为在提交线程到线程池时,都围绕着这些属性的改变。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
从代码中可以看到,线程池一共有状态5种状态,分别是:
- RUNNING:线程池初始化时默认的状态,表示线程池正处于运行状态,能够接受新提交的任务,同时也能够处理阻塞队列中的任务
- SHUTDOWN:调用shutdown()方法会使线程池进入到该状态,该状态下不再继续接受新提交的任务,但是还会处理阻塞队列中的任务
- STOP:调用shutdownNow()方法会使线程池进入到该状态,该状态下不再继续接受新提交的任务,同时不再处理阻塞队列中的任务
- TIDYING:如果线程池中workerCount=0,即有效线程数量为0时,会进入该状态
- TERMINATED:在terminated()方法执行完后进入该状态,只不过terminated()方法需要我们自行实现。
这些状态均由int型表示,大小关系为 RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED,这个顺序基本上也是遵循线程池从 运行 到 终止这个过程。此外下面还有三个方法用于获取线程池当前的状态,有效线程数等。其中CAPACITY经过(1 << COUNT_BITS) - 1运算后得到的是一个高3位是0,低29位是1的整数。
- runStateOf(int c): CAPACITY取反码,即高3位是1,低29位是0。c & ~CAPACITY用于获取高3位保存的线程池状态。
- workerCountOf(int c):运算 c & CAPACITY,用于获取c变量低29位的值,也就是获取线程池中的有效线程数。
- ctlOf(int rs, int wc):参数rs表示runState,参数wc表示workerCount,即根据runState和workerCount打包合并成ctl。
仔细思考的朋友肯定会有疑问,我刚看的时候也非常困惑。Doug Lea大神为什么要用一个int变量的高3位表示线程池状态,低29位表示线程池当前的有效线程数,为什么不使用两个变量分别表示。有人会觉得是为了节省了存储空间,但是这里很显然不是为了节省空间而设计的,两个变量也就多4个字节而已。为了这4个字节而去大费周章地设计一通,不但麻烦而且增加了代码复杂度。从上面关于线程状态的解释可以看出,线程状态的转变很大程度上决定于线程池中的有效线程数,在多线程运行环境下,线程池中的线程数量和线程池状态往往是动态改变的,并且运行状态和有效线程数量需要保证统一,不能出现一个改而另一个没有改的情况。因此把他们放到一个变量中,用一个变量表示两个值,那么在修改的时候只需要修改一次,就统一了两个值的修改,同时该值又是AtomicInteger类型,保证了多线程下修改它的值是一个原子操作。试想一下,如果使用两个值会是什么情况,首先要保证两个值各自的修改是原子操作,因此必须将两个变量设置为原子类AtomicInteger类型,同时又得保证两个值的修改是统一的同步的,所以得把两个值的修改操作放到一个方法中并使用synchronize关键字修饰该方法保证同步,这样做就能实现源码中用一个类表示两个值的最终效果,但是性能消耗就大大增加了,因为synchronize是一把重量级锁,虽然JDK对它做了优化,但是在竞争激烈的情况下synchronize还是会升级为重量级锁,当然也可以使用Lock锁,但肯定也不会比一个int类型数据的修改性能消耗低。分析完这一波,是不是更加佩服老爷子的神操作了。
线程池运行过程
execute() – 提交任务
excute方法是提交任务到线程池的核心方法,下面是ThreadPoolExecutor类中关于该方法的源码。源码中的注释也很好的解释了提交任务的逻辑
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
* 在要执行给定的任务时。这个任务会用一个新线程执行,或者用线程池中的一个线程执行
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
* 如果任务无法被提交执行,要么是因为这个Executor已经被shutdown关闭,
* 要么是已经达到其容量上限,任务会被当前的RejectedExecutionHandler处理
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 如果运行的线程少于corePoolSize,尝试开启一个新线程去运行command,
* command作为这个线程的第一个任务
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 如果任务被成功放入队列,我们仍需要一个双重校验去确认是否应该新建一个线程
*(因为可能存在有些线程在我们上次检查后死了) 或者 从我们进入这个方法后,pool被关闭了
* 所以我们需要再次检查state,如果线程池停止了需要回滚入队列,如果池中没有线程了,新开启 一个线程
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
* 如果无法将任务入队列(可能队列满了),需要添加一个新线程(线程池扩容:往maxPoolSize发展)
* 如果失败了,说明线程池shutdown 或者 饱和了,所以我们拒绝任务
*/
int c = ctl.get();
/**
* 1、如果当前线程数少于corePoolSize,则添加任务
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
/**
* 没有成功addWorker(),再次获取c(凡是需要再次用ctl做判断时,都会再次调用ctl.get())
* 失败的原因可能是:
* 1、线程池已经shutdown,shutdown的线程池不再接收新任务
* 2、workerCountOf(c) < corePoolSize 判断后,由于并发,别的线程先创建了worker线程,导致workerCount>=corePoolSize
*/
c = ctl.get();
}
/**
* 2、如果线程池RUNNING状态,且入队列成功
* offer方法是往队列中添加任务,队列已满无法添加,返回false
*/
if (isRunning(c) && workQueue.offer(command)) {
// 再次获取ctl的值
int recheck = ctl.get();
/**
* 再次校验放入workerQueue中的任务是否能被执行
* 1、如果线程池不是运行状态了,应该拒绝添加新任务,从workQueue中删除任务
* 2、如果线程池是运行状态,或者从workQueue中删除任务失败(刚好有一个线程执行完毕,并消耗了这个任务),确保还有线程执行任务(只要有一个就够了)
*/
if (! isRunning(recheck) && remove(command))
reject(command);
/**
* 如果当前worker数量为0,通过addWorker(null, false)创建一个线程,其任务为null
* 为什么只检查运行的worker数量是否等于0而不和corePoolSize比较呢?
* 因为只保证有一个worker线程可以从queue中获取任务执行就行了
* 因为只要还有活动的worker线程,就可以消费workerQueue中的任务
*/
else if (workerCountOf(recheck) == 0)
//第一个参数为null,说明只为新建一个worker线程,没有指定firstTask
//第二个参数为true代表占用corePoolSize,false占用maxPoolSize
addWorker(null, false);
}
/**
* 3、如果线程池不是running状态 或者 无法入队列
* 尝试开启新线程,扩容至maxPoolSize,如果addWork(command, false)失败了,拒绝当前command
*/
else if (!addWorker(command, false))
reject(command);
}
单从execute()方法来说,这个方法就干了一件事情,就是添加任务。添加任务分为三种情况,分别对应代码中的三个大 if 条件。下面来说说每种情况的具体条件
参数: command <执行任务的线程,不能为空>
执行流程:
1 . 添加任务到线程池(coreSizePool): 第一个 if 判断,当前线程池中的线程数小于corePoolSize,则通过addWorker方法创建新线程,并添加到线程池中。如果添加成功则返回,如果添加失败继续后续步骤。核心线程池中添加失败的原因可能有:
- 线程池已经被其他线程shutdown,shutdown的线程池不再接收新任务。
- workerCountOf(c) < corePoolSize 判断通过后,由于并发,别的线程先创建了worker线程,导致workerCount>=corePoolSize
2 . 添加任务到队列(BlockingQueue): 第二个 if 判断,当往核心线程池中添加任务失败,说明线程池中的有效线程池已经等于设置的核心线程数大小。这时继续被添加的任务只能添加到阻塞队列中,等待核心线程池中有线程空闲到队列中取任务执行。往队列中添加任务失败的原因可能有:
- 如果线程池已经不是running状态了,应该拒绝添加新任务,从workQueue中删除刚刚添加的任务任务。
- 如果线程池是运行状态,或者从workQueue中删除任务失败(刚好核心线程池中有一个线程执行完毕,并消耗了刚添加到队列的这个任务),确保还有线程执行任务(只要有一个就够了)
3 . 添加任务到线程池(maxSizePool): 第三个 if 判断,当核心线程池中添加任务失败,任务队列中添加任务也失败时。会将线程池的线程数扩容到设置的最大线程数。新添加的任务会通过addWorker方法创建新线程,并添加到线程池中,这部分新创建的线程是一些临时工。当任务执行完后会被回收,使线程池的大小最终维持在coreSize。
execute方法的大致流程就分析完了,下面是它的执行流程图。
addWorker() – 添加worker线程
此方法的作用是添加任务到线程池,并运行任务。该方法的源码如下
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
* 检查根据当前线程池的状态和给定的边界(core or maximum)是否可以创建一个新的worker
* 如果是这样的话,worker的数量做相应的调整,如果可能的话,创建一个新的worker并启动,
* 参数中的firstTask作为worker的第一个任务
* 如果方法返回false,可能因为pool已经关闭或者调用过了shutdown
* 如果线程工厂创建线程失败,也会失败,返回false
* 如果线程创建失败,要么是因为线程工厂返回null,要么是发生了OutOfMemoryError
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
//外层循环,负责判断线程池状态
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//内层循环,负责对worker数量的+1
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//调用unsafe CAS操作,使得worker数量+1,成功则跳出retry循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
// else CAS失败时因为workerCount改变了,继续内层循环尝试CAS对worker数量+1
}
}
//worker数量+1成功的后续操作,添加到workers Set集合,并启动worker线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//如果往HashSet中添加worker成功,启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
当线程池是可添加任务状态的时候,都会执行到addWorker方法。这个方法的功能简单理解就是创建任务线程并执行任务,具体流程逻辑如下
参数: firstTask < worker线程的初始任务,可以为null >,core < true:将corePoolSize作为上限,false:将maximumPoolSize作为上限 >
执行流程:
1 . 判断线程池状态: 判断线程池当前是否为可以添加worker线程的状态,可以则继续下一步,不可以返回 false:
- 线程池状态>shutdown,可能为stop、tidying、terminated,不能添加worker线程
- 线程池状态==shutdown,firstTask不为空,不能添加worker线程,因为shutdown状态的线程池不接收新任务
- 线程池状态==shutdown,firstTask==null,workQueue为空,不能添加worker线程,因为firstTask为空是为了添加一个没有任务的线程再从workQueue获取task,而workQueue为空,说明添加无任务线程已经没有意义
2 . 判断线程池边界: 线程池当前线程数量是否超过上限(corePoolSize 或 maximumPoolSize),如果超过了返回false,没超过则对workerCount+1,继续下一步
3 . 创建并执行任务: 在ReentrantLock保证下(在操作work set集合largestPoolSize时需要上锁,一个线程池只有一把可重入锁),向线程池中添加新创建的worker实例,并启动worker线程,如果都成功了,返回 true,如果向线程池添加worker实例失败或任务线程启动失败,调用addWorkerFailed()逻辑。
方法参数: addWorker有两个参数,对应于4种传参方式。作用分别如下
在execute方法中就使用了前3种 | |
---|---|
addWorker(command, true) | 线程数小于corePoolSize时,放一个需要处理的task进Workers Set。 如果Workers Set长度超过corePoolSize,就返回false |
addWorker(command, false) | 当队列被放满时,就将这个新来的task直接放入Workers Set。 此时Workers Set的长度限制是maximumPoolSize。如果线程池也满了的话就返回false |
addWorker(null, false) | 放入一个空的task进workers Set,此时长度限制是maximumPoolSize。 这样一个task为空的worker在线程执行的时候会去任务队列里拿任务, 这样就相当于创建了一个新的线程,只是没有马上分配任务 |
addWorker(null, true) | 此方法就是放一个null的task进Workers Set,而且是在小于corePoolSize时, 如果此时Set中的数量已经达到corePoolSize那就返回false,什么也不干。 实际使用中是在prestartAllCoreThreads()方法, 此方法用来为线程池预先启动corePoolSize个worker等待从队列中获取任务执行 |
下面是addWorker方法的流程图
内部类Worker
Worker类本身既实现了Runnable,又继承了AbstractQueuedSynchronizer(以下简称AQS),所以其既是一个可执行的任务,又可以达到锁的效果。前面分析到提交任务时,线程池会把提交的线程封装成Worker实例。任务提交后通过Worker类中的t.start()运行线程。刚看到这里时,就有一个疑问了。执行线程的start方法是直接运行线程了。那么是它怎么保证线程池中的核心线程池数不变的。一切的奥秘就隐藏在Worker类中的run方法了。需要注意的是,执行t.start方法并不会直接运行提交线程的run()方法中的代码,而会运行Worker类中的run方法。因为在Worker实例创建时,通过工程创建了线程,而创建时又将自身作为参数传了进去。下面是内部类Worker类的源码实现
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
* Worker类大体上管理着运行线程的中断状态 和 一些指标。
* 此类继承了AQS吗,又实现了Runnable,说明其既是一个可运行的任务,也是一把锁(不可重入)
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
//设置AQS的同步状态private volatile int state,是一个计数器,大于0代表锁已经被获取
// 在运行runWorker方法之前,不可被中断
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
为什么要把传进来的线程再封装成一个Worker类,为什么不直接执行execute(command)提交的command呢。主要是为了控制中断,用AQS锁,当运行时上锁,就不能中断,TreadPoolExecutor的shutdown()方法中断前都要获取worker锁,只有在等待从workQueue中获取任务getTask()时才能中断,worker实现了一个简单的不可重入的互斥锁,而不是用ReentrantLock可重入锁。
new Worker():
- 将AQS的state置为-1,在runWoker()前不允许中断
- 待执行的任务会以参数传入,并赋予firstTask
- 用Worker这个Runnable创建Thread
之所以Worker自己实现Runnable,并创建Thread,在firstTask外包一层,是因为要通过Worker控制中断,而firstTask这个工作任务只是负责执行业务
Worker控制中断的几方面:
- 初始AQS状态为-1,此时不允许中断interrupt(),只有在worker线程启动了,执行了runWoker(),将state置为0,才能中断,其中不允许中断体现在:(a) . shutdown()线程池时,会对每个worker tryLock()上锁,而Worker类这个AQS的tryAcquire()方法是固定将state从0->1,故初始状态state==-1时tryLock()失败,没发interrupt()。(b) . shutdownNow()线程池时,不用tryLock()上锁,但调用worker.interruptIfStarted()终止worker,interruptIfStarted()也有state>0才能interrupt的逻辑
- 为了防止某种情况下,在运行中的worker被中断,runWorker()每次运行任务时都会lock()上锁,而shutdown()这类可能会终止worker的操作需要先获取worker的锁,这样就防止了中断正在运行的线程
Worker和firstTask的区别:Worker是线程池中的线程,而Task虽然是runnable,但是并没有真正执行,只是被Worker调用了run方法,后面会看到这部分的实现。这也是线程池能保证核心线程数的原因(因为线程池并不是直接创建我们提交给线程池任务的线程,而是自己创建线程,用它创建的线程调用我们提交的线程的run()方法。)
runWorker() – 执行任务
下面是Worker类中run()方法中调用的runWorker()方法
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 1.我们可能使用一个初始化任务开始,即firstTask为null
* 然后只要线程池在运行,我们就从getTask()获取任务
* 如果getTask()返回null,则worker由于改变了线程池状态或参数配置而退出
* 其它退出因为外部代码抛异常了,这会使得completedAbruptly为true,
* 这会导致在processWorkerExit()方法中替换当前线程
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 2. 在任何任务执行之前,都需要对worker加锁去防止在任务运行时,其它任务执行时中断,
* 此外还需确保线程没有中断标记,除非线程池已经停了
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 3. 每个任务执行前会调用beforeExecute(),其中可能抛出一个异常,这种情况下会导致线程死亡
*(跳出循环,且completedAbruptly==true),没有执行任务
* 因为beforeExecute()的异常没有cache住,会上抛,跳出循环
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 4. 假定beforeExecute()正常完成,我们执行任务
* 汇总任何抛出的异常并发送给afterExecute(task, thrown)
* 因为我们不能在Runnable.run()方法中重新上抛Throwables,
* 我们将Throwables包装到Errors上抛(会到线程的UncaughtExceptionHandler去处理)
* 任何上抛的异常都会导致线程死亡
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* 5 .任务执行结束后,调用afterExecute(),也可能抛异常,也会导致线程die
* 根据JLS Sec 14.20,这个异常(finally中的异常)会生效
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 确保只有在线程stoping时,才会被设置中断标示,否则清除中断标示
// 1、如果线程池状态>=stop,且当前线程没有设置中断状态,wt.interrupt()
// 2、如果一开始判断线程池状态<stop,但Thread.interrupted()为true,即线程已经被中断,又清除了中断标示,再次判断线程池状态是否>=stop
// 是,再次设置中断标示,wt.interrupt()
// 否,不做操作,清除中断标示后进行后续步骤
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
runWorker(Worker w)执行流程:
- 执行任务之前,首先worker.unlock(),将AQS的state置为0,允许中断当前worker线程
- 开始执行firstTask,调用task.run(),在执行任务前会上锁wroker.lock(),在执行完任务后会解锁,为了防止在任务运行时被线程池一些中断操作中断
- 在任务执行前后,可以根据业务场景自定义beforeExecute() 和 afterExecute()方法
- 无论在beforeExecute()、task.run()、afterExecute()发生异常上抛,都会导致worker线程终止,进入processWorkerExit()处理worker退出的流程
- 如正常执行完当前task后,会通过getTask()从阻塞队列中获取新任务,当队列中没有任务,且获取任务超时,那么当前worker也会进入退出流程
getTask() – 获取任务
这个方法也是线程池的很重要的方法,它定义了任务从线程池获取线程的具体逻辑。也是保证线程池中核心线程数量的关键方法。当有效线程数小于corePoolSize时,有阻塞的从队列中获取任务。源码如下
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* 如果线程数量大于maximumPoolSize(可能maximumPoolSize被修改了)
* 要么既需要计时timed==true,也超时了timedOut==true
* worker数量-1,减一执行一次就行了,然后返回null,在runWorker()中会有逻辑减少worker线程
* 如果本次减一失败,继续内层循环再次尝试减一
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
// 大于corePoolSize
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 小于等于corePoolSize
workQueue.take();
// 如果获取到了任务就返回
if (r != null)
return r;
// 没有返回,说明超时,那么在下一次内层循环时会进入worker count减一的步骤
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
执行流程:
1、首先判断是否可以满足从workQueue中获取任务的条件,不满足return null
A、线程池状态是否满足:
(1)shutdown状态 + workQueue为空 或 stop状态,都不满足,因为被shutdown后还是要执行workQueue剩余的任务,但workQueue也为空,就可以退出了
(2)stop状态,shutdownNow()操作会使线程池进入stop,此时不接受新任务,中断正在执行的任务,workQueue中的任务也不执行了,故return null返回
B、线程数量是否超过maximumPoolSize 或 获取任务是否超时
(1)线程数量超过maximumPoolSize可能是线程池在运行时被调用了setMaximumPoolSize()被改变了大小,否则已经addWorker()成功不会超过maximumPoolSize
(2)如果 当前线程数量>corePoolSize,才会检查是否获取任务超时,这也体现了当线程数量达到maximumPoolSize后,如果一直没有新任务,会逐渐终止worker线程直到corePoolSize
2、如果满足获取任务条件,根据是否需要定时获取调用不同方法:
A、workQueue.poll():如果在keepAliveTime时间内,阻塞队列还是没有任务,返回null
B、workQueue.take():如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务
3、在阻塞从workQueue中获取任务时,可以被interrupt()中断,代码中捕获了InterruptedException,重置timedOut为初始值false,再次执行第1步中的判断,满足就继续获取任务,不满足return null,会进入worker退出的流程