本文主要内容

  • 什么是线程池
  • 线程池的运用
  • 线程池的原理
  • 线程池中的位运算
  • 源码解析

什么是线程池

假设一再创建线程,也是会影响全体资源或效率的,线程池的发生是为了防止重复的创建线程和收回线程。线程池有以下几个作用:

  • 下降资源消耗。经过重复利用已创建的线程下降线程创建、销毁线程形成的消耗
  • 前进响应速度。当任务抵达时,任务可以不需求比及线程创建就能当即实行。
  • 前进线程的可管理性。线程是稀缺资源,假设入约束的创建,不只会消耗系统资源,还会下降系统的稳定性,运用线程池可以进行一致的分配、调优和监控。

线程池的运用

先来看看线程池的中心结构方法:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    //反常处理略过
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

需求用户指定以下几个要害变量

  • corePoolSize:中心线程数
  • maximumPoolSize:最大线程数量
  • allowCoreThreadTimeOut:是否容许线程超时(设置为true时与keepAliveTime,TimeUnit一同起作用)
  • keepAliveTime:线程存活时刻(当线程池容许线程超时且运转中的线程数量逾越- corePoolSize时,会按照此变量设置时刻关闭线程)
  • TimeUnit:单位(一般与keepAliveTime一同运用,供线程池判别是否满足关闭线程的条件)
  • workQueue:缓冲队伍
  • RejectedExecutionHandler:回绝处理任务类(默许:AbortPolicy 会抛反常,见下方实例)
  • threadFactory:线程工厂(默许:DefaultThreadFactory)

理解这些中心数据的作用,就可以随意运用线程池了

在Executors类中,封装好了几个接口用于初始化线程池,具体可参看源码,只处只罗列其间一个,不再胪陈

ExecutorService executor = Executors.newFixedThreadPool(5);

线程池的原理

假设让我们自己来规划一个线程池,我们应该怎么做呢?线程池最大的功能在于线程重复运用,不需求每实行一个任务就从头结构新线程。

假设用户提交的任务比较多,显然我们需求一个缓存队伍用于存储任务。

为了抵达线程重复运用的意图,线程应该不停地从缓存队伍中获取新的任务,并且实行它。

线程池ThreadPoolExecutor源码解析

线程池中的位运算

线程池中运用一个AtomicInteger方针来表征线程池的情况和大小,为了抵达1个int型数据能保存2个数据的意图,源码采用了非常精妙的位运算来完成。

//线程池的各个情况
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
//两个常量
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
//获取情况、线程池大小的方法以及一个或方法
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
//要害变量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

从源码可知,其实RUNNING等5个情况的值的前4位分别是:

  • 1110 、0000、 0010 、0100、 0110

各情况后边接了28个0。

CAPACITY的值,前4位为0,后28位为1。

假设要获取线程池情况,则调用runStateOf此方法,ctl的值将与 ~CAPACITY 作与操作,得到的正是ctl值的前4位。

获取其时线程池大小,则是与CAPACITY 作与操作,正是取ctl的低位值。

所以 ctl 的高位值用于表征线程池情况,而低们值用于表征线程池的大小。

ctl的初始化,调用ctlOf方法,正是拿RUNNING与0作或操作,成果仍是RUNNING,所以可知,在线程池初始化的时分,默许ctl的情况值为RUNNING,而线程池大小为0。

ctlOf方法的意义也可知,正是组合情况值与线程池大小。

源码解析

一般来说,线程池添加任务有两种方法,一种是execute方法,另一种是submit方法,submit方法其实也是会调用execute,所以在此只研究execute方法即可。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //获取线程池情况
    int c = ctl.get();
    //假设线程池大小少于中心线程,则直接添加新的Worker并回来
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //假设中心线程数量已满,则考虑将任务添加进入缓存队伍,然后再次检查情况
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //最终中心线程数量已满,缓存队伍已满,那么直接交给回绝战略处理
    else if (!addWorker(command, false))
        reject(command);
}

再来看非常要害的addWorker方法:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        //获取线程池情况
        int rs = runStateOf(c);
        // 对情况进行检测,假设线程池现已被调用shutDown方法时,回来false
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            //获取线程池的大小
            int wc = workerCountOf(c);
            //假设线程池大小现已逾越中心线程池大小,那么也直接回来,这种情况应该把任务缓存到队伍中去
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //正常情况下,线程池大小加1即可
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    //新建一个Worker,在实行Worker结构方法时,创建了新的线程,即下面的t,具体可拜见Worker的结构方法。
    Worker w = new Worker(firstTask);
    Thread t = w.thread;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Recheck while holding lock.
        // Back out on ThreadFactory failure or if
        // shut down before lock acquired.
        int c = ctl.get();
        int rs = runStateOf(c);
        if (t == null ||
            (rs >= SHUTDOWN &&
             ! (rs == SHUTDOWN &&
                firstTask == null))) {
            decrementWorkerCount();
            tryTerminate();
            return false;
        }
        workers.add(w);
        int s = workers.size();
        if (s > largestPoolSize)
            largestPoolSize = s;
    } finally {
        mainLock.unlock();
    }
    //发起线程
    t.start();
    // It is possible (but unlikely) for a thread to have been
    // added to workers, but not yet started, during transition to
    // STOP, which could result in a rare missed interrupt,
    // because Thread.interrupt is not guaranteed to have any effect
    // on a non-yet-started Thread (see Thread#interrupt).
    if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
        t.interrupt();
    return true;
}

在addWorker方法中,发起了一个新的线程,那我们来看看线程中的run方法,其实即是runWorker方法:

final void runWorker(Worker w) {
    Runnable task = w.firstTask;
    w.firstTask = null;
    boolean completedAbruptly = true;
    try {
        //请注意,这是一个while循环,假设其时Worker的task不为空,则取其时Worker的task,实行任务
        //假设其时task为空,则从缓存队伍中取任务来实行,这便是线程池中最重要概念线程复用的体现
        while (task != null || (task = getTask()) != null) {
            w.lock();
            clearInterruptsForTaskRun();
            try {
                beforeExecute(w.thread, task);
                Throwable thrown = null;
                try {
                    //实行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

我们来看看getTask方法是否真的是从缓存队伍中取任务:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        boolean timed;      // Are workers subject to culling?
        for (;;) {
            int wc = workerCountOf(c);
            timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //正常情况下,线程池数量应该少于最大值,并且也不会timedOut,
            //假设真的大于了最大值,则应该删去一个线程
            if (wc <= maximumPoolSize && ! (timedOut && timed))
                break;
            if (compareAndDecrementWorkerCount(c))
                return null;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
        try {
            //从缓存队伍中取出任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

线程池分析到这,底子结束了,内中最多的细节,比如同步锁的运用,比如情况的判别,比如最终中止线程池等等,不再细说了,线程池介绍到这底子原理都清楚了,有点类似于android中的Looper,死循环中取消息,分发消息并实行对应操作等,线程池也是这样。

线程池中的位运算,刚开始或许看不懂,但在纸上写下各个数,底子就能理解各个方法的意义了。

最终再弥补一个小细节,负数在计算机中的表明,为了更便利完成2进制数的加减法,负数运用补码表明,具体则是正数部分取反加1,而正数的补码则是正数本身。正因为如此,RUNNING 前4位才是1110,具体的各位可以计算下,补码的意义可以自行搜索。