线程池
线程池
ThreadPool源码学习
zodiac ·2020-12-10 ·20 次阅读
ThreadPoolExecutor
jdk1.5在juc包里提供了方便快捷的线程池api,并提供了基于工厂模式的Executors工具类用于快捷创建线程池,在实际开发过程中,需要使用线程池时,应当优先考虑使用Executors
1、类继承关系
Executor为顶级接口,其主要目标是将任务、任务的提交与任务的执行解耦
ExecutorService接口则定义了正常的线程池应该有的功能与行为,诸如任务提交,异步执行等等
ScheduledExecutorService接口则定义了一些定时的特性
2、Executor
Executor执行提交的任务(Runnable),该接口提供了一种将任务提交与任务执行(包括执行细节:线程、定时等)解耦的途径。
通过Executor包装的线程(Thread)对象,避免直接使用Thread对象来执行任务,可以有效将线程信息屏蔽,避免直接对线程的操作。
Executor本身并不强制要求执行的任务必须是异步执行
用于执行任务的执行器,而Runnable则表示可以用于执行的任务。
/**
- 在未来某个时刻执行给定的指令,命令可以在新的线程中、线程池或调用线程中执行
- 实际情况取决于Executor的实现
*/
void execute(Runnable command);
3、ExecutorService
能够管理任务终止、能够产生追踪一个或多个异步任务处理进度的Future的执行器(Executor)
接口的核心定义:
提交任务
Future<?> submit(Runnable task);
第一种提交Callable
第二种方式,Runnable task和 T result作为参数,实际上内部通过包装将入参result作为返回值与Runnable task一同包装为一个Callable,最后任务完成时将Callable结果放至Future中。因此通过这种方式,入参result,返回结果则是可能在任务中被修改的result。
AbstractExecutorService
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture
execute(ftask);
return ftask;
}
//异步任务的抽象,内部封装了实际的任务、任务状态、真正的执行线程以及等待任务完成的线程等细节
//实际上是调用FutureTask.run()的线程被阻塞,作为真正的执行线程
protected
return new FutureTask
}
FutureTask
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);//将Runnable适配为Callable
this.state = NEW; // ensure visibility of callable
}
Tips:
FutureTask里有很多特性(比如对等待任务完成的线程进行阻塞,任务完成后对等待线程的唤醒,防止任务被并发调用等等)都可以使用AbstractQueuedSynchronizer,但在JDK1.8中的源码却没有发现AQS的痕迹,想想这是为何?
早期FutureTask确实是使用AQS实现,后续修改为了目前的样子(很多通过内存直接修改对象的操作,Unsafe类),核心是为了性能。这一部分可以再单独深入看看
Executors.RunnableAdapter
//简单的适配,将Runnable包装为Callable
static final class RunnableAdapter
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
第三种方式,与第二种类似,会创一个result 为 null的Runnable适配器。
异步执行任务
通过任务提交、invokeAny、invokeAll
等待任一/全部任务执行完成
invokeAny()
一次性提交批量任务,有任一任务完成时返回该任务的处理结果,调用线程阻塞。
private
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList<Future
ExecutorCompletionService
new ExecutorCompletionService
try {
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
//1、提交第一个任务
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;
for (; ; ) {
//2、判断任务是否完成
Future<T> f = ecs.poll();
if (f == null) {
//3、没有完成,且还有任务可以提交时,继续提交
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
} else if (active == 0)//4、没有完成、没有任务可以提交、无处理中任务跳出
break;
else if (timed) {//5、无任务提交,任务在处理中时,设置等待任务完成
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
} else//6、无限期阻塞,等待完成任务的队列有完成任务可以获得
f = ecs.take();
}
if (f != null) {//7、获取到完成任务
--active;
try {
return f.get();//8、返回完成任务的结果
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {//9、最终取消所有任务
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);//FutureTask.cancel()只有当未NEW状态才会取消
}
}
invokeAll
如果无超时时间(较为简单)
则遍历FutureTask,通过FutureTask.get()方法阻塞调用线程即可。
如果存在超时时间
遍历FutureTask,每提交一次任务检查一次是否超时。任务提交完成后,遍历未结束Future,调用Future.get(timeout),最终返回结果,任务清理。
public
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
ArrayList<Future
boolean done = false;
try {
for (Callable
futures.add(newTaskFor(t));
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));//执行任务,executor不保证异步执行
nanos = deadline - System.nanoTime();//检测超时时间
if (nanos <= 0L)
return futures;//超时间内未提交的任务,不会再被执行
}
for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {//未完成future
if (nanos <= 0L)//阻塞前先判断一次是否超时
return futures;
try {
f.get(nanos, TimeUnit.NANOSECONDS);//超时时间内获取
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime();//更新超时时间
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);//最终未完成的任务,不会再被执行
}
}
终止任务提交
shutdown()
已提交的任务,仍将被执行,但新的任务不再被接收。如果已经被shutdown,再次调用无影响
终止任务执行
List
立即尝试停止所有正在执行的任务,返回等待执行的任务,不会等待正在执行的任务终结。
该方法会尝试尽最大努力终结执行中的任务,但无法保证正在执行的任务被终结,因此,如果有任务终结失败,该任务也许永远无法被终止。
等待执行器进入终止状态
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
当调用了showdown()后,在超时间内、调用线程被中断前阻塞调用线程等待已提交任务完成。
4、AbstractExecutorService
通过模板模式的设计模式,对ExecutorService接口中定义的某些方法,进行了通用实现。
public Future<?> submit(Runnable task){…}
public
public
public
public
submit(…):
主要逻辑:
1、包装Callable、Runnable、result为RunnableFuture(实际默认实现是FutureTask,适配Runable接口)
2、调用实现类execute()方法执行RunnableFuture
invokeAny(…)
invokeAll(…)
上述方法的具体源码可以参见ExecutorService部分
5、ThreadPoolExecutor
虽然AbstractExecutorService进行了一些通用实现,但诸如execute()、shutdown()等依赖实际执行路径与执行器内部状态的方法并未被实现,因此这些方法的实现逻辑将是对Executor进行区分的重要因素。
ThreadPoolExecutor直译为线程池执行器,该类通过维护内线程池,最大程度复用线程,减少线程创建、销毁与维护的开销,提高任务的执行效率。通常会作为系统的一个异步处理模块出现,最大程度降低系统对硬件资源的占用。同时也提供了友善的api,通过对线程池核心参数的设计,可以设计出不同类型的线程池,扩展线程池的可应用场景。
基础概念
包含线程控制状态、作业队列、工人(worker)、工人集合(worker set)、工人数量(worker count)、终止条件、线程工厂、任务拒绝处理器、核心池大小、最大池大小等等
控制状态、工人数量
控制状态:
标记了执行器的生命周期,记录线程池的当前状态,分为RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED,主要是为了实现shutdown()等涉及到执行器状态的方法,如果不存在控制状态,则无法实现类似:拒绝新任务的添加、终结执行器等功能。
阶段 说明 状态转换
RUNING 任务执行中,此时可以接收新的任务 SHUTDOWN:调用shutdown()
STOP:调用shutdownNow()
SHUTDOWN 任务执行中,但不再接收新的任务 STOP:调用shutdownNow()
TIDYING:任务完成、线程池清空
STOP 执行中的任务将被终止,且不再接收新的任务,不再继续处理任务 TIDYING:线程池清空
TIDYING 整理阶段,主要是回收资源与收尾工作。所有的任务已经结束,工人数量为0(线程已经完成回收),在该阶段对应线程将调用钩子函数terminated()告知子类即将要终结 TERMINATED:terminated()执行后
TERMINATED terminated()完成
ThreadPoolExecutor实现中将线程池状态与工人数量整合到一个Integer内,为了保证并发安全,Integer使用AtomicInteger。控制状态一共5种,在设计中通过保证各个状态在状态空间中的有序,直接使用数值的方式判断当前状态。
工人数量(workerCount):
记录当前线程池中的有效线程数量,主要用于状态变更、判断是否需要新增线程、线程数量是否已达设定值等。
状态字段:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;//29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;//0001111111111…
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;//RUNING状态下,ctl<0
private static final int SHUTDOWN = 0 << COUNT_BITS;//SHUTDOWN状态下,当workercount=0时,ctl=0
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }//取高三位的结果
private static int workerCountOf(int c) { return c & CAPACITY; }//低位即wokerCount
private static int ctlOf(int rs, int wc) { return rs | wc; }
作业队列(workQueue)-BlockingQueue
用于保存待处理任务并将任务移交给工作线程。声明类型为阻塞队列(BlockingQueue)
不要求poll()返回null时代表队列为空(isEmpty),在决定是否进行线程池状态转移时(比如由SHUTDOWN转移为TIDYING需要判断任务队列是否为空)使用isEmpty判断队列是否为空。这种设计就可以让workQueue使用一些特殊设计的队列,比如延迟队列(DelayQueue,即便poll()为null,但延迟一段时间后可以返回non-null,即无法通过poll()是否为null判断队列是否为空),ThreadPoolExecutor通过isEmpty而非poll() == null 的方式判断队列是否为空,能够对工作队列的实现类型更加包容。
ThreadPoolExecutor并没有为BlokingQueue提供默认实现(声明时没有指定实例,且所有的构造方法没有为其提供默认实现),但使用Executors创建ThreadPoolExecutors时默认会使用LinkedBlockedQueue作为默认实现。
关于阻塞队列(BlockingQueue)
合适的阻塞队列,当队列空时会阻塞消费者,队列满时阻塞生产者。
如果不适用阻塞队列,可以使用线程安全队列+标志锁实现~(但自己实现的方式可能会存在很多细节问题,有空可以把这个细节深入研究一下)
工人(worker)
保存了所有在线程池中的工作线程的集合
private final HashSet
该声明并未使用线程安全的Set类,而是使用了最为简单的HashSet,因此其内部要求,任何对该集合的访问都需要获取private final ReentrantLock mainLock = new ReentrantLock();的锁权限。
Worker类属于ThreadPoolExecutor的私有内部类,因此只有ThreadPoolExecutor能够创建该类的实例,作为内部类,该类的实例能够直接访问ThreadPoolExecutor的属性与方法。
Woker源码:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
//通过ThreadPoolExecutor内的ThreadFactory生成的执行线程
//woker持有该thread,主要是为了保证可以获取到是哪个线程是该woker的运行线程
final Thread thread;
//该woker的第一个任务,可能为null
Runnable firstTask;
//该woker完成的任务总量
volatile long completedTasks;
//AQS中state标志位,0:无锁状态,1:被持有锁状态,>=0:可被中断状态
Worker(Runnable firstTask) {
setState(-1); // 禁止中断该worker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);//worker也是runnable,将worker作为其执行线程的target,当其执行线程start()时,JVM会调用worker.run()
}
//该方法表明woker是个Runnable,当其执行线程start时,会调用该方法
public void run() {
runWorker(this);//将自己作为参数,运行自己
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() {
acquire(1);
}
public boolean tryLock() {
return tryAcquire(1);
}
public void unlock() {
release(1);
}
public boolean isLocked() {
return isHeldExclusively();
}
//仅当woker线程运行时,允许中断
void interruptIfStarted() {
Thread t;
//getState() >= 0 通过判断state是否大于0,初始情况下state=-1
//当state < 0,即state = -1时,其执行线程尚未start,因此无需中断其执行线程
//并不要求获取了woker的锁
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();//中断执行线程
} catch (SecurityException ignore) {
}
}
}
}
ThreadPoolExecutor.runWoker()
final void runWorker(ThreadPoolExecutor.Worker w) {
//runWoker()方法只会被Woker.run()调用,而Woker.run()只会在其执行线程start后由jvm调用,因此runWoker()必定只会被对应Worker的执行线程调用
Thread wt = Thread.currentThread();//所以这里当前线程就是w.thread,也就是woker的执行线程
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();//此时woker的执行线程已经启动,允许外部进行中断,因此将woker的state置为0
boolean completedAbruptly = true;
try {
//getTask()重点,内部其实是从任务队列里获取任务,包含了线程阻塞、挂起、淘汰等一系列逻辑
//正常情况下就是该线程不断从任务队列里取任务
while (task != null || (task = getTask()) != null) {
//这里的这个lock很有意思
//如果仅仅是执行线程执行任务的话,其实执行线程获取woker锁并无太大意义,一方面是因为该方法(runWoker(…))只会运行在执行线程中,不可能有并发情况,另一方面woker本身没有可以共享的资源(这个地方可以再考虑一下:获取的任务、本身执行的线程是否算可共享资源呢),没必要获取这个锁,但这个地方有个隐藏逻辑,就是一旦woker在执行任务,则woker必定是被其执行线程锁住的,因此通过woker锁的状态可以判断woker是否在执行任务。当外部线程想要让worker正常执行完任务,然后再停止woker时,就必须获取该woker的锁,如此就能够保证woker当前正在执行的任务被正常完成
w.lock();
//简短的代码,逻辑复杂,具体分析参见“Worker检测ThreadPoolExecutor是否Stoping及线程状态机制”
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//单个任务执行之前的钩子函数
Throwable thrown = null;
try {
task.run();//真正的执行任务
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);//任务执行之后的钩子函数
}
} finally {
task = null;
w.completedTasks++;//完成任务计数
w.unlock();//释放锁,表明当前worker已经完成某个任务的执行
}
}
completedAbruptly = false;//表明线程正常执行完任务,如果为true,则说明执行形成可能中途被中断,或是用户任务发生了异常
} finally {
/* *处理将要结束的worker的收尾工作。
* 1、将当前worker从workerSet中移除
* 2、尝试将线程池过度为Terminated状态
* 3、在线程池仍然需要执行任务的状态下(RUNING、SHUTDOWN),判断是否需要添加新的worker至线程池,添加条件为:1)当前worker为突然终止;2)当前线程池的线程数量小于最小需求线程数量
* 4、走到这行代码一般有两种场景:1)无任务可做,且不会有新任务来了;2)用户任务执行期间发生了异常
*/
processWorkerExit(w, completedAbruptly);
}
}
Worker检测ThreadPoolExecutor是否Stoping及线程状态机制
1、如果ThreadPool处于Stop、Tidying或Terminated,且当前线程未被中断,则中断当前线程;2、如果处理Runing、Shutdown状态,则清除当前中断标志位,返回之前的中断标志,如果之前是中断的,且ThreadPool状态变为了Stop、Tidying或Terminated,则再次中断执行线程;3、这段逻辑保证了:如果当前ThreadPoolExecutor处于RUNING、SHUTDOWN,则中断标志位被清除;否则,直接中断执行线程。4、进一步思考:如果不这样实现会有什么问题?
下图为实际的判断逻辑,如果没有红框部分逻辑,则之前的清除中断标志位可能会导致在箭头处插入的ShutdownNow事件被忽略。
处理woker结束的工作
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn’t adjusted
//正常结束不需要再去减少workerCount,原因是在getTask()的地方已经预先减过了
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
线程池里的线程创建与销毁
失效场景:1、空闲worker(当前worker数量超过corePoolSize,且线程keepAliveTime时间内未获取到任务,这也是线程池控制worker超时失效的机制);2、用户程序异常;3、任务完全处理完成
线程池在处理失效线程时,如果仍然需要增加线程,那么会再次通过ThreadFactory创建一个新的线程。
假设如下场景:线程池持续接收新任务,如果新的任务正常执行,那么执行该任务的线程还会继续服役,处理后续的新任务,但是如果新的任务无法正常执行,抛出了异常,那么该线程将由于该异常而而终结,线程池会创建新的线程继续处理后续任务。所以当所有的新任务都抛出异常时,可能会导致线程池单个线程的寿命极短,频繁创建线程,事实上导致线程池失效。
实际上呢?
通过submit()提交给线程池的RunnableTask都被包装为FutureTask了,而FutureTask在run()的时候,对异常进行包装,将outCome输出为Exception,也就是说正常的用户异常根本不会抛出到Worker的runWoker loop里…
但是如果直接调用ThreadPoolExecutor.execute()方法,则不会使用FutureTask包装该Runnable了,包装的过程都是在AbstractExecutorService里完成的,直接通过execute执行任务的话,则会产生之前所述的场景,线程将频繁创建
ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(5, new NamedThreadFactory());
while (true) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.execute(() -> {//使用Executor.execute()执行,则会出现频繁创建线程池的情况,使用submit()则不会,但其实execute与submit都是异步执行
throw new RuntimeException();
});
}
6、总结
线程池的核心其实是一个基于BlockingQueue的任务生产消费模型,任务的生产方为ThreadPool的使用方,通过submit与execute生产Runnable任务,任务的消费方则为ThreadPool内部的Worker。TheadPool做了很多关于线程池特性的控制:比如核心线程池大小、最大线程池大小、空闲线程的最大存活时间等等,主要就是通过Worker从BlockingQueue中获取任务的状态来控制Worker的销毁。
源码十分精巧,部分代码写的相当简练,但包含了很深的关于状态定义、任务生产、任务消费、超时控制、同步控制、空闲线程销毁、新增线程等一系列特性的实现逻辑,每一步的动作都值得进一步思考。
如果让自己去实现一遍这样的线程池,该如何去实现呢?
最后附上向ThreadPoolExecutor submit任务的执行流程图~