之前去面试被问及到线程池的细节,各种被虐。今天将这个短板补上来.

一般使用线程池都会使用Excutors,但是其内部实现是ThreadPoolExecutor。所以要理解多线程还是需要直接看ThreadPoolExecutor源码的。

目录

ThreadPoolExecutor

构造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  1. corePoolSize: 线程池中核心线程数量,必须大于等于0
  2. maximumPoolSize: 线程池最大容量大小,必须大于0(如果也等于0的话,那这个线程池就无法创建线程了,没啥意义了)
  3. maximumPoolSize必须大于或者等于corePoolSize
  4. keepAliveTime: 当线程数大于核心数时,这是多余空闲线程在终止之前等待新任务的最长时间。必须大于等于0
  5. workQueue: 承载任务的队列
  6. handler: 线程拒绝(抛弃)策略(这个参数,我后面专门写篇博客来聊)

属性

要弄明白ThreadPoolExecutor需要先弄清楚它的关键属性,以及状态的变化

    //ctl保存当前线程数量和当前线程池状态。其中高3位位状态,低29位位数量。
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //偏移量
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //最大线程容量2^29-1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    //线程池状态如下:
    //接受新任务并处理排队的任务。 111
    private static final int RUNNING    = -1 << COUNT_BITS;
    //不再接收新任务,但是处理排队的任务。 000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //不接受新任务,不处理排队的任务,并中断正在进行的任务. 001
    private static final int STOP       =  1 << COUNT_BITS;
    //所有任务都已终止,workerCount为零,转换为状态TIDYING的线程将运行terminate()钩子方法。  010
    private static final int TIDYING    =  2 << COUNT_BITS;
    //terminated()执行完毕. 011
    private static final int TERMINATED =  3 << COUNT_BITS;
    //获取线程池状态,取前3位
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //获取当前正在工作的worker,主要是取后面29位
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //获取ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    private final ReentrantLock mainLock = new ReentrantLock();
    //存放池中的所有工作线程。 只有在持有mainLock的情况下才能访问。
    private final HashSet<Worker> workers = new HashSet<Worker>();

可以看出当状态值大于等于0时都不在接受新任务

状态的主要变化: 1. RUNNING -> SHUTDOWN: 在调用shutdown()时发生该变化,可能会隐式地在finalize()中调用。

  1. (RUNNING or SHUTDOWN) -> STOP: 当调用shutdownNow()时。

  2. SHUTDOWN -> TIDYING: 当队列和池都是空的时

  3. STOP -> TIDYING: 当池中没有线程时.

  4. TIDYING -> TERMINATED: 当terminated()钩子方法被调用时.

execute()

我们先看execute方法(Executors.submit()方法的作用是创建一个FutureTask,然后调用execute()):

execute()方法用来执行给定的任务,该任务可以在新的线程池中或者现有的线程池中执行.如果任务无法提交执行,无论是因为该执行程序已关闭或因其容量已达到,该任务将由当前的RejectedExecutionHandler处理。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
        * Proceed in 3 steps:
        *
        * 1. If fewer than corePoolSize threads are running, try to
        * start a new thread with the given command as its first
        * task.  The call to addWorker atomically checks runState and
        * workerCount, and so prevents false alarms that would add
        * threads when it shouldn't, by returning false.
        * 如果正在运行的线程数少于corePoolSize,则将给定的command作为第一个任务来启动一个新线程(worker).
        * addWorker()方法以原子方式检查runState(线程池状态)和workerCount(worker数量),从而防止线程池在不能添加线程的情况下添加线程。
        *
        * 2. If a task can be successfully queued, then we still need
        * to double-check whether we should have added a thread
        * (because existing ones died since last checking) or that
        * the pool shut down since entry into this method. So we
        * recheck state and if necessary roll back the enqueuing if
        * stopped, or start a new thread if there are none.
        * 如果线程入队成功,还需要进行double-check确认是否可以添加新线程(因为现有线程在上一次检查之后已经died)或者进入该方法后线程池是否关闭了该线程.
        * 如果有必要的话就回滚队列,否则就启动一个新的线程。
        *
        * 3. If we cannot queue task, then we try to add a new
        * thread.  If it fails, we know we are shut down or saturated
        * and so reject the task.
        * 如果我们不能排队任务,那么我们尝试添加一个新线程。 如果添加失败,那么池已关闭或饱和,然后会拒绝任务。
        */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        //如果woker已经添加并且启动成功了直接返回
        
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //温馨提示:请先看addWorker方法,再回来看下来逻辑

    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

这里需要提些概念,方便对下面内容的理解:
Woker: 当前线程池中的线程.
Task: 实现了Runnable,但是并非是线程,会被Worker调用run()方法
maximumPoolSize: 线程池最多能创建多少个worker
corePoolSize: 线程池的核心线程数量.当线程池满了时,并且workerQueue队列也满了时, 还可以创建Worker去处理WorkerQueue队列中的task,但是不能超过maximumPoolSize。超过corePoolSize之外的线程会在空闲超时后终止。

if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}else if (!addWorker(command, false))
    reject(command);

这一段的意思是:

  1. 如果前面第一次addWorker失败,然后线程池还是running状态,并且任务还可以添加到workQueue中成功,则再次校验线程池状态,如果非running状态并且将任务从workQueue中删除成功的话,直接拒绝策略。如果可运行线程为0的话,则添加firstTask为null的worker用于执行队列中的任务.

  2. 如果线程池状态为非running状态,则直接丢弃任务.

  3. 如果线程池为running状态,但是workQueue已满,则尝试基于maximumPoolSize大小添加worker.用该worker优先执行不在队列中的任务,然后再从任务队列中获取任务

  4. 如果线程池为running状态,但是workQueue已满,而且线程池的大小已经大于等于maximumPoolSize,则执行拒绝策略

由于workQueue是否可以添加任务影响execute方法的逻辑,所以ThreadPoolExecutor的实际运行状况跟workQueue的具体实现(是否是有界队列)有关

addWorker(Runnable firstTask, boolean core)

/**
    * Checks if a new worker can be added with respect to current
    * pool state and the given bound (either core or maximum). If so,
    * the worker count is adjusted accordingly, and, if possible, a
    * new worker is created and started, running firstTask as its
    * first task. This method returns false if the pool is stopped or
    * eligible to shut down. It also returns false if the thread
    * factory fails to create a thread when asked.  If the thread
    * creation fails, either due to the thread factory returning
    * null, or due to an exception (typically OutOfMemoryError in
    * Thread.start()), we roll back cleanly.
    * 根据线程池状态和给定的界限(核心或最大)检查是否可以添加新的worker.
    * 如果可以添加的话,则调整相应的worker数量.如果可以的话,创建并启动新的woker,
    * 并将firstTask作为其第一个任务. 如果线程池关闭或者状态为shutdown,则返回false.
    * 如果线程工厂在被调用后未能创建新线程,该方法返回false.如果创建线程失败,或者由于线程工厂
    * 返回Null,或者由于异常(一般是Thread.start()中的OutOfMemoryError),该方法会干净的回滚队列
    *
    * @param firstTask the task the new thread should run first (or
    * null if none). Workers are created with an initial first task
    * (in method execute()) to bypass queuing when there are fewer
    * than corePoolSize threads (in which case we always start one),
    * or when the queue is full (in which case we must bypass queue).
    * Initially idle threads are usually created via
    * prestartCoreThread or to replace other dying workers.
    * 新线程中的任务应该先运行(如果没有,则为null).当少于corePoolSize线程数(
    * 在这种情况下,我们总是启动一个线程)或者队列已满时(这种情况下,我们必须绕过队列),创建
    * 第一次任务的woker(在方法execute()方法中)以绕过队列.原来空闲的线程通常通过prestartCoreThread创建,
    * 或者是替换其他垂死的worker.
    *
    * @param core 如果为true则使用corePoolSize来绑定,否则使用maximumPoolSize.
    * @return true if successful
    */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);//线程状态

        // Check if queue empty only if necessary.
        // 这里要结合上文的状态来看,
        // 当线程池的状态大于等于SHUTDOWN时拒绝添加worker
        // 当线程池状态等于SHUTDOWN时,并且workQueue为empty时也不允许添加firstTask为null的worker
        // 注: 当线程池状态为SHUTDOWN,并且workQueue已满时是可以添加firstTask为null的worker,
        // 这么做是为了让worker消费workQueue队列中的任务
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);//worker数量
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // 如果由于workerCount更改CAS失败; 重试内部循环
            // 重试时要再次校验线程池状态和workerCount,因为他们都可能已经发生了变化
        }
    }

    //workerCount修改成功之后就要做添加woker操作了
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            //在要对队列以及状态修改时加锁(因为workers是hashSet,非线程安全)
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                
                //当线程池状态为RUNNING状态时或者状态为SHUTDOWN并且firstTask为null时(shutdown状态不接受新任务,但是任务为null时还是可以添加worker)
                //可以添加worker
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 预先检查t是否可以启动
                    // 如果一个线程已经启动并且还没有死亡,那么这个线程是活着(alive)的
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {//提供添加成功了就启动woker线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //1. 如果调用的线程工厂(在new Worker(firstTask)中调用)创建新的线程失败
        //2. 添加woker启动失败
        //3. 发生异常
        //以上3种情况发生的话进入addWorkerFailed(w)逻辑
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;//返回woker是否启动成功的标记
}

addWorkerFailed(Worker w)

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);
        decrementWorkerCount();
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

这里主要是回滚队列和还原workerCount.并调用tryTerminate().

Worker

我们先看Worker类的接口定义

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    /**
    * Creates with given first task and thread from ThreadFactory.
    * @param firstTask the first task (null if none)
    */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    //省略.....
}

由此可见Worker继续了AQS并且实现了Runnable接口,所以Worker本身包含着任务信息(firstTask)以及线程(thread)

run()

由于Worker实现了Runnable接口,所以Worker类会实现run()方法,而addWorker(Runnable firstTask, boolean core)方法中的t.start()就会调用到Worker类的run方法。

/** Delegates main run loop to outer runWorker  */
public void run() {
    runWorker(this);
}

总结: 在用submit()提交任务之后,会对线程池状态和当前线程数进行判断,如果当前的worker数量超过了corePoolSize并且任务队列已满则使用maximumPoolSize再次尝试添加worker,如果满足条件就会创建一个Worker,该Worker包含一个线程和一个第一任务信息(firstTask).添加成功后会将Woker加入现有工作线程池workers中,并且会立即start该worker的线程.而start方法会首先执行该Worker的firstTask(下篇博文会讲到)。所以如果addWork成功的话任务会被第一时间执行。如果添加失败的话会将任务添加到workQueue

本博文先讲到这,任务是如何执行的,如何回收的,我们下个博文见