参阅文章:/post/698321…

一、Java线程池的根本运用

  在Java中能够经过如下两种方式运用线程池,不过终究都是运用到ThreadPoolExecutor。

1、直接运用ThreadPoolExecutor

  可经过ThreadPoolExecutor的结构函数创立出线程池的实例目标,并调用其execute函数进行使命添加。代码完成如下:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
        10,//corePoolSize
        20,//maximumPoolSize
        120,//keepAliveTime
        TimeUnit.SECONDS,//keepAliveTime时刻单位
        new ArrayBlockingQueue<>(10)//等候行列
        );
//将使命添加到该线程池中
threadPoolExecutor.execute(() -> {
    //do something
});//留意这儿运用时lambda表达式

  这儿简单说一下lambda表达式,留意遵从如下几个准则即可,详细可参阅该篇博客。

1)需求函数式接口(即一个接口中只要一个函数);留意必定要是接口;
(2)lambada表达式->前面的是形参,形参可省掉掉参数类型;假如只要一个形参则能够省掉掉括号;
(3->后边的是函数体,假如只要一行代码则可省掉掉大括号,假如该一行代码是回来语句则省掉掉大括号的一起也需求省掉掉return关键词。

2、运用Executors

  经过该类供给的函数,能够创立如下几种类型的线程池;不过其底层依然运用的是ThreadPoolExecutor。

//该线程池只包含一个线程,一切使命顺序履行;适用于需求顺序履行使命的场景
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
//创立固定线程数量线程池,即该线程池中线程数量固定,假如线程池中线程都处于忙碌状况,则新进来的使命需求进入到堵塞行列中,直到线程闲暇停止。
//适用于需求约束线程数量的场景
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
//可缓存线程池是一个依据需求主动调整巨细的线程池;适用于履行大量的短期异步使命的场景
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
//定时使命线程池,底层运用的是ScheduledThreadPoolExecutor,不过该类也继承至ThreadPoolExecutor;
//用于履行定时使命和周期性使命
ExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);

  如下以函数newCachedThreadPool为例子,学习一下Executors创立线程池的底层完成。从其源码能够知道其也是经过ThreadPoolExecutor类创立的线程池目标,仅仅其中的各个参数现已定义好了而已。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

二、ThreadPoolExecutor源码

1、ThreadPoolExecutor结构函数

/**
* corePoolSize:线程池中中心线程数量,一般来说中心线程会一向存在,即便处于闲暇状况;
* maximumPoolSize:线程池中最大线程数量;当使命量超越中心线程数量并且使命行列已满,线程池还能创立新的线程来处理使命,可是线程数(包含中心线程数量)不会超越该值;
* keepAliveTime:非中心线程在闲暇状况下能够存活的时刻;
* unit:存活时刻单位(ms、s、h等);
* workQueue:等候履行的使命列表(先进先出);
* threadFactory:线程创立工厂,可运用默许的,也能够自定义;
* handler:使命超越 maximumPoolSize+使命行列巨细回调函数,即无法处理新的使命;也称回绝战略。
*/
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    //各个参数合法性判别
    if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

2、execute函数

  经过调用该函数以提交新的使命到线程池中进行处理。因此以该函数的源码完成作为学习线程池源码的进口点。代码如下:

public void execute(Runnable command) {
    //不答应提交的使命为空
    if (command == null)
        throw new NullPointerException();
    //获取当时线程池状况+线程数量(详细可阅览开头的参阅文章)
    int c = ctl.get();
    //判别当时线程数是否小于中心线程数
    if (workerCountOf(c) < corePoolSize) {
        //假如小于中心线程数则创立新的线程并直接履行
        //这儿参数true表明当时需求创立的线程是中心线程(用于判别当时答应的最大线程数量)
        if (addWorker(command, true))
            return;
        //使命履行失利,则从头获取线程池状况+当时线程数
        c = ctl.get();
    }
    //首先判别线程池是否处于运转状况,假如是则将当时使命添加到使命行列中
    if (isRunning(c) && workQueue.offer(command)) {
        //从头获取线程池状况标识
        int recheck = ctl.get();
        //假如线程池没有运转,则从行列中移除该使命,并履行回绝战略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //当时正在运转线程为0
        //则需求发动一个中心线程以运转使命行列中的使命
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //使命行列已满,则发动非中心线程,false标识发动非中心线程
    //假如使命运转失利,则履行回绝战略
    else if (!addWorker(command, false))
        reject(command);
}

  如上代码可缩减为如下几个步骤:

  • 判别是否能够创立中心线程以履行使命
  • 将使命添加到使命行列
  • 创立非中心线程履行使命
  • 使命履行失利则履行回绝战略

3、addWorker函数

  该函数主要有两个功用:(1)经过CAS将当时线程数量+1;(2)创立新的线程并添加到作业行列中,一起立马开端履行当时传递进来的使命;

private boolean addWorker(Runnable firstTask, boolean core) {
    retry://类似于goto
    //获取当时线程池状况+线程数量
    for (int c = ctl.get();;) {
        //假如当时线程池状况是RUNNING则能够持续履行使命
        //不然假如当时线池状况是STOP、TIDYING或许TERMINATED或许待履行使命为空或许作业行列为空,则直接回来false
        if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;
        //经过CAS将线程数量+1,直到修正成功停止
        for (;;) {
            //依据当时是否是中心线程挑选当时答应的最大线程数量
            //假如超越答应的最大线程数量则直接回来false
            if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            //经过CAS将线程数量+1
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();
            //CAS修正线程数量失利,再次判别线程池状况
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //该类为ThreadPoolExecutor内部类,继承了AbstractQueuedSynchronizer类并且完成了Runnable类
        //并且在其结构函数中会调用所供给的工厂创立新的线程
        w = new Worker(firstTask);
        //获取创立的新线程
        final Thread t = w.thread;
        if (t != null) {
            //加锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int c = ctl.get();
                //判别线程池是否处于运转状况或许可履行使命状况
                if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                    //假如线程状况不是处于新创立状况则直接抛出反常
                    if (t.getState() != Thread.State.NEW)
                        throw new IllegalThreadStateException();
                    //将新创立的worker添加到作业行列中
                    workers.add(w);
                    workerAdded = true;
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                }
             //释放锁
            } finally {
                mainLock.unlock();
            }
            //假如是新创立的线程则直接开端履行使命
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //没有发动新的使命或许发动失利,则将新的使命从使命行列移除,并将线程数量-1
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

3、Worker类源码

  该类比较简单,其结构函数中会存储当时需求运转的使命(可为空);并经过供给的工厂创立新的线程,一起将当时目标引证作为Runnable使命传递给该线程;当上述addWorker函数中调用t.start()就会调用到该类的run函数中以履行后续的逻辑。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    private static final long serialVersionUID = 6138294804551838833L;
    @SuppressWarnings("serial") // Unlikely to be serializable
    final Thread thread;
    @SuppressWarnings("serial") // Not statically typed as Serializable
            Runnable firstTask;
    volatile long completedTasks;
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        //当时需求履行的使命
        this.firstTask = firstTask;
        //经过供给的工厂创立新的线程,留意这儿将当时类的引证传递给了Thread作为Runnable使命
        this.thread = getThreadFactory().newThread(this);
    }
    public void run() {
        runWorker(this);
    }
    //省掉掉部分源码
    ......
}

4、runWorker函数

  如下代码便是当时线程开端履行使命了;(1)首先判别worker是否存在需求履行的使命,假如有则优先履行该使命;(2)不然从行列中获取使命进行履行;

final void runWorker(Worker w) {
    //获取当时线程
    Thread wt = Thread.currentThread();
    //获取当时worker中需求履行的使命(优先级最高)
    Runnable task = w.firstTask;
    //将worker中需求履行使命设置为空,避免后续重复履行
    w.firstTask = null;
    w.unlock(); 
    boolean completedAbruptly = true;
    try {
        //首先判别当时worker中需求履行的使命是否为空,假如不为空则履行该使命
        //不然从使命列表中获取使命进行履行,留意getTask会堵塞当时线程
        while (task != null || (task = getTask()) != null) {
            //加锁
            w.lock();
            //判别当时线程池以及线程是否能够持续履行使命
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                            runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                wt.interrupt();
            try {
                //该函数未完成
                beforeExecute(wt, task);
                try {
                    //履行使命
                    task.run();
                    //该函数未完成
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //完毕当时线程
        processWorkerExit(w, completedAbruptly);
    }
}

5、getTask函数

  在该函数中会依据是否答应超时收回线程以设置使命获取堵塞时刻,假如是中心线程则会一向堵塞,非中心线程假如超时依然没有获取到可履行使命则会被收回。

private Runnable getTask() {
    boolean timedOut = false;
    for (;;) {
        int c = ctl.get();
        //判别线程池是否处于可履行使命状况,假如不是则回来null
        //留意这儿SHUTDOWN状况是能够持续履行使命的
        if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        //获取当时线程数量
        int wc = workerCountOf(c);
        //allowCoreThreadTimeOut表明是否答应闲暇的中心线程超时收回
        //或许假如当时线程数量大于中心线程数量,则超越中心线程数量的线程能够超时收回
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //假如某个线程超时获取task为null,则会履行到该处
        //留意这儿修正线程数量是经过CAS,也便是同一时刻只要一个线程能够修正成功,其他线程假如同步在修正则会失利
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            //经过CAS将线程数量-1,并直接回来null,即直接完毕当时线程
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            //经过堵塞的方式等候行列使命
            //假如timed为true则说明能够超时收回线程,即假如在特定时刻内没有新的使命,那么当时线程就需求收回
            //take函数会一向等候直到有新的使命呈现需求处理
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

三、总结

  如上代码剖析下来,答复如下几个问题则没有什么难度了:

问题1:线程池使命处理根本流程
答复:经过execute函数将使命提交到线程池,假如中心线程数未超越则直接创立中心线程履行使命,不然添加到使命行列;
假如使命行列满了则创立非中心线程履行使命,不然履行回绝战略。
问题2:怎么复用现有线程
答复:从上述代码剖析能够知道,每个线程会优先履行当时worker中的task,假如当时worker的task为空,则会去使命行列中获取使命进行履行;
假如使命行列为空则会堵塞当时线程;即典型的出产者-消费者模型,上层事务会向使命行列中添加使命(出产),线程会从使命行列中获取使命进行履行(消费),这样线程就能够不断复用了。
问题3:中心线程与非中心线程是否存在差异
答复:没有啥差异,假如当时既有中心线程也有非中心线程,那么终究被保留下来的线程便是中心线程,详细的可参阅上述getTask函数源码。