本文主要有以下内容

  • 介绍Java中内置的线程池
  • 线程池的七大参数
  • 线程池的生命周期
  • 线程池的作业原理
  • 自界说线程池

Java内置的线程池

线程池:为了节约系统资源,减少创立线程的开支以及更好的办理线程,Java供给了一套Executor结构,封装了对多线程的控制,其体系结构如下图所示:

【多线程03】谈谈Java中的线程池

Executor自身是一个接口,其代码如下:

public interface Executor {
  void execute(Runnable command);
}

ExecutorService接口对该接口进行了扩展,添加许多办法

  • shutdown()
  • shutdowmNow()
  • isShutdown()
  • isTerminated()
  • awaitTermination()
  • submit(Callable<T>)
  • submit(Runnable,T)
  • submit(Runnable)
  • invokeAll()等重载办法

重点关注前五个办法:

  • shutdown(): 调用此办法告诉线程池 shutdown,调用此办法后,线程池不再承受新的使命,已经提交的使命不会受到影响,会按照顺序履行完毕。不会堵塞调用此办法的线程。
  • shutdowmNow(),当即测验中止一切正在运转的使命,回来一个待履行的使命列表。不会堵塞调用此办法的线程。该办法除了尽力去测验中止线程外,没有任何保证,任何呼应中止失利的线程可能永久不会中止(如:经过thread.interrupted()中止线程时)。
  • isShutdown():回来一个boolean值,假如已经 shutdown 回来true,反之false。
  • awaitTermination(timeout,timeUnit):堵塞直到一切使命全部完结,或者等候 timeout ,或者在等候timeout期间当时线程抛出InterruptedException
  • isTerminated(): 回来 true 假如一切的使命已经完结且关闭,不然回来false除非在从前已经调用过shutdown()/shutdownNow()

AbstractExecutorService是一个抽象类,完成了ExecutorService,其子类ThreadPoolExecutor进一步扩展了相关功能,在Java中,贴心的Doug Lea供给了一个东西类供咱们去运用ThreadPoolExecutor,在Executors中供给了如下几种线程池

办法名 描绘
newCachedThreadPool() 必要时创立新的线程,闲暇线程保存60s
newFixedThreadPool() 创立固定数目的线程;闲暇线程会一向保存
newWorkStealThreadExecutor() 一种适合fork-join使命的线程池,杂乱使命拆分为简单的使命,闲暇线程会来帮助
newSingleThreadExecutor() 只有一个线程的线程池,按顺序履行所提交的使命
newScheduledThreadPool() 用于调度履行的固定线程池
newSingleThreadScheduledExecutor() 用于调度履行的单线程

虽然有这么多的线程池,但都是给ThreadPoolExecutor的结构函数传递不同的参数算了!

上面所说到的线程池中,需求留意的一个线程池为newScheduledThreadPool(),他的源码如下

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  return new ScheduledThreadPoolExecutor(corePoolSize);
}

回来的是一个ScheduledThreadPoolExecutor对象,在这个类中咱们需求留意这三个办法的运用

  • public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
  • public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
  • public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay, TimeUnit unit);

schedule()会在给定的时间进行一次调度,后面的两办法会周期性的对使命进行调用,可是还有些许差异,scheduleWithFixedDelay()会在上一次使命履行完毕后等候给定的delay时间后再履行,可是假如代码运转的时长大于delay,则会在运转完毕后当即运转。scheduleAtFixedRate()则是在上次使命履行的开端时间之后的period后就履行。

scheduleAtFixedRate()的运用:

代码示例:

ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
AtomicInteger i = new AtomicInteger(1);
System.out.println("当时时间是:"+LocalDateTime.now());
executorService.scheduleAtFixedRate(() -> {
  System.out.println(LocalDateTime.now());
  System.out.println("得到" + i + "次履行拉");
  i.getAndIncrement();
    // try {
  //      Thread.sleep(3000);
  //     } catch (InterruptedException e) {
  //      e.printStackTrace();
  //     }
}, 3, 2, TimeUnit.SECONDS);

运转成果如下图:

【多线程03】谈谈Java中的线程池

从上面的运转成果能够知道,基本上每一次调度都是在上一次开端之后的 2s 之后,

留意:假如代码运转的时间超越了等候时间,则上一次调度完毕后,立马履行。

翻开注释的代码,得到的运转成果如下:

【多线程03】谈谈Java中的线程池

能够看到,每一次运转的成果时间距离并不是之前的 2s,而是 3s!!

小结: scheduleAtFixedRate的调度流程

  • 先等候 delay 时间后运转
  • 此时假如代码运转的时间 < period,则下次运转的时间是上一次开端调度的时间的period时间后。
  • 假如代码运转时间 > period,则下次运转的时间是在上一次完毕之后立马运转。

scheduleWithFixedDelay()的用法:

ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
AtomicInteger i = new AtomicInteger(1);
System.out.println("当时时间是:"+LocalDateTime.now());
executorService.scheduleWithFixedDelay(()->{
  System.out.println("运转开端: "+LocalDateTime.now());
  System.out.println("得到"+i+"履行拉");
  i.getAndIncrement();
  // try {
  //   Thread.sleep(4000);
  // } catch (InterruptedException e) {
  //   e.printStackTrace();
  // }
  System.out.println("运转完毕: "+ LocalDateTime.now());
},3,2,TimeUnit.SECONDS);

【多线程03】谈谈Java中的线程池

翻开注释,得到如下运转成果:

【多线程03】谈谈Java中的线程池

从上面两次运转的成果能够看到 scheduleWithFixedDelay()的调度距离和其代码的运转时间没有关系,相邻的距离时间固定。

线程池的七大参数

ThreadPoolExecutor供给了如下的结构办法:

public ThreadPoolExecutor(int corePoolSize,
             int maximumPoolSize,
             long keepAliveTime,
             TimeUnit unit,
             BlockingQueue<Runnable> workQueue,
             ThreadFactory threadFactory,
             RejectedExecutionHandler handler) {
  if (corePoolSize < 0 ||
    maximumPoolSize <= 0 ||
    maximumPoolSize < corePoolSize ||
    keepAliveTime < 0)
    throw new IllegalArgumentException();
  if (workQueue == null || threadFactory == null || handler == null)
    throw new NullPointerException();
  this.corePoolSize = corePoolSize;
  this.maximumPoolSize = maximumPoolSize;
  this.workQueue = workQueue;
  this.keepAliveTime = unit.toNanos(keepAliveTime);
  this.threadFactory = threadFactory;
  this.handler = handler;
}
  • corePoolSize:中心线程数,即一向存活的线程数量,

  • maximumPoolSize:线程池允许的最大线程数量

  • keepAliveTime:当线程池中的线程数量超越中心线程数量时,闲暇线程的最大的存活时间

  • unit:keepAliveTime的时间单位

  • workQueue:作业行列,用于存放未履行的使命的行列

  • threadFactory:线程工厂,创立新线程的当地

  • Handler:回绝战略,当线程池不承受使命时采取的战略

    • DiscardPolicy:直接丢掉
    • DiscardOldestPolicy:丢掉等候最长时间的使命u
    • AbortPolicy:默许的回绝战略,抛出RejectedExecutionException异常
    • CallerRunsPolicy:只需线程池不处有shutdown,则将使命交给调用者线程履行,即调用execute()办法的线程,假如处于shutdown,则会被丢掉履行。

在上面的表中,列出了Executors东西类中所供给的创立线程的办法,本质上便是7大参数的不同值。

newFixedThreadPool ,中心线程等于最大线程则为固定线程

public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                 0L, TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<Runnable>());
}

在比如newCachedThreadPool(),假如当时线程池有闲暇线程可用,则当即履行,假如没有闲暇线程,则当即创立新的线程履行,不把他放入作业行列中去。

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),threadFactory);
}

SynchronousQueueBlockingQueue的一种完成,他没有任何的内部容量,往这个行列中进行插入操作有必要等候另一个线程的remove操作,鄙人一篇并发容器的相关文章中做详细介绍。

线程池的生命周期

ThreadPoolExecutor类中界说了如下几种线程池的状态

  • RUNNING:接纳新使命和处理行列中的使命
  • SHUTDOWN:不在接纳新的使命,可是会处理等候行列中的使命
  • STOP:不在接纳新使命且不会处理等候行列中的使命,还要中止正在运转的使命
  • TIDYING:一切使命完结且作业线程数为0,调用terminate()会进入到此状态
  • TERMINATED:terminate()运转完毕之后会进入这种状态

【多线程03】谈谈Java中的线程池

线程池的作业原理

ThreadPoolExecutor这个类中,其execute()办法,展示了线程池的作业原理

源码如下:

public void execute(Runnable command) {
    if (command == null) // 1
      throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) { // 2
      if (addWorker(command, true))
        return;
      c = ctl.get();
     }
    if (isRunning(c) && workQueue.offer(command)) { //3
      int recheck = ctl.get();
      if (! isRunning(recheck) && remove(command))
        reject(command);
      else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
     }
    else if (!addWorker(command, false)) // 4
      reject(command);
   }

第一:查看传进来的对象是否为null,假如是则抛出空指针异常。

第二:假如作业线程数小于中心线程数则经过addWoker()将使命交给线程池处理,即立马履行当时的使命

第三:假如作业线程数大于等于中心线程数,则将使命加入到作业行列

第四:假如作业行列已满,则交给线程池处理,假如当时线程数小于最大线程数,则创立新的线程运转使命,反之回绝该使命

【多线程03】谈谈Java中的线程池

自界说线程池

在思考如何自界说线程池之前,需求首要回顾下线程池的七大参数:

  • 中心线程数:常驻线程池的线程
  • 最大线程数:线程池的最多的线程容量
  • 存活时间和时间单位:闲暇线程的最大存活时间
  • 作业行列:存储还没来得及处理的使命的容器
  • 线程工厂:创立新线程的当地
  • 回绝战略:说 “No” 的办法

在这7大参数中:中心线程数,最大线程数,以及存活时间和时间单位在我个人看来不是那么重要!毕竟在生产中这些参数在详细场景下都会得到确认,不会有什么特别能够定制的当地,(他如同没那么重要.jpg)

而作业行列的挑选和回绝战略则能够有较多的挑选只需是完成了BlockedQueue接口的容器都能够当作作业行列,换句话说便是只需完成了该接口,都能够充当作业行列。相同的,在 jdk 中默许完成的4种回绝战略,他们都完成了RejectedExecutionHandler接口!而这个接口的作用便是界说哪些不能够被线程池处理的使命,这个接口里面只有一个办法rejectedExecution(Runnable r, ThreadPoolExecutor executor);这个办法的调用时机便是线程池无法接纳新的使命时!

线程工厂:ThreadFactory接口中,界说了newThread(Runnable r);办法进行创立新线程。因而只需完成了改接口,也能够依据自己的意愿去创立新线程!

因而在创立自界说线程池的时分,咱们能够进行如下挑选

  • 完成ThreadFactory:界说创立线程的办法,在Executors中有默许完成,但也能够自己去完成!
  • 完成RejectedExecutionHandler:自界说回绝战略
  • 完成BlockingQueue:就能够自界说作业行列

除此之外,在Executors东西类中,其创立线程的办法如以下几种:

// Executors
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
  return new DelegatedScheduledExecutorService
     (new ScheduledThreadPoolExecutor(1));
}
// ThreadPoolExecutor
public ScheduledThreadPoolExecutor(int corePoolSize,
                  ThreadFactory threadFactory) {
  super(corePoolSize, Integer.MAX_VALUE,
     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
     new DelayedWorkQueue(), threadFactory);
}
// Executors
public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                 60L, TimeUnit.SECONDS,
                 new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                 0L, TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<Runnable>(),
                 threadFactory);
}

大多都是依据不同的作业行列,创立了不同特性的线程池!

创立自界说回绝战略的线程池

参照 netty 的代码,创立如下的回绝战略:

public class MyRejectHandler implements RejectedExecutionHandler {
  @Override
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    try {
      System.out.println("给你时机,你不中用啊");
      final Thread t = new Thread(r, "new thread execute new task");
      t.start();
     } catch (Throwable e) {
      throw new RejectedExecutionException(
          "Failed to start a new thread", e);
     }
​
   }
}

这里创立的线程池是凭借了ThreadPoolExecutor, 假如对自己的能力自信,能够自己去完成一个自己的"ThreadPoolExecutor",来给人们一点小小的震慑!

ThreadPoolExecutor executor = new ThreadPoolExecutor(2,
                           2, 
                           0, 
                           TimeUnit.SECONDS, 
                           new ArrayBlockingQueue<Runnable>(1), 
                           new MyRejectHandler());
​
for (int i = 0; i < 10; i++) {
  executor.execute(() -> {
    System.out.println("new Task submit " + LocalDateTime.now());
   });
  // try {
  //   Thread.sleep(1000);
  // } catch (InterruptedException e) {
  //   e.printStackTrace();
  // }
}

先翻开注释,怠慢使命提交,此时没有触发回绝战略

【多线程03】谈谈Java中的线程池

注释掉这段线程延时代码,能够得到如下输出,打印了七次给你时机,你不中用啊,这是由于常驻线程为2,能够处理2个使命,作业行列的容量为1,能够保存一个,因而,能够处理3个使命,而经过下面打印的时间,就能够发现在同一时间,提交了9个使命,因而处理不过来所以打印了 10- 3 = 7次

【多线程03】谈谈Java中的线程池

参考资料:

  • Java中心技术卷1
  • Java高并发程序设计
  • Executors,ThreadPoolExecutor等源码