持续创作,加速生长!这是我参加「日新方案 10 月更文应战」的第23天,点击检查活动详情

概述

线程池的好处和运用本篇文章就不赘叙了,不了解的能够参考下面两篇文章:

  • 一文全貌了解线程池的正确运用姿势
  • 学习线程池原理从手写一个线程池开端

那么本文要点是从源码层面了解jdk8中线程池的完结。

中心机制

再分析源码之前,咱们还是先回忆和熟悉下线程的中心作业机制。

线程池作业原理

线程池选用的是一种生产者-顾客的模型,如下图:

Java线程池源码深度解析

  1. 主线程调用execute、或许submit等办法提交使命给线程池。
  2. 假如线程池中正在运转的作业线程数量小于corePoolSize(中心线程数量),那么马上创立线程运转这个使命。
  3. 假如线程池中正在运转的作业线程数量大于或等于 corePoolSize(中心线程数量),那么将这个使命放入行列,稍后履行。
  4. 假如这时行列满了且正在运转的作业线程数量还小于 maximumPoolSize(最大线程数量),那么会创立非中心作业线程马上运转这个使命,这部分非中心作业线程闲暇超越必定的时刻(keepAliveTime)时,就会被毁掉收回。
  5. 假如最终提交的使命超越了maximumPoolSize(最大线程数量),那么就会履行回绝战略。

线程池状况

Java线程池源码深度解析

线程池的状况有5种,他们的状况转化如上图所示,这儿记得差异线程的状况,它们不是一回事。

ThreadPoolExecutor类寄存线程池的状况信息很特别,是存储在一个int类型原子变量的高3位,而低29位用来存储线程池当时运转的线程数量。经过将线程池的状况和线程数量合二为一,能够做到一次CAS原子操作更新数据。

状况 高3位值 阐明
RUNNING 111 运转状况,线程池被创立后的初始状况,能承受新提交的使命,也能处理堵塞行列中的使命。
SHUTDOWN 000 封闭状况,不再承受新提交的使命,但任能够处理堵塞行列中的使命。
STOP 001 中止状况,会中止正在处理的线程,不能承受新提交的使命,也不会处理堵塞行列中的使命。
TIDYING 010 一切使命都现已中止,有效作业线程为0。
TERMINATED 011 中止状况,线程池完全中止。

源码解析

Java线程池源码深度解析

上图是线程池中心类ThreadPoolExecutor的类结构图:

  • Executor: 提交使命的根底接口,只要一个execute办法。
  • ExecutorService: 承继自Executor,它供给办理中止的办法,以及能够产生Future的办法,用于盯梢一个或多个异步使命的进度。
  • AbstractExecutorService: 供给ExecutorService履行办法的默许完结。
  • ThreadPoolExecutor: 线程池类本类,完结了线程池的中心逻辑。
  • Worker: ThreadPoolExecutor的内部类,作业线程类,承继自 AQS。
  • *Policy: 其他Policy结尾的都是内置的决策战略类。

要害成员变量

  1. 线程池的状况信息和线程数量信息(ctl)相关
  • 线程的状况信息和数量信息用同一个int的原子变量存储,高3位存储状况信息,低29位存储线程数量。
// ctl,原子变量,存储状况和线程数量,初始化运转状况+0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 静态常量,表明线程数量寄存的位数29=32-3
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程数量最大的容量,低 COUNT_BITS 位所能表达的最大数值,000 11111111111111111111 => 5亿多
private static final int CAPACITY  = (1 << COUNT_BITS) - 1;
  • 经过位运算符设置各个状况的高三位值。
// 111 000000000000000000,转化成整数后其实便是一个【负数】
private static final int RUNNING    = -1 << COUNT_BITS;
// 000 000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001 000000000000000000
private static final int STOP       =  1 << COUNT_BITS;
// 010 000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;
// 011 000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;
  • 从ctl中获取线程池的状况值
// ~CAPACITY = ~000 11111111111111111111 = 111 000000000000000000000(取反)
// &运算符,和1&是它自身,和0&便是0,就能够取得高位值。
private static int runStateOf(int c)     { return c & ~CAPACITY; }
  • 从ctl中获取线程池的数量
// CAPACITY = 000 11111111111111111111
// &运算符,和1&是它自身,和0&便是0,就能够取得低29位
private static int workerCountOf(int c)  { return c & CAPACITY; }
  • 生成ctl值
// rs 表明线程池状况,wc 表明当时线程池中 worker(线程)数量,相与今后便是兼并后的状况
private static int ctlOf(int rs, int wc) { return rs | wc; }
  • 比较当时线程池 ctl 所表明的状况

线程池状况值的大小联系:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED

// 比较当时线程池 ctl 所表明的状况,是否小于某个状况 s
private static boolean runStateLessThan(int c, int s) { return c < s; }
// 比较当时线程池 ctl 所表明的状况,是否大于等于某个状况s
private static boolean runStateAtLeast(int c, int s) { return c >= s; }
// 小于 SHUTDOWN 的必定是 RUNNING,SHUTDOWN == 0
private static boolean isRunning(int c) { return c < SHUTDOWN; }
  • cas设置ctl的值
// 运用 CAS 办法 让 ctl 值 +1 ,成功回来 true, 失利回来 false
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}
// 运用 CAS 办法 让 ctl 值 -1 ,成功回来 true, 失利回来 false
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}
// 将 ctl 值减一,do while 循环会一向重试,直到成功为止
private void decrementWorkerCount() {
    do {} while (!compareAndDecrementWorkerCount(ctl.get()));
}
  1. 线程池中的行列
// 线程池用于保存使命并将使命传递给作业线程的行列
private final BlockingQueue<Runnable> workQueue;
  1. 控制并发的锁
// 增加削减 worker 或许时修正线程池运转状况需求持有 mainLock
private final ReentrantLock mainLock = new ReentrantLock();
  1. 线程池中作业线程的调集
private final HashSet<Worker> workers = new HashSet<Worker>();
  1. 线程池结构参数联系特点
// 中心线程数量
private volatile int corePoolSize;
// 线程池最大线程数量
private volatile int maximumPoolSize;	
// 闲暇线程存活时刻
private volatile long keepAliveTime;	
// 创立线程时运用的线程工厂,默许是 DefaultThreadFactory
private volatile ThreadFactory threadFactory;	
// 【超越中心线程提交使命就放入 堵塞行列】
private final BlockingQueue<Runnable> workQueue;
// 回绝战略
private volatile RejectedExecutionHandler handler;	
  1. 线程池监控相关特点
// 记载线程池生命周期内线程数最大值
private int largestPoolSize;	
// 记载线程池所完结使命总数,当某个 worker 退出时将完结的使命累加到该特点
private long completedTaskCount;	

线程提交原理

线程池提交线程有多种办法如execute、submit或许invoke相关办法,咱们要点关注在最根底的execute()办法提交使命,把它搞清楚了,其他的都不在话下。

  1. execute(Runnable command)办法是线程提交的入口办法。
//  ThreadPoolExecutor#execute
public void execute(Runnable command) {
        // 假如使命为空,直接抛空指针
        if (command == null)
            throw new NullPointerException();
        // 获取ctl的值,其中高3位是状况信息,低3位是线程数量
        int c = ctl.get();
        // workerCountOf获取当时线程的数量
        // 当时线程数量小于中心线程数,调用addWorker创立一个作业线程
        if (workerCountOf(c) < corePoolSize) {
            // 调用addWorker办法创立作业线程,直接履行使命。假如成功的话,直接结束办法。
            if (addWorker(command, true))
                return;
            // 因为并发等原因,addWorker增加失利,会走到这儿,再次获取ctl的值
            c = ctl.get();
        }
    	// 假如线程池是运转状况的话,就把使命参加到行列中
        if (isRunning(c) && workQueue.offer(command)) {
            // 两层检查,因为从上次检查到进入此办法,线程池或许已成为SHUTDOWN状况
            int recheck = ctl.get();
            // 假如发现线程池不是运转状况的话,那就移除这个使命
            if (!isRunning(recheck) && remove(command))
                // 使命出队成功,走回绝战略
                reject(command);
             // 履行到这阐明线程池是 running 状况,获取线程池中的线程数量,判别是否是 0
             // 【担保机制】,确保线程池在 running 状况下,最起码得有一个线程在作业
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 走到这儿阐明线程不是运转状况,或许便是行列满了,offer回来false
        // 再次调用addWoker创立新的线程,假如不成功(一般是超越了线程池最大线程数量),履行回绝战略
        else if (!addWorker(command, false))
            // 履行回绝战略
            reject(command);
    }

这个办法是提交线程的主干逻辑:

  1. 提交一个使命时,假如运转的线程少于corePoolSize,经过调用addWorker增加一个作业线程,直接开端运转。

  2. 假如作业线程大于等于corePoolSize,而且前面addWorker失利时,需求将使命参加到行列中,参加成功后,做了一层两层校验,因为这个进程或许线程池状况产生变化了,假如现已封闭,那么要移除刚刚参加的这个使命。

3.假如参加行列失利,阐明行列满了,这时候调用addWorker办法再次创立线程,假如回来false,有或许是超越最大线程数量了,那么就履行回绝战略。

  1. addWorker办法也是一个很要害的办法, 增加线程到线程池,回来 true 表明创立 Worker 成功,且发动线程。
//  ThreadPoolExecutor#addWorker
// core == true 表明选用中心线程数量限制,false 表明选用 maximumPoolSize
private boolean addWorker(Runnable firstTask, boolean core) {
     // 自旋【判别当时线程池状况是否答应创立线程】,答应就设置线程数量 + 1
    retry:
    for (;;) {
         // 获取 ctl 的值
        int c = ctl.get();
        // 获取当时线程池运转状况
        int rs = runStateOf(c);
         // 判别当时线程池状况【是否答应增加线程】
       // 假如线程池状况大于SHUTDOWN 或许是SHUTDOWN状况,行列是空了的话,都不答应创立新的线程
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            // false,没有创立线程
            return false;
        // 再次自旋
        for (;;) {
            // 获取线程池中线程数量
            int wc = workerCountOf(c);
            // 假如线程数量超越阈值的话,回来false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
          // 记载线程数量现已加 1,类比于请求到了一块令牌,条件失利阐明其他线程修正了数量
            if (compareAndIncrementWorkerCount(c))
                // 请求成功,跳出了 retry 这个 for 自旋
                break retry;
             // CAS 失利,没有成功的请求到令牌
            c = ctl.get(); 
            // 判别当时线程池状况是否产生过变化,被其他线程修正了,或许其他线程调用了 shutdown() 办法
            if (runStateOf(c) != rs)
                // 从头回到retry的履行点
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 下面开端真实创立线程了
    // 运转符号,表明创立的 worker 是否现已发动,false未发动  true发动
    boolean workerStarted = false;
    // 增加符号,表明创立的 worker 是否增加到池子中了,默许false未增加,true是增加。
    boolean workerAdded = false;
    Worker w = null;
    try {
        //【创立 Worker,底层经过线程工厂 newThread 办法创立履行线程,指定了首要履行的使命】
        w = new Worker(firstTask);
        // 将新创立的 worker 节点中的线程赋值给 t
        final Thread t = w.thread;
        // 这儿的判别为了防止 程序员自定义的 ThreadFactory 完结类有 bug,创造不出线程
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // 加互斥锁,要增加 worker 了
            mainLock.lock();
            try {
                 // 获取最新线程池运转状况
                int rs = runStateOf(ctl.get());
            	// 判别线程池是否为RUNNING状况,不是再【判别当时是否为SHUTDOWN状况且firstTask为空,特殊状况】
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                     // 当线程start后,线程isAlive会回来true,这儿还没开端发动线程,假如被发动了就需求报错
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    //将新建的 Worker 增加到线程池中
                    workers.add(w);
                    int s = workers.size();
                    // 当时池中的线程数量是一个新高,更新 largestPoolSize
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                     // 增加符号置为 true
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
             // 增加成功就【发动线程履行使命】
            if (workerAdded) {
                // 发动线程
                t.start();
                // 运转符号置为 true
                workerStarted = true;
            }
        }
    } finally {
        // 线程发动失利
        if (! workerStarted)
            // 整理作业,比如从线程池中移除。
            addWorkerFailed(w);
    }
    return workerStarted;
}
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    // 持有线程池大局锁,因为操作的是线程池相关的东西
    mainLock.lock();
    try {
        //条件建立需求将 worker 在 workers 中整理出去。
        if (w != null)
            workers.remove(w);
        // 将线程池计数 -1,相当于偿还令牌。
        decrementWorkerCount();
        // 测验中止线程池
        tryTerminate();
    } finally {
        //开释线程池大局锁。
        mainLock.unlock();
    }
}
  • 这儿注意一个点,SHUTDOWN 状况也能增加线程,但是要求新加的 Woker 没有 firstTask,而且当时 queue 不为空,所以创立一个线程来协助线程池履行行列中的使命。

Woker运转原理

Woker类是ThreadPoolExecutor类的内部类,见明知意,它是承担了一个“工人”干活,也便是作业线程的职责。

  1. Worker类

每个 Worker 对象有一个初始使命,发动 Worker 时优先履行,这也是造成线程池不公平的原因。Worker 承继自 AQS,自身具有锁的特性,选用独占锁形式,state = 0 表明未被占用,> 0 表明被占用,< 0 表明初始状况不能被抢锁。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
	// worker 内部封装的作业线程
    final Thread thread;		
    // worker 第一个履行的使命,普通的 Runnable 完结类或许是 FutureTask
    Runnable firstTask;	
    // 记载当时 worker 所完结使命数量
    volatile long completedTasks;	
    // 结构办法
    Worker(Runnable firstTask) {
        // 设置AQS独占形式为初始化中状况,这个状况不能被抢占锁
       	setState(-1);
        // firstTask不为空时,当worker发动后,内部线程会优先履行firstTask,履行完后会到queue中去获取下个使命
        this.firstTask = firstTask;
        // 运用线程工厂创立一个线程,而且【将当时worker指定为Runnable】,所以thread发动时会调用 worker.run()
        this.thread = getThreadFactory().newThread(this);
    }
    // 不可重入锁,重写了AQS中的办法
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
 protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        // 设置state为0,开端抢锁
        setState(0);
        return true;
    }
}
  1. Worker的作业办法run
// Worker#run
public void run() {
    // 调用自身的runWoker办法
    runWorker(this);
}
// Worker#runWorker
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 获取 worker 的 firstTask
    Runnable task = w.firstTask;
    // 引用置空,【防止复用该线程时重复履行该使命】
    w.firstTask = null;
	// 初始化 worker 时设置 state = -1,表明不答应抢占锁
    // 这儿需求设置 state = 0 和 exclusiveOwnerThread = null,开端独占形式抢锁
    w.unlock(); 
   // true 表明产生反常退出,false 表明正常退出。
    boolean completedAbruptly = true;
    try {
        // firstTask 不是 null 就直接运转,不然去 queue 中获取使命
        while (task != null || (task = getTask()) != null) {
            // worker 加锁,shutdown 时会判别当时 worker 状况,【依据独占锁状况判别是否闲暇】
            w.lock();
            // 阐明线程池状况大于 STOP,目前处于 STOP/TIDYING/TERMINATION,此刻给线程一个中止信号
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                // 线程不是处于中止的状况
                !wt.isInterrupted())
                 // 中止线程,设置线程的中止标志位为 true
                wt.interrupt();
            try {
                // 使命履行前的回调,空完结,能够在子类中自定义
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 真实履行使命
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                     // 钩子办法,【使命履行的后置处理】
                    afterExecute(task, thrown);
                }
            } finally {
                // 将局部变量task置为null,代表使命履行完结
                task = null;
                // 更新worker完结使命数量
                w.completedTasks++;
                // 解锁
                w.unlock();
            }
        }
         // getTask()办法回来null时会走到这儿,表明queue为空而且线程闲暇超越保活时刻,【当时线程履行退出逻辑】
        completedAbruptly = false;
    } finally {
        // 正常退出 completedAbruptly = false
       	// 反常退出 completedAbruptly = true,【从 task.run() 内部抛出反常】时,跳到这一行
        processWorkerExit(w, completedAbruptly);
    }
}
  1. getTask() 获取使命

这个办法首要做了下面几件事情:

  • 从堵塞行列中获取使命
  • 假如当时线程闲暇时刻超越 keepAliveTime 就会被收回,首要经过调用行列的超时获取接口poll(long timeout, TimeUnit unit)完结。
private Runnable getTask() {
     // 超时符号,表明当时线程获取使命是否超时,true 表明已超时
    boolean timedOut = false; 
    for (;;) {
        int c = ctl.get();
         // 获取线程池当时运转状况
        int rs = runStateOf(c);
        // 假如发现线程池被封闭了,直接回来null
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 运用 CAS 自旋的办法让 ctl 值 -1
            decrementWorkerCount();
            return null;
        }
        // 获取线程池中的线程数量
        int wc = workerCountOf(c);
        //timed用来判别当时线程是否超越必定时刻没有获取使命就进行毁掉收回,true是需求,false不需求, 有两种状况
        //1. allowCoreThreadTimeOut为true代表答应收回中心线程,那就无所谓了,悉数线程都履行超时收回
        //2. 线程数量大于中心线程数,当时线程认为对错中心线程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // 一起满意下面1和2条件下,阐明线程要收回,直接回来null
        // 1. 假如线程数量超越最大线程数 或许 上面的timed和超时时刻timedOut都为true
        if ((wc > maximumPoolSize || (timed && timedOut))
            // 2.假如线程数量大于1而且行列时空的状况
            && (wc > 1 || workQueue.isEmpty())) {
            // 运用 CAS 机制将 ctl 值 -1 ,减 1 成功的线程,回来 null,代表能够退出
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            // 从行列中获取使命,有下面两种办法
            // timed为true, 调用超时办法poll获取使命
            // timed为false,调用堵塞办法take获取
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            获取使命为 null 阐明超时了,将超时符号设置为 true,进入下一次循环,就能够毁掉这个线程了
            timedOut = true;
        } catch (InterruptedException retry) {
             // 堵塞线程被打断后超时符号置为 false,【阐明被打断不算超时】,要继续获取,直到超时或许获取到使命
            // 假如线程池 SHUTDOWN 状况下的打断,会在循环获取使命前判别,回来 null
            timedOut = false;
        }
    }
}
  1. processWorkerExit()作业线程退出办法
// 正常退出 completedAbruptly = false,反常退出为 true
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 条件建立代表当时 worker 是产生反常退出的,task 使命履行进程中向上抛出反常了
    if (completedAbruptly) 
        // 从反常时到这儿 ctl 一向没有 -1,需求在这儿 -1
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    // 加锁
    mainLock.lock();
    try {
        // 将当时 worker 完结的 task 数量,汇总到线程池的 completedTaskCount
        completedTaskCount += w.completedTasks;
		// 将 worker 从线程池中移除
        workers.remove(w);
    } finally {
        mainLock.unlock();	// 解锁
    }
	// 测验中止线程池,唤醒下一个线程
    tryTerminate();
    int c = ctl.get();
    // 线程池不是中止状况就应该有线程运转【担保机制】
    if (runStateLessThan(c, STOP)) {
        // 正常退出的逻辑,是对闲暇线程收回,不是履行犯错
        if (!completedAbruptly) {
            // 依据是否收回中心线程确定【线程池中的线程数量最小值】
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 最小值为 0,但是线程行列不为空,需求一个线程来完结使命担保机制
            if (min == 0 && !workQueue.isEmpty())
                min = 1;
            // 线程池中的线程数量大于最小值能够直接回来
            if (workerCountOf(c) >= min)
                return;
        }
        // 履行 task 时产生反常,有个线程因为反常中止了,需求增加
        // 或许线程池中的数量小于最小值,这儿要创立一个新 worker 加进线程池
        addWorker(null, false);
    }
}

总结

本文首要从源码层面分析了线程池的运转机理,总算知道了execute办法背后是怎么运转的。

参考

www.cnblogs.com/wang-meng/p…