本文源码阅览的主角是JobTriggerPoolHelper
,作业触发线程池助手,经过阅览此部分源码,咱们能够学习到:
- 线程池阻隔技能
- 多线程常识
快慢线程池
在JobTriggerPoolHelper
类中,有这么两个变量,界说了两个线程池
// fast/slow thread pool
private ThreadPoolExecutor fastTriggerPool = null;
private ThreadPoolExecutor slowTriggerPool = null;
两个线程池在启动的时候会被初始化
public void start() {
fastTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(), // 最大线程数,最小为200
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000), // 行列数1000
r -> new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()));
slowTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(), // 最大线程数,最小为100
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2000), // 行列数2000
r -> new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode()));
}
经过源码,咱们发现,快慢线程池除了最大线程数和行列数不一致外,其他是相同的,这一开始让我很困惑,莫非修改最大线程数和使命行列数,能够让线程跑的更快吗?
查阅其他资料后,茅塞顿开,还是功力不够。
实际上不是线程池自身的快慢,而是线程池里面的使命履行有快慢一说,只要履行快的,才有资历参加fastTriggerPool
,履行慢的就只能被打入slowTriggerPool
。
总的来讲,运用线程池阻隔技能,将快使命和慢使命分开履行,能够避免某些使命履行过慢,然后拖累原本履行快的使命,优化整体的使命履行功能。
这一招在Hystrix
里面好像见过。
慢使命的界说
既然要将慢使命打入冷宫,那么咱们怎么界说这个使命履行的慢呢?
XXL-Job是这么界说的:使命履行时刻大于500ms,那么认为是慢使命(超时),此刻记为一次;假如1分钟内超时次数大于10次,就会被打入冷宫。
具体做法:
- 界说变量
minTim
,记载当时使命履行完毕时刻和60000(1分钟的毫秒值)的比值
private volatile long minTim = System.currentTimeMillis() / 60000; // ms > min
- 界说变量
jobTimeoutCountMap
,记载每个使命的履行超时次数,该变量间隔大于等于1分钟会重置
// 此map用来记载使命超时次数,不过实际上并不是超时,只是履行时刻大于500ms,应该叫做履行慢的次数才对
private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();
// check timeout-count-map
long minTim_now = System.currentTimeMillis() / 60000;
// 当时时刻戳和60000相除的结果,假如和既定变量不一致,那么肯定已经曩昔1分钟乃至更久,那么清空jobTimeoutCountMap
if (minTim != minTim_now) {
minTim = minTim_now;
jobTimeoutCountMap.clear();
}
- 在每次使命履行完毕后,判别使命履行时长是否大于500ms,假如是,那么记为超时一次
// incr timeout-count-map
long cost = System.currentTimeMillis() - start;
// 假如使命履行时刻大于500ms,那么记为超时一次
if (cost > 500) { // ob-timeout threshold 500ms
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if (timeoutCount != null) {
timeoutCount.incrementAndGet();
}
}
- 在每次使命调度前,选择合适的线程池来履行
// choose thread pool
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
// 作业-1分钟内超时10次,将会运用慢线程池来履行
if (jobTimeoutCount != null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
看到这儿,感觉这个慢使命线程池我司好像用不上,由于守时使命都是批处理使命,耗时都挺长的,永远也达不成1分钟内超时10次的硬指标,由于使命自身耗时就大于1分钟。
其他多线程常识
volatile
的运用
开始前,先温习下volatile
的语义:所有线程都能看到共享内存的最新状态。也就是说,线程总是能拿到该变量最新的值。
minTim
和jobTimeoutCountMap
都运用了volatile
进行润饰。
这儿要注意一下,使命调度的部分,是无锁并发,没有运用任何锁。
// trigger
triggerPool_.execute(() -> {
long start = System.currentTimeMillis();
try {
// do trigger
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
// check timeout-count-map
...
// incr timeout-count-map
...
}
});
minTim
好了解,由于要和当时计算的值进行比较,所以运用volatile
,取最新值进行比较就行。
但是jobTimeoutCountMap
的用法存疑,由于自身类型就是ConcurrentMap
,读写是原子性的,而且volatile
润饰的是容器类,实际上容器类咱们能够运用final
来润饰更好一点,由于容器自身不需要改动,改动的只是其中的元素罢了。
结合Do we need to make ConcurrentHashMap volatile?,我感觉这边应该运用final
润饰更好一点,有没有懂哥赐教下的,欢迎谈论留言。
线程池初始化
咱们来看一下快线程池的初始化
fastTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(), // 最大线程数,最小为200
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000), // 行列数1000
r -> new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()));
- 核心线程数:10
- 最大线程数:可配置,假如配置数小于200,那么设置为200,意味着最小是200
- 线程闲暇时刻:60秒
- 使命行列:容量为1000的有界堵塞行列
- 线程工厂:界说了下线程名称
回顾一下线程池的常识,什么状况下会抵达最大线程数呢?
答:当增加一个使命时,核心线程数已满,线程池还没抵达最大线程数,而且没有闲暇线程,工作行列已满的状况下,创立一个新线程并履行。
也就是说,10个使命在运转,1000个使命待运转,然后此刻增加使命,就会创立线程,而且短时刻内疯狂的调度200个使命,才能抵达最大线程数。
此刻应该是极端状况了,一般是不会积压这么多待运转使命的。
算是稳固了下对线程池的了解,在实际事务场景遇到也可作为参考。
附录
- XXL-Job 2.4.0源码学习库房