• 作者简介:咱们好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,阿里云专家博主
  • 系列专栏:Java规划形式、Spring源码系列、Netty源码系列、Kafka源码系列、JUC源码系列、duubo源码系列
  • 假如感觉博主的文章还不错的话,请三连支持一下博主哦
  • 博主正在尽力完成2023计划中:以梦为马,扬帆起航,2023追梦人
  • 联系方式:hls1793929520,加我进群,咱们一同学习,一同进步,一同对立互联网寒冬

ScheduledExecutorService

一、布景

咱们好呀,上星期咱们公司因为守时线程池运用不当出了一个故障,几千万的单子可能没了

踩了定时线程池的坑,导致公司损失几千万,血的教训

给兄弟们共享共享这个坑,期望兄弟们今后别踩!

事务中大量的运用守时线程池(ScheduledExecutorService)履行使命,有时候会疏忽掉 Try/Catch 的反常判别

当使命履行报错时,会导致整个守时线程池挂掉,影响事务的正常需求

踩了定时线程池的坑,导致公司损失几千万,血的教训

二、问题

咱们来仿照一个出产的例子:

  • 合作方修改频率低且合作方允许最终一致性

  • 咱们有一个守时使命每隔 60 秒去 MySQL 拉取全量的 合作方 数据放至 合作方缓存(本地缓存)

  • 当客户恳求时,咱们去缓存中拿取合作方即可

踩了定时线程池的坑,导致公司损失几千万,血的教训

这样的出产例子应该存在于绝大数公司,代码如下:

public class Demo {
    // 创建守时线程池
    private static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    private List<String> partnerCache = new ArrayList<>();
    @PostConstruct
    public void init() {
        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                // 启动时每隔60秒履行一次数据库的改写
                // 将数据缓存至本地
                loadPartner();
            }
        }, 3, 60, TimeUnit.SECONDS);
    }
    public void loadPartner() {
        // 查询数据库当时最新合作方数据
        List<String> partnerList = queryPartners();
        // 合作方数据放至缓存
        partnerCache.clear();
        partnerCache.addAll(partnerList);
    }
    public List<String> queryPartners() {
        // 数据库挂了!
        throw new RuntimeException();
    }
}

运行上述样例,咱们会发现程序不停止,输出一遍 Load start!,一直在运行,但后续不输出 Load start!

这个时候咱们能够确认:反常确实导致当时使命不再履行

1、为什么使命报错会影响守时线程池?

2、守时线程池是真的挂掉了嘛?

3、守时线程池内部是如何履行的?

跟着这三个问题,咱们一同来看一看 ScheduledExecutorService 的原理介绍

三、原理剖析

关于 ScheduledExecutorService 来说,本质上是 延时行列 + 线程池

1、延时行列介绍

DelayQueue 是一个无界的 BlockingQueue,用于放置完成了Delayed接口的目标,只能在到期时才能从行列中取走。

这种行列是有序的,即队头目标的推迟到期时刻最长。

咱们看一下延时行列里目标的特点:

class MyDelayedTask implements Delayed{
    // 当时使命创建时刻
    private long start = System.currentTimeMillis();
    // 延时时刻
    private long time ;
    // 初始化
    public MyDelayedTask(long time) {
        this.time = time;
    }
    /**
     * 需求完成的接口,获得推迟时刻(用过期时刻-当时时刻)
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert((start+time) - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
    }
    /**
     * 用于推迟行列内部比较排序(当时时刻的推迟时刻 - 比较目标的推迟时刻)
     */
    @Override
    public int compareTo(Delayed o) {
        MyDelayedTask o1 = (MyDelayedTask) o;
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
}

踩了定时线程池的坑,导致公司损失几千万,血的教训

所以,延时行列的完成原理也很简单:

  • 出产端:投递音讯时增加时刻戳(当时时刻+延时时刻
  • 消费端:用当时时刻与时刻戳进行比较,若小于则消费,反之则循环等候

2、线程池的原理介绍

踩了定时线程池的坑,导致公司损失几千万,血的教训

  • 当时的线程池个数低于中心线程数,直接增加中心线程即可
  • 当时的线程池个数大于中心线程数,将使命增加至阻塞行列中
  • 假如增加阻塞行列失利,则需求增加非中心线程数处理使命
  • 假如增加非中心线程数失利(满了),履行回绝战略

3、守时线程的原理

咱们从守时线程池的创建看:scheduledExecutorService.scheduleAtFixedRate(myTask, 3L, 1L, TimeUnit.SECONDS);

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
    // 初始化咱们的使命
    // triggerTime:延时的完成
    ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 将当时使命丢进延时行列
    super.getQueue().add(task);
    // 创建中心线程并启动 
    ensurePrestart();
}
// 时刻轮算法
private long triggerTime(long delay, TimeUnit unit) {
    return now() + delay;
}

从这儿咱们能够得到定论:守时线程池通过延时行列来到达守时的目的

有一个问题:咱们仅仅向 Queue 里面放了一个使命,他是怎么保证履行屡次的呢?

带着这个问题,咱们看一下他拉取使命启动的代码:

for (;;) {
    // 从延时行列中获取使命
    Runnable r = workQueue.take();
}
public RunnableScheduledFuture<?> take(){
    for (;;) {
        // 获取行列第一个使命
        RunnableScheduledFuture<?> first = queue[0];
        // 【要点】假如当时行列使命为空,则等候
        if (first == null){
            available.await();
        }
        // 获取当时使命的时刻
        long delay = first.getDelay(NANOSECONDS);
        if (delay <= 0){
            // 弹出当时使命
            return finishPoll(first);
        }
    }
}
// 时刻戳减去当时时刻
public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), NANOSECONDS);
}

当拿到使命(ScheduledFutureTask)之后,会履行使命:task.run()

public void run() {
   // 履行当时的使命
   if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}
protected boolean runAndReset() {
    if (state != NEW){
        return false;
    }
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                // 履行使命
                c.call(); 
                // 【要点!!!】假如使命正常履行成功的话,这儿会将ran置为true
                // 假如你的使命有问题,会被下面直接捕捉到,不会将此处的ran置为true
                ran = true;
            } catch (Throwable ex) {
                // 出现反常会将state置为EXCEPTIONAL
                // 标记当时使命履行失利并将反常赋值到结果
                setException(ex);
            }finally {
                 s = state;
            }
        }
    }
    // ran:当时使命是否履行成功
    // s:当时使命状态
    // ran为false:当时使命履行失利
    // s == NEW = false:当时使命状态出现反常
    return ran && s == NEW;
}

假如咱们的 runAndReset 回来 false 的话,那么进不去 setNextRunTime 该方法:

if (ScheduledFutureTask.super.runAndReset()) {
    // 修改当时使命的Time
    setNextRunTime();
    // 将使命从头丢进行列
    reExecutePeriodic(outerTask);
}

最终,使命没有办法被丢进行列,咱们的线程无法拿到使命履行,一直在等候。

四、定论

通过上面的剖析,咱们回头看一下开篇的三个问题:

1、为什么使命报错会影响守时线程池?

  • 使命报错不会影响线程池,仅仅线程池将当时使命给丢掉,没有继续放到行列中

2、守时线程池是真的挂掉了嘛?

  • 守时线程池没有挂,挂的仅仅报错的使命

3、守时线程池内部是如何履行的?

  • 线程池 + 延时行列

所以,通过上述的解说,咱们应该认识到:守时使命一定要加Try Catch,不然一旦产生反常

不然,你就会和作者相同,背故障让公司损失几千万,血的经历!

踩了定时线程池的坑,导致公司损失几千万,血的教训

五、总结

鲁迅先生曾说:独行难,众行易,和志同道合的人一同进步。彼此毫无保留的共享经历,才是对立互联网寒冬的最佳挑选。

其实很多时候,并不是咱们不行尽力,很可能便是自己尽力的方向不对,假如有一个人能稍微点拨你一下,你真的可能会少走几年弯路。

假如你也对 后端架构中间件源码 有爱好,欢迎增加博主微信:hls1793929520,一同学习,一同成长

我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,喜欢后端架构和中间件源码。

咱们下期再见。

我从清晨走过,也拥抱夜晚的星斗,人生没有捷径,你我皆普通,你好,陌生人,一同共勉。

往期文章引荐:

  • 不愧是阿里三面,ConcurrentHashMap多线程扩容机制被面试官装到了
  • 美团二面:聊聊ConcurrentHashMap的存储流程
  • 从源码全面解析Java 线程池的来龙去脉
  • 从源码全面解析LinkedBlockingQueue的来龙去脉
  • 从源码全面解析 ArrayBlockingQueue 的来龙去脉
  • 从源码全面解析ReentrantLock的来龙去脉
  • 阅读完synchronized和ReentrantLock的源码后,我竟发现其十分相似
  • 从源码全面解析 ThreadLocal 关键字的来龙去脉
  • 从源码全面解析 synchronized 关键字的来龙去脉
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。