ThreadPoolExecutor(二)
接着上文继续看ThreadPoolExecutor
runWorker(Worker w)
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
* 反复从队列中获取任务并执行它们,同时应对以下几个问题:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
* 1. Worker可能还是执行一个初始化的task(firstTask),但是有时也不需要获取该task(因为task可以为null),
* 只要poll在运行,就可以从getTask()获得任务.如果它返回null,则由于更改池状态或配置参数而导致worker退出。
* 其他退出是由外部代码中的异常抛出引起的,在这种情况下,completionAbruptly成立,
* 这通常会导致processWorkerExit替换该线程
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
* 2. 在运行任何任务之前,获取锁以防止任务执行期间出现其他池中断,然后确保除非池正在停止,则此线程不能设置其中断。
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
* 3. 每个任务运行前都会调用beforeExecute,这可能会引发异常,在这种情况下,
* 会将线程杀死(断开循环completeAbruptly为true),而不处理任务。
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
* 假设beforeExecute正常执行(没有抛出任何异常),接下来就会执行任务,并且会收集任何抛出来的异常发送到afterExecute
* 分别处理RuntimeException,Error和任意Throwables。因为我们不能在Runnable.run中重新引发Throwables,
* 所以会将它们封装在出错的路径中(到线程的UncaughtExceptionHandler)。 任何抛出的异常都会保守地导致线程死亡。
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
* task.run完成后,我们调用afterExecute,这也可能会引发异常,这也会导致线程死亡。
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
* 异常机制的最终效果是afterExecute和线程的UncaughtExceptionHandler拥有我们可以提供的有关用户代码遇到的任何问题的准确信息
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {//如果task为null,则从getTask()中获取任务
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果池停止,确保线程中断;如果没有,确保线程不中断。这需要在第二种情况下进行重新检查,以便在清除中断时处理shutdownNow竞争
// (STOP状态会终端线程)
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
beforeExecute方法是个钩子,在ThreadPoolExcutor中是空实现.它一般用于在Runnable执行之前记录日志或初始化ThreadLocals. afterExecute方法也是个钩子,在ThreadPoolExcutor中是空实现.一般用于在Runnable执行之后处理异常。 completedAbruptly表示循环是否正常结束(beforeExecute或者afterExecute的执行或者其他地方如果抛出异常了就是非正常结束)
在获取到任务task之后会调用其run方法(所以Task虽然是Runnable,但是并非是线程,它只是被worker中的线程调用了run方法)
如果任务队列中没有任务了就进入worker退出逻辑processWorkerExit:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
如果runWorker非正常结束则表示期间有异常发生,则需要将workerCount自减。然后记录完成任务数量,将worker从现有线程池workers中移除.并且尝试终止线程池。 如果线程池状态小于STOP.则执行以下步骤:
如果runWorker非正常执行完,那么会用新的worker替换旧的(addWorker(null,false)).
如果runWorker正常执行完,并且核心线程不允许超时,则如果当前线程池的workerCount小于corePoolSize时会重新addWorker,以保证线程池的大小一直是corePoolSize大小.
如果runWorker正常执行完,任务队列中还有任务,并且核心线程允许超时,则当前线程池最少保留一个worker.
如果runWorker正常执行完,任务队列中没有任务,并且核心线程允许超时,则线程池回收全部线程.
还有一个逻辑需要结合getTask()才能明白,即大于corePoolSize的线程会在超时之后退出,然后尝试添加新的firstTask为null的Worker线程。
getTask():
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 根据当前配置设置执行时阻塞或定时等待任务.如果满足一下情况,则worker必须退出返回null:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 有多于maximumPoolSize的worker
* 2. The pool is stopped.
* 线程池已停止
* 3. The pool is shutdown and the queue is empty.
* 线程池已经关闭,队列是空的
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
* 该worker等待任务超时,worker在等待超时会被终止(即代码: allowCoreThreadTimeOut || workerCount> corePoolSize)
* 在等待超时之前和之后,如果队列不为空,则该worker不会是线程池中最后一个线程。(即如果任务队列中有任务,并且该worker是线程池中的最后一个线程则不管该worker是否等待超时都不* 会终止它,因为要保证任务被执行)
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
// 这里要强调下allowCoreThreadTimeOut标记,该值为false的话,即便核心线程数为空闲也会保持活跃状态,为true的则核心线程使用keepAliveTime超时等待工作
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//线程池已经关闭,队列是空的
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//允许核心线程超时或者当前线程大小大于corePoolSize(利用maximumPoolSize添加的线程worker)则keepAliveTime之后会超时,否则会一直阻塞直到有新的任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
整个方法的逻辑集合重要代码片段
Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();
可以看出,该方法除了注释中的那几种情况return null之外或阻塞在poll或take()中,直到拿到worker线程。
总结: runWorker会首先执行firstTask,如果firstTask为null的会阻塞等待获取workQueue中的任务,然后调用任务的run方法(换句话说:worker本身的任务已经执行完毕,下一次就会执行队列中的任务).
线程池的主要流程到此就结束了。我们来总结一下线程池的主要特性:
- 如果线程池的大小小于等于corePoolSize时,会添加有firstTask的线程Worker,并且立即start该线程.该线程会首先执行firstTask任务,然后再从队列中获取任务.
- 如果线程池的大小等于corePoolSize时,任务会添加到任务队列workQueue中。
- 如果线程池的大小等于corePoolSize时,任务添加到任务队列workQueue中成功后,会再次校验线程池状态,如果状态不是running状态,会移除任务,并且任务成功移除,则执行拒绝策略
- 如果线程池的大小等于corePoolSize时,并且任务队列也满了时,会在小于等于maximumPoolSize的基础上添加worker,用于首先执行不再任务队列中的任务,然后再去任务队列中获取任务.
如果线程池的大小等于maximumPoolSize时,并且任务队列也满了时,会执行拒绝策略
获取任务时,如果允许核心线程超时,那么会在等待任务队列keepAlive时间之后超时
获取任务时,如果不允许核心线程超时,并且当前线程数小于等于corePoolSize的话,会永不终止的等待任务队列中的任务.
获取任务时,如果不允许核心线程超时,并且当前线程数大于corePoolSize的话,多出来(即大于corePoolSize)的线程会在等待任务队列keepAlive时间之后超时
进行线程退出时,如果任务执行中抛出了异常(等待超时就是一种异常),则将原来的worker删除重新添加一个没有firstTask的worker(该worker优先从任务队列中获取任务).
进行线程退出时,如果任务队列中的任务执行完了,并且不允许核心线程超时的话,线程池会保持corePoolThread大小。
进行线程退出时,如果任务队列中的任务执行完了,并且允许核心线程超时的话,线程池大小为0。
进行线程退出时,如果任务队列中还有任务,并且允许核心线程超时的话,线程池的大小1.
接下来我们继续看其他几个比较重要的方法.至于拒绝策略,我们下一篇中继续讲。
shutdown()
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
简单来说做了以下五步:
- 校验线程shutdown权限.
- 修改当前线程池的状态为SHUTDOWN
- 终止空闲(Idle)线程Worker
- 调用钩子onShutdown()
- 尝试终止线程池
其中interruptIdleWorkers(),tryTerminate()需要细聊下:
interruptIdleWorkers()
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {//如果可以获取到锁则说明该线程是空闲的
try {
t.interrupt();
} catch (SecurityException ignore) {
//如果抛出这种异常,说明无法修改该线程的状态,它会保持不中断
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
tryTerminate()
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
在对线程池做终止操作的时候回再一次判断线程池状态以及任务队列workQueue中的任务是否已全部执行完毕(如果任务是比较耗时的那很有可能还没执行完,注意,shutdown状态的线程池虽然不接受新任务,但是必须保证任务队列中的任务全部执行完毕的).
如果当前线程池中仍有线程,则终端一个空闲的线程.
如果线程池中没有线程了,状态也非running(并且小于TIDYING),并且任务队列也空了.那么就将线程池状态设置为TIDYING,并调用钩子terminated().执行完毕之后再将线程池状态设置为TERMINATED。