RocketMQ实战一:先写库还是先发消息?

先问一个问题:RocketMQ是怎么确保音讯与数据库业务的一致性

第一时间或许会想到RocketMQ的业务音讯

咱们以日常开发中的案例来进行剖析:下单送积分。用户在下单后,订单体系保存订单数据,然后发送音讯到MQ,积分体系订阅这个音讯,然后给用户加积分。这就引出了一个问题,从生产者订单体系视点看,到底是先写库仍是先发音讯 呢?那咱们接下来就别离看下这两种情况。

1. 先写库后发音讯

咱们先经过一段伪代码来剖析下:

public void createOrder(final Order order) throws Exception {
    //模仿spirng的tx模板
    transactionTemplate.execute(new TransactionCallback<Boolean>() {
        @Override
        public Boolean doInTransaction(TransactionStatus status) {
            //本地数据刺进
            orderMapper.save(order);
            orderDetailMapper.save(order.getOrderDetail());
            //模仿 mq 发送音讯
            SendResult send = producer.send(orderMessage);;
            if (send.getSendStatus() == SendStatus.SEND_OK) {
                status.setRollbackOnly();
            }
            return Boolean.TRUE;
        }
    });
}

咱们来剖析下它的进程:

首要,履行本地数据库业务,刺进数据,注意此刻还没有commit, 紧接着发送音讯到MQ, 这中心或许因为网络波动等原因,导致生产者迟迟没有收到broker的响应成果,比方5s内都没有回来SendResult给生产者,这也就意味着这5s内本地数据库业务是无法commit的,假如在高并发的场景下,数据库连接资源很快就会被耗尽,后续的请求则无法处理,最终体系将会崩溃。

既然咱们知道了先写库后发音讯有这样的问题,那么假如是先发音讯后写库呢?

2.先发音讯后写库

咱们仍是先看下代码:

public void createOrder(Order order) {
    try {
        //先发送音讯
        SendResult send = producer.send(orderMessage);
        if (send.getSendStatus() == SendStatus.SEND_OK) {
            orderMapper.save(order);
            orderDetailMapper.save(order.getOrderDetail());
            //提交业务
            connection.commit();
        }
    } catch (Exception e) {
        //回滚
        connection.rollback();
    }
}    

这样也是有问题的:

  1. 首要他也存在先写库后发音讯的问题,一旦MQ因为网络等原因长期没有回来SendResult给生产者,将会导致本地业务无法被提交或回滚,高并发下资源将会被快速耗尽。
  2. 其次,生产者将音讯发送出去并快速响应了,可是履行本地数据库业务时呈现了错误,比方上述代码中的orderMapper.save(order)履行出错了,这也就意味着音讯现已发送出去,顾客能够消费了,可是此刻本地业务失利了,为了弥补错误,此刻或许需求“回滚”之前发送的音讯,可是此刻这条音讯或许现已被消费了,就算没有被消费,每次我都在发送音讯后判别是否呈现了反常,假如呈现了反常在发送条”回滚“的音讯,这无疑是增加了开发的复杂度,也显得冗余。

那么有没有什么更好的办法,既能够不堵塞本地数据库业务,还能确保最终一致性呢?

这就是接下来咱们要说的RocketMQ的业务音讯,它能够确保本地业务与MQ音讯的最终一致性。

业务音讯咱们之前有剖析过它的源码和流程,这儿咱们简略看下

RocketMQ实战一:先写库还是先发消息?

知道了业务音讯的大致流程后,接下来咱们仍是经过伪代码来看下它的实现进程。

  1. 发送业务音讯

发送的topic是 “tx_order_topic”,顾客订阅的也是这个,可是在发送到broker时,他会在内部将咱们的topic做一次修正,这样对顾客就不行见了。

@Slf4j
@Controller
public class OrderCreateController {
    //rocketmq 发送音讯的模板
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @ResponseBody
    @GetMapping("/order/{buyer}")
    public String createOrder(@PathVariable String buyer) {
        //@Accessors(chain = true)
        OrderDetail orderDetail = new OrderDetail();
        orderDetail.setPhone("18883858508").setAddress("上海外滩xxxxx").setOrderDetailId(UUID.randomUUID().toString());
        Order order = new Order();
        order.setOrderId(UUID.randomUUID().toString()).setBuyer(buyer).setOrderDetail(orderDetail);
        Message<Order> message = MessageBuilder.withPayload(order).build();
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("tx_order_topic", message, null);
        if (SendStatus.SEND_OK == result.getSendStatus()) {
            log.info("发送音讯成功, result: {}", result);
        }
        //回查订单表
        return "order create success";
    }
}

rocketMQTemplate.sendMessageInTransaction(…)要等本地业务履行完毕,才会回来 TransactionSendResult

  1. 履行本地业务
@Slf4j
@RocketMQTransactionListener
public class CreateOrderCheckerListener implements RocketMQLocalTransactionListener {
    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private OrderDetailMapper orderDetailMapper;
    @Autowired
    private TransactionTemplate transactionTemplate;
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info("message: {}, args: {}", msg, arg);
        String orderMsg = new String((byte[]) msg.getPayload());
        final Order order = JSON.parseObject(orderMsg, Order.class);
        log.info("order info : {}", order);
        try {
            //放到同一个本地业务中
            this.transactionTemplate.executeWithoutResult(status -> {
                this.orderMapper.saveOrder(order);
               // int x = 1 / 0;
                this.orderDetailMapper.saveOrderDetail(order.getOrderDetail());
            });
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("保存订单失利", e);
            //触发回查
            return RocketMQLocalTransactionState.UNKNOWN;
            //假如是ROLLBACK,则回滚音讯,rocketmq将抛弃这条音讯
        }
    }
    //先忽略回查的逻辑
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {}
}

假如本地业务履行成功(订单正常入库),producer将给Broker发送一个COMMIT的标识,此刻broker会将之前被替换了的topic给替换回去,这样顾客就能够消费了。

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "qiuguan_test_consumer_group", topic = "tx_order_topic")
public class RewardsPoints implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order message) {
        log.info("积分体系根据订单增加积分 : {}", message);
    }
}

假如本地履行进程中发生了反常,比方网络抖动等,没有正常入库,此刻给Broker发送一个UNKNOW的标识,broker收到UNKNOW标识后,默认按照每分钟一次的频率发起回查。

  1. 音讯回查
@Slf4j
@RocketMQTransactionListener
public class CreateOrderCheckerListener implements RocketMQLocalTransactionListener {
    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private OrderDetailMapper orderDetailMapper;
    @Autowired
    private TransactionTemplate transactionTemplate;
    //履行本地业务逻辑
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {}
    //回查
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        log.info("履行本地业务回查:{}", LocalDateTime.now());
        String orderMsg = new String((byte[]) msg.getPayload());
        final Order order = JSON.parseObject(orderMsg, Order.class);
        log.info("回查order: {}", order);
        //回查次数
        //int checkTimes = msg.getHeaders().get("TRANSACTION_CHECK_TIMES", Integer.class);
        Order o = this.orderMapper.getOrder(order.getOrderId());
        if (o == null) {
            try {
                this.transactionTemplate.executeWithoutResult(status -> {
                    this.orderMapper.saveOrder(order);
                    this.orderDetailMapper.saveOrderDetail(order.getOrderDetail());
                });
            } catch (Exception e) {
                log.error("保存订单失利", e);
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }
        return RocketMQLocalTransactionState.COMMIT;
    }
}

在回查的时分咱们能够检查数据库是否刺进了订单,假如没有,此刻咱们能够再次测验入库,假如入库成功,则响应给Broker一个COMMIT标识,此刻该音讯就能够被顾客消费了,假如仍然入库失利,能够等候再次回查,或许回滚。假如是回滚,则Broker将丢掉该消费,顾客也将无法消费。

接下来咱们剖析下运用RocketMQ的业务音讯有哪些问题:

  1. 生产者发送业务音讯失利

RocketMQ实战一:先写库还是先发消息?

这种情况就直接抛出反常即可,本地业务也不会履行,更不会存在数据不一致的问题。

  1. 生产者发送音讯成功,可是本地业务履行失利
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    log.info("message: {}, args: {}", msg, arg);
    try {
        this.transactionTemplate.executeWithoutResult(status -> {
           this.orderMapper.saveOrder(order);
           int x = 1 / 0;
           this.orderDetailMapper.saveOrderDetail(order.getOrderDetail());
        });
        return RocketMQLocalTransactionState.COMMIT;
    } catch (Exception e) {
        log.error("保存订单失利", e);
        //回滚音讯
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}

一旦本地业务履行失利,则数据库将会回滚,一起给broker发送ROLLBACK标识,broker收到该标识后,将抛弃掉这条音讯,顾客也无法消费这条音讯,这样也不会呈现数据不一致的问题。

  1. 生产者发送音讯成功,本地业务也履行成功,可是在生产者将COMMIT标识发送给broker时,发生了网络抖动,没有及时收到COMMIT指令
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    log.info("message: {}, args: {}", msg, arg);
    try {
        this.transactionTemplate.executeWithoutResult(status -> {
           this.orderMapper.saveOrder(order);
           this.orderDetailMapper.saveOrderDetail(order.getOrderDetail());
        });
        //网络抖动...
        return RocketMQLocalTransactionState.COMMIT;
    } catch (Exception e) {
        log.error("保存订单失利", e);
        //回滚音讯
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}

本地数据库业务履行成功,订单数据保存到表中,broker因为网络抖动没有及时收到COMMIT指令,此刻音讯仍是一条半业务音讯,顾客仍是无法消费,这样本地业务与RocketMQ音讯的一致性就被破坏了。

RocketMQ为了处理这个问题,引入了音讯回查机制,关于半业务音讯,假如没有及时收到COMMIT/ROLLBACK指令,它会测验主动与broker进行通信,调用监听器的 checkLocalTransaction(..) 办法再次承认之前的本地业务是否成功。

public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
    log.info("履行本地业务回查:{}", LocalDateTime.now());
    final Order order = JSON.parseObject(new String((byte[]) msg.getPayload()), Order.class);
    log.info("回查order: {}", order);
    /**
     * 因为之前本地业务现已履行成功,数据刺进了表中,只是在给broker发送COMMIT标识时发生了网络闪断
     * 所以这儿回查的时分,是能够从数据库表中查询到订单数据的,此刻就能够给broker发送一个COMMIT标识
     * 这样broker就会把这对顾客不行见的音讯修正为可见,此刻就能够消费了。
     */
    Order o = this.orderMapper.getOrder(order.getOrderId());
    /**
     * 假如数据库中没有订单数据,说明之前的刺进就是失利的,此刻这儿测验再次刺进或许直接回滚就能够了
     */
    return o == null ? RocketMQLocalTransactionState.ROLLBACK : RocketMQLocalTransactionState.COMMIT;
}

不难发现,运用RocketMQ的业务音讯具有以下好处:

将发送音讯和本地业务分脱离,假如发送音讯失利,则整个流程失利,不会堵塞本地业务,假如本地业务履行失利,则能够直接回滚或许回查,不会影响顾客。

好了,关于RocketMQ的业务音讯的实战就介绍到这儿,欢迎我们批评指正。