前语
在生活中太阳的东升西落,鸟类的南飞北归,四级的轮换,每天的上下班,海水的潮汐,每月的房租车贷等等,假设用程序员的视角看,这便是一个个的守时使命,在日常的开发作业中也有许多的守时使命场景:
- 数仓体系凌晨进行的数据同步
- 订单12小时未付出的状况校验
- rpc调用超时时刻的校验
- 缓存数据失效时刻的延长
- 守时开启的促销活动
- ……
假设现在有一个使命需求3s后履行,你会怎么完结?
简单点,直接一个线程的休眠,thread.sleep(3000),一行代码就能到达意图,可是功能嘛……,因为每个使命都需求一个独自的线程,当体系中存在许多使命,
使命调度
假设,现在有一个使命需求3s后履行,你会怎么完结呢?
简单点,直接一个休眠,让线程sleep 3s,不就到达意图了吗?可是功能嘛……,因为每个使命都需求一个独自的线程,在体系中存在许多使命时,这种计划的耗费是极端巨大的,那么怎么完结高效的调度呢?大佬们垂头看了一眼手表,一个算法呈现了
时刻轮的数据结构

如图所示,这便是时刻轮的一个根底结构,一个存储了守时使命的环形行列,能够理解为一个时刻钟,行列的每个节点称为时刻槽,每个槽位又运用列表存储着需求履行的守时使命。和生活中的钟表运转机制相同,每隔固定的单位时刻,就会从一个槽位跳到下一个槽位,就像秒针跳动了一次,再取出当时槽位的使命进行履行。假设固定单位时刻为1S,当时槽位位2,假设需求刺进一个3S后的使命,就会在槽位5的的列表里加上当时使命。等指针运转到第五个槽位时,取出使命履行就能够了。
时刻轮的最大优势是在时刻复杂度上的优势,一个使命简单的生命周期:
- 创立使命,刺进到数据结构中。
- 查询使命,找到满意条件的使命
- 履行使命。
- 使命归档,从使命调度的列表中删出。
其间第三步的履行时刻是固定的,所以1,2,4这三部就的时刻复杂度就决定了整个使命调度流程的复杂度,而时刻轮是链式存储结构,所以在增删和查询时,时刻复杂度都是0(1),其他常见的使命调度算法例如最小堆和红黑树以及跳表。
最小堆是一颗彻底二叉树而且子节点的值总是大于等于父节点的值,所以在刺进时分需求判断父节点的联系,它的时刻添加操作时刻复杂度是O(logn),在使命履行时,只需求判断最顶节点就行,所以它的查询时刻复杂度时哦O(1)。
依据红黑树的特性现已被归纳法证明它的添加的时刻复杂度是O(logn),查找最小节点的时刻复杂度位O(h)。
跳表的的实质是完结二分查找法的有序链表,可是他有多个层级,和红黑树的高度值类似,它的时刻复杂度也是O(logn)
高级时刻轮
如上图所示,假设一个刻度代表1S,那么一个周期便是1分钟,可是假设我一个使命是在3分钟后履行呢,假设是在一个12小时后履行呢?
当然假设是单纯的添加环形链表的长度也是能够的,直接扩大到3600*24,一天一个周期,直接放进来。可是还有更好的方法。
带次序符号的使命
使命履行次序的计算公式:((使命履行时刻-当时时刻)/固定单位时刻)%槽位数量
依据槽位计算公式能够算出当时使命需求刺进履行的次序,我在使命上面加一个字段round,当每次履行到该槽位时,就遍历该槽位的使命列表,每个使命的round-1,取出来round=0的使命履行就行。
for(Task task:taskList){
int round= task.getRound();
round=(round-1);
task.setRound(round);
if(round==0){
doTask(task);
}
}
假设使命距离不是很大,看起来也是不错的一种解决方式。
可是作业中有许多使命,延迟履行的时刻是好久今后的,例如延保履约服务成功之后会有一个7天自动完结的守时使命,甚至有一些几年后才会履行的使命,假设都用round来处理的话,那这个round将会变的非常大的一个数字,也会在使命列表中刺进许多当时不需求履行的使命,假设每次都履行上面的逻辑,显然会糟蹋许多的资源。
多层时刻轮

多层时刻轮的核心思维是:
就上上图的水表,有许多小的表盘,可是每个表盘的刻度其实是不相同,又或许手表里的时分秒或许日历上的年月日。
针对时刻复杂度的问题:不做遍历计算round,只需到了当时槽位,就把使命列表的一切使命拿出来履行。
针对空间复杂度的问题:分层,每个层级的时刻轮刻度不相同,多个时刻轮和谐作业。

如上图所示,第一次时刻轮,每个刻度是1ms,一轮是20ms,第二个层时刻轮的刻度是20ms,一轮便是400ms,第三层的刻度是400ms,一轮便是8000ms,每层的周期就等于 20ms *2的n次方。这要运用多层级时刻轮就能够很容易把使命区分开来。每逢高层次时刻轮到达当时节点,就把使命降级到低层级的时刻轮上。对于400ms的时刻轮来说,小于1ms和小于399ms的使命都是过期使命,只需不大于400ms,都认为是过期使命。
代码完结的话,往上也有许多,最近比较炽热的POWER-JOB的分布式调度结构便是才有的时刻轮算法,粘贴下核心代码大家看下:
1.首先界说了一个使命接口
public interface TimerTask extends Runnable {
}
2.调度中的使命对象
public interface TimerFuture {
/**
* 获取实际要履行的使命
* @return
*/
TimerTask getTask();
/**
* 撤销使命
* @return
*/
boolean cancel();
/**
* 使命是否撤销
* @return
*/
boolean isCancelled();
/**
* 使命是否完结
* @return
*/
boolean isDone();
}
3.调度器接口
public interface Timer {
/**
* 调度守时使命
*/
TimerFuture schedule(TimerTask task, long delay, TimeUnit unit);
/**
* 中止一切调度使命
*/
Set<TimerTask> stop();
}
4.时刻轮的完结
public class HashedWheelTimer implements Timer {
private final long tickDuration;
private final HashedWheelBucket[] wheel;
private final int mask;
private final Indicator indicator;
private final long startTime;
private final Queue<HashedWheelTimerFuture> waitingTasks = Queues.newLinkedBlockingQueue();
private final Queue<HashedWheelTimerFuture> canceledTasks = Queues.newLinkedBlockingQueue();
private final ExecutorService taskProcessPool;
public HashedWheelTimer(long tickDuration, int ticksPerWheel) {
this(tickDuration, ticksPerWheel, 0);
}
/**
* 新建时刻轮守时器
* @param tickDuration 时刻距离,单位毫秒(ms)
* @param ticksPerWheel 轮盘个数
* @param processThreadNum 处理使命的线程个数,0代表不启用新线程(假设守时使命需求耗时操作,请启用线程池)
*/
public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processThreadNum) {
this.tickDuration = tickDuration;
// 初始化轮盘,巨细格式化为2的N次,能够运用 & 替代取余
int ticksNum = CommonUtils.formatSize(ticksPerWheel);
wheel = new HashedWheelBucket[ticksNum];
for (int i = 0; i < ticksNum; i++) {
wheel[i] = new HashedWheelBucket();
}
mask = wheel.length - 1;
// 初始化履行线程池
if (processThreadNum <= 0) {
taskProcessPool = null;
}else {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build();
// 这儿需求调整一下行列巨细
BlockingQueue<Runnable> queue = Queues.newLinkedBlockingQueue(8192);
int core = Math.max(Runtime.getRuntime().availableProcessors(), processThreadNum);
// 基本都是 io 密集型使命
taskProcessPool = new ThreadPoolExecutor(core, 2 * core,
60, TimeUnit.SECONDS,
queue, threadFactory, RejectedExecutionHandlerFactory.newCallerRun("PowerJobTimeWheelPool"));
}
startTime = System.currentTimeMillis();
// 发动后台线程
indicator = new Indicator();
new Thread(indicator, "HashedWheelTimer-Indicator").start();
}
@Override
public TimerFuture schedule(TimerTask task, long delay, TimeUnit unit) {
long targetTime = System.currentTimeMillis() + unit.toMillis(delay);
HashedWheelTimerFuture timerFuture = new HashedWheelTimerFuture(task, targetTime);
// 直接运转到期、过期使命
if (delay <= 0) {
runTask(timerFuture);
return timerFuture;
}
// 写入阻塞行列,保证并发安全(功能进一步优化能够考虑 Netty 的 Multi-Producer-Single-Consumer行列)
waitingTasks.add(timerFuture);
return timerFuture;
}
@Override
public Set<TimerTask> stop() {
indicator.stop.set(true);
taskProcessPool.shutdown();
while (!taskProcessPool.isTerminated()) {
try {
Thread.sleep(100);
}catch (Exception ignore) {
}
}
return indicator.getUnprocessedTasks();
}
/**
* 包装 TimerTask,保护预期履行时刻、总圈数等数据
*/
private final class HashedWheelTimerFuture implements TimerFuture {
// 预期履行时刻
private final long targetTime;
private final TimerTask timerTask;
// 所属的时刻格,用于快速删去该使命
private HashedWheelBucket bucket;
// 总圈数
private long totalTicks;
// 当时状况 0 - 初始化等待中,1 - 运转中,2 - 完结,3 - 已撤销
private int status;
// 状况枚举值
private static final int WAITING = 0;
private static final int RUNNING = 1;
private static final int FINISHED = 2;
private static final int CANCELED = 3;
public HashedWheelTimerFuture(TimerTask timerTask, long targetTime) {
this.targetTime = targetTime;
this.timerTask = timerTask;
this.status = WAITING;
}
@Override
public TimerTask getTask() {
return timerTask;
}
@Override
public boolean cancel() {
if (status == WAITING) {
status = CANCELED;
canceledTasks.add(this);
return true;
}
return false;
}
@Override
public boolean isCancelled() {
return status == CANCELED;
}
@Override
public boolean isDone() {
return status == FINISHED;
}
}
/**
* 时刻格(实质便是链表,保护了这个时刻或许需求履行的一切使命)
*/
private final class HashedWheelBucket extends LinkedList<HashedWheelTimerFuture> {
public void expireTimerTasks(long currentTick) {
removeIf(timerFuture -> {
// processCanceledTasks 后外部操作撤销使命会导致 BUCKET 中仍存在 CANCELED 使命的状况
if (timerFuture.status == HashedWheelTimerFuture.CANCELED) {
return true;
}
if (timerFuture.status != HashedWheelTimerFuture.WAITING) {
log.warn("[HashedWheelTimer] impossible, please fix the bug");
return true;
}
// 本轮直接调度
if (timerFuture.totalTicks <= currentTick) {
if (timerFuture.totalTicks < currentTick) {
log.warn("[HashedWheelTimer] timerFuture.totalTicks < currentTick, please fix the bug");
}
try {
// 提交履行
runTask(timerFuture);
}catch (Exception ignore) {
} finally {
timerFuture.status = HashedWheelTimerFuture.FINISHED;
}
return true;
}
return false;
});
}
}
private void runTask(HashedWheelTimerFuture timerFuture) {
timerFuture.status = HashedWheelTimerFuture.RUNNING;
if (taskProcessPool == null) {
timerFuture.timerTask.run();
}else {
taskProcessPool.submit(timerFuture.timerTask);
}
}
/**
* 模仿时针转动的线程
*/
private class Indicator implements Runnable {
private long tick = 0;
private final AtomicBoolean stop = new AtomicBoolean(false);
private final CountDownLatch latch = new CountDownLatch(1);
@Override
public void run() {
while (!stop.get()) {
// 1. 将使命从行列推入时刻轮
pushTaskToBucket();
// 2. 处理撤销的使命
processCanceledTasks();
// 3. 等待指针跳向下一刻
tickTack();
// 4. 履行守时使命
int currentIndex = (int) (tick & mask);
HashedWheelBucket bucket = wheel[currentIndex];
bucket.expireTimerTasks(tick);
tick ++;
}
latch.countDown();
}
/**
* 模仿指针转动,当返回时指针现已转到了下一个刻度
*/
private void tickTack() {
// 下一次调度的绝对时刻
long nextTime = startTime + (tick + 1) * tickDuration;
long sleepTime = nextTime - System.currentTimeMillis();
if (sleepTime > 0) {
try {
Thread.sleep(sleepTime);
}catch (Exception ignore) {
}
}
}
/**
* 处理被撤销的使命
*/
private void processCanceledTasks() {
while (true) {
HashedWheelTimerFuture canceledTask = canceledTasks.poll();
if (canceledTask == null) {
return;
}
// 从链表中删去该使命(bucket为null说明还没被正式推入时刻格中,不需求处理)
if (canceledTask.bucket != null) {
canceledTask.bucket.remove(canceledTask);
}
}
}
/**
* 将行列中的使命推入时刻轮中
*/
private void pushTaskToBucket() {
while (true) {
HashedWheelTimerFuture timerTask = waitingTasks.poll();
if (timerTask == null) {
return;
}
// 一共的偏移量
long offset = timerTask.targetTime - startTime;
// 一共需求走的指针步数
timerTask.totalTicks = offset / tickDuration;
// 取余计算 bucket index
int index = (int) (timerTask.totalTicks & mask);
HashedWheelBucket bucket = wheel[index];
// TimerTask 保护 Bucket 引证,用于删去该使命
timerTask.bucket = bucket;
if (timerTask.status == HashedWheelTimerFuture.WAITING) {
bucket.add(timerTask);
}
}
}
public Set<TimerTask> getUnprocessedTasks() {
try {
latch.await();
}catch (Exception ignore) {
}
Set<TimerTask> tasks = Sets.newHashSet();
Consumer<HashedWheelTimerFuture> consumer = timerFuture -> {
if (timerFuture.status == HashedWheelTimerFuture.WAITING) {
tasks.add(timerFuture.timerTask);
}
};
waitingTasks.forEach(consumer);
for (HashedWheelBucket bucket : wheel) {
bucket.forEach(consumer);
}
return tasks;
}
}
}
作者:京东保险陈建华
来历:京东云开发者社区