关于keepAliveTime的面试问题

前几天遇到一个面试题: Java线程池是怎么记载线程是否过期的?

这个确实没有深度考虑过, 初步主意是线程池或许保护了一个定时器来监控线程是否闲暇到期? 可是这样会不会太重了, 而且关于规划者这种极度聪明的人,应该是有更好的办法的,带着这个问题咱们去看 execute 办法。

execute 的注释

Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current RejectedExecutionHandler.
Params:
command – the task to execute
Throws:
RejectedExecutionException – at discretion of RejectedExecutionHandler, if the task cannot be accepted for execution
NullPointerException – if command is null

在将来某个时间履行给出的task. 使命或许被多个线程履行或许被一个存在池子中的线程履行。假如使命不能被履行器提交,要么是履行器现已被封闭或许由于它达到了容量上限,使命被当时的回绝战略处理器来处理。

参数 :
command – 履行的使命

Throws: RejectExecutionException – 假如使命不被接受,由RejectExecutionHandler自行决定履行

NullPointerException – 假如command 是null

execute 的源码

public void execute(Runnable command) {
    // 假如 command 参数是null, 抛出空指针反常
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
     /**
     *  三步处理:
     * 1. 假如运转的线程少于 corePoolSize, 测验把该task发动一个新线程来履行。 
     * 调用 addWorker 办法自动查看 runState运转状况 和 workerCount 作业线程数, 
     * 同时在不能增加线程时避免错误报警,回来false.
     *
     * 2. 假如使命task被成功入队, 咱们依然需求再次查看咱们是否成功增加了线程(由于存在前次查看后逝世的状况), 
     * 或许进入这个办法之后,线程池封闭了。 所以咱们再次查看状况,假如需求就回滚排队进程,假如线程中止的话,就新发动一个。  
     * 
     * 3. 假如咱们不能入队使命,咱们会测验新建线程. 假如失利了,咱们知道被封闭了或许饱和了所以回绝了使命。
     */
    // 获取状况和线程数字段 ctl 
    int c = ctl.get();
    // 假如作业线程数 小于  中心线程数, 就入行列
    if (workerCountOf(c) < corePoolSize) {
        // 假如增加成功,就完毕了
        if (addWorker(command, true))
            return;
        // 不然就更新c, 拿到 ctl 的最新值
        c = ctl.get();
    }
    // 假如线程没有增加成功,或许运转线程数 >= 中心线程数了,就入队
    // 假如是运转状况 而且 参加行列成功
    if (isRunning(c) && workQueue.offer(command)) {
        // 再次获取池信息, 假如不是运转状况,且删去使命成功(不接受新使命),走回绝战略
        // 假如
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    } 
    // 不是运转状况 或许  入队不成功,   再调用addWorker也回来false,就走回绝逻辑,
    // (留意此刻 addWorker 第二参数是false, 和第一次调用传true不同,这参数便是是否不是中心线程,等下再看addWorker详情)
    else if (!addWorker(command, false))
        reject(command);
}

这儿要了解的便是,何时加线程,何时放行列,即,

运转线程数少于中心线程数, 就加线程(用 addWorker 办法),运转线程数大于等于中心线程数, 就放行列,假如行列满了,就持续增加线程,此刻便是非中心线程

addWorker

看addWorker 这个办法, 看是怎么新增线程的:

/*
 * Methods for creating, running and cleaning up after workers
 */

看办法最上一层注释, 创立、运转、整理。

Checks if a new worker can be added with respect to current pool state and the given bound (either core or maximum). If so, the worker count is adjusted accordingly, and, if possible, a new worker is created and started, running firstTask as its first task. This method returns false if the pool is stopped or eligible to shut down. It also returns false if the thread factory fails to create a thread when asked. If the thread creation fails, either due to the thread factory returning null, or due to an exception (typically OutOfMemoryError in Thread.start()), we roll back cleanly.
Params:
firstTask – the task the new thread should run first (or null if none). Workers are created with an initial first task (in method execute()) to bypass queuing when there are fewer than corePoolSize threads (in which case we always start one), or when the queue is full (in which case we must bypass queue). Initially idle threads are usually created via prestartCoreThread or to replace other dying workers.
core – if true use corePoolSize as bound, else maximumPoolSize. (A boolean indicator is used here rather than a value to ensure reads of fresh values after checking other pool state).
Returns:
true if successful

查看一个worker 是否能被增加到线程池, 是依据线程池状况和给定的鸿沟(core 或 maximum)来确定的。
假如是这样,worker计数将相应地进行调整,假如或许的话,将创立并发动一个新的worker,并将 firstTask 作为第一个使命运转. 这个办法假如pool 是stopped 或许能够封闭,就回来false. 假如线程工厂创立线程失利,也会回来false. 假如线程创立失利, 由于是线程工厂回来null, 或许由于反常(典型如Thread.start()时分的 OutOfMemoryError ) , 咱们会清晰回滚。

参数:
firstTask – 新线程需求跑的使命。 当线程数少于中心线程或许行列满了(或许要绕过线程), 咱们通常启一个新的线程。 初始化闲暇线程通常被创立经过prestartCoreThread 或许替代其他逝世线程。

core – 假如true ,使用 corePoolSize为鸿沟, 不然便是 maximumPoolSize.


private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker 第一个 for循环

先看第一个标记了 retry 的for循环,

第一个if判别, 在非RUNNING状况, 而且也不是 (SHUTDOWN && task==null && workQueue is not empty) , 就直接回来false, 也便是新增worker失利。

这儿的问题是, RUNNING状况新增成功就不说了,为何 SHUTDOWN && task==null && workQueue is not empty 也能够增加成功呢?

要回答这个,咱们就要再回忆下线程池SHUTDOWN状况时分的特点了:

SHUTDOWN: Don’t accept new tasks, but process queued tasks

不接受新使命,可是会处理还在行列中的使命。 那么请问,假如线程池现已是SHUTDOWN了,而且此刻作业的线程现已是0了, 那么还在行列中的使命要怎么处理? 当然是新增一个线程来处理。具体便是调用 addWorker(null, false) 来完成。 所以此刻 addWorker的完成就有了这个判别 。

接着看下面完成,持续一个for死循环,
拿到作业线程数,判别超越池子容量,或许抵达线程池设定的容量(corePoolSize或 maximumPoolSize), 直接回来false.

不然就直接走 compareAndIncrementWorkerCount(c), 增加作业线程数, 跳出循环 break retry;
假如增加线程数不成功(CAS读取workercount失利,被改了),从头读取 ctl, 此刻的 rs变化的话, 从头走retry逻辑, 没变的话持续此处内部循环

retry:
    for (;;) {
        // 获取ctl
        int c = ctl.get();
        // 获取pool状况runState
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        // 线程池的状况假如是SHUTDOWN以上,即所有非 RUNNING状况, 而且
        // 非 ( 假如是SHUTDOWN状况 而且 task==null 而且 作业行列workQueue不为空)
        // 直接回来false
        if (rs >= SHUTDOWN &&
            ! (
                rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()
               )
           )
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

addWorker 实体

增加完workerCount后, 就走进了addWorker的中心逻辑。

首先是定义了 workerStarted , workerAdded 布尔值,还有个 临时变量 Worker w, 这个Worker便是咱们要参加作业队里列的对象,也便是一个线程类,稍后看。

持续看addWorker代码结构:

try {
    1. 新建 worker 线程
    锁{
        2. 获取线程池当时状况
        3. 假如是RUNNING 或许 (SHUTDOWN 而且 firstTask==null) // 运转或许SHUTDOWN要增加一个线程
        3. workers 增加一个线程对象w, workers是一个HashSet结构,存储线程对象
        4. 更新最大池子变量值
        5. 更新增加线程为成功 workerAdded = true
    }
    假如增加成功,发动线程
} finally {
    if worker发动失利
        走addWorkerFailed(w)逻辑
}
return worker发动成功仍是失利

这儿总结下 addWorker的首要功能:

  1. 当 RUNNING 状况时分 新建线程并发动
  2. 当 SHUTDOWN 状况时分,假如是 firstTask==null, 就新建线程并发动

Worker究竟是什么

Worker是AQS的一个完成.

先看注释说了什么:

Class Worker mainly maintains interrupt control state for threads running tasks, along with other minor bookkeeping. This class opportunistically extends AbstractQueuedSynchronizer to simplify acquiring and releasing a lock surrounding each task execution. This protects against interrupts that are intended to wake up a worker thread waiting for a task from instead interrupting a task being run. We implement a simple non-reentrant mutual exclusion lock rather than use ReentrantLock because we do not want worker tasks to be able to reacquire the lock when they invoke pool control methods like setCorePoolSize. Additionally, to suppress interrupts until the thread actually starts running tasks, we initialize lock state to a negative value, and clear it upon start (in runWorker)

Worker类和其他小型记载器,首要保护了线程运转使命的中止状况操控。 这个类合适承继了 AQS 来简化 获取释放锁。
这能够防止中止,这些中止旨在唤醒正在等候使命的作业线程,而不会中止正在运转的使命。 咱们完成了一个简的非可重入复用排他锁而不是使用 ReentrantLocak, 由于咱们不想 worker 的使命在调用池子的操控办法像 setCorePoolSize 时分能够获取到锁。 更多的, 为了按捺中止知道线程实际开端运转使命, 咱们初始化锁状况为一个复数值, 并在 runWorker办法发动时整理掉。

所以,除了看下Worker的锁规划, 最重要的仍是要看下 runWorker办法

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;
    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;
    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }
    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

Worker 的runWorker办法

先看runWorker的注释.

首要的worker运转循环。重复从队里额获取使命然后履行他们,同时处理一些问题:

  1. 咱们能够从一个初始使命开端,这样咱们就不需求获得第一个使命。或许,只需池子运转中, 咱们能够从getTask获取使命。 假如它回来null,由于池状况改变或许装备参数原因,作业线程worker会退出。 其他退出是由外部代码中的反常抛出导致的,在这种状况下,CompletedAbruply 保持不变,这通常会导致 processWorkerExit 替换此线程。
  2. 在运转任何使命之前, 获取锁来避免其他池子在使命履行时分中止,之后咱们确保除非池子中止, 这个线程没有自己的中止。
  3. 每个使命之前有一个 beforeExecute, 或许会抛出反常, 这时就让线程逝世不处理使命(completedAbruptly 为true 打破循环)
  4. 假如 beforeExecute 正常运转,咱们就运转使命, 搜集任何它抛出的反常发给 afterExecute来处理。 咱们分隔处理 RuntimeException, Error 和 Throwables. 由于咱们不能在Runnable.run 里再次抛出, 咱们包装他们进Errors的办法出来(给线程的 UncaughtExceptionHandler)。 任何抛出反常也会形成线程逝世。
  5. task.run完成后, 咱们调用afterExecution ,或许也会抛出一个反常,也会形成线程逝世。 依据JLS sec 14.20, 即使是task.run抛出的反常也会收效。

反常机制的终究作用是,在 afterExecution 和线程的 UncaughtExceptionHandler ,咱们能够提供关于用户代码遇到的任何问题的精确信息。

Main worker run loop. Repeatedly gets tasks from queue and executes them, while coping with a number of issues:

  1. We may start out with an initial task, in which case we don’t need to get the first one. Otherwise, as long as pool is running, we get tasks from getTask. If it returns null then the worker exits due to changed pool state or configuration parameters. Other exits result from exception throws in external code, in which case completedAbruptly holds, which usually leads processWorkerExit to replace this thread.
  2. Before running any task, the lock is acquired to prevent other pool interrupts while the task is executing, and then we ensure that unless pool is stopping, this thread does not have its interrupt set.
  3. Each task run is preceded by a call to beforeExecute, which might throw an exception, in which case we cause thread to die (breaking loop with completedAbruptly true) without processing the task.
  4. Assuming beforeExecute completes normally, we run the task, gathering any of its thrown exceptions to send to afterExecute. We separately handle RuntimeException, Error (both of which the specs guarantee that we trap) and arbitrary Throwables. Because we cannot rethrow Throwables within Runnable.run, we wrap them within Errors on the way out (to the thread’s UncaughtExceptionHandler). Any thrown exception also conservatively causes thread to die.
  5. After task.run completes, we call afterExecute, which may also throw an exception, which will also cause thread to die. According to JLS Sec 14.20, this exception is the one that will be in effect even if task.run throws.

The net effect of the exception mechanics is that afterExecute and the thread’s UncaughtExceptionHandler have as accurate information as we can provide about any problems encountered by user code.
Params:
w – the worker

再来看runWorker的源码, 依据以上注释,咱们知道在 run之前有个 beforeExecution, run之后有 afterExecution. 而 最终的finally 有个处理线程封闭的办法 processWorkerExit.

再来看只需逻辑, 重要的是while循环条件部分:

while (task != null || (task = getTask()) != null)

这儿第一个task来历便是addWorker传参过来的task, 运转此刻的使命。 第二个来历便是 task= getTask() , 是从堵塞列的取出的 task. 由此可见, 只需task行列里不为空 或许 传参Task不为空, 这个循环就会一向运转下去, 也便是该线程不断获取需求履行的 task , 这便是池子的意义, 避免了创立毁掉线程的开支 ,直接获取可履行使命,复用线程

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

processWorkerExit(w, completedAbruptly)

再来看看这个线程退出办法:
在runWorker的最终finally直接调用了退出办法 processWorkerExit.

这个办法首要便是在保存线程的 HashSet<Worker> 容器里删去该Worker,

Performs cleanup and bookkeeping for a dying worker. Called only from worker threads. Unless completedAbruptly is set, assumes that workerCount has already been adjusted to account for exit. This method removes thread from worker set, and possibly terminates the pool or replaces the worker if either it exited due to user task exception or if fewer than corePoolSize workers are running or queue is non-empty but there are no workers.

对一个正在逝世的worker 履行整理和记账操作。 仅从作业线程调用。 除非 completedAbruptly 设置了值,假设 workerCount 现已调整为考虑退出。 这个办法删去线程容器中的线程, 或许封闭池子或许替换worker 假如用户使命反常或许 运转的线程小于 corePoolSize 或许队里额非空可是没有worker了。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

keepAliveTime 哪里保护并封闭线程的

回到最初的问题,咱们设置了 keepAliveTime, 在哪里收效的?

翻遍runWorker办法,也没有发现对keepAliveTime的特别处理。 不由要从头开端考虑, 依据改runWorker办法,履行完就直接调用了封闭线程操作,那么在哪里计时的? 咱们知道 线程要么在作业中,要么在闲暇中。 闲暇超越keepAliveTime时间,就会被回收, 所以, 此刻task是履行完了, 行列肯定是空的, 咱们重点看 task = getTask() 办法

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

重点在这一句:
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();

workQueue.poll(keepAliveTime, TimeUnit.NANAOSECONDS) , 这句用上了keepAliveTime, 咱们知道这是一个堵塞行列, 假如行列为空, 就堵塞等候 keepAliveTime 后回来null, 这样timeOut 便是true了, 回来到runWorker 完毕就能够直接调用processExit办法了。

/**
 * Retrieves and removes the head of this queue, waiting up to the
 * specified wait time if necessary for an element to become available.
 *
 * @param timeout how long to wait before giving up, in units of
 *        {@code unit}
 * @param unit a {@code TimeUnit} determining how to interpret the
 *        {@code timeout} parameter
 * @return the head of this queue, or {@code null} if the
 *         specified waiting time elapses before an element is available
 * @throws InterruptedException if interrupted while waiting
 */
E poll(long timeout, TimeUnit unit)
    throws InterruptedException;