本文源码阅览的主角是JobTriggerPoolHelper,作业触发线程池助手,经过阅览此部分源码,咱们能够学习到:

  1. 线程池阻隔技能
  2. 多线程常识

快慢线程池

JobTriggerPoolHelper类中,有这么两个变量,界说了两个线程池

// fast/slow thread pool
private ThreadPoolExecutor fastTriggerPool = null;
private ThreadPoolExecutor slowTriggerPool = null;

两个线程池在启动的时候会被初始化

public void start() {
    fastTriggerPool = new ThreadPoolExecutor(
        10,
        XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(), // 最大线程数,最小为200
        60L,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(1000), // 行列数1000
        r -> new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()));
    slowTriggerPool = new ThreadPoolExecutor(
        10,
        XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(), // 最大线程数,最小为100
        60L,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(2000), // 行列数2000
        r -> new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode()));
}

经过源码,咱们发现,快慢线程池除了最大线程数和行列数不一致外,其他是相同的,这一开始让我很困惑,莫非修改最大线程数和使命行列数,能够让线程跑的更快吗?

查阅其他资料后,茅塞顿开,还是功力不够。

实际上不是线程池自身的快慢,而是线程池里面的使命履行有快慢一说,只要履行快的,才有资历参加fastTriggerPool,履行慢的就只能被打入slowTriggerPool

总的来讲,运用线程池阻隔技能,将快使命和慢使命分开履行,能够避免某些使命履行过慢,然后拖累原本履行快的使命,优化整体的使命履行功能。

这一招在Hystrix里面好像见过。

慢使命的界说

既然要将慢使命打入冷宫,那么咱们怎么界说这个使命履行的慢呢?

XXL-Job是这么界说的:使命履行时刻大于500ms,那么认为是慢使命(超时),此刻记为一次;假如1分钟内超时次数大于10次,就会被打入冷宫。

具体做法:

  1. 界说变量minTim,记载当时使命履行完毕时刻和60000(1分钟的毫秒值)的比值
private volatile long minTim = System.currentTimeMillis() / 60000;     // ms > min
  1. 界说变量jobTimeoutCountMap,记载每个使命的履行超时次数,该变量间隔大于等于1分钟会重置
// 此map用来记载使命超时次数,不过实际上并不是超时,只是履行时刻大于500ms,应该叫做履行慢的次数才对
private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();
// check timeout-count-map
long minTim_now = System.currentTimeMillis() / 60000;
// 当时时刻戳和60000相除的结果,假如和既定变量不一致,那么肯定已经曩昔1分钟乃至更久,那么清空jobTimeoutCountMap
if (minTim != minTim_now) {
    minTim = minTim_now;
    jobTimeoutCountMap.clear();
}
  1. 在每次使命履行完毕后,判别使命履行时长是否大于500ms,假如是,那么记为超时一次
// incr timeout-count-map
long cost = System.currentTimeMillis() - start;
// 假如使命履行时刻大于500ms,那么记为超时一次
if (cost > 500) {       // ob-timeout threshold 500ms
    AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
    if (timeoutCount != null) {
        timeoutCount.incrementAndGet();
    }
}
  1. 在每次使命调度前,选择合适的线程池来履行
// choose thread pool
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
// 作业-1分钟内超时10次,将会运用慢线程池来履行
if (jobTimeoutCount != null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
    triggerPool_ = slowTriggerPool;
}

看到这儿,感觉这个慢使命线程池我司好像用不上,由于守时使命都是批处理使命,耗时都挺长的,永远也达不成1分钟内超时10次的硬指标,由于使命自身耗时就大于1分钟。

其他多线程常识

volatile的运用

开始前,先温习下volatile的语义:所有线程都能看到共享内存的最新状态。也就是说,线程总是能拿到该变量最新的值。

minTimjobTimeoutCountMap都运用了volatile进行润饰。

这儿要注意一下,使命调度的部分,是无锁并发,没有运用任何锁。

// trigger
triggerPool_.execute(() -> {
    long start = System.currentTimeMillis();
    try {
        // do trigger
        XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
    } catch (Exception e) {
        LOGGER.error(e.getMessage(), e);
    } finally {
        // check timeout-count-map
        ...
        // incr timeout-count-map
        ...
    }
});

minTim好了解,由于要和当时计算的值进行比较,所以运用volatile,取最新值进行比较就行。

但是jobTimeoutCountMap的用法存疑,由于自身类型就是ConcurrentMap,读写是原子性的,而且volatile润饰的是容器类,实际上容器类咱们能够运用final来润饰更好一点,由于容器自身不需要改动,改动的只是其中的元素罢了。

结合Do we need to make ConcurrentHashMap volatile?,我感觉这边应该运用final润饰更好一点,有没有懂哥赐教下的,欢迎谈论留言。

线程池初始化

咱们来看一下快线程池的初始化

fastTriggerPool = new ThreadPoolExecutor(
    10,
    XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(), // 最大线程数,最小为200
    60L,
    TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(1000), // 行列数1000
    r -> new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()));
  1. 核心线程数:10
  2. 最大线程数:可配置,假如配置数小于200,那么设置为200,意味着最小是200
  3. 线程闲暇时刻:60秒
  4. 使命行列:容量为1000的有界堵塞行列
  5. 线程工厂:界说了下线程名称

回顾一下线程池的常识,什么状况下会抵达最大线程数呢?

答:当增加一个使命时,核心线程数已满,线程池还没抵达最大线程数,而且没有闲暇线程,工作行列已满的状况下,创立一个新线程并履行。

也就是说,10个使命在运转,1000个使命待运转,然后此刻增加使命,就会创立线程,而且短时刻内疯狂的调度200个使命,才能抵达最大线程数。

此刻应该是极端状况了,一般是不会积压这么多待运转使命的。

算是稳固了下对线程池的了解,在实际事务场景遇到也可作为参考。

附录

  • XXL-Job 2.4.0源码学习库房