本文共享自华为云社区《JUC线程池: ThreadPoolExecutor详解》,作者:龙哥手记 。
带着BAT大厂的面试问题去了解
提示
请带着这些问题持续后文,会很大程度上帮助你更好的了解相关知识点。@pdai
- 为什么要有线程池?
- Java是完结和管理线程池有哪些办法? 请简单举例怎样运用。
- 为什么许多公司不答应运用Executors去创立线程池? 那么引荐怎样运用呢?
- ThreadPoolExecutor有哪些中心的装备参数? 请扼要阐明
- ThreadPoolExecutor能够创立哪是哪三种线程池呢?
- 当行列满了并且worker的数量到达maxSize的时分,会怎样样?
- 说说ThreadPoolExecutor有哪些RejectedExecutionHandler战略? 默许是什么战略?
- 扼要说下线程池的使命履行机制? execute –> addWorker –>runworker (getTask)
- 线程池中使命是怎样提交的?
- 线程池中使命是怎样封闭的?
- 在装备线程池的时分需求考虑哪些装备要素?
- 怎样监控线程池的状况?
为什么要有线程池
线程池能够对线程进行统一分配,调优和监控:
- 下降资源消耗(线程无限制地创立,然后运用完毕后毁掉)
- 进步响应速度(无须创立线程)
- 进步线程的可管理性
ThreadPoolExecutor比如
Java是怎样完结和管理线程池的?
从JDK 5开始,把作业单元与履行机制分离开来,作业单元包含Runnable和Callable,而履行机制由Executor结构供给。
-
WorkerThread
public class WorkerThread implements Runnable {
private String command; public WorkerThread(String s){ this.command=s; } @Override public void run() { System.out.println(Thread.currentThread().getName()+" Start. Command = "+command); processCommand(); System.out.println(Thread.currentThread().getName()+" End."); } private void processCommand() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString(){ return this.command; }
}
SimpleThreadPool
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SimpleThreadPool {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
Runnable worker = new WorkerThread("" + i);
executor.execute(worker);
}
executor.shutdown(); // This will make the executor accept no new threads and finish all existing threads in the queue
while (!executor.isTerminated()) { // Wait until all threads are finish,and also you can use "executor.awaitTermination();" to wait
}
System.out.println("Finished all threads");
}
}
程序中咱们创立了固定巨细为五个作业线程的线程池。然后分配给线程池十个作业,因为线程池巨细为五,它将发动五个作业线程先处理五个作业,其他的作业则处于等候状况,一旦有作业完结,闲暇下来作业线程就会捡取等候行列里的其他作业进行履行。
这儿是以上程序的输出。
pool-1-thread-2 Start. Command = 1
pool-1-thread-4 Start. Command = 3
pool-1-thread-1 Start. Command = 0
pool-1-thread-3 Start. Command = 2
pool-1-thread-5 Start. Command = 4
pool-1-thread-4 End.
pool-1-thread-5 End.
pool-1-thread-1 End.
pool-1-thread-3 End.
pool-1-thread-3 Start. Command = 8
pool-1-thread-2 End.
pool-1-thread-2 Start. Command = 9
pool-1-thread-1 Start. Command = 7
pool-1-thread-5 Start. Command = 6
pool-1-thread-4 Start. Command = 5
pool-1-thread-2 End.
pool-1-thread-4 End.
pool-1-thread-3 End.
pool-1-thread-5 End.
pool-1-thread-1 End.
Finished all threads
输出表明线程池中至始至终只需五个名为 “pool-1-thread-1” 到 “pool-1-thread-5” 的五个线程,这五个线程不随着作业的完结而消亡,会一向存在,并担任履行分配给线程池的使命,直到线程池消亡。
Executors 类供给了运用了 ThreadPoolExecutor 的简单的 ExecutorService 完结,可是 ThreadPoolExecutor 供给的功用远不止于此。咱们能够在创立 ThreadPoolExecutor 实例时指定活动线程的数量,咱们也能够限制线程池的巨细并且创立咱们自己的 RejectedExecutionHandler 完结来处理不能适应作业行列的作业。
这儿是咱们自界说的 RejectedExecutionHandler 接口的完结。
-
RejectedExecutionHandlerImpl.java
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor;
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
@Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + " is rejected"); }
}
ThreadPoolExecutor 供给了一些办法,咱们能够运用这些办法来查询 executor 的当时状况,线程池巨细,活动线程数量以及使命数量。因而我是用来一个监控线程在特定的时刻距离内打印 executor 信息。
-
MyMonitorThread.java
import java.util.concurrent.ThreadPoolExecutor;
public class MyMonitorThread implements Runnable { private ThreadPoolExecutor executor;
private int seconds; private boolean run=true; public MyMonitorThread(ThreadPoolExecutor executor, int delay) { this.executor = executor; this.seconds=delay; } public void shutdown(){ this.run=false; } @Override public void run() { while(run){ System.out.println( String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s", this.executor.getPoolSize(), this.executor.getCorePoolSize(), this.executor.getActiveCount(), this.executor.getCompletedTaskCount(), this.executor.getTaskCount(), this.executor.isShutdown(), this.executor.isTerminated())); try { Thread.sleep(seconds*1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
}
这儿是运用 ThreadPoolExecutor 的线程池完结比如。
-
WorkerPool.java
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;
public class WorkerPool {
public static void main(String args[]) throws InterruptedException{ //RejectedExecutionHandler implementation RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl(); //Get the ThreadFactory implementation to use ThreadFactory threadFactory = Executors.defaultThreadFactory(); //creating the ThreadPoolExecutor ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler); //start the monitoring thread MyMonitorThread monitor = new MyMonitorThread(executorPool, 3); Thread monitorThread = new Thread(monitor); monitorThread.start(); //submit work to the thread pool for(int i=0; i<10; i++){ executorPool.execute(new WorkerThread("cmd"+i)); } Thread.sleep(30000); //shut down the pool executorPool.shutdown(); //shut down the monitor thread Thread.sleep(5000); monitor.shutdown(); }
}
留意在初始化 ThreadPoolExecutor 时,咱们保持初始池巨细为 2,最大池巨细为 4 而作业行列巨细为 2。因而假如现已有四个正在履行的使命而此刻分配来更多使命的话,作业行列将只是保留他们(新使命)中的两个,其他的将会被 RejectedExecutionHandlerImpl 处理。
上面程序的输出能够证实以上观念。
pool-1-thread-1 Start. Command = cmd0
pool-1-thread-4 Start. Command = cmd5
cmd6 is rejected
pool-1-thread-3 Start. Command = cmd4
pool-1-thread-2 Start. Command = cmd1
cmd7 is rejected
cmd8 is rejected
cmd9 is rejected
[monitor] [0/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false
[monitor] [4/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false
pool-1-thread-4 End.
pool-1-thread-1 End.
pool-1-thread-2 End.
pool-1-thread-3 End.
pool-1-thread-1 Start. Command = cmd3
pool-1-thread-4 Start. Command = cmd2
[monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false
[monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false
pool-1-thread-1 End.
pool-1-thread-4 End.
[monitor] [4/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true
[monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true
留意 executor 的活动使命、完结使命以及一切完结使命,这些数量上的变化。咱们能够调用 shutdown() 办法来完毕一切提交的使命并停止线程池。
ThreadPoolExecutor运用详解
其实java线程池的完结原理很简单,说白了便是一个线程调集workerSet和一个堵塞行列workQueue。当用户向线程池提交一个使命(也便是线程)时,线程池会先将使命放入workQueue中。workerSet中的线程会不断的从workQueue中获取线程然后履行。当workQueue中没有使命的时分,worker就会堵塞,直到行列中有使命了就取出来持续履行。
Execute原理
当一个使命提交至线程池之后:
- 线程池首要当时运转的线程数量是否少于corePoolSize。假如是,则创立一个新的作业线程来履行使命。假如都在履行使命,则进入2.
- 判别BlockingQueue是否现已满了,假使还没有满,则将线程放入BlockingQueue。不然进入3.
- 假如创立一个新的作业线程将使当时运转的线程数量超越maximumPoolSize,则交给RejectedExecutionHandler来处理使命。
当ThreadPoolExecutor创立新线程时,经过CAS来更新线程池的状况ctl.
参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
-
corePoolSize
线程池中的中心线程数,当提交一个使命时,线程池创立一个新线程履行使命,直到当时线程数等于corePoolSize, 即便有其他闲暇线程能够履行新来的使命, 也会持续创立线程;假如当时线程数为corePoolSize,持续提交的使命被保存到堵塞行列中,等候被履行;假如履行了线程池的prestartAllCoreThreads()办法,线程池会提前创立并发动一切中心线程。 -
workQueue
用来保存等候被履行的使命的堵塞行列. 在JDK中供给了如下堵塞行列: 详细能够参阅JUC 调集: BlockQueue详解-
ArrayBlockingQueue
: 根据数组结构的有界堵塞行列,按FIFO排序使命; -
LinkedBlockingQueue
: 根据链表结构的堵塞行列,按FIFO排序使命,吞吐量一般要高于ArrayBlockingQueue; -
SynchronousQueue
: 一个不存储元素的堵塞行列,每个刺进操作有必要比及另一个线程调用移除操作,不然刺进操作一向处于堵塞状况,吞吐量一般要高于LinkedBlockingQueue; -
PriorityBlockingQueue
: 具有优先级的无界堵塞行列;
-
LinkedBlockingQueue
比ArrayBlockingQueue
在刺进删去节点功用方面更优,可是二者在put()
, take()
使命的时均需求加锁,SynchronousQueue
运用无锁算法,根据节点的状况判别履行,而不需求用到锁,其间心是Transfer.transfer()
.
-
maximumPoolSize
线程池中答应的最大线程数。假如当时堵塞行列满了,且持续提交使命,则创立新的线程履行使命,条件是当时线程数小于maximumPoolSize;当堵塞行列是无界行列, 则maximumPoolSize则不起作用, 因为无法提交至中心线程池的线程会一向持续地放入workQueue. -
keepAliveTime
线程闲暇时的存活时刻,即当线程没有使命履行时,该线程持续存活的时刻;默许情况下,该参数只在线程数大于corePoolSize时才有用, 超越这个时刻的闲暇线程将被停止; -
unit
keepAliveTime的单位 -
threadFactory
创立线程的工厂,经过自界说的线程工厂能够给每个新建的线程设置一个具有辨认度的线程名。默许为DefaultThreadFactory -
handler
线程池的饱满战略,当堵塞行列满了,且没有闲暇的作业线程,假如持续提交使命,有必要采纳一种战略处理该使命,线程池供给了4种战略:-
AbortPolicy
: 直接抛出反常,默许战略; -
CallerRunsPolicy
: 用调用者地点的线程来履行使命; -
DiscardOldestPolicy
: 丢掉堵塞行列中靠最前的使命,并履行当时使命; -
DiscardPolicy
: 直接丢掉使命;
-
当然也能够根据应用场景完结RejectedExecutionHandler接口,自界说饱满战略,如记载日志或持久化存储不能处理的使命。
三种类型
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
线程池的线程数量达corePoolSize后,即便线程池没有可履行使命时,也不会开释线程。
FixedThreadPool的作业行列为无界行列LinkedBlockingQueue(行列容量为Integer.MAX_VALUE), 这会导致以下问题:
- 线程池里的线程数量不超越corePoolSize,这导致了maximumPoolSize和keepAliveTime将会是个无用参数
- 因为运用了无界行列, 所以FixedThreadPool永久不会回绝, 即饱满战略失效
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
初始化的线程池中只需一个线程,假如该线程反常完毕,会从头创立一个新的线程持续履行使命,唯一的线程能够确保所提交使命的顺序履行.
因为运用了无界行列, 所以SingleThreadPool永久不会回绝, 即饱满战略失效
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
线程池的线程数可到达Integer.MAX_VALUE,即2147483647,内部运用SynchronousQueue作为堵塞行列; 和newFixedThreadPool创立的线程池不同,newCachedThreadPool在没有使命履行时,当线程的闲暇时刻超越keepAliveTime,会自动开释线程资源,当提交新使命时,假如没有闲暇线程,则创立新线程履行使命,会导致一定的系统开支; 履行过程与前两种略微不同:
- 主线程调用SynchronousQueue的offer()办法放入task, 假使此刻线程池中有闲暇的线程测验读取 SynchronousQueue的task, 即调用了SynchronousQueue的poll(), 那么主线程将该task交给闲暇线程. 不然履行(2)
- 当线程池为空或许没有闲暇的线程, 则创立新的线程履行使命.
- 履行完使命的线程假使在60s内仍闲暇, 则会被停止. 因而长时刻闲暇的CachedThreadPool不会持有任何线程资源.
封闭线程池
遍历线程池中的一切线程,然后逐个调用线程的interrupt办法来中止线程.
封闭办法 – shutdown
将线程池里的线程状况设置成SHUTDOWN状况, 然后中止一切没有正在履行使命的线程.
封闭办法 – shutdownNow
将线程池里的线程状况设置成STOP状况, 然后停止一切正在履行或暂停使命的线程. 只需调用这两个封闭办法中的任意一个, isShutDown() 回来true. 当一切使命都成功封闭了, isTerminated()回来true.
ThreadPoolExecutor源码详解
几个要害特点
//这个特点是用来寄存 当时运转的worker数量以及线程池状况的
//int是32位的,这儿把int的高3位拿来充当线程池状况的标志位,后29位拿来充当当时运转worker的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//寄存使命的堵塞行列
private final BlockingQueue<Runnable> workQueue;
//worker的调集,用set来寄存
private final HashSet<Worker> workers = new HashSet<Worker>();
//历史到达的worker数最大值
private int largestPoolSize;
//当行列满了并且worker的数量到达maxSize的时分,履行详细的回绝战略
private volatile RejectedExecutionHandler handler;
//超出coreSize的worker的生计时刻
private volatile long keepAliveTime;
//常驻worker的数量
private volatile int corePoolSize;
//最大worker的数量,一般当workQueue满了才会用到这个参数
private volatile int maximumPoolSize;
内部状况
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
其间AtomicInteger变量ctl的功用十分强壮: 利用低29位表明线程池中线程数,经过高3位表明线程池的运转状况:
- RUNNING: -1 << COUNT_BITS,即高3位为111,该状况的线程池会接纳新使命,并处理堵塞行列中的使命;
- SHUTDOWN: 0 << COUNT_BITS,即高3位为000,该状况的线程池不会接纳新使命,但会处理堵塞行列中的使命;
- STOP : 1 << COUNT_BITS,即高3位为001,该状况的线程不会接纳新使命,也不会处理堵塞行列中的使命,而且会中止正在运转的使命;
- TIDYING : 2 << COUNT_BITS,即高3位为010, 一切的使命都现已停止;
- TERMINATED: 3 << COUNT_BITS,即高3位为011, terminated()办法现已履行完结
使命的履行
execute –> addWorker –>runworker (getTask)
线程池的作业线程经过Woker类完结,在ReentrantLock锁的确保下,把Woker实例刺进到HashSet后,并发动Woker中的线程。 从Woker类的结构办法完结能够发现: 线程工厂在创立线程thread时,将Woker实例自身this作为参数传入,当履行start办法发动线程thread时,本质是履行了Worker的runWorker办法。 firstTask履行完结之后,经过getTask办法从堵塞行列中获取等候的使命,假如行列中没有使命,getTask办法会被堵塞并挂起,不会占用cpu资源;
execute()办法
ThreadPoolExecutor.execute(task)完结了Executor.execute(task)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//workerCountOf获取线程池的当时线程数;小于corePoolSize,履行addWorker创立新线程履行command使命
if (addWorker(command, true))
return;
c = ctl.get();
}
// double check: c, recheck
// 线程池处于RUNNING状况,把提交的使命成功放入堵塞行列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// recheck and if necessary 回滚到入队操作前,即假使线程池shutdown状况,就remove(command)
//假如线程池没有RUNNING,成功从堵塞行列中删去使命,履行reject办法处理使命
if (! isRunning(recheck) && remove(command))
reject(command);
//线程池处于running状况,可是没有线程,则创立线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 往线程池中创立新的线程失败,则reject使命
else if (!addWorker(command, false))
reject(command);
}
- 为什么需求double check线程池的状况?
在多线程环境下,线程池的状况时刻在变化,而ctl.get()是非原子操作,很有可能刚获取了线程池状况后线程池状况就改变了。判别是否将command加入workque是线程池之前的状况。假使没有double check,假如线程池处于非running状况(在多线程环境下很有可能产生),那么command永久不会履行。
addWorker办法
从办法execute的完结能够看出: addWorker首要担任创立新的线程并履行使命 线程池创立新线程履行使命时,需求 获取大局锁:
private final ReentrantLock mainLock = new ReentrantLock();
private boolean addWorker(Runnable firstTask, boolean core) {
// CAS更新线程池数量
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 线程池重入锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 线程发动,履行使命(Worker.thread(firstTask).start());
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker类的runworker办法
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 创立线程
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// ...
}
- 继承了AQS类,能够便利的完结作业线程的中止操作;
- 完结了Runnable接口,能够将自身作为一个使命在作业线程中履行;
- 当时提交的使命firstTask作为参数传入Worker的结构办法;
一些特点还有结构办法:
//运转的线程,前面addWorker办法中便是直接经过发动这个线程来发动这个worker
final Thread thread;
//当一个worker刚创立的时分,就先测验履行这个使命
Runnable firstTask;
//记载完结使命的数量
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//创立一个Thread,将自己设置给他,后边这个thread发动的时分,也便是履行worker的run办法
this.thread = getThreadFactory().newThread(this);
}
runWorker办法是线程池的中心:
- 线程发动之后,经过unlock办法开释锁,设置AQS的state为0,表明运转可中止;
- Worker履行firstTask或从workQueue中获取使命:
- 进行加锁操作,确保thread不被其他线程中止(除非线程池被中止)
- 查看线程池状况,假使线程池处于中止状况,当时线程将中止。
- 履行beforeExecute
- 履行使命的run办法
- 履行afterExecute办法
- 解锁操作
经过getTask办法从堵塞行列中获取等候的使命,假如行列中没有使命,getTask办法会被堵塞并挂起,不会占用cpu资源;
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 先履行firstTask,再从workerQueue中取task(getTask())
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask办法
下面来看一下getTask()办法,这儿边涉及到keepAliveTime的运用,从这个办法咱们能够看出线程池是怎样让超越corePoolSize的那部分worker毁掉的。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
留意这儿一段代码是keepAliveTime起作用的要害:
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
allowCoreThreadTimeOut为false,线程即便闲暇也不会被毁掉;假使为ture,在keepAliveTime内仍闲暇则会被毁掉。
假如线程答应闲暇等候而不被毁掉timed == false,workQueue.take使命: 假如堵塞行列为空,当时线程会被挂起等候;当行列中有使命加入时,线程被唤醒,take办法回来使命,并履行;
假如线程不答应无休止闲暇timed == true, workQueue.poll使命: 假如在keepAliveTime时刻内,堵塞行列还是没有使命,则回来null;
使命的提交
-
submit使命,等候线程池execute
-
履行FutureTask类的get办法时,会把主线程封装成WaitNode节点并保存在waiters链表中, 并堵塞等候运转成果;
-
FutureTask使命履行完结后,经过UNSAFE设置waiters相应的waitNode为null,并经过LockSupport类unpark办法唤醒主线程;
public class Test{ public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool(); Future<String> future = es.submit(new Callable<String>() { @Override public String call() throws Exception { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "future result"; } }); try { String result = future.get(); System.out.println(result); } catch (Exception e) { e.printStackTrace(); } }
}
在实际业务场景中,Future和Callable基本是成对呈现的,Callable担任产生成果,Future担任获取成果。
- Callable接口相似于Runnable,只是Runnable没有回来值。
- Callable使命除了回来正常成果之外,假如产生反常,该反常也会被回来,即Future能够拿到异步履行使命各种成果;
- Future.get办法会导致主线程堵塞,直到Callable使命履行完结;
submit办法
AbstractExecutorService.submit()完结了ExecutorService.submit() 能够获取履行完的回来值, 而ThreadPoolExecutor 是AbstractExecutorService.submit()的子类,所以submit办法也是ThreadPoolExecutor`的办法。
// submit()在ExecutorService中的界说
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
// submit办法在AbstractExecutorService中的完结
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 经过submit办法提交的Callable使命会被封装成了一个FutureTask目标。
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
经过submit办法提交的Callable使命会被封装成了一个FutureTask目标。经过Executor.execute办法提交FutureTask到线程池中等候被履行,终究履行的是FutureTask的run办法;
FutureTask目标
public class FutureTask<V> implements RunnableFuture<V>
能够将FutureTask提交至线程池中等候被履行(经过FutureTask的run办法来履行)
-
内部状况
/* The run state of this task, initially NEW. * … * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
内部状况的修改经过sun.misc.Unsafe修改
-
get办法
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
内部经过awaitDone办法对主线程进行堵塞,详细完结如下:
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
假如主线程被中止,则抛出中止反常;
- 判别FutureTask当时的state,假如大于COMPLETING,阐明使命现已履行完结,则直接回来;
- 假如当时state等于COMPLETING,阐明使命现已履行完,这时主线程只需经过yield办法让出cpu资源,等候state变成NORMAL;
- 经过WaitNode类封装当时线程,并经过UNSAFE添加到waiters链表;
- 终究经过LockSupport的park或parkNanos挂起线程;
run办法
public void run() {
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
FutureTask.run办法是在线程池中被履行的,而非主线程
- 经过履行Callable使命的call办法;
- 假如call履行成功,则经过set办法保存成果;
- 假如call履行有反常,则经过setException保存反常;
使命的封闭
shutdown办法会将线程池的状况设置为SHUTDOWN,线程池进入这个状况后,就回绝再接受使命,然后会将剩下的使命全部履行完
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//查看是否能够封闭线程
checkShutdownAccess();
//设置线程池状况
advanceRunState(SHUTDOWN);
//测验中止worker
interruptIdleWorkers();
//预留办法,留给子类完结
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历一切的worker
for (Worker w : workers) {
Thread t = w.thread;
//先测验调用w.tryLock(),假如获取到锁,就阐明worker是闲暇的,就能够直接中止它
//留意的是,worker自己自身完结了AQS同步结构,然后完结的相似锁的功用
//它完结的锁是不行重入的,所以假如worker在履行使命的时分,会先进行加锁,这儿tryLock()就会回来false
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
shutdownNow做的比较绝,它先将线程池状况设置为STOP,然后回绝一切提交的使命。最后中止左右正在运转中的worker,然后清空使命行列。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//检测权限
advanceRunState(STOP);
//中止一切的worker
interruptWorkers();
//清空使命行列
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历一切worker,然后调用中止办法
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
更深入了解
为什么线程池不答应运用Executors去创立? 引荐办法是什么?
线程池不答应运用Executors去创立,而是经过ThreadPoolExecutor的办法,这样的处理办法让写的同学愈加清晰线程池的运转规矩,躲避资源耗尽的风险。 阐明:Executors各个办法的坏处:
- newFixedThreadPool和newSingleThreadExecutor: 首要问题是堆积的请求处理行列可能会消耗十分大的内存,甚至OOM。
- newCachedThreadPool和newScheduledThreadPool: 首要问题是线程数最大数是Integer.MAX_VALUE,可能会创立数量十分多的线程,甚至OOM。
引荐办法 1
首要引进:commons-lang3包
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
引荐办法 2
首要引进:com.google.guava包
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
//Common Thread Pool
ExecutorService pool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
// excute
pool.execute(()-> System.out.println(Thread.currentThread().getName()));
//gracefully shutdown
pool.shutdown();
引荐办法 3
spring装备线程池办法:自界说线程工厂bean需求完结ThreadFactory,可参阅该接口的其它默许完结类,运用办法直接注入bean调用execute(Runnable task)办法即可
<bean id="userThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="100" />
<property name="queueCapacity" value="2000" />
<property name="threadFactory" value= threadFactory />
<property name="rejectedExecutionHandler">
<ref local="rejectedExecutionHandler" />
</property>
</bean>
//in code
userThreadPool.execute(thread);
装备线程池需求考虑要素
从使命的优先级,使命的履行时刻长短,使命的性质(CPU密布/ IO密布),使命的依靠关系这四个角度来分析。并且近可能地运用有界的作业行列。
性质不同的使命可用运用不同规模的线程池分开处理:
- CPU密布型: 尽可能少的线程,Ncpu+1
- IO密布型: 尽可能多的线程, Ncpu*2,比如数据库连接池
- 混合型: CPU密布型的使命与IO密布型使命的履行时刻不同较小,拆分为两个线程池;不然没有必要拆分。
监控线程池的状况
能够运用ThreadPoolExecutor以下办法:
-
getTaskCount()
Returns the approximate total number of tasks that have ever been scheduled for execution. -
getCompletedTaskCount()
Returns the approximate total number of tasks that have completed execution. 回来成果少于getTaskCount()。 -
getLargestPoolSize()
Returns the largest number of threads that have ever simultaneously been in the pool. 回来成果小于等于maximumPoolSize -
getPoolSize()
Returns the current number of threads in the pool. -
getActiveCount()
Returns the approximate number of threads that are actively executing tasks.
参阅文章
- 《Java并发编程艺术》
- www.jianshu.com/p/87bff5cc8…
- blog.csdn.net/programmer\…
- blog.csdn.net/u013332124/…
- www.journaldev.com/1069/thread…
点击重视,第一时刻了解华为云新鲜技术~