一口气说出 6种 延时队列的实现方案,面试稳稳的


本文demo 已全部上传 github 地址:https://github.com/chengxy-nds/delayqueue,WX查找【程序员内点事】,回复【666】妙趣横生。

五一期间原计划是写两篇文章,看一本技术类书籍,效果这五天因为自律性过于差,不由得各种引诱,我连电脑都没打开过,计划完美H { J l宣告失利。所以在这能看出和大佬之间的间隔,人家没白没夜的更文,比你优异的人比你更尽力,难以望其项背,真是让我自0 { 9惭形秽。

知耻然后勇,这不逼着自己又学起来了,个人比较喜爱一些实践类的东西,既学习到知识又能让技术落地,能搞出个demo最好,本来不知道该同享2 w ^什么主题,好在最近项目紧急招人中,而我有幸做了回面试官,就给我们收拾同享一道面试题:“怎样完结延时, Z A队伍?”。

下边会介绍多种完结延时队伍的思路,文末供应有几种完结方法的 github地址。其实哪种方法都没有肯定的好与坏,只是看把它用在什么业务场景中,技术这东西没有最好的只需最合适的。

一、延时V f P D队伍的运用

什么是延时队伍?断章取义:首要它要具K . / _ 2有队伍的特性,再给它附加一个C C n d * ^推延B 9 o ^ k ~消费队伍消息的功用,也就是说可以指定队伍中的消息在哪个时间点被消费。

延时队伍在项目中的运用仍是比较多的,特别像电商类渠道:

1、订单成功后k 5 p Y s = 0 Q V,在30分钟内没y { q有付出,自动吊销订单

2、外卖渠道发送订餐告诉,下单成功后60s给用户推o : }送短信。

3、假设订单一贯处于某一个未完结情况时,及时处理关单,; 6 (并交还库存

4、淘宝新建商户一个月内还没上传商品q 4 [ N H信息,将冻住商铺等

。。。。

上边的这些场景都可以运用延时队伍处理。

二、延时队伍的完结

我个人一贯秉承的观念:作业上能用JDK自带API完结的功用,就不要简略自己重复造轮子,或许引进三方中间件。一方面自己封装很简略出问题(大佬在外),再加上调试验证发生许多不用` c k ; Y o ] Z要的作业量;另一方面一旦接入三方的中间件就会让体系复杂度成倍b P I H N +的添加,维护本钱也大大的添加。

1、DelayQueue 延时队伍

JDK 中供应了一组完结推延队伍的API,位于Jag ! fva.util.concurrent包下DelayQX 2 Z | 8 =ueue

Delay8 A o p jQueue是一个BlockingQueue(无界堵塞)队伍,它本质就是封装了一个PriorityQue} / ` A ( Y B iue(优先队伍),PriorityQueue内部运用完全二叉堆(不知道的自行了解哈)来完结队伍元素排序,我们在向Del0 [ 7 @ C J sayQueue队伍中添加元素时,会给元素一个Delay(推延时间)作为排序条件,队伍中最小的元素会优先放在队首。队伍中的元素只需到了Delay时间o T 4 ; 2 s – e才答应从队伍中取出。队伍中可以放底子数据类型或自界说实体类,在寄{ v : t存底子数据类型时,优先队伍中元素默许升序摆放,自界说实体类就需要我们根据类属I z v 8 ` ~ ~性值比较核算了。

先简略完结一下看看效果,添加三个order_ / | U入队Delay| 9 R 0Q} b :ueue,分别设置订单在当时时t G H a g刻的5秒10秒15秒后吊销。

一口气说出 6种 延时队伍的完结计划,面试稳稳的

要完结DelayQueue延时队伍,队中元素要implements Delayed 接口,这哥接口里只需一个i | d * ) ] o wgetDelay方法,用于设置延期时间。Order类中compareTo方法H 2 H w T C担任对队伍中的元素进行排序。

public class Order implementss , X D n Delayed {
/**
* 推延时间
*/
@JsonForm{ R ;at(locale = "zh", timezone = "G_ X F 4 b ~MT+8", pattern = "yyyy-MM-dd HH:mm:sj ^ w u } !s")
private long time;
Strin L * j 7 : w |ng name;
public Orderl ` t U 6(String name, long time, TimeUnit unit) {
this.name = name;
this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis()n 5 { o ` K;
}
@Override
public inS ` 1 ( p F } ht compareTo(Delayed o) {
Order Orde% 9 ] ) d 6r = (Order) o;
loH J c K B ) Eng diff = this.timeT H , # - Order.time;
if (diff <=P p p 0) {
return -1;
} else {
returng { a % [ @ 1;
}
}
}

DelayQueueput方法是线程安全的,因为put方法内部运用了ReentrantLock锁进行线程同步。DelayQueue还供应了两种出队的方法 poll()take()poll() 为非堵塞获取,没有到期的元素直接回来null;take() 堵塞方法获取,没有到期的K j G q = / o元素线程将会等候。

public class Delay| p d ? K ] u &QueueDE t R ) Gemo {
pubZ E Vlic static void main(String[] args) throws InterruptedException {
Order Order1 = new Order("Order1", 5, TimeUJ P & _ N N A nit.SECONDS);
Order Order2/ u W ` 4 9 e R v = new OrdeN x + Y c Rr("Order2", 10, TimeUnit.SECONDSG S G / e ~ q ] !);
Order Order3 = ne3 h ` } U q 5 Sw Order("Order3", 15h P 1, TimeUnit.SECONDS);
DelayQueue<Order> delayQueue = new DelayQueue<>();
delayQueue.put(Orz c c n i D K d Dder1);
delayQueue.put(Orders z K % $ T w [ q2);
delayQueue.put(Order3);
System.out.println("订单推延队伍初步时间:" + Local2 ] k ] j C ( N aDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
while (delayQueue.sl c : -ize() != 0) {
/**
* 取队伍头部元素是否过期
*/
Order task = delayQueue.poll();
if (task != null) {
System.out.format("订单:{%s}被吊销, 吊销时间:{%s}n", task.name, LocaU ] # J E v X $ #lDateTime.now().format(DateTimeForx 0 * { g Omatter.ofPattern("yyyy-MM-dd HH:mm:ssY U  g $  g ( @")));
}
Thread.sleep(1000);
}
}
}

上边只是简略的完结入队与出队的操作,实践开发中会d o | w @ W F有专门的线程,担任消息的入队与消费。

实行后看到效果如下,Order1Orde3 v ( d K { 3 Er2Order3 分别在 5秒10秒15秒后被实行,至此就用Delaf o C B 9 a 6 oyQueue完结了延时u h 2 8 队伍。

订单推延队伍初步时间:F  n q G b2020-05-06 14:59:09
订单:{Order1}被吊销, 吊销时间:h j ]{2020-05-06 14:59:14}
订单:{Order2}被吊销, 吊销时间:{2020-05-06 14:59:19}
订单:{Order3}i x j ? } ) 0 p被吊销, 吊销时间:{2020-05-06 14:59:24}
2、Quartz 守时任务

Quartz一款十分经典任务调度结构,在RedisRabbitMQ还未广C !泛运用时,超时未付出吊销订单功用@ , b ; [ D S g :都是由守时任务完结的。守时任务它有必定的周期性,或许许多单子现已超时,但还没抵达触发实行的时间点,那么就会造成订单处理的不可及时。s 1 ( F # 4 D a

引进quartz结构依托包

<dependency&gN L N S p D @ ( %t;
<groupId>org.springframework.boot</groupId>
<artifactId>j N 6 C 8 F yspring-boot-starter-quartz</artifactId>
</dependX & P . D ency>

在发起类中运用@EnableScheduling注解敞V r m ) s v D开守时任务功用。

@EnableScheduling
@SpringBc B { ; x JootApplication
public class DelayqueueApplication {
public static void main(String[] args) {
SpringApplication.run(DelayqE y d / n y ^ - ~ueueApplication.class, args);
}
}

编写一个守时任务,每个5秒实行一次e R l

@Compop ] nent
pubu P r ) C P Nlic class QuartzDemo {
//每隔五秒
@Scheduled(cron = "0/5 * * * * ? ")
public void process(){
System.out.println("我是守时任务!");
}
}
3、O J 8 BRedis sorted set

Redis的数据结构Zset,相同可以完结推延队伍~ . ^ Q K 3 _ 0 M的效果,首要运用它的score特点,redis通过score来为调会集的成员进行从小到大的r y I G e w M排序。

一口气说出 6种 延时队伍的完结计划,面试稳稳的

通过zadd指令向队伍delayqueue 中添加元素,并设置score值表明元素过期的时间;向delayqueue 添加三个order1order2order3,分别是n ; I i10秒20秒30秒后过期。

 zadd delayqueue 3 order3

消费端轮询队伍delayqueue, 将元素排序后取最小时间与当时时间比对,如小于当时时间代表现已过期r A ^ U b 3 D p移除key

    /*B N { m 0 G }*
* 消费消息
*/
public void pollOrderQueue() {
w- R yhi+ & h tle (true) {
Set<Tuple> set = jedis.zra. j ) S A | 2ngeWithScores(DELAY_QUEUE, 0, 0);
String value = ((Tuple) set.toArray()[0])* . E ,.getElement();
int score = (int) ((Tuple) set.toArray()[0])B R j c ?  T.getScore();
Ca* J $ ? o ) a Qlendar cal = Calendar# X ! o M s L.g8 h h 0etInstance();
int nowSecond = (int) (cal.getTimeIn B iMillis() / 1000);
if (nowSecond >= score) {
jedis.zrem(DELAY_QUEUE,H , | 7  J 2 { q value);
System.out.println(sdf.format(new Date()) + " removed key:" +] T h  . - } R value);
}
if (9 D / w x J S [jedis.zca` V g krd(DQ D G w V ; f Z $ELAY_QUEUE) <= 0) {N F j |
Syst{ z Q s 8 2 mem.out.println(sdf.format(new Date()) + " zset empty ");
return;
}
Thread.sleep(1000);
}
}

我们看到实行效果符合预期

2020-05-07 13:24:09 add finished.
2020-05-07 13:24:19 removed key:orda P r 3 $ @er1
2020-05-07 13:24:29 removed key:order2
2020-05-07 13:24:39 removed kQ e ? [ dey:order3
2020-05-07 13:24:39 zset empty
4、Redis 过期回调? @ ) 7

Rediskey过期回调事件,也能抵达推延n A 7 + ^ 2 t e队伍的效果,简略来R p 3 w ; n g L /说我们打开监听key是否过期的事件,一旦keY H G 8y过期会触发s 9 b x h 0 N一个callback事件。

修改redis.conf文件打开notify-keyspace-events Ex

notify-keyspace-events Ex

Redis监听配备,注入Bean RedisMessageListC f ^ k k 9eneP ( D C A c P urContaiU 5 s / [ b V xner

@Configuration
public class RedisListenerConfig {
@Bean
RedisMessageListenerContainer container(Rv ~ # P s 2 % DedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new_ * V 5 4 RedisMessageListenerContainer();
containe+ k 8 I 6r.setConnectionFac` , Ntory(conneE / 5 5 , = LctionFactory);y ~ H v c ` w Z 
return contM f F J 7 ? lai; f @ner;
}
}

编写Redis过期回 O m调监听办i Z Q ? I .法,必须继_ r C S NKeys O - 7 B q - D )ExpirationEventMessage ; I - S J | 1eLis# g p , ~ y ; /tener ,有点类似于MQ的消息监听。

@CompoR B $ c { O $ & Tnent
public class RedisKeyExp+ 7 _ O nirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContU o $ _ J 0 h ainer) {
suj w t Qper(listenerContaiu ? * ~ner);
}
@Overrip V k | W @de
public void onMessage(Message message, byte[] pattern) {
String eT o L i z IxpiredKey = message.toString();
System.out.println("监听到keyB 5 b V y $:" + expiredKey + "已过期");
}
}

到这代码就编写完结,十分的简略,接下来测验一下效果,在redis-clip / , ?客户端添加一个key 并给定3s的过期时间。

 set xiaofu 123 ex 3

在控制台成功监听到了这个过期的keyB # F

监听到过期的key为:xiao& R M 4 u # c Kfu
5、RabbitMQ 延时队伍

运用 RabbitMQ 做延时队伍是比较常见的一种方法,而实践上RabbitMQ 本身并没有直接支持供应推延队伍功用,而是通过Y T Y 3 F A | RabbitMQ; C s 9 f 消息队伍的 TTLDXL这两个特点间接完结的。

先来认识一下 TT. q & ` 7 a HLDXL两个概念:

Time To Live(TTL) :

TTL 断章取义:指的是消息的存活时间,RabbitMQ可以通过x-mesE / / J r . gsage-tt参数来设置指定Queue(队伍)和 Message(消息)上消息的存活时间,它的值是一个非负整数,单位为微秒。

RabbitMQ 可以从两种维度设置消息过期时间,分别是队伍消息本身

  • 设置队伍过期时间,那么队伍中所有消息都具有相同的过期时间。
  • 设置消息过期时w t @ , 3 3 Z 7 0刻,对队伍中的某一条消息设置过期时间,每条消息TTL都可以不同。

假设一起设置队伍和队伍中消息的! v OTT} ` i b ] B bL,则TTL值以两者中较小的值为准。而队伍中的消息存在队伍中的时间,一旦超过TTL过期时间则成~ # 1 dDead Letter(死信)。

Dead Letter ExchangesDLX

DLX即死信沟通机,绑定在死信沟通机上的即死信队伍。RabbitMQQueue(队伍)可以配备两个参数x-dead-letter-exchangex-deam Q A ^ *d-letter-routing-5 : .key(可选),一旦队伍内出现了Dead Letter(死信),则依照这两个参数可以将消息从头路由到另一个Exchange(沟通机),让消息从头被消费。

x-dead-letter-exchange:队伍中出现Dead Letter后将Dead Letter从头路由转发到指定 exchange(沟通机)。

x-d& ; } Z h S ? uead-letter-rou= 1 w j f # (ting-key:指定routing-key发送,一般为要指定转发的队伍O H ! – A ` q #

队伍出现DI ^ Q W & 7 = V Vead Letter的情况有T + z S 4

  • 消息或许队伍的TTL过期
  • l j Z ] Q / l列抵达最大长度
  • 消息被消j & @ q $ j S N费端拒绝(basic.reject or basic.nack)

下边结合一张图看看怎样完结超30分钟未付出关单功用,我们将订单消息A0001发送到推延队伍ordeM 8 ^ K #r.delay.queue,并设置x-message-tt消息存活时间为30分钟,当抵达3= ( _ B ?0分钟后订单消息A0001成为了Dead Letter(死信),推延队伍检测到有死信,通过配备x-dead-letter-exchange,将死信从头转发到能正常消费的关单队伍,直接监听关单队伍处理关单逻辑即可。

一口气说出 6种 延时队伍的完结计划,面试稳稳的

发送消息时指定消息推延的时间

public void send(String delayTimesn R O B # ^ : ;) {
amqpTemplate.convertAndSend("order.pay.exchan4 t & S , Y $ge", "order.pay.queue","咱! 3 . Z ; y们好我k f 2是推延数据", message -> {
// 设置推延毫秒值
message.gZ u e a f 2 = @etMessageProperties().setExpiration(Strig l w - : ( Y I .ng.valueOf(delayTimes));
return message;
});
}
}

设置推延队伍出现死信后的转发规则

/**
* 延时队伍
*/
@Bean(name = "order.delay.queue")
public Queue getMessageQueue() {
return QueueBuilder
.durable(RabU w U c ( D k S ibitConstant.DEAD_LETTER_QUEUE)
// 配备到期后转发的沟通
.withArgument("x-d1 R q ! Kead-ler Y [ 8 W @tter-exchange", "order.close.exchange")
// 装) * ` g备到期后转发的路由键
.withArgument("x-dead-letter-routing-key", "order.close.queue")
.bI S p 8 Z c ] uild();
}
6、时间轮

前边几种延时队伍的完结方法相对简略,比较简略了解,时间轮算法就稍微有点抽象了。kafkanetta w 7y都有根据时间轮算法完结延时队伍,下边首要实践Netty的延 E v Y $ J 3 w时行M | i y j 9 . 9 N列讲一下时间轮是[ 6 1什么原P o F a Q B 6 M C理。

U ` H Y a i 5来看一张时间轮的原理图,解读一下时间轮的几个底子概念

一口气说出 6种 延时队伍的完结计划,面试稳稳的

wheel :时间轮,图中的圆盘可以看作是挂钟的刻度。比方一圈round 长度为24秒,刻度数为 8,那么每一个刻度表V _ z L | . f l3秒。那么时间精度就是 3秒。时间长度 / 刻度数值越& ~ I J W大,精度越大。

当添加一个守时 / z H E / A、延时使6 , 6 y m h命A,假设会推延25秒后才会实行 * ,可时间轮一圈round 的长度才24秒,那么此刻会根据时间轮长度和刻度得到一个圈数 round和对应的指针方位 index,也是就使O U t | J h N p e命Au J w R n绕一圈指向0格子上,此刻时间轮会记载该任务的roundindex信息。当round=0,index=0 ,指针指向0格子 任务A并不会实行,因为 round=0不满足要求。

所以每一个格子代表的是一些时间,比方1秒25秒 都会指向0格子上,而任务则放在k j ` – V k . I每个格子对应的链表中,这点和HashMap的数据有些类似。

Netty构建延时队伍首要用HashedWheelTimerHashedWheelTimer底层数据结构依$ # 4然是运用D_ H r O 9 l U J ;elayedQueue,只是选用时间轮的算z Q @ e P y F z法来完结。

下面我们用Netty 简略完结延时队伍,HashedWheelW v G ; ! ! N !Timer构造函数比较多,解释N Q $ ]一下各参数的含义。

  • ThreadFactory :表明用于生成作业线程,一般选用线程池;
  • tiB U ^ ckDurationunit:每格的时间间隔,默许100ms; V % o [ $ l
  • tic- k B 1 qksPerWheel:一圈下来有几格,默许512,而假设传% m } Q 1 %入数值的不是2的N次方,则会调整为大6 1 ~ g ] %于等于该参数的一个2的N次方M & y 9数值,有利于优化q 3 @ / G R F 2 Chash值的核算。
public HashedWheelTimw j $ ! (er(ThreadFactory threH & _adFactory, long tickDuration, TimeUnit unit, inp 9 g Pt ticksPerWheel) {
this(threadFactory, tickDuration, unit, ticksPerWheel, true);
}
  • TimerTask:一个e $ p j守时任务的完结接口,其中run方法包装了守? q p 1 e时任务的逻辑: N y % | Y Q [
  • Timeout:一个守时任务提交到Timer之后回来的句; w X柄,通过这个句柄外部可以吊销这个守时任务,并对守时任务的情况7 J r P M进行一些底子的判别。
  • Timer:是HashedWheelTimer完结的父接口,仅界说了怎样提交守时任务和怎样停止整个守时机制。
public class NettyDelayQueue {
public staticg ^ x 6 N X void main(String[] args) {
final Timer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 5, Time@ * C u Q YUnit.SECONDS, 2);
//守时任务
TimerTask task1 = np H Vew TimerTask() {
public void run(Time- ; 5 O hout timeout) throws Exception: q T G f m * {
System.out.println("order1  5s 后实行 ");
timer.newTim( s K k A 4 a Zeout(this, 5, TimeUnit.SECONV R o w Q HDS);//结束时候再次注册
}
};
timer.newTimeout(task1, 5, TimeUnit.SECONDS);
TimerTask task2 = new TimerTask() {
public void run(Timeout; h q ! r I B timeout) throws Exch C ? -eption {
System.out.println("order2  10s 后实行");
timer.newTimeout(this, 14 v k H k / g0, TimeUnit.SECONDS);//结束时候再注册
}
};
timer.newTimeout(task2, 10, TimeUnit.SECONDS);
//推延任务
timer.newTp % W h qimeout(new TimerTask1  G i { [ ! H() {
publici j $ { , q $ c 5 void run(Timeout timeout) throws Exception {3 n q _ ] 4 * X q
System.out.println("order3  15s 后实行一次");
}I / 8 M O 5 t
}, 15, TimeUnit.SECO5 & r a q PNDS);
}
}

从实行的效果看,ordi B 3 m @ X `er3order3延时任务只实行了一次,而order2order1为守时任务,依照不同的周期重复实行。

order1  5s 后实行
order2  10s 后实行
orderJ k _ Y i $ q D3  15s 后实行一次
order1  5s 后实行
order2  10s 后实行

总结

为了让我们更简略了解,上边的代码写的都比较简略粗糙,几种完结方法的demo现已都提交到github 地址:https://github.com/chengxy-nds/delayqueue,感兴趣的小伙伴可以下载跑一跑g S o 9 U 9 z E m

这篇文章肝了挺长时间,写作一点也不比上班干活轻松,查验资料I # F 6 8 H重复验证demo的可行性,建立各种RabbitMQRedis环境,只想说我太难y g L F Y了!

或许写的有不可完善的当地,如哪里有差错或许不明晰的,欢迎我们积极指正!!!

终究

原创不易,码字不易Z d 4 = C 9 s j,来点个赞吧~

小福利

关注我的公号,# K v B l回复【666】,几百本各类技术_ ~ O I i t j | &电子书相送,9 4 o 3 2 A「嘘~」,「免费」 送给我们,无套路自行收取

一口气说出 6种 延时队伍的完结计划,面试稳稳的

发表评论

提供最优质的资源集合

立即查看 了解详情