大家好,我是小趴菜,在面试中,咱们经常会被面试官问到线程池的一些问题,比如

  • 1:线程池履行流程
  • 2:线程池是怎么收回非中心线程
  • 3:线程池怎么维护中心线程
  • 4:………

今天咱们来剖析一下线程池的全体中心流程,协助大家完全了解线程池的底层中心原理

关于线程池的用法如下,咱们要剖析的进口便是submit()办法,这是将使命提交到线程池的办法

ThreadPoolExecutor threadPoolExecutor =
        new ThreadPoolExecutor(1,2,3000,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.submit(() -> {
    try {
        TimeUnit.SECONDS.sleep(120);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});
TimeUnit.SECONDS.sleep(1);
threadPoolExecutor.submit(() -> {
    try {
        TimeUnit.SECONDS.sleep(120);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});
TimeUnit.SECONDS.sleep(1);
threadPoolExecutor.submit(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});
threadPoolExecutor.shutdown();
public Future<?> submit(Runnable task) {
    //判别提交的使命是否为空,为空就直接抛出空指针反常
    if (task == null) throw new NullPointerException();
    //将咱们的使命封装成一个RunnableFuture,后续能够经过Future.get()办法获取咱们使命的回来成果
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    //中心办法,将使命提交给线程池履行
    execute(ftask);
    return ftask;
}

execute(ftask);最终进入ThreadPoolExecutor这个类中

public void execute(Runnable command) {
    //对咱们的使命再次进行判空处理
    if (command == null)
        throw new NullPointerException();
    //拿到咱们线程池中此刻存活的线程数
    int c = ctl.get();
    //假如存活的线程数小于中心线程数
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //假如此刻线程池存活的线程数不小于中心线程数
    //isRunning(c):判别此刻线程池的状况,假如是Running,就能够接纳新的使命
    //workQueue.offer(command): 将使命放入行列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            //线程池的拒绝策略,也便是拒绝了这个使命
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //假如行列满了,也便是存活线程数不小于中心线程数而且行列也现已满了
    //也便是判别线程数是否大于线程池最大的线程数
    else if (!addWorker(command, false))
        //线程池的拒绝策略,也便是拒绝了这个使命
        reject(command);
}

在execute(ftask);办法中,呈现最多的便是addWorker()这个办法,接下来咱们看下这个办法的效果是什么

//firstTask:是咱们要提交给线程池的使命
//core:true表明的是这个线程是中心线程,也便是线程数小于中心线程数时分创立的线程,
        false,表明是能够收回的线程,表明此刻线程数现已不小于中心线程数了
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    //关于咱们要了解线程池中心原理,这个for(;;)其实没必要去深化了解,咱们能够直接忽略
    //咱们先把全体的流程搞理解,关于一些不重要的步骤咱们能够直接忽略
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 判别线程池的状况,有RUNNING,STOP,SHUTDOWN等一些状况,只需不是RUNNING状况
        //那么线程池就不会接纳新的使命,所以这儿直接会回来false,可是咱们此刻的
        //线程池状况是RUNNING,所以不会回来进入这个if分支
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            //拿到此刻线程池存活的线程数
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    //咱们直接从这儿开始剖析
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //将咱们的使命封装成一个Worker目标,此刻这儿就创立了一个新的线程
        w = new Worker(firstTask);
        //从Worker目标中拿到履行当前使命的线程
        final Thread t = w.thread;
        //判别线程是否为空
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                //这儿仍是判别线程池的状况
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //将咱们创立的新的线程放入到workers中
                    //private final HashSet<Worker> workers = new HashSet<Worker>();
                    //其实workers便是一个Set调集
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                //开释锁
                mainLock.unlock();
            }
            if (workerAdded) {
                //调用线程的start()办法去真实履行咱们的事务
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

由于咱们的使命被封装成一个Worker的目标,而Worker又完成了Runnable接口,所以在调用start()办法时分,就会履行它的run()办法

ThreadPoolExecutor线程池核心原理

// Worker中的run()办法
public void run() {
    runWorker(this);
}
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    //拿到咱们提交给线程池的使命,注意:此刻task不为空!!!!!!
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //由于此刻的task不为空,所以会直接进入到while循环中去
        while (task != null || (task = getTask()) != null) {
            w.lock();
            //判别线程是否被中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //履行使命的前置事务逻辑,由于咱们没有完成,所以这儿不做使命处理
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //真实去调用咱们的事务办法
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //履行使命的后置事务逻辑,由于咱们没有完成,所以这儿不做使命处理
                    afterExecute(task, thrown);
                }
            } finally {
                //此刻将task设置为null
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

第一次进入while循环就完毕了,完毕之后将task设置为null,由于这是个while循环,所以会再次进入这个while循环中去

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //再次进入到这个while循环中来,此刻的task现已为null了,所以第一个判别为false
        //接下来就会履行task = getTask()办法
        while (task != null || (task = getTask()) != null) {
            w.lock();
            //判别线程是否被中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //履行使命的前置事务逻辑,由于咱们没有完成,所以这儿不做使命处理
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //真实去调用咱们的事务办法
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //履行使命的后置事务逻辑,由于咱们没有完成,所以这儿不做使命处理
                    afterExecute(task, thrown);
                }
            } finally {
                //此刻将task设置为null
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    //这儿是个死循环
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 假如此刻线程池状况等于RUNING,而且行列现已为空了,就直接回来null
        //可是此刻咱们的线程池状况是RUNNING,所以不会进入到这个if分支
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            //将线程池的线程数减1
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        //allowCoreThreadTimeOut:false 表明假如没有使命,那么中心线程在超时时刻到达之后就会收回
        // true:表明中心线程不会被收回
        // 咱们能够经过 threadPoolExecutor.allowCoreThreadTimeOut(false);来设置
        // allowCoreThreadTimeOut默认值便是false,所以timed的值便是由wc > corePoolSize来操控
        //假如此刻存活的线程数大于中心线程数,那么便是true,不然便是false
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //   假如此刻存活的线程数大于最大线程数而且行列为空
        //   就履行compareAndDecrementWorkerCount(c),将线程数减1,直接回来null,
        //   回来null之后,上层while循环就直接退出,然后将这个线程收回
        //   这个if的效果便是假如此刻线程数大于最大线程数了,此刻行列又没有使命,这时分就将
        //   那么就直接将这些线程收回
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        //假如timed = true:表明此刻的线程数大于中心线程数,可是小于最大线程数,此刻直接调用
        // 堵塞行列的poll办法,并在咱们设置的存活时刻之后假如还没有获取到元素,就直接纳回这条线程
        // 假如timed = false:表明此刻的线程数小于或等于中心线程数,那么此刻就调用堵塞行列的
        // take()堵塞办法,知道行列中有使命
        //所以中心线程之所以能存活,便是由于调用了take()办法一直堵塞在这儿,
        //非中心线程是有堵塞时刻的,超过这个时刻没有使命就会被收回
        try {
            Runnable r = timed ?
                //非中心线程会收回的中心原理
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                //中心线程存活的中心原理
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}