java结构中例如Tomcat、Dubbo等都离不开线程池,这些结构用到线程的当地,都会用线程池来负责。咱们在运用这些结构的时分,会设置线程池参数,用于进步功用。那么开多少线程合适?今天咱们将围绕这个问题来学习一下线程池。

为什么运用线程池

往常咱们运用java线程的时分,都是直接创立一个Thread目标,java线程的创立和毁掉都会涉及到Thread目标的创立和毁掉,线程切换等问题。创立Thread目标,仅仅是在 JVM 的堆里分配一块内存而已;而创立一个线程,却需求调用操作体系内核的 API,然后操作体系要为线程分配一系列的资源,这个成本就很高了。所以线程是一个重量级的目标,应该防止频频创立和毁掉。

一般能够经过“池化”思想来处理上述的问题,而JDK中供给的线程池完结是根据ThreadPoolExecutor

运用线程池能够带来一系列优点:

  • 降低资源耗费:经过池化技术重复运用已创立的线程,降低线程创立和毁掉造成的损耗。
  • 进步响应速度:使命抵达时,无需等候线程创当即可当即履行。
  • 进步线程的可管理性:线程是稀缺资源,假如无限制创立,不只会耗费体系资源,还会因为线程的不合理散布导致资源调度失衡,降低体系的稳定性。运用线程池能够进行共同的分配、调优和监控。
  • 供给更多更强大的功用:线程池具有可拓展性,答应开发人员向其中添加更多的功用。比方延时守时线程池ScheduledThreadPoolExecutor,就答应使命延期履行或定期履行。

线程池中心设计与完结

总体设计

java线程-如何正确使用java线程池

  • 顶层接口是Executorjava.util.concurrent.Executor#execute,用户只需供给Runnable目标,将使命的运转逻辑提交到履行器(Executor)中,由Executor结构完结线程的调配和使命的履行部分。

  • ExecutorService接口扩展了Executor并添加了一些能力:

    • 扩充履行使命的能力,经过调用submit()或许invokeAll()办法能够为一个或一批异步使命生成Future的办法;
    • 供给了管控线程池的办法,比方调用shutdown()等办法中止线程池的运转。
  • AbstractExecutorService则是上层的抽象类,将履行使命的流程串联了起来,确保下层的完结只需重视一个履行使命的办法即可。

  • 详细完结类是ThreadPoolExecutorThreadPoolExecutor将会一方面保护本身的生命周期,另一方面一起管理线程和使命,使两者杰出的结合从而履行并行使命。

  • ScheduledThreadPoolExecutor又扩展了ThreadPoolExecutorScheduledExecutorService接口,添加了调度能力,使使命能够延时守时履行。

  • 别的还有一个供给了线程池创立的工厂办法的类Executors,用来创立线程池。

本章首要阐明ThreadPoolExecutor的完结原理,ScheduledThreadPoolExecutor下篇会讨论。

ThreadPoolExecutor完结原理

ThreadPoolExecutor结构参数阐明

ThreadPoolExecutor(
 int corePoolSize,
 int maximumPoolSize,
 long keepAliveTime,
 TimeUnit unit,
 BlockingQueue<Runnable> workQueue,
 ThreadFactory threadFactory,
 RejectedExecutionHandler handler) 
​
  • corePoolSize:表明线程池保有的最小线程数。中心线程数,这些中心线程一旦被创立,就不会被毁掉。相反,假如对错中心线程,等使命履行完并长期未被运用则会被毁掉。

  • maximumPoolSize:表明线程池创立的最大线程数。

  • keepAliveTime&unit:一个线程假如在一段时间内,都没有履行使命,阐明很闲,keepAliveTimeunit便是用来界说这个一段时间的参数。也便是说,假如线程现已闲暇了keepAliveTimeunit这么久了,而且线程数大于corePoolSize,那么这个闲暇线程就要被收回。

  • workQueue:用来存储使命,当有新的使命恳求线程处理时,假如中心线程池已满,那么新来的使命会加入workQueue行列中,workQueue是一个堵塞行列。

  • threadFactory:经过这个参数能够自界说怎么创立线程。

  • handler:经过这个参数能够自界说使命的回绝战略。假如线程池中一切的线程都在繁忙,而且作业行列也满了(前提是作业行列是有界行列),那么此时提交使命,线程池就会回绝接纳。至于回绝的战略,能够经过这个参数来指定

    ThreadPoolExecutor现已供给了四种战略。

    1. CallerRunsPolicy:提交使命的线程自己去履行该使命。
    2. AbortPolicy:默许的回绝战略,会throws RejectedExecutionException.
    3. DiscardPolicy:直接丢掉使命,没有任何反常输出。
    4. DiscardOldestPolicy:丢掉最老的使命,其实便是把最早进入作业行列的使命丢掉,然后把新使命加入到作业行列。

ThreadPoolExecutor履行流程

public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
  int c = ctl.get();
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
      return;
    c = ctl.get();
   }
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
      reject(command);
    else if (workerCountOf(recheck) == 0)
      addWorker(null, false);
   }
  else if (!addWorker(command, false))
    reject(command);
}
  1. 首要检测线程池运转状况,假如不是RUNNING,则直接回绝,线程池要确保在RUNNING的状况下履行使命。
  2. 假如workerCount < corePoolSize,则创立并发动一个线程来履行新提交的使命。
  3. 假如workerCount >= corePoolSize,且线程池内的堵塞行列未满,则将使命添加到该堵塞行列中。
  4. 假如workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的堵塞行列已满,则创立并发动一个线程来履行新提交的使命。
  5. 假如workerCount >= maximumPoolSize,而且线程池内的堵塞行列已满, 则依据回绝战略来处理该使命, 默许的处理方式是直接抛反常。

java线程-如何正确使用java线程池

线程池运转状况

线程池的运转状况,由线程池内部保护,线程池内部运用AtomicInteger变量,用于保护运转状况runState和作业线程数workerCount,高3位保存runState,低29位保存workerCount,两个变量之间互不搅扰。用一个变量去存储两个值,可防止在做相关决策时,出现不共同的状况,不用为了保护两者的共同,而占用锁资源。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
​
// COUNT_BITS=29,(对于int长度为32来说)表明线程数量的字节位数
private static final int COUNT_BITS = Integer.SIZE - 3;
// 状况掩码,高三位是1,低29位满是0,能够经过 ctl&COUNT_MASK 运算来获取线程池状况
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
​
​
private static final int RUNNING  = -1 << COUNT_BITS; // 111 00000 00000000 00000000 00000000;
private static final int SHUTDOWN  = 0 << COUNT_BITS; // 000 00000 00000000 00000000 00000000; 
private static final int STOP    = 1 << COUNT_BITS; // 001 00000 00000000 00000000 00000000;
private static final int TIDYING  = 2 << COUNT_BITS; // 010 00000 00000000 00000000 00000000;
private static final int TERMINATED = 3 << COUNT_BITS; // 011 00000 00000000 00000000 00000000;// 核算当时运转状况
private static int runStateOf(int c)   { return c & ~COUNT_MASK; }
// 核算当时线程数量
private static int workerCountOf(int c)  { return c & COUNT_MASK; }
//经过状况和线程数生成ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
状况 描绘
RUNNING 能接受新的使命,也能处理堵塞行列中的使命
SHUTDOWN 封闭状况,不能接受新的使命,只能处理堵塞行列中的使命
STOP 不能接受新的使命,也不能处理堵塞行列中的使命,会中止正在处理使命的线程
TIDYING 一切使命都中止了,workerCount为0
TERMINATED 在履行terminated()办法会进入到这个状况

状况搬运:

java线程-如何正确使用java线程池

堵塞行列

再介绍线程池总体设计的时分,说过线程池的设计,采用的都是生产者 – 顾客形式,其完结首要便是经过BlockingQueue来完结的,意图是将使命和线程两者解耦,堵塞行列缓存使命,作业线程从堵塞行列中获取使命。

运用不同的行列能够完结不一样的使命存取战略。在这里,咱们能够再介绍下堵塞行列的成员:

堵塞行列 描绘
ArrayBlockingQueue 根据数组完结的有界行列,支撑公正锁和非公正锁
LinkedBlockingQueue 根据链表完结的有界行列,行列巨细默许为Integer.MAX_VALUE,所以默许创立该行列会有容量危险
PriorityBlockingQueue 支撑优先级排序的无界行列,不能确保同优先级的顺序
DelayQueue 根据PriorityBlockingQueue完结的延期行列,只有当延时期满了,才能从中取出元素
SynchronousQueue 同步行列,不存储任何元素,调用一次put()就必须等候take()调用完。支撑公正锁和非公正锁
LinkedTransferQueue 根据链表完结的无界行列,多了transfer()tryTransfer()办法
LinkedBlockingDeque 根据双向链表完结的行列,多线程并发时,能够将锁的竞赛最多降到一半

Worker

Worker整体设计

  • Worker继承了AQS,运用AQS来完结独占锁这个功用。没有运用可重入锁ReentrantLock,而是运用AQS,为的便是完结不可重入的特性去反应线程现在的履行状况。
  • Worker完结了Runnable接口,持有一个线程thread,一个初始化的使命firstTaskthread是在调用结构办法时经过ThreadFactory来创立的线程,能够用来履行使命;
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
  final Thread thread;//Worker持有的线程
  Runnable firstTask;//初始化的使命,能够为null
 
  Worker(Runnable firstTask) {
   setState(-1); // inhibit interrupts until runWorker
   this.firstTask = firstTask;
   this.thread = getThreadFactory().newThread(this);
   }
 
  public void run() {
   runWorker(this);
   }
 
 // ...省略其他代码
}

Worker怎么添加使命

private boolean addWorker(Runnable firstTask, boolean core) {
  retry:
  for (int c = ctl.get();;) {
    // Check if queue empty only if necessary.
    if (runStateAtLeast(c, SHUTDOWN)
      && (runStateAtLeast(c, STOP)
        || firstTask != null
        || workQueue.isEmpty()))
      return false;
​
    for (;;) {
      if (workerCountOf(c)
        >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
        return false;
      if (compareAndIncrementWorkerCount(c))
        break retry;
      c = ctl.get(); // Re-read ctl
      if (runStateAtLeast(c, SHUTDOWN))
        continue retry;
      // else CAS failed due to workerCount change; retry inner loop
     }
   }
​
  boolean workerStarted = false;
  boolean workerAdded = false;
  Worker w = null;
  try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
        // Recheck while holding lock.
        // Back out on ThreadFactory failure or if
        // shut down before lock acquired.
        int c = ctl.get();
​
        if (isRunning(c) ||
           (runStateLessThan(c, STOP) && firstTask == null)) {
          if (t.getState() != Thread.State.NEW)
            throw new IllegalThreadStateException();
          workers.add(w);
          workerAdded = true;
          int s = workers.size();
          if (s > largestPoolSize)
            largestPoolSize = s;
         }
       } finally {
        mainLock.unlock();
       }
      if (workerAdded) {
        t.start();
        workerStarted = true;
       }
     }
   } finally {
    if (! workerStarted)
      addWorkerFailed(w);
   }
  return workerStarted;
}

addWorker()办法有两个参数:

  • firstTask用它来保存传入的第一个使命,这个使命能够有也能够为null。假如这个值对错空的,那么线程就会在发动初期当即履行这个使命,也就对应中心线程创立时的状况;假如这个值是null,那么就需求创立一个线程去履行workQueue中的使命,也就对错中心线程的创立。
  • core参数为true表明在新增线程时会判断当时活动线程数是否少于corePoolSizefalse表明新增线程前需求判断当时活动线程数是否少于maximumPoolSize

详细流程如下:

java线程-如何正确使用java线程池

Worker怎么获取使命

使命的履行有两种可能:一种是使命直接由新创立的线程履行。另一种是线程从使命行列中获取使命然后履行,履行完使命的闲暇线程会再次去从行列中恳求使命再去履行。

第一种在上述addWorker()办法中,假如firstTask不为空的话,会直接运转。第二种firstTask为空,使命将从workQueue中获取,调用getTask()办法

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?for (;;) {
      int c = ctl.get();
      // Check if queue empty only if necessary.
      if (runStateAtLeast(c, SHUTDOWN)
        && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
        decrementWorkerCount();
        return null;
       }
      int wc = workerCountOf(c);
      // Are workers subject to culling?
      boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
​
      if ((wc > maximumPoolSize || (timed && timedOut))
        && (wc > 1 || workQueue.isEmpty())) {
        if (compareAndDecrementWorkerCount(c))
          return null;
        continue;
       }
      try {
        Runnable r = timed ?
          workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
          workQueue.take();
        if (r != null)
          return r;
        timedOut = true;
       } catch (InterruptedException retry) {
        timedOut = false;
       }
     }
   }

详细流程:

java线程-如何正确使用java线程池

Worker怎么运转使命

// java.util.concurrent.ThreadPoolExecutor#runWorker
final void runWorker(Worker w) {
 Thread wt = Thread.currentThread();
 Runnable task = w.firstTask;
 w.firstTask = null;
 w.unlock(); // allow interrupts
 boolean completedAbruptly = true;
 try {
  while (task != null || (task = getTask()) != null) {
   w.lock();
   // If pool is stopping, ensure thread is interrupted;
   // if not, ensure thread is not interrupted.  This
   // requires a recheck in second case to deal with
   // shutdownNow race while clearing interrupt
   if ((runStateAtLeast(ctl.get(), STOP) ||
      (Thread.interrupted() &&
      runStateAtLeast(ctl.get(), STOP))) &&
     !wt.isInterrupted())
    wt.interrupt();
   try {
    beforeExecute(wt, task);
    try {
     task.run();
     afterExecute(task, null);
     } catch (Throwable ex) {
     afterExecute(task, ex);
     throw ex;
     }
    } finally {
    task = null;
    w.completedTasks++;
    w.unlock();
    }
   }
  completedAbruptly = false;
  } finally {
  processWorkerExit(w, completedAbruptly);
  }
}

详细流程:

java线程-如何正确使用java线程池

  1. while循环不断地经过getTask()办法获取使命。
  2. 假如线程池正在中止,那么要确保当时线程是中止状况,否则要确保当时线程不是中止状况。
  3. 履行使命。
  4. 假如getTask结果为null则跳出循环,履行processWorkerExit()办法,毁掉线程。

Worker线程怎么收回

线程的毁掉依靠JVM自动的收回,但线程池中中心线程是不能被jvm收回的,所以当线程池决议哪些线程需求收回时,只需求将其引证消除即可。Worker被创立出来后,就会不断地进行轮询,然后获取使命去履行,中心线程能够无限等候获取使命,非中心线程要限时获取使命。当Worker无法获取到使命,也便是获取的使命为空时,循环会结束,Worker会主动消除本身在线程池内的引证。

其首要逻辑在processWorkerExit()办法中

private void processWorkerExit(Worker w, boolean completedAbruptly) {
  if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
    decrementWorkerCount();
​
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    completedTaskCount += w.completedTasks;
    workers.remove(w);
   } finally {
    mainLock.unlock();
   }
​
  tryTerminate();
​
  int c = ctl.get();
  if (runStateLessThan(c, STOP)) {
    if (!completedAbruptly) {
      int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
      if (min == 0 && ! workQueue.isEmpty())
        min = 1;
      if (workerCountOf(c) >= min)
        return; // replacement not needed
     }
    addWorker(null, false);
   }
}

详细流程:

java线程-如何正确使用java线程池

运用线程池最佳实践

Executors

考虑到ThreadPoolExecutor的结构函数完结有些复杂,所以java供给了一个线程池的静态工厂类,Executors,运用Executors能够快速创立线程池。但是大厂都不主张运用Executors,原因:Executors的很多办法默许运用的是无参结构的LinkedBlockQueue,默许巨细为Integer.MAX_VALUE,高负载状况下,行列很简单导致OOM。而OOM了就会导致一切恳求都无法处理。强烈主张运用ArrayBlockingQueue有界行列。

运用有界行列,当使命过多时,线程池会触发履行回绝战略,线程池默许的回绝战略会throw RejectedExecutionException这个运转时反常,所以开发人员很简单疏忽,因此默许回绝战略需求慎重运用。假如线程处理的使命非常重要,主张自界说回绝战略,实际开发中,自界说回绝战略往往和降级战略配合运用。

下面介绍常用的办法:

newFixedThreadPool()

  • newFixedThreadPool()函数用来创立巨细固定的线程池。
  • ThreadPoolExecutor中的maximumPoolSizecorePoolSize相等,因此,线程池中的线程都是中心线程,一旦创立便不会毁掉。
  • workQueue为LinkedBlockingQueue,默许巨细为Integer.MAX_VALUE,巨细非常大,相当于无界堵塞行列。使命能够无限的往workQueue中提交,永久都不会触发回绝战略。
public static ExecutorService newFixedThreadPool(int nThreads) {
 return new ThreadPoolExecutor(nThreads, nThreads,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
}

newSingleThreadExecutor()

  • newSingleThreadExecutor()函数用来创立单线程履行器。
  • ThreadPoolExecutor中的maximumPoolSizecorePoolSize都等于1。
  • workQueue同样是巨细为Integer.MAX_VALUELinkedBlockingQueue
public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService
     (new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()));
}

newCachedThreadPool()

  • newCachedThreadPool()函数创立的线程池只包括非中心线程,线程闲暇60秒以上便会毁掉。

  • workQueueSynchronousQueue类型的,而SynchronousQueue是长度为0的堵塞行列,所以,workQueue不存储任何等候履行的使命。

    • 假如线程池内存在闲暇线程,那么新提交的使命会被闲暇线程履行
    • 假如线程池内没有闲暇线程,那么线程池会创立新的线程来履行新提交的使命。
  • 线程池巨细为Integer.MAX_VALUE,因此,线程池中创立的线程个数能够非常多。

public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                 60L, TimeUnit.SECONDS,
                 new SynchronousQueue<Runnable>());
}

反常捕获

运用线程池,还需求注意反常处理的问题,经过ThreadPoolExecutor目标的execute()办法履行使命时,假如在使命履行期间出现运转时反常,会导致使命的线程终止,但是你却获取不到任何告诉,这会让你误认为使命都履行得很正常。虽说线程池供给了很多用于反常处理的办法,但是最保险和简单的方案还是捕获反常信息,并按需处理。

配置线程池参数

从使命的优先级,使命的履行时间长短,使命的性质(CPU密布/ IO密布),使命的依靠关系这四个视点来剖析。而且近可能地运用有界的作业行列。

性质不同的使命可用运用不同规模的线程池分开处理:

  • CPU密布型: 尽可能少的线程,Ncpu+1
  • IO密布型: 尽可能多的线程, Ncpu*2,比方数据库连接池
  • 混合型: CPU密布型的使命与IO密布型使命的履行时间差别较小,拆分为两个线程池;否则没有必要拆分。

参阅动态参数化github.com/shawn-happy…

参阅文档

美团线程池