深入Java线程池实现源码


Java线程池是使用频率很高的开源框架。也是在面试中常被问到的组件。它的实现源码在J.U.C包下,本人也经常使用线程池,简单方便。大多是浮于表面的一些API的调用,对于框架实现中具体做了哪些事情,却是知之甚少。本文将从源码角度,深入了聊一聊Java线程池的源码实现中到底做了什么事情。Java线程池的源码编写者也是大名鼎鼎的 Doug Lea 。这老爷子对Java并发编程的贡献相当大,说他是这个世界上对Java影响力最大的一个人,一点也不为过。因为两次Java历史上的大变革,他都间接或直接的扮演了举足轻重的角色。2004年所推出的Tiger。Tiger广纳了15项JSRs(Java Specification Requests)的语法及标准,其中一项便是JSR-166。JSR-166是来自于Doug编写的util.concurrent包。所以看J.U.C包下的源码,也是理解老爷子对于并发编程的思维方式的最好材料。

为什么使用线程池

在Java开发过程中,总是不可避免的要使用各种池,比如线程池,连接池,缓存池等等。那么既然可以使用原生的方式实现功能,为什么还要使用池呢?其实虽然池管理的对象不一样,但原因大同小异。对于线程池来说,线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,合理的使用线程池对线程进行统一分配、调优和监控,有以下好处:

  1. 降低资源消耗。对于任何在池中的对象而言,肯定是使用频率比较高,而且创建的代价比较昂贵。如果频繁的被创建和销毁,就会过多的消耗系统资源,降低了服务的性能。
  2. 提高响应速度。正因为创建的代价高,比较消耗资源,因此在需要对象的时候直接去线程池中获取就行了,不需要等待创建线程,提高了响应速度
  3. 提高线程的可管理性。连接池可帮我们做一些公共的事情,比如想让线程被阻塞时间过长后返回,想知道当前线程的状态。回收一些长期空闲的线程等

在Java1.5中引入的Executor框架把任务的提交和执行进行解耦,只需要定义好任务,然后提交给线程池,而不用关心该任务是如何执行、被哪个线程执行,以及什么时候执行。在深入源码之前先来看看Executor框架线程池的类图:

它们的最顶层是一个Executor接口,它只有一个方法:

public interface Executor {
    void execute(Runnable command);
}

线程池状态

ExecutorService扩展了Executor,添加了操控线程池生命周期的方法,如shutDown(),shutDownNow()等,以及扩展了可异步跟踪执行任务生成返回值Future的方法,如submit()等方法。ThreadPoolExecutor继承自AbstractExecutorService,同时实现了ExecutorService接口,也是Executor框架默认的线程池实现类,也是这篇文章重点分析的对象,一般我们使用线程池,如没有特殊要求,直接创建ThreadPoolExecutor,初始化一个线程池,如果需要特殊的线程池,则直接继承ThreadPoolExecutor,并实现特定的功能,如ScheduledThreadPoolExecutor,它是一个具有定时执行任务的线程池。Executor框架中创建线程池非常简单,使用线程池提交线程也很简单。代码如下

// 创建线程池
ExecutorService executeService = Executors.newCachedThreadPool();

// 提交线程
executeService.submit(thread);
executeService.execute(thread);
executeService.invokeAll(threads);

submit,execute,invokeAll都可以提交线程。其中invokeAll可以批量提交线程,并提供了返回值。而这三个方法最终都会执行顶层接口中的execute方法。因此直接看execute方法的实现。该方法的实现在ThreadPoolExecutor类中,在看该方法的实现之前需要先了解一些线程池的是属性信息,因为在提交线程到线程池时,都围绕着这些属性的改变。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

从代码中可以看到,线程池一共有状态5种状态,分别是:

  1. RUNNING:线程池初始化时默认的状态,表示线程池正处于运行状态,能够接受新提交的任务,同时也能够处理阻塞队列中的任务
  2. SHUTDOWN:调用shutdown()方法会使线程池进入到该状态,该状态下不再继续接受新提交的任务,但是还会处理阻塞队列中的任务
  3. STOP:调用shutdownNow()方法会使线程池进入到该状态,该状态下不再继续接受新提交的任务,同时不再处理阻塞队列中的任务
  4. TIDYING:如果线程池中workerCount=0,即有效线程数量为0时,会进入该状态
  5. TERMINATED:在terminated()方法执行完后进入该状态,只不过terminated()方法需要我们自行实现。

这些状态均由int型表示,大小关系为 RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED,这个顺序基本上也是遵循线程池从 运行 到 终止这个过程。此外下面还有三个方法用于获取线程池当前的状态,有效线程数等。其中CAPACITY经过(1 << COUNT_BITS) - 1运算后得到的是一个高3位是0,低29位是1的整数。

  1. runStateOf(int c): CAPACITY取反码,即高3位是1,低29位是0。c & ~CAPACITY用于获取高3位保存的线程池状态。
  2. workerCountOf(int c):运算 c & CAPACITY,用于获取c变量低29位的值,也就是获取线程池中的有效线程数。
  3. ctlOf(int rs, int wc):参数rs表示runState,参数wc表示workerCount,即根据runState和workerCount打包合并成ctl。

仔细思考的朋友肯定会有疑问,我刚看的时候也非常困惑。Doug Lea大神为什么要用一个int变量的高3位表示线程池状态,低29位表示线程池当前的有效线程数,为什么不使用两个变量分别表示。有人会觉得是为了节省了存储空间,但是这里很显然不是为了节省空间而设计的,两个变量也就多4个字节而已。为了这4个字节而去大费周章地设计一通,不但麻烦而且增加了代码复杂度。从上面关于线程状态的解释可以看出,线程状态的转变很大程度上决定于线程池中的有效线程数,在多线程运行环境下,线程池中的线程数量和线程池状态往往是动态改变的,并且运行状态和有效线程数量需要保证统一,不能出现一个改而另一个没有改的情况。因此把他们放到一个变量中,用一个变量表示两个值,那么在修改的时候只需要修改一次,就统一了两个值的修改,同时该值又是AtomicInteger类型,保证了多线程下修改它的值是一个原子操作。试想一下,如果使用两个值会是什么情况,首先要保证两个值各自的修改是原子操作,因此必须将两个变量设置为原子类AtomicInteger类型,同时又得保证两个值的修改是统一的同步的,所以得把两个值的修改操作放到一个方法中并使用synchronize关键字修饰该方法保证同步,这样做就能实现源码中用一个类表示两个值的最终效果,但是性能消耗就大大增加了,因为synchronize是一把重量级锁,虽然JDK对它做了优化,但是在竞争激烈的情况下synchronize还是会升级为重量级锁,当然也可以使用Lock锁,但肯定也不会比一个int类型数据的修改性能消耗低。分析完这一波,是不是更加佩服老爷子的神操作了。

线程池运行过程

execute() – 提交任务

excute方法是提交任务到线程池的核心方法,下面是ThreadPoolExecutor类中关于该方法的源码。源码中的注释也很好的解释了提交任务的逻辑

/**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     * 在要执行给定的任务时。这个任务会用一个新线程执行,或者用线程池中的一个线程执行
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     * 如果任务无法被提交执行,要么是因为这个Executor已经被shutdown关闭,
     * 要么是已经达到其容量上限,任务会被当前的RejectedExecutionHandler处理
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         * 如果运行的线程少于corePoolSize,尝试开启一个新线程去运行command,
         * command作为这个线程的第一个任务
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         * 如果任务被成功放入队列,我们仍需要一个双重校验去确认是否应该新建一个线程
         *(因为可能存在有些线程在我们上次检查后死了) 或者 从我们进入这个方法后,pool被关闭了
         * 所以我们需要再次检查state,如果线程池停止了需要回滚入队列,如果池中没有线程了,新开启 一个线程
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         * 如果无法将任务入队列(可能队列满了),需要添加一个新线程(线程池扩容:往maxPoolSize发展)
         * 如果失败了,说明线程池shutdown 或者 饱和了,所以我们拒绝任务
         */
        int c = ctl.get();

       /**
       * 1、如果当前线程数少于corePoolSize,则添加任务
       */
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
         /**
         * 没有成功addWorker(),再次获取c(凡是需要再次用ctl做判断时,都会再次调用ctl.get())
         * 失败的原因可能是:
         * 1、线程池已经shutdown,shutdown的线程池不再接收新任务
         * 2、workerCountOf(c) < corePoolSize 判断后,由于并发,别的线程先创建了worker线程,导致workerCount>=corePoolSize
         */
            c = ctl.get();
        }

        /**
        * 2、如果线程池RUNNING状态,且入队列成功
        * offer方法是往队列中添加任务,队列已满无法添加,返回false
        */
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次获取ctl的值
            int recheck = ctl.get();

         /**
         * 再次校验放入workerQueue中的任务是否能被执行
         * 1、如果线程池不是运行状态了,应该拒绝添加新任务,从workQueue中删除任务
         * 2、如果线程池是运行状态,或者从workQueue中删除任务失败(刚好有一个线程执行完毕,并消耗了这个任务),确保还有线程执行任务(只要有一个就够了)
         */
            if (! isRunning(recheck) && remove(command))
                reject(command);
         /**
          * 如果当前worker数量为0,通过addWorker(null, false)创建一个线程,其任务为null
          * 为什么只检查运行的worker数量是否等于0而不和corePoolSize比较呢?
          * 因为只保证有一个worker线程可以从queue中获取任务执行就行了
          * 因为只要还有活动的worker线程,就可以消费workerQueue中的任务
          */
            else if (workerCountOf(recheck) == 0)
                //第一个参数为null,说明只为新建一个worker线程,没有指定firstTask
                //第二个参数为true代表占用corePoolSize,false占用maxPoolSize
                addWorker(null, false);
        }
        /**
        * 3、如果线程池不是running状态 或者 无法入队列
        *   尝试开启新线程,扩容至maxPoolSize,如果addWork(command, false)失败了,拒绝当前command
        */
        else if (!addWorker(command, false))
            reject(command);
    }

单从execute()方法来说,这个方法就干了一件事情,就是添加任务。添加任务分为三种情况,分别对应代码中的三个大 if 条件。下面来说说每种情况的具体条件

参数: command <执行任务的线程,不能为空>

执行流程:

1 . 添加任务到线程池(coreSizePool): 第一个 if 判断,当前线程池中的线程数小于corePoolSize,则通过addWorker方法创建新线程,并添加到线程池中。如果添加成功则返回,如果添加失败继续后续步骤。核心线程池中添加失败的原因可能有:

  1. 线程池已经被其他线程shutdown,shutdown的线程池不再接收新任务。
  2. workerCountOf(c) < corePoolSize 判断通过后,由于并发,别的线程先创建了worker线程,导致workerCount>=corePoolSize

2 . 添加任务到队列(BlockingQueue): 第二个 if 判断,当往核心线程池中添加任务失败,说明线程池中的有效线程池已经等于设置的核心线程数大小。这时继续被添加的任务只能添加到阻塞队列中,等待核心线程池中有线程空闲到队列中取任务执行。往队列中添加任务失败的原因可能有:

  1. 如果线程池已经不是running状态了,应该拒绝添加新任务,从workQueue中删除刚刚添加的任务任务。
  2. 如果线程池是运行状态,或者从workQueue中删除任务失败(刚好核心线程池中有一个线程执行完毕,并消耗了刚添加到队列的这个任务),确保还有线程执行任务(只要有一个就够了)

3 . 添加任务到线程池(maxSizePool): 第三个 if 判断,当核心线程池中添加任务失败,任务队列中添加任务也失败时。会将线程池的线程数扩容到设置的最大线程数。新添加的任务会通过addWorker方法创建新线程,并添加到线程池中,这部分新创建的线程是一些临时工。当任务执行完后会被回收,使线程池的大小最终维持在coreSize。

execute方法的大致流程就分析完了,下面是它的执行流程图。

addWorker() – 添加worker线程

此方法的作用是添加任务到线程池,并运行任务。该方法的源码如下

 /**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     * 检查根据当前线程池的状态和给定的边界(core or maximum)是否可以创建一个新的worker
     * 如果是这样的话,worker的数量做相应的调整,如果可能的话,创建一个新的worker并启动,
     * 参数中的firstTask作为worker的第一个任务
     * 如果方法返回false,可能因为pool已经关闭或者调用过了shutdown
     * 如果线程工厂创建线程失败,也会失败,返回false
     * 如果线程创建失败,要么是因为线程工厂返回null,要么是发生了OutOfMemoryError
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
         //外层循环,负责判断线程池状态
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            //内层循环,负责对worker数量的+1
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                 //调用unsafe CAS操作,使得worker数量+1,成功则跳出retry循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
                // else CAS失败时因为workerCount改变了,继续内层循环尝试CAS对worker数量+1
            }
        }

        //worker数量+1成功的后续操作,添加到workers Set集合,并启动worker线程
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //如果往HashSet中添加worker成功,启动线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

当线程池是可添加任务状态的时候,都会执行到addWorker方法。这个方法的功能简单理解就是创建任务线程并执行任务,具体流程逻辑如下

参数: firstTask < worker线程的初始任务,可以为null >,core < true:将corePoolSize作为上限,false:将maximumPoolSize作为上限 >

执行流程:

1 . 判断线程池状态: 判断线程池当前是否为可以添加worker线程的状态,可以则继续下一步,不可以返回 false:

  1. 线程池状态>shutdown,可能为stop、tidying、terminated,不能添加worker线程
  2. 线程池状态==shutdown,firstTask不为空,不能添加worker线程,因为shutdown状态的线程池不接收新任务
  3. 线程池状态==shutdown,firstTask==null,workQueue为空,不能添加worker线程,因为firstTask为空是为了添加一个没有任务的线程再从workQueue获取task,而workQueue为空,说明添加无任务线程已经没有意义

2 . 判断线程池边界: 线程池当前线程数量是否超过上限(corePoolSize 或 maximumPoolSize),如果超过了返回false,没超过则对workerCount+1,继续下一步

3 . 创建并执行任务: 在ReentrantLock保证下(在操作work set集合largestPoolSize时需要上锁,一个线程池只有一把可重入锁),向线程池中添加新创建的worker实例,并启动worker线程,如果都成功了,返回 true,如果向线程池添加worker实例失败或任务线程启动失败,调用addWorkerFailed()逻辑。

方法参数: addWorker有两个参数,对应于4种传参方式。作用分别如下

在execute方法中就使用了前3种
addWorker(command, true) 线程数小于corePoolSize时,放一个需要处理的task进Workers Set。
如果Workers Set长度超过corePoolSize,就返回false
addWorker(command, false) 当队列被放满时,就将这个新来的task直接放入Workers Set。
此时Workers Set的长度限制是maximumPoolSize。如果线程池也满了的话就返回false
addWorker(null, false) 放入一个空的task进workers Set,此时长度限制是maximumPoolSize。
这样一个task为空的worker在线程执行的时候会去任务队列里拿任务,
这样就相当于创建了一个新的线程,只是没有马上分配任务
addWorker(null, true) 此方法就是放一个null的task进Workers Set,而且是在小于corePoolSize时,
如果此时Set中的数量已经达到corePoolSize那就返回false,什么也不干。
实际使用中是在prestartAllCoreThreads()方法,
此方法用来为线程池预先启动corePoolSize个worker等待从队列中获取任务执行

下面是addWorker方法的流程图

内部类Worker

Worker类本身既实现了Runnable,又继承了AbstractQueuedSynchronizer(以下简称AQS),所以其既是一个可执行的任务,又可以达到锁的效果。前面分析到提交任务时,线程池会把提交的线程封装成Worker实例。任务提交后通过Worker类中的t.start()运行线程。刚看到这里时,就有一个疑问了。执行线程的start方法是直接运行线程了。那么是它怎么保证线程池中的核心线程池数不变的。一切的奥秘就隐藏在Worker类中的run方法了。需要注意的是,执行t.start方法并不会直接运行提交线程的run()方法中的代码,而会运行Worker类中的run方法。因为在Worker实例创建时,通过工程创建了线程,而创建时又将自身作为参数传了进去。下面是内部类Worker类的源码实现

/**
     * Class Worker mainly maintains interrupt control state for
     * threads running tasks, along with other minor bookkeeping.
     * This class opportunistically extends AbstractQueuedSynchronizer
     * to simplify acquiring and releasing a lock surrounding each
     * task execution.  This protects against interrupts that are
     * intended to wake up a worker thread waiting for a task from
     * instead interrupting a task being run.  We implement a simple
     * non-reentrant mutual exclusion lock rather than use
     * ReentrantLock because we do not want worker tasks to be able to
     * reacquire the lock when they invoke pool control methods like
     * setCorePoolSize.  Additionally, to suppress interrupts until
     * the thread actually starts running tasks, we initialize lock
     * state to a negative value, and clear it upon start (in
     * runWorker).
     * Worker类大体上管理着运行线程的中断状态 和 一些指标。
     * 此类继承了AQS吗,又实现了Runnable,说明其既是一个可运行的任务,也是一把锁(不可重入)
     */
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
             //设置AQS的同步状态private volatile int state,是一个计数器,大于0代表锁已经被获取
             // 在运行runWorker方法之前,不可被中断
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

为什么要把传进来的线程再封装成一个Worker类,为什么不直接执行execute(command)提交的command呢。主要是为了控制中断,用AQS锁,当运行时上锁,就不能中断,TreadPoolExecutor的shutdown()方法中断前都要获取worker锁,只有在等待从workQueue中获取任务getTask()时才能中断,worker实现了一个简单的不可重入的互斥锁,而不是用ReentrantLock可重入锁。

new Worker():

  1. 将AQS的state置为-1,在runWoker()前不允许中断
  2. 待执行的任务会以参数传入,并赋予firstTask
  3. 用Worker这个Runnable创建Thread

之所以Worker自己实现Runnable,并创建Thread,在firstTask外包一层,是因为要通过Worker控制中断,而firstTask这个工作任务只是负责执行业务

Worker控制中断的几方面:

  1. 初始AQS状态为-1,此时不允许中断interrupt(),只有在worker线程启动了,执行了runWoker(),将state置为0,才能中断,其中不允许中断体现在:(a) . shutdown()线程池时,会对每个worker tryLock()上锁,而Worker类这个AQS的tryAcquire()方法是固定将state从0->1,故初始状态state==-1时tryLock()失败,没发interrupt()。(b) . shutdownNow()线程池时,不用tryLock()上锁,但调用worker.interruptIfStarted()终止worker,interruptIfStarted()也有state>0才能interrupt的逻辑
  2. 为了防止某种情况下,在运行中的worker被中断,runWorker()每次运行任务时都会lock()上锁,而shutdown()这类可能会终止worker的操作需要先获取worker的锁,这样就防止了中断正在运行的线程

Worker和firstTask的区别:Worker是线程池中的线程,而Task虽然是runnable,但是并没有真正执行,只是被Worker调用了run方法,后面会看到这部分的实现。这也是线程池能保证核心线程数的原因(因为线程池并不是直接创建我们提交给线程池任务的线程,而是自己创建线程,用它创建的线程调用我们提交的线程的run()方法。)

runWorker() – 执行任务

下面是Worker类中run()方法中调用的runWorker()方法

/**
     * Main worker run loop.  Repeatedly gets tasks from queue and
     * executes them, while coping with a number of issues:
     *
     * 1. We may start out with an initial task, in which case we
     * don't need to get the first one. Otherwise, as long as pool is
     * running, we get tasks from getTask. If it returns null then the
     * worker exits due to changed pool state or configuration
     * parameters.  Other exits result from exception throws in
     * external code, in which case completedAbruptly holds, which
     * usually leads processWorkerExit to replace this thread.
     * 
     * 1.我们可能使用一个初始化任务开始,即firstTask为null
     * 然后只要线程池在运行,我们就从getTask()获取任务
     * 如果getTask()返回null,则worker由于改变了线程池状态或参数配置而退出
     * 其它退出因为外部代码抛异常了,这会使得completedAbruptly为true,
     * 这会导致在processWorkerExit()方法中替换当前线程
     *
     * 2. Before running any task, the lock is acquired to prevent
     * other pool interrupts while the task is executing, and then we
     * ensure that unless pool is stopping, this thread does not have
     * its interrupt set.
     *
     * 2. 在任何任务执行之前,都需要对worker加锁去防止在任务运行时,其它任务执行时中断,
     * 此外还需确保线程没有中断标记,除非线程池已经停了 
     *
     * 3. Each task run is preceded by a call to beforeExecute, which
     * might throw an exception, in which case we cause thread to die
     * (breaking loop with completedAbruptly true) without processing
     * the task.
     * 
     * 3. 每个任务执行前会调用beforeExecute(),其中可能抛出一个异常,这种情况下会导致线程死亡
     *(跳出循环,且completedAbruptly==true),没有执行任务
     * 因为beforeExecute()的异常没有cache住,会上抛,跳出循环
     *
     * 4. Assuming beforeExecute completes normally, we run the task,
     * gathering any of its thrown exceptions to send to afterExecute.
     * We separately handle RuntimeException, Error (both of which the
     * specs guarantee that we trap) and arbitrary Throwables.
     * Because we cannot rethrow Throwables within Runnable.run, we
     * wrap them within Errors on the way out (to the thread's
     * UncaughtExceptionHandler).  Any thrown exception also
     * conservatively causes thread to die.
     * 
     * 4. 假定beforeExecute()正常完成,我们执行任务
     * 汇总任何抛出的异常并发送给afterExecute(task, thrown)
     * 因为我们不能在Runnable.run()方法中重新上抛Throwables,
     * 我们将Throwables包装到Errors上抛(会到线程的UncaughtExceptionHandler去处理)
     * 任何上抛的异常都会导致线程死亡
     *
     * 5. After task.run completes, we call afterExecute, which may
     * also throw an exception, which will also cause thread to
     * die. According to JLS Sec 14.20, this exception is the one that
     * will be in effect even if task.run throws.
     *
     * 5 .任务执行结束后,调用afterExecute(),也可能抛异常,也会导致线程die
     * 根据JLS Sec 14.20,这个异常(finally中的异常)会生效
     *
     * The net effect of the exception mechanics is that afterExecute
     * and the thread's UncaughtExceptionHandler have as accurate
     * information as we can provide about any problems encountered by
     * user code.
     *
     * @param w the worker
     */
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt

                // 确保只有在线程stoping时,才会被设置中断标示,否则清除中断标示
                // 1、如果线程池状态>=stop,且当前线程没有设置中断状态,wt.interrupt()
                // 2、如果一开始判断线程池状态<stop,但Thread.interrupted()为true,即线程已经被中断,又清除了中断标示,再次判断线程池状态是否>=stop
                // 是,再次设置中断标示,wt.interrupt()
                //  否,不做操作,清除中断标示后进行后续步骤
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

runWorker(Worker w)执行流程:

  1. 执行任务之前,首先worker.unlock(),将AQS的state置为0,允许中断当前worker线程
  2. 开始执行firstTask,调用task.run(),在执行任务前会上锁wroker.lock(),在执行完任务后会解锁,为了防止在任务运行时被线程池一些中断操作中断
  3. 在任务执行前后,可以根据业务场景自定义beforeExecute() 和 afterExecute()方法
  4. 无论在beforeExecute()、task.run()、afterExecute()发生异常上抛,都会导致worker线程终止,进入processWorkerExit()处理worker退出的流程
  5. 如正常执行完当前task后,会通过getTask()从阻塞队列中获取新任务,当队列中没有任务,且获取任务超时,那么当前worker也会进入退出流程

getTask() – 获取任务

这个方法也是线程池的很重要的方法,它定义了任务从线程池获取线程的具体逻辑。也是保证线程池中核心线程数量的关键方法。当有效线程数小于corePoolSize时,有阻塞的从队列中获取任务。源码如下

/**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait, and if the queue is
     *    non-empty, this worker is not the last thread in the pool.
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

             /**
             * 如果线程数量大于maximumPoolSize(可能maximumPoolSize被修改了)
             * 要么既需要计时timed==true,也超时了timedOut==true
             * worker数量-1,减一执行一次就行了,然后返回null,在runWorker()中会有逻辑减少worker线程
             * 如果本次减一失败,继续内层循环再次尝试减一
             */
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    // 大于corePoolSize
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :   
                    // 小于等于corePoolSize
                    workQueue.take();

                // 如果获取到了任务就返回
                if (r != null)
                    return r;
                // 没有返回,说明超时,那么在下一次内层循环时会进入worker count减一的步骤
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

执行流程:

1、首先判断是否可以满足从workQueue中获取任务的条件,不满足return null
​ A、线程池状态是否满足:
​ (1)shutdown状态 + workQueue为空 或 stop状态,都不满足,因为被shutdown后还是要执行workQueue剩余的任务,但workQueue也为空,就可以退出了
​ (2)stop状态,shutdownNow()操作会使线程池进入stop,此时不接受新任务,中断正在执行的任务,workQueue中的任务也不执行了,故return null返回
​ B、线程数量是否超过maximumPoolSize 或 获取任务是否超时
​ (1)线程数量超过maximumPoolSize可能是线程池在运行时被调用了setMaximumPoolSize()被改变了大小,否则已经addWorker()成功不会超过maximumPoolSize
​ (2)如果 当前线程数量>corePoolSize,才会检查是否获取任务超时,这也体现了当线程数量达到maximumPoolSize后,如果一直没有新任务,会逐渐终止worker线程直到corePoolSize
2、如果满足获取任务条件,根据是否需要定时获取调用不同方法:
​ A、workQueue.poll():如果在keepAliveTime时间内,阻塞队列还是没有任务,返回null
​ B、workQueue.take():如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务
3、在阻塞从workQueue中获取任务时,可以被interrupt()中断,代码中捕获了InterruptedException,重置timedOut为初始值false,再次执行第1步中的判断,满足就继续获取任务,不满足return null,会进入worker退出的流程


Author: 顺坚
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint polocy. If reproduced, please indicate source 顺坚 !
评论
 Previous
聊聊对ThreadLocal的理解 聊聊对ThreadLocal的理解
JDK 1.2的版本中就提供java.lang.ThreadLocal,ThreadLocal为解决多线程程序的并发问题提供了一种新的思路。使用这个工具类可以很简洁地编写出优美的多线程程序。也是面试中出现频率比较高的知识点。ThreadLo
2019-06-22
Next 
Java实现Mysql增量同步 Java实现Mysql增量同步
最近公司有个基于Mysql做增量数据同步的需求需要我要完成。源端是两个不同业务系统数据库的两张表,需要把这两张表的数据字段做一些过滤和处理,然后增量同步到本地服务的数据库中。由于数据量不大,源端两个表都是几十万的数据,因此当时首先想到的就是
2019-06-08
  TOC