一、死信行列

文档地址:www.rabbitmq.com/dlx.html

死信行列是RabbitMQ中非常重要的一个特性。简单理解,他是RabbitMQ关于未能正常消费的音讯进行的一种弥补机制。死信行列也是一个一般的行列,相同能够在行列上声明顾客,继续对音讯进行消费处理。

​ 关于死信行列,在RabbitMQ中首要涉及到几个参数。

x-dead-letter-exchange:	mirror.dlExchange   对应的死信交换机
x-dead-letter-routing-key:	mirror.messageExchange1.messageQueue1  死信交换机routing-key
x-message-ttl:	3000  音讯过期时刻
durable:	true  耐久化,这个是有必要的。

在这里,x-dead-letter-exchange指定一个交换机作为死信交换机,然后x-dead-letter-routing-key指定交换机的RoutingKey。而接下来,死信交换机就能够像一般交换机相同,经过RoutingKey将音讯转发到对应的死信行列中。

1-1、何时会产生死信

有以下三种情况,RabbitMQ会将一个正常音讯转成死信

  • 音讯被顾客承认回绝。顾客把requeue参数设置为true(false),并且在消费后,向RabbitMQ回来回绝。channel.basicReject或许channel.basicNack。
  • 音讯到达预设的TTL时限还一向没有被消费。
  • 音讯因为行列已经到达最长长度约束而被丢掉

TTL即最长存活时刻 Time-To-Live 。音讯在行列中保存时刻超越这个TTL,即会被认为逝世。逝世的音讯会被丢入死信行列,假如没有装备死信行列的话,RabbitMQ会确保死了的音讯不会再次被投递,并且在未来版别中,会主动删去去这些死掉的音讯。

设置TTL有两种方法,一是经过装备战略指定,另一种是给行列独自声明TTL

战略装备方法 – Web办理渠道装备 或许 运用指令装备 60000为毫秒单位

rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues

在声明行列时指定 – 相同能够在Web办理渠道装备,也能够在代码中装备:

Map<String, Object> args =newHashMap<String, Object>();
args.put("x-message-ttl",60000);
channel.queueDeclare("myqueue",false,false,false, args);

1-2、死信行列的装备方法

RabbitMQ中有两种方法能够声明死信行列,一种是针对某个独自行列指定对应的死信行列。另一种便是以战略的方法进行批量死信行列的装备。

针对多个行列,能够运用战略方法,装备一致的死信行列。、

rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues

针对行列独自指定死信行列的方法首要是之前说到的三个特点。

channel.exchangeDeclare("some.exchange.name","direct");
Map<String, Object> args =newHashMap<String, Object>();  
args.put("x-dead-letter-exchange","some.exchange.name");  
channel.queueDeclare("myqueue",false,false,false, args);

这些参数,也能够在RabbitMQ的办理页面进行装备。例如装备战略时:

RabbitMQ的死信队列实现延迟消息

别的,在对行列进行装备时,只有Classic经典行列和Quorum裁定行列才能装备死信行列,而目前Stream流式行列,并不支撑装备死信行列。

1-3、关于参数x-dead-letter-routing-key

死信在转移到死信行列时,他的Routing key 也会保存下来。但是假如装备了x-dead-letter-routing-key这个参数的话,routingkey就会被替换为装备的这个值。

​ 别的,死信在转移到死信行列的过程中,是没有经过音讯发送者承认的,所以并不能确保音讯的安全性

1-4、怎么确定一个音讯是不是死信

音讯被作为死信转移到死信行列后,会在Header傍边增加一些音讯。在官网的详细介绍中,能够看到许多内容,比如时刻、原因(rejected,expired,maxlen)、行列等。然后header中还会加上第一次成为死信的三个特点,并且这三个特点在以后的传递过程中都不会更改。

  • x-first-death-reason
  • x-first-death-queue
  • x-first-death-exchange

1-5、死信行列怎么消费

其实早年面的装备过程能够看到,所谓死信交换机或许死信行列,不过是在交换机或许行列之间建立一种死信对应联系,而死信行列能够像正常行列相同被消费。他与一般行列相同具有FIFO的特性。对死信行列的消费逻辑一般是对这些失效音讯进行一些事务上的补偿。

RabbitMQ中,是不存在推迟行列的功能的,而一般假如要用到推迟行列,就会采用TTL+死信行列的方法来处理。

RabbitMQ供给了一个rabbitmq_delayed_message_exchange插件,能够完成推迟行列的功能,但是并没有集成到官方的发布包傍边,需求独自去下载。

二、运用私信行列完成推迟音讯

2-1、经过springboot的rabbitTemplate完成

在电商系统中,假如要完成订单超时15分钟未付出就将该订单交易状态改为已取消,就能够运用RabbitMQ的死信行列来完成

首先设定两个行列一个是订单在有效期内的行列orderQueue,别的一个便是死信行列deadQueue。上面说到音讯变为死信的几种方法,我这边运用TTL过期时刻来进行处理,设置orderQueue音讯的有效期为15分钟,当音讯过期的时分就会转入到deadQueue中。

2-1-1、创立死信行列寄存过期的订单

死信行列和一般行列创立的方法相同,如下

@Configuration
public class DeadConfig {
    @Bean
    //创立死信交换机
    public DirectExchange deadExchange(){
        return new DirectExchange("dead.direct.exchange");
    }
    //创立死信行列
    @Bean
    public Queue deadQueue(){
        return new Queue("dead.queue");
    }
    //绑定
    @Bean
    public Binding bindQueue(){
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
    }
}

2-1-2、创立订单交换机和行列

经过TTL+死信行列完成对过期订单的处理,核心便是如下内容,创立行列的时分需求设置如下三个参数:

(有必要)x-message-ttl:当前行列音讯多久未消费进入死信行列

(有必要)x-dead-letter-exchange:音讯过期后进入的死信行列交换机

(非有必要)x-dead-letter-routing-key:音讯的routingKey

@Configuration
public class OrderConfig {
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("order.direct.exchange",true,false);
    }
    @Bean
    public Queue orderQueue(){
        HashMap<String, Object> args = new HashMap<>();
        //10秒
        args.put("x-message-ttl",10000);
        args.put("x-dead-letter-exchange","dead.direct.exchange");
        args.put("x-dead-letter-routing-key","dead");
        return new Queue("order.queue",true,false,false,args);
    }
    @Bean
    public Binding bindingQueue(){
        return BindingBuilder.bind(orderQueue()).to(directExchange()).with("dead");
    }
}

2-1-3、订单音讯

@Component
public class OrderProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //  模拟订单
    public void makeTest(String a,String b){
        String ExchangeName = "order.direct.exchange";
        String luYouKey = "dead";
        String message = UUID.randomUUID().toString();
        System.out.println("订单生成----"+message);
        rabbitTemplate.convertAndSend(ExchangeName,luYouKey,message);
    }
}

2-1-4、测试

@SpringBootTest
class SpringBootDemoApplicationTests {
    @Autowired
    private OrderProducer orderProducer;
    @Test
    void contextLoads() {
        orderProducer.makeTest("下单","...");
    }
}

2-1-5、检查成果

调用完订单的音讯之后,发送的音讯就会进入到order.direct.exchange交换机,然后又发布到和order.direct.exchange绑定的order.queue行列中,10s后音讯未消费,就会进入到dead.direct.exchange,然后再发布到和dead.direct.exchange绑定的dead.queue行列中

RabbitMQ的死信队列实现延迟消息

后边的事务如取消或许删去订单只有消费dead.queue行列的音讯即可