- 作者简介:咱们好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,阿里云专家博主
- 系列专栏:Java规划形式、Spring源码系列、Netty源码系列、Kafka源码系列、JUC源码系列、duubo源码系列
- 假如感觉博主的文章还不错的话,请三连支持一下博主哦
- 博主正在尽力完成2023计划中:以梦为马,扬帆起航,2023追梦人
- 联系方式:hls1793929520,加我进群,咱们一同学习,一同进步,一同对立互联网寒冬
ScheduledExecutorService
一、布景
咱们好呀,上星期咱们公司因为守时线程池运用不当出了一个故障,几千万的单子可能没了

给兄弟们共享共享这个坑,期望兄弟们今后别踩!
事务中大量的运用守时线程池(ScheduledExecutorService
)履行使命,有时候会疏忽掉 Try/Catch
的反常判别
当使命履行报错时,会导致整个守时线程池挂掉,影响事务的正常需求

二、问题
咱们来仿照一个出产的例子:
-
合作方修改频率低且合作方允许最终一致性
-
咱们有一个守时使命每隔
60
秒去MySQL
拉取全量的合作方
数据放至 合作方缓存(本地缓存) 中 -
当客户恳求时,咱们去缓存中拿取合作方即可

这样的出产例子应该存在于绝大数公司,代码如下:
public class Demo {
// 创建守时线程池
private static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
private List<String> partnerCache = new ArrayList<>();
@PostConstruct
public void init() {
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// 启动时每隔60秒履行一次数据库的改写
// 将数据缓存至本地
loadPartner();
}
}, 3, 60, TimeUnit.SECONDS);
}
public void loadPartner() {
// 查询数据库当时最新合作方数据
List<String> partnerList = queryPartners();
// 合作方数据放至缓存
partnerCache.clear();
partnerCache.addAll(partnerList);
}
public List<String> queryPartners() {
// 数据库挂了!
throw new RuntimeException();
}
}
运行上述样例,咱们会发现程序不停止,输出一遍 Load start!
,一直在运行,但后续不输出 Load start!
这个时候咱们能够确认:反常确实导致当时使命不再履行
1、为什么使命报错会影响守时线程池?
2、守时线程池是真的挂掉了嘛?
3、守时线程池内部是如何履行的?
跟着这三个问题,咱们一同来看一看 ScheduledExecutorService
的原理介绍
三、原理剖析
关于 ScheduledExecutorService
来说,本质上是 延时行列 + 线程池
1、延时行列介绍
DelayQueue
是一个无界的 BlockingQueue
,用于放置完成了Delayed接口的目标,只能在到期时才能从行列中取走。
这种行列是有序的,即队头目标的推迟到期时刻最长。
咱们看一下延时行列里目标的特点:
class MyDelayedTask implements Delayed{
// 当时使命创建时刻
private long start = System.currentTimeMillis();
// 延时时刻
private long time ;
// 初始化
public MyDelayedTask(long time) {
this.time = time;
}
/**
* 需求完成的接口,获得推迟时刻(用过期时刻-当时时刻)
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert((start+time) - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
/**
* 用于推迟行列内部比较排序(当时时刻的推迟时刻 - 比较目标的推迟时刻)
*/
@Override
public int compareTo(Delayed o) {
MyDelayedTask o1 = (MyDelayedTask) o;
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}

所以,延时行列的完成原理也很简单:
- 出产端:投递音讯时增加时刻戳(当时时刻+延时时刻)
- 消费端:用当时时刻与时刻戳进行比较,若小于则消费,反之则循环等候
2、线程池的原理介绍

- 当时的线程池个数低于中心线程数,直接增加中心线程即可
- 当时的线程池个数大于中心线程数,将使命增加至阻塞行列中
- 假如增加阻塞行列失利,则需求增加非中心线程数处理使命
- 假如增加非中心线程数失利(满了),履行回绝战略
3、守时线程的原理
咱们从守时线程池的创建看:scheduledExecutorService.scheduleAtFixedRate(myTask, 3L, 1L, TimeUnit.SECONDS);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
// 初始化咱们的使命
// triggerTime:延时的完成
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 将当时使命丢进延时行列
super.getQueue().add(task);
// 创建中心线程并启动
ensurePrestart();
}
// 时刻轮算法
private long triggerTime(long delay, TimeUnit unit) {
return now() + delay;
}
从这儿咱们能够得到定论:守时线程池通过延时行列来到达守时的目的
有一个问题:咱们仅仅向 Queue
里面放了一个使命,他是怎么保证履行屡次的呢?
带着这个问题,咱们看一下他拉取使命启动的代码:
for (;;) {
// 从延时行列中获取使命
Runnable r = workQueue.take();
}
public RunnableScheduledFuture<?> take(){
for (;;) {
// 获取行列第一个使命
RunnableScheduledFuture<?> first = queue[0];
// 【要点】假如当时行列使命为空,则等候
if (first == null){
available.await();
}
// 获取当时使命的时刻
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0){
// 弹出当时使命
return finishPoll(first);
}
}
}
// 时刻戳减去当时时刻
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
当拿到使命(ScheduledFutureTask)之后,会履行使命:task.run()
public void run() {
// 履行当时的使命
if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
protected boolean runAndReset() {
if (state != NEW){
return false;
}
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
// 履行使命
c.call();
// 【要点!!!】假如使命正常履行成功的话,这儿会将ran置为true
// 假如你的使命有问题,会被下面直接捕捉到,不会将此处的ran置为true
ran = true;
} catch (Throwable ex) {
// 出现反常会将state置为EXCEPTIONAL
// 标记当时使命履行失利并将反常赋值到结果
setException(ex);
}finally {
s = state;
}
}
}
// ran:当时使命是否履行成功
// s:当时使命状态
// ran为false:当时使命履行失利
// s == NEW = false:当时使命状态出现反常
return ran && s == NEW;
}
假如咱们的 runAndReset
回来 false
的话,那么进不去 setNextRunTime
该方法:
if (ScheduledFutureTask.super.runAndReset()) {
// 修改当时使命的Time
setNextRunTime();
// 将使命从头丢进行列
reExecutePeriodic(outerTask);
}
最终,使命没有办法被丢进行列,咱们的线程无法拿到使命履行,一直在等候。
四、定论
通过上面的剖析,咱们回头看一下开篇的三个问题:
1、为什么使命报错会影响守时线程池?
- 使命报错不会影响线程池,仅仅线程池将当时使命给丢掉,没有继续放到行列中
2、守时线程池是真的挂掉了嘛?
- 守时线程池没有挂,挂的仅仅报错的使命
3、守时线程池内部是如何履行的?
- 线程池 + 延时行列
所以,通过上述的解说,咱们应该认识到:守时使命一定要加Try Catch,不然一旦产生反常
不然,你就会和作者相同,背故障让公司损失几千万,血的经历!

五、总结
鲁迅先生曾说:独行难,众行易,和志同道合的人一同进步。彼此毫无保留的共享经历,才是对立互联网寒冬的最佳挑选。
其实很多时候,并不是咱们不行尽力,很可能便是自己尽力的方向不对,假如有一个人能稍微点拨你一下,你真的可能会少走几年弯路。
假如你也对 后端架构 和 中间件源码 有爱好,欢迎增加博主微信:hls1793929520,一同学习,一同成长
我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,喜欢后端架构和中间件源码。
咱们下期再见。
我从清晨走过,也拥抱夜晚的星斗,人生没有捷径,你我皆普通,你好,陌生人,一同共勉。
往期文章引荐:
- 不愧是阿里三面,ConcurrentHashMap多线程扩容机制被面试官装到了
- 美团二面:聊聊ConcurrentHashMap的存储流程
- 从源码全面解析Java 线程池的来龙去脉
- 从源码全面解析LinkedBlockingQueue的来龙去脉
- 从源码全面解析 ArrayBlockingQueue 的来龙去脉
- 从源码全面解析ReentrantLock的来龙去脉
- 阅读完synchronized和ReentrantLock的源码后,我竟发现其十分相似
- 从源码全面解析 ThreadLocal 关键字的来龙去脉
- 从源码全面解析 synchronized 关键字的来龙去脉