ThreadPoolExecutor(一)
之前去面试被问及到线程池的细节,各种被虐。今天将这个短板补上来.
一般使用线程池都会使用Excutors,但是其内部实现是ThreadPoolExecutor。所以要理解多线程还是需要直接看ThreadPoolExecutor源码的。
目录
ThreadPoolExecutor
构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize: 线程池中核心线程数量,必须大于等于0
- maximumPoolSize: 线程池最大容量大小,必须大于0(如果也等于0的话,那这个线程池就无法创建线程了,没啥意义了)
- maximumPoolSize必须大于或者等于corePoolSize
- keepAliveTime: 当线程数大于核心数时,这是多余空闲线程在终止之前等待新任务的最长时间。必须大于等于0
- workQueue: 承载任务的队列
- handler: 线程拒绝(抛弃)策略(这个参数,我后面专门写篇博客来聊)
属性
要弄明白ThreadPoolExecutor需要先弄清楚它的关键属性,以及状态的变化
//ctl保存当前线程数量和当前线程池状态。其中高3位位状态,低29位位数量。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//偏移量
private static final int COUNT_BITS = Integer.SIZE - 3;
//最大线程容量2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
//线程池状态如下:
//接受新任务并处理排队的任务。 111
private static final int RUNNING = -1 << COUNT_BITS;
//不再接收新任务,但是处理排队的任务。 000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//不接受新任务,不处理排队的任务,并中断正在进行的任务. 001
private static final int STOP = 1 << COUNT_BITS;
//所有任务都已终止,workerCount为零,转换为状态TIDYING的线程将运行terminate()钩子方法。 010
private static final int TIDYING = 2 << COUNT_BITS;
//terminated()执行完毕. 011
private static final int TERMINATED = 3 << COUNT_BITS;
//获取线程池状态,取前3位
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取当前正在工作的worker,主要是取后面29位
private static int workerCountOf(int c) { return c & CAPACITY; }
//获取ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
private final ReentrantLock mainLock = new ReentrantLock();
//存放池中的所有工作线程。 只有在持有mainLock的情况下才能访问。
private final HashSet<Worker> workers = new HashSet<Worker>();
可以看出当状态值大于等于0时都不在接受新任务
状态的主要变化: 1. RUNNING -> SHUTDOWN: 在调用shutdown()时发生该变化,可能会隐式地在finalize()中调用。
(RUNNING or SHUTDOWN) -> STOP: 当调用shutdownNow()时。
SHUTDOWN -> TIDYING: 当队列和池都是空的时
STOP -> TIDYING: 当池中没有线程时.
TIDYING -> TERMINATED: 当terminated()钩子方法被调用时.
execute()
我们先看execute方法(Executors.submit()方法的作用是创建一个FutureTask,然后调用execute()):
execute()方法用来执行给定的任务,该任务可以在新的线程池中或者现有的线程池中执行.如果任务无法提交执行,无论是因为该执行程序已关闭或因其容量已达到,该任务将由当前的RejectedExecutionHandler处理。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 如果正在运行的线程数少于corePoolSize,则将给定的command作为第一个任务来启动一个新线程(worker).
* addWorker()方法以原子方式检查runState(线程池状态)和workerCount(worker数量),从而防止线程池在不能添加线程的情况下添加线程。
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 如果线程入队成功,还需要进行double-check确认是否可以添加新线程(因为现有线程在上一次检查之后已经died)或者进入该方法后线程池是否关闭了该线程.
* 如果有必要的话就回滚队列,否则就启动一个新的线程。
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
* 如果我们不能排队任务,那么我们尝试添加一个新线程。 如果添加失败,那么池已关闭或饱和,然后会拒绝任务。
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//如果woker已经添加并且启动成功了直接返回
if (addWorker(command, true))
return;
c = ctl.get();
}
//温馨提示:请先看addWorker方法,再回来看下来逻辑
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
这里需要提些概念,方便对下面内容的理解: Woker: 当前线程池中的线程. Task: 实现了Runnable,但是并非是线程,会被Worker调用run()方法 maximumPoolSize: 线程池最多能创建多少个worker corePoolSize: 线程池的核心线程数量.当线程池满了时,并且workerQueue队列也满了时, 还可以创建Worker去处理WorkerQueue队列中的task,但是不能超过maximumPoolSize。超过corePoolSize之外的线程会在空闲超时后终止。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}else if (!addWorker(command, false))
reject(command);
这一段的意思是:
如果前面第一次addWorker失败,然后线程池还是running状态,并且任务还可以添加到workQueue中成功,则再次校验线程池状态,如果非running状态并且将任务从workQueue中删除成功的话,直接拒绝策略。如果可运行线程为0的话,则添加firstTask为null的worker用于执行队列中的任务.
如果线程池状态为非running状态,则直接丢弃任务.
如果线程池为running状态,但是workQueue已满,则尝试基于maximumPoolSize大小添加worker.用该worker优先执行不在队列中的任务,然后再从任务队列中获取任务
如果线程池为running状态,但是workQueue已满,而且线程池的大小已经大于等于maximumPoolSize,则执行拒绝策略
由于workQueue是否可以添加任务影响execute方法的逻辑,所以ThreadPoolExecutor的实际运行状况跟workQueue的具体实现(是否是有界队列)有关
addWorker(Runnable firstTask, boolean core)
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
* 根据线程池状态和给定的界限(核心或最大)检查是否可以添加新的worker.
* 如果可以添加的话,则调整相应的worker数量.如果可以的话,创建并启动新的woker,
* 并将firstTask作为其第一个任务. 如果线程池关闭或者状态为shutdown,则返回false.
* 如果线程工厂在被调用后未能创建新线程,该方法返回false.如果创建线程失败,或者由于线程工厂
* 返回Null,或者由于异常(一般是Thread.start()中的OutOfMemoryError),该方法会干净的回滚队列
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
* 新线程中的任务应该先运行(如果没有,则为null).当少于corePoolSize线程数(
* 在这种情况下,我们总是启动一个线程)或者队列已满时(这种情况下,我们必须绕过队列),创建
* 第一次任务的woker(在方法execute()方法中)以绕过队列.原来空闲的线程通常通过prestartCoreThread创建,
* 或者是替换其他垂死的worker.
*
* @param core 如果为true则使用corePoolSize来绑定,否则使用maximumPoolSize.
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);//线程状态
// Check if queue empty only if necessary.
// 这里要结合上文的状态来看,
// 当线程池的状态大于等于SHUTDOWN时拒绝添加worker
// 当线程池状态等于SHUTDOWN时,并且workQueue为empty时也不允许添加firstTask为null的worker
// 注: 当线程池状态为SHUTDOWN,并且workQueue已满时是可以添加firstTask为null的worker,
// 这么做是为了让worker消费workQueue队列中的任务
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);//worker数量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// 如果由于workerCount更改CAS失败; 重试内部循环
// 重试时要再次校验线程池状态和workerCount,因为他们都可能已经发生了变化
}
}
//workerCount修改成功之后就要做添加woker操作了
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//在要对队列以及状态修改时加锁(因为workers是hashSet,非线程安全)
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//当线程池状态为RUNNING状态时或者状态为SHUTDOWN并且firstTask为null时(shutdown状态不接受新任务,但是任务为null时还是可以添加worker)
//可以添加worker
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 预先检查t是否可以启动
// 如果一个线程已经启动并且还没有死亡,那么这个线程是活着(alive)的
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {//提供添加成功了就启动woker线程
t.start();
workerStarted = true;
}
}
} finally {
//1. 如果调用的线程工厂(在new Worker(firstTask)中调用)创建新的线程失败
//2. 添加woker启动失败
//3. 发生异常
//以上3种情况发生的话进入addWorkerFailed(w)逻辑
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;//返回woker是否启动成功的标记
}
addWorkerFailed(Worker w)
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
这里主要是回滚队列和还原workerCount.并调用tryTerminate().
Worker
我们先看Worker类的接口定义
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
//省略.....
}
由此可见Worker继续了AQS并且实现了Runnable接口,所以Worker本身包含着任务信息(firstTask)以及线程(thread)
run()
由于Worker实现了Runnable接口,所以Worker类会实现run()方法,而addWorker(Runnable firstTask, boolean core)方法中的t.start()就会调用到Worker类的run方法。
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
总结: 在用submit()提交任务之后,会对线程池状态和当前线程数进行判断,如果当前的worker数量超过了corePoolSize并且任务队列已满则使用maximumPoolSize再次尝试添加worker,如果满足条件就会创建一个Worker,该Worker包含一个线程和一个第一任务信息(firstTask).添加成功后会将Woker加入现有工作线程池workers中,并且会立即start该worker的线程.而start方法会首先执行该Worker的firstTask(下篇博文会讲到)。所以如果addWork成功的话任务会被第一时间执行。如果添加失败的话会将任务添加到workQueue
本博文先讲到这,任务是如何执行的,如何回收的,我们下个博文见