Timer和ScheduledExecutorService是JDK内置的守时使命计划,而业内还有一个经典的守时使命的规划叫时刻轮(Timing Wheel), Netty内部基于时刻轮完成了一个HashedWheelTimer来优化百万量级I/O超时的检测,它是一个高性能,低消耗的数据结构,它适合用非准实时,推迟的短平快使命,例如心跳检测。本文首要介绍时刻轮(Timing Wheel)及其运用。@pdai

知识预备

需求对时刻轮(Timing Wheel),以及Netty的HashedWheelTimer要处理什么问题有开始的知道。

什么是时刻轮(Timing Wheel)

时刻轮(Timing Wheel)是George Varghese和Tony Lauck在1996年的论文’ Hashed and Hierarchical Timing Wheels: data structures to efficiently implement a timer facility ‘完成的,它在Linux内核中运用广泛,是Linux内核守时器的完成方法和基础之一。

时刻轮(Timing Wheel)是一种环形的数据结构,就像一个时钟能够分成很多格子(Tick),每个格子代表时刻的距离,它指向存储的具体使命(timerTask)的一个链表

SpringBoot定时任务 - 定时任务设计:时间轮案例和原理

以上述在论文中的图片比如,这儿一个轮子包括8个格子(Tick), 每个tick是一秒钟;

使命的添加:假如一个使命要在17秒后履行,那么它需求转2轮,最终加到Tick=1方位的链表中。

使命的履行:在时钟转2Round到Tick=1的方位,开始履行这个方位指向的链表中的这个使命。(# 这儿表示剩下需求转几轮再履行这个使命)

Netty的HashedWheelTimer要处理什么问题

HashedWheelTimer是Netty依据时刻轮(Timing Wheel)开发的东西类,它要处理什么问题呢?这儿面有两个要点:推迟使命 + 低时效性。@pdai

在Netty中的一个典型应用场景是判别某个衔接是否idle,假如idle(如客户端由于网络原因导致到服务器的心跳无法送达),则服务器会主动断开衔接,开释资源。判别衔接是否idle是经过守时使命完成的,可是Netty可能保持数百万级别的长衔接,对每个衔接去定义一个守时使命是不可行的,所以如何提升I/O超时调度的效率呢?

Netty依据时刻轮(Timing Wheel)开发了HashedWheelTimer东西类,用来优化I/O超时调度(本质上是推迟使命);之所以选用时刻轮(Timing Wheel)的结构还有一个很重要的原因是I/O超时这种类型的使命对时效性不需求十分精准。

HashedWheelTimer的运用方法

在了解时刻轮(Timing Wheel)和Netty的HashedWheelTimer要处理的问题后,咱们看下HashedWheelTimer的运用方法

经过结构函数看首要参数

public HashedWheelTimer(
        ThreadFactory threadFactory,
        long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
        long maxPendingTimeouts, Executor taskExecutor) {
}

具体参数说明如下:

threadFactory
tickDuration
unit
ticksPerWheel
leakDetection
maxPendingTimeouts

完成事例

这儿展示下HashedWheelTimer的基本运用事例。@pdai

Pom依赖

引进pom的依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.77.Final</version>
</dependency>

2个简略比如

比如1:5秒后履行TimerTask

@SneakyThrows
public static void simpleHashedWheelTimer() {
    log.info("init task 1...");
    HashedWheelTimer timer = new HashedWheelTimer(1, TimeUnit.SECONDS, 8);
    // add a new timeout
    timer.newTimeout(timeout -> {
        log.info("running task 1...");
    }, 5, TimeUnit.SECONDS);
}

履行成果如下:

23:32:21.364 [main] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - init task 1...
...
23:32:27.454 [pool-1-thread-1] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - running task 1...

比如2:使命失效后cancel并让它重新在3秒后履行。

@SneakyThrows
public static void reScheduleHashedWheelTimer() {
    log.info("init task 2...");
    HashedWheelTimer timer = new HashedWheelTimer(1, TimeUnit.SECONDS, 8);
    Thread.sleep(5000);
    // add a new timeout
    Timeout tm = timer.newTimeout(timeout -> {
        log.info("running task 2...");
    }, 5, TimeUnit.SECONDS);
    // cancel
    if (!tm.isExpired()) {
        log.info("cancel task 2...");
        tm.cancel();
    }
    // reschedule
    timer.newTimeout(tm.task(), 3, TimeUnit.SECONDS);
}
23:28:36.408 [main] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - init task 2...
23:28:41.412 [main] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - cancel task 2...
23:28:45.414 [pool-2-thread-1] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - running task 2...

进一步了解

咱们经过如下问题进一步了解HashedWheelTimer。@pdai

HashedWheelTimer是如何完成的?

简略看下HashedWheelTimer是如何完成的

SpringBoot定时任务 - 定时任务设计:时间轮案例和原理

Worker
HashedWheelBucket
HashedWheelTimeout

结构函数

public HashedWheelTimer(
        ThreadFactory threadFactory,
        long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
        long maxPendingTimeouts, Executor taskExecutor) {
    checkNotNull(threadFactory, "threadFactory");
    checkNotNull(unit, "unit");
    checkPositive(tickDuration, "tickDuration");
    checkPositive(ticksPerWheel, "ticksPerWheel");
    this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
    // Normalize ticksPerWheel to power of two and initialize the wheel.
    wheel = createWheel(ticksPerWheel);
    mask = wheel.length - 1;
    // Convert tickDuration to nanos.
    long duration = unit.toNanos(tickDuration);
    // Prevent overflow.
    if (duration >= Long.MAX_VALUE / wheel.length) {
        throw new IllegalArgumentException(String.format(
                "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                tickDuration, Long.MAX_VALUE / wheel.length));
    }
    if (duration < MILLISECOND_NANOS) {
        logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
                    tickDuration, MILLISECOND_NANOS);
        this.tickDuration = MILLISECOND_NANOS;
    } else {
        this.tickDuration = duration;
    }
    workerThread = threadFactory.newThread(worker);
    leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
    this.maxPendingTimeouts = maxPendingTimeouts;
    if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
        WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
        reportTooManyInstances();
    }
}

创立wheel

private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
    //ticksPerWheel may not be greater than 2^30
    checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");
    ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
    HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
    for (int i = 0; i < wheel.length; i ++) {
        wheel[i] = new HashedWheelBucket();
    }
    return wheel;
}
private static int normalizeTicksPerWheel(int ticksPerWheel) {
    int normalizedTicksPerWheel = 1;
    while (normalizedTicksPerWheel < ticksPerWheel) {
        normalizedTicksPerWheel <<= 1;
    }
    return normalizedTicksPerWheel;
}

使命的添加

@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    checkNotNull(task, "task");
    checkNotNull(unit, "unit");
    long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
    if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
        pendingTimeouts.decrementAndGet();
        throw new RejectedExecutionException("Number of pending timeouts ("
            + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
            + "timeouts (" + maxPendingTimeouts + ")");
    }
    start();
    // Add the timeout to the timeout queue which will be processed on the next tick.
    // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
    // Guard against overflow.
    if (delay > 0 && deadline < 0) {
        deadline = Long.MAX_VALUE;
    }
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    timeouts.add(timeout);
    return timeout;
}

履行方法

/**
    * Starts the background thread explicitly.  The background thread will
    * start automatically on demand even if you did not call this method.
    *
    * @throws IllegalStateException if this timer has been
    *                               {@linkplain #stop() stopped} already
    */
public void start() {
    switch (WORKER_STATE_UPDATER.get(this)) {
        case WORKER_STATE_INIT:
            if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                workerThread.start();
            }
            break;
        case WORKER_STATE_STARTED:
            break;
        case WORKER_STATE_SHUTDOWN:
            throw new IllegalStateException("cannot be started once stopped");
        default:
            throw new Error("Invalid WorkerState");
    }
    // Wait until the startTime is initialized by the worker.
    while (startTime == 0) {
        try {
            startTimeInitialized.await();
        } catch (InterruptedException ignore) {
            // Ignore - it will be ready very soon.
        }
    }
}

中止方法

@Override
public Set<Timeout> stop() {
    if (Thread.currentThread() == workerThread) {
        throw new IllegalStateException(
                HashedWheelTimer.class.getSimpleName() +
                        ".stop() cannot be called from " +
                        TimerTask.class.getSimpleName());
    }
    if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
        // workerState can be 0 or 2 at this moment - let it always be 2.
        if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
            INSTANCE_COUNTER.decrementAndGet();
            if (leak != null) {
                boolean closed = leak.close(this);
                assert closed;
            }
        }
        return Collections.emptySet();
    }
    try {
        boolean interrupted = false;
        while (workerThread.isAlive()) {
            workerThread.interrupt();
            try {
                workerThread.join(100);
            } catch (InterruptedException ignored) {
                interrupted = true;
            }
        }
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
    } finally {
        INSTANCE_COUNTER.decrementAndGet();
        if (leak != null) {
            boolean closed = leak.close(this);
            assert closed;
        }
    }
    return worker.unprocessedTimeouts();
}

什么是多级Timing Wheel?

多级的时刻轮是比较好了解的,时钟是有小时,分钟,秒的,秒转一圈(Round)分钟就转一个格(Tick), 分钟转一圈(Round)小时就转一格(Tick)。

SpringBoot定时任务 - 定时任务设计:时间轮案例和原理

PS:明显HashedWheelTimer是一层时刻轮。