我正在参加「启航方案」

1. 布景

某年某月某日,和我的卧龙搭档聊一个需求,说是有个数据查询的功用,由于涉及到多个第三方接口调用,想用线程池并行来做。

很正常的一个方案,可是上线后发现,每次服务发布的时分,这个数据查询的功用就会挂掉,后来发现是线程池没有做好封闭,这儿总结一下。

如何优雅的关闭线程池

关键字:线程池;shutdown;shutdownNow;interrupt

2. 线程中止 interrupt

先补一补根底的常识:线程中止。

线程中止的意义,并不是强制把运转中的线程给“咔嚓”中止,而是把线程的中止标志方位为true,这样等线程之后堵塞(wait、join、sleep)的时分,就会抛出 InterruptedException,程序通过捕获 InterruptedException 来做必定的善后处理,然后让线程退出。

来看个比如,下面这段代码是起一个线程,打印一百行文本,打印进程中,会把线程的中止标志方位为true

public static void test02() throws InterruptedException {
    Thread t = new Thread(() -> {
    for (int i = 0; i < 100; i++) {
        System.out.println("process i=" + i + ",interrupted:" + Thread.currentThread().isInterrupted());
    }
    });
    t.start();
    Thread.sleep(1);
    t.interrupt();
}

看看控制台的输出,发现在打印到 57 的时分,中止标志位现已成功置为true了,可是线程任然在打印,阐明仅仅设置了中止标志位,而不是直接粗暴的把线程中止。

process i=55,interrupted:false

process i=56,interrupted:false

process i=57,interrupted:true

process i=58,interrupted:true

process i=59,interrupted:true

再看看这个示例,同样是打印一百行文本,打印进程中会判别中止标志位,假如中止就自行退出。

public static void test02() throws InterruptedException {
    Thread t = new Thread(() -> {
    for (int i = 0; i < 100; i++) {
        if (Thread.interrupted()) {
            System.out.println("线程已中止,退出履行");
            break;
        }
        System.out.println("process i=" + i + ",interrupted:" + Thread.currentThread().isInterrupted());
    }
    });
    t.start();
    Thread.sleep(1);
    t.interrupt();
}

控制台输出如下,:

process i=49,interrupted:false

process i=50,interrupted:false

process i=51,interrupted:false

线程已中止,退出履行

3. 线程池的封闭 shutdown 办法

了解完线程中止,再来看看线程池的封闭办法。

封闭线程池有两个办法 shutdown() 和 shutdownNow(),具体有什么区别?咱们先来看看 shutdown() 办法

    /**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN); // 1. 把线程池的状况设置为 SHUTDOWN
            interruptIdleWorkers(); // 2. 把闲暇的作业线程置为中止
            onShutdown(); // 3. 一个空完成,暂不用重视
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

看源码先看注释,我用我英语四级的超高水准水平翻译下:

发动有序封闭会履行以前提交的使命,但不承受任何新使命。

假如现已封闭,则调用不会产生额外的影响。

此办法不等候活动履行的使命停止。假如需要,可运用 awaitTermination() 做到这一点。

3.1 第一步:advanceRunState(SHUTDOWN) 把线程池置为 SHUTDOWN

线程池状况流通如下。调用 shutdown() 办法会把线程池的状况置为 SHUTDOWN,后续再往线程池提交使命就会被拒绝(execute() 办法中做了判别)。

如何优雅的关闭线程池

3.2 第二步:interruptIdleWorkers() 把闲暇的作业线程置为中止

interruptIdleWorkers() 办法遍历一切的作业线程,假如 tryLock() 成功,就把线程置为中止。

这儿,假如 tryLock() 成功,阐明对应的 woker 是一个闲暇的,没有在履行使命的线程,假如没成功,阐明对应的 worker 正在履行使命。也便是说,这儿的中止,对正在履行中的使命并没有影响。

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

3.3 第三步:onShutdown() 一个空完成,暂不用重视

这个没啥,便是个留空的办法。

如何优雅的关闭线程池

3.4 总结

shutdown() 办法干两件事:

  1. 把线程池状况置为 SHUTDOWN 状况
  2. 中止闲暇线程

咱们来看个比如,加深下形象。

    public static void test01() throws InterruptedException {
        // corePoolSize 是 2,maximumPoolSize 是 2
        ThreadPoolExecutor es = new ThreadPoolExecutor(2, 2,
                60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>());
        es.prestartAllCoreThreads(); // 发动一切 worker
        es.execute(new Task()); // Task是一个访问某网站的 HTTP 恳求,跑的慢,后面会贴出来完好代码,这儿把他作为一个跑的慢的异步使命就行
        es.shutdown();
        es.execute(new Task()); // 在线程池 shutdown() 后 持续增加使命,这儿预期是抛出异常
    }

这个比如咱们首要观察两个现象。

一个是线程池会有两个woker( prestartAllCoreThreads() 办法的调用使得已发动就有两个 worker),其中一个正在履行,一个处于闲暇。 所以当调用shutdown() 办法,走进 interruptIdleWorkers() 的时分,只有那个闲暇的线程会调用 t.interrupt()。

如何优雅的关闭线程池

第二个是调用 shutdown() 办法后,再调用 execute() 时,会抛出异常,由于线程池的状况现已置为 SHUTDOWN,不再承受新的使命增加进来。

如何优雅的关闭线程池

4. 线程池的封闭 shutdownNow 方式

    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * cancels tasks via {@link Thread#interrupt}, so any task that
     * fails to respond to interrupts may never terminate.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP); // 1:把线程池设置为STOP
            interruptWorkers(); // 2.中止作业线程
            tasks = drainQueue(); // 3.把线程池中的使命都 drain 出来
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

注释的意思是:

测验中止一切正在履行的使命,暂停正在等候的使命的处理,并回来等候履行的使命列表。从该办法回来时,这些使命将从使命队列中清空(移除)。

此办法不等候活动履行的使命停止。假如需要,可运用 awaitTermination() 做到这一点。

除了尽最大努力测验中止处理主动履行的使命之外,没有其他确保。

此完成通过 Thread.Interrupt() 取消使命,因而任何无法呼应中止的使命都可能永久不会停止。

4.1 第一步:advanceRunState() 把线程池设置为STOP

和 shutdown() 办法不同的是,shutdownNow() 办法会把线程池的状况设置为 STOP。

如何优雅的关闭线程池

4.2 第二步:interruptWorkers() 中止作业线程

interruptWorkers() 如下,能够看到,和 shutdown() 办法不同的是,一切的作业线程都调用了 interrupt() 办法

    /**
     * Interrupts all threads, even if active. Ignores SecurityExceptions
     * (in which case some threads may remain uninterrupted).
     */
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

4.3 第三步:drainQueue() 把线程池中的使命都 drain 出来

drainQueue() 办法如下,把堵塞队列里边等候的使命都拿出来,并回来。封闭线程池的时分,能够根据这个特性,把回来的使命都打印出来,做个记录。

    /**
     * Drains the task queue into a new list, normally using
     * drainTo. But if the queue is a DelayQueue or any other kind of
     * queue for which poll or drainTo may fail to remove some
     * elements, it deletes them one by one.
     */
    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }

4.4 总结

shutdownNow() 办法干三件事:

  1. 把线程池状况置为 STOP 状况
  2. 中止作业线程
  3. 把线程池中的使命都 drain 出来并回来

咱们来看个比如,代码合刚才的相同,仅仅封闭线程用的是shutdownNow()

    public static void test01() throws InterruptedException {
        // corePoolSize 是 1,maximumPoolSize 是 1,无限容量
        ThreadPoolExecutor es = new ThreadPoolExecutor(1, 1,
                60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>());
        es.prestartAllCoreThreads(); // 发动一切 worker
        es.execute(new Task()); // Task是一个访问某网站的 HTTP 恳求,跑的慢,后面会贴出来完好代码,这儿把他作为一个跑的慢的异步使命就行
        es.execute(new Task());
        List<Runnable> result = es.shutdownNow();
        System.out.println(result);
        es.execute(new Task()); // 在线程池 shutdownNow() 后 持续增加使命,这儿预期是抛出异常
    }

这个比如咱们首要观察三个现象。 一个是线程池有两个woker,所以当调用shutdownNow() 办法,走进 interruptWorkers() 的时分,一切的 woker 都会调用 t.interrupt()。

如何优雅的关闭线程池

第二个是 shutdownNow() 办法会回来还没来得及履行的task,并打印出来。

第三个是调用 shutdownNow() 办法后,再调用 execute() 时,会抛出异常,由于线程池的状况现已置为 STOP,不再承受新的使命增加

如何优雅的关闭线程池

5. 实战,与 JVM 钩子合作

实际作业中,咱们一般是运用 shutdown() 办法,由于它比较“温和”,会等候咱们把线程池中的使命都履行完,这儿也已 shutdown() 办法为例。

咱们回到最最初聊到的那个 case,机器从头发布,可是线程池中还有没履行完使命,机器一关,这些使命悉数被kill,怎么办呢?有什么机制能够堵塞一下,等候这个使命履行完再封闭吗?

有的,用 JVM 的钩子!(深入了解 JVM 钩子能够再看看这篇博文:扫盲 JVM 安全退出机制:shutdownHook,signalHandler)

实例代码如下,一个线程池,提交了三个使命去履行,履行完得半分钟。然后增加一个JVM的钩子,这个钩子能够简单理解为监听器,注册后,JVM在封闭的时分就会调用这个办法,调用完才会正式封闭JVM。

    public static void test01() throws InterruptedException {
        ThreadPoolExecutor es = new ThreadPoolExecutor(1, 1,
                60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>());
        es.execute(new Task());
        es.execute(new Task());
        es.execute(new Task());
        Thread shutdownHook = new Thread(() -> {
            es.shutdown();
            try {
                es.awaitTermination(3, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                e.printStackTrace();
                System.out.println("等候超时,直接封闭");
            }
        });
        Runtime.getRuntime().addShutdownHook(shutdownHook);
    }

在机器上履行,会发现,我运用 ctrl + c (注意不是ctrl + z )封闭进程,会发现进程并没有直接封闭,线程池任然履行,一向等到线程池的使命履行完,进程才会正式退出。

如何优雅的关闭线程池

怎么样,是不是很神奇。

本文中涉及的 Task 的源码如下。这个使命是对 stackoverflow 网站发起 10 次恳求,用来模仿跑的比较慢的使命,当然这不是要点,能够疏忽,有爱好动手试一下本文代码的同学能够参阅下。

    public static class Task implements Runnable {
        @Override
        public void run() {
            System.out.println("task start");
            for (int i = 0; i < 10; i++) {
                httpGet();
                System.out.println("task execute " + i);
            }
            System.out.println("task finish");
        }
        private void httpGet() {
            String url = "https://stackoverflow.com/";
            String result = "";
            BufferedReader in = null;
            try {
                String urlName = url;
                URL realUrl = new URL(urlName);
                // 打开和URL之间的连接
                URLConnection conn = realUrl.openConnection();
                // 设置通用的恳求特点
                conn.setRequestProperty("accept", "*/*");
                conn.setRequestProperty("connection", "Keep-Alive");
                conn.setRequestProperty("user-agent",
                        "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)");
                // 建立实际的连接
                conn.connect();
                // 获取一切呼应头字段
                Map<String, List<String>> map = conn.getHeaderFields();
//                 遍历一切的呼应头字段
//                for (String key : map.keySet()) {
//                    System.out.println(key + "--->" + map.get(key));
//                }
                // 定义BufferedReader输入流来读取URL的呼应
                in = new BufferedReader(
                        new InputStreamReader(conn.getInputStream()));
                String line;
                while ((line = in.readLine()) != null) {
                    result += "/n" + line;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            // 运用finally块来封闭输入流
            finally {
                try {
                    if (in != null) {
                        in.close();
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
//            System.out.print(result);
        }
    }

6. 总结

想要高雅的封闭线程池,首先要理解线程中止的意义。

其次,封闭线程池有两种方式:shutdown() 和 shutdownNow(),二者最大的区别是 shutdown() 仅仅把闲暇的 woker 置为中止,不影响正在运转的woker,并且会持续把待履行的使命给处理完。shutdonwNow() 则是把一切的 woker 都置为中止,待履行的使命悉数抽出并回来,日常作业中更多是运用 shutdown()。

最终,单纯的运用 shutdown() 也不靠谱,还得运用 awaitTermination() 和 JVM 的钩子,才算高雅的封闭线程池。

都看到这了,给个赞吧

如何优雅的关闭线程池

7. Ref

  • 怎么高雅的封闭Java线程池 www.cnblogs.com/qingquanzi/…
  • java 线程池状况及状况转换 www.yht7.com/news/188264