前言

大家好,我是小郭,在业务研发的过程中,咱们会涉及到非常多的业务场景与音讯行列相关,一般咱们会考虑使用音讯行列来做异步解耦的作业,结合一些实际的场景咱们考虑到音讯的次序性,假如没有严厉依照次序去处理音讯,轻则给用户带来不好的体会,严重的话可能会导更多问题的发生,今日咱们首要从实战、发送次序音讯流程到次序音讯的消费,以及如何确保次序消费为重心进行一些扩展。

一、实战场景

用户更新钱包金额

->用户向钱包中转入100元,短信告诉用户A现在剩下金额100元

->用户下单产品消费50元,短信告诉用户A现在剩下金额50元。

一般音讯:不次序发送余额短信,则用户可能存在先收到余额50元,再收到余额100元的信息,带来不好的用户体会。

代码环节

为了愈加体现音讯的次序性差异,咱们在一次调用中循环发送10次

发送一般音讯

public Boolean updateUser(UserUpdateReqDTO userUpdateReqDTO) {
        String userTopic = rocketMqConfig.getSyncUserTopic();
        IntStream.range(0, 10).forEach(i ->{
            MessageWrapper messageSend = MessageWrapper.builder()
                    .keys(userTopic).message("用户向钱包中转入100元,短信告诉用户现在剩下金额100元:"+ i)
                    .timestamp(System.currentTimeMillis()).build();
            MessageWrapper messageSend1 = MessageWrapper.builder()
                    .keys(userTopic).message("用户下单产品消费50元,短信告诉用户现在剩下金额50元:"+ i)
                    .timestamp(System.currentTimeMillis()).build();
            rocketMQTemplate.syncSend(userTopic, messageSend);
            rocketMQTemplate.syncSend(userTopic, messageSend1);
        });
        return Boolean.TRUE;
    }

发送次序音讯

public Boolean updateUser(UserUpdateReqDTO userUpdateReqDTO) {
        String userTopic = rocketMqConfig.getSyncUserTopic();
        IntStream.range(0, 10).forEach(i ->{
            MessageWrapper messageSend = MessageWrapper.builder()
                    .keys(userTopic).message("用户向钱包中转入100元,短信告诉用户现在剩下金额100元:"+ i)
                    .timestamp(System.currentTimeMillis()).build();
            MessageWrapper messageSend1 = MessageWrapper.builder()
                    .keys(userTopic).message("用户下单产品消费50元,短信告诉用户现在剩下金额50元:"+ i)
                    .timestamp(System.currentTimeMillis()).build();
        rocketMQTemplate.syncSendOrderly(userTopic, messageSend, "11111");
        rocketMQTemplate.syncSendOrderly(userTopic, messageSend1, "11111");
        });
        return Boolean.TRUE;
    }

顾客服务

@Service
@RocketMQMessageListener(topic = "${rocketmq.sync.user-topic}", consumerGroup = "user_consumer", selectorExpression = "*", consumeMode = ConsumeMode.ORDERLY)
@Slf4j
public class syncUserConsumer implements RocketMQListener<MessageWrapper> {
    @Override
    public void onMessage(MessageWrapper mes) {
        log.info("user consumer message : {}", JSON.toJSONString(mes));
    }
}

发送一般音讯成果:

RocketMQ结合实际场景顺序消费,它是如何保证顺序消费的?

二、发送次序音讯流程

sequenceDiagram
autoNumber
DefaultMQProducer ->> DefaultMQProducerImpl: send()
DefaultMQProducerImpl ->> DefaultMQProducerImpl:send() >> sendSelectImpl()
DefaultMQProducerImpl ->> DefaultMQProducerImpl:makeSureStateOK() >> 检查服务状况
DefaultMQProducerImpl ->> Validators:checkMessage() >> 校验信息
Validators -->> DefaultMQProducerImpl: 校验成果
DefaultMQProducerImpl ->> DefaultMQProducerImpl:tryToFindTopicPublishInfo() >> 获取主题信息
DefaultMQProducerImpl ->> MQClientInstance:getMQAdminImpl()
MQClientInstance ->> MQAdminImpl:parsePublishMessageQueues()
MQAdminImpl -->> DefaultMQProducerImpl: 回来音讯行列列表
DefaultMQProducerImpl ->> MessageAccessor: cloneMessage >> 复制一份音讯
MessageAccessor -->> DefaultMQProducerImpl: 回来Message
DefaultMQProducerImpl ->> MQClientInstance: getClientConfig
MQClientInstance ->> ClientConfig:queueWithNamespace >> 获取行列
ClientConfig -->> DefaultMQProducerImpl: 回来音讯行列
DefaultMQProducerImpl ->> MessageQueueSelector:select 依据传入的选择器规则获取行列
MessageQueueSelector -->> DefaultMQProducerImpl: 回来MessageQueue
DefaultMQProducerImpl ->> DefaultMQProducerImpl:sendKernelImpl() >> 向MessageQueue投递音讯
DefaultMQProducerImpl -->> DefaultMQProducer:回来SendResult成果

投递音讯行列战略

Hash战略

在次序音讯中,咱们使用Hash战略,将同一个HashKey分配到同一个行列中。

public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode() % mqs.size();
        if (value < 0) {
            value = Math.abs(value);
        }
        return mqs.get(value);
    }

获取音讯消费行列

// 查询主题下音讯行列列表
List<MessageQueue> messageQueueList = this.mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
// 获取指定行列
String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
                userMessage.setTopic(userTopic);
 mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));

三、确保次序消费的机制

  1. 依据不同的音讯监听器初始化消费音讯线程池、定时线程池、扫描过期音讯铲除线程池。
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    this.consumeOrderly = true;
    // 次序音讯形式,不初始化扫描过期音讯铲除线程池
    this.consumeMessageService =
        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
}
  1. 发动次序音讯顾客服务。
this.consumeMessageService.start();
  1. 默许每隔20s履行一次确定分配给自己的音讯消费行列。
public void start() {
    if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    ConsumeMessageOrderlyService.this.lockMQPeriodically();
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
                }
            }
        }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
    }
}
public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
  1. 音讯行列负载

集群形式下,同一个主题内的顾客组内,顾客们共同承当订阅音讯行列的消费。

为了确保音讯的次序性,咱们有必要确保同一个音讯行列在同一时间只能被顾客组内一个顾客消费。

获取到音讯行列之后向Broker建议确定该音讯行列的恳求。

sequenceDiagram
autoNumber
DefaultMQPushConsumerImpl ->> MQClientInstance:start() >> 发动实例
MQClientInstance ->> RebalanceService:start()
RebalanceService ->> RebalanceService:run()
MQClientInstance ->> RebalanceService:doRebalance() >> 做负载均衡
RebalanceService ->> RebalanceService:rebalanceByTopic 
RebalanceService ->> AllocateMessageQueueStrategy:allocate
AllocateMessageQueueStrategy -->>RebalanceService:回来成果
RebalanceService ->> RebalanceService:updateProcessQueueTableInRebalance >> 重新负载
RebalanceService -->> MQClientInstance: 回来成果
MQClientInstance -->> DefaultMQPushConsumerImpl: 回来成果

updateProcessQueueTableInRebalance逻辑

首要目的是为了将消费音讯行列上锁,而且创立该音讯行列的拉取使命。

  1. 向Broker建议确定该音讯行列的恳求。
if (isOrder && !this.lock(mq)) {
    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
    continue;
}
  1. 拉取消费方位。
long nextOffset = -1L;
try {
	nextOffset = this.computePullFromWhereWithException(mq);
} catch (Exception e) {
	log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
	continue;
}
  1. 加锁成功则创立该音讯行列的拉取使命,否则等候其他顾客开释该音讯行列的锁。
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
  1. 音讯拉取
sequenceDiagram
autoNumber
PullMessageService ->> DefaultMQPushConsumerImpl:pullMessage() >>拉取音讯
DefaultMQPushConsumerImpl ->> PullRequest:getProcessQueue() >>获取消费行列快照
PullRequest -->> DefaultMQPushConsumerImpl:回来消费行列快照
DefaultMQPushConsumerImpl ->> DefaultMQPushConsumerImpl:makeSureStateOK() >>查验状况
opt 已暂停
DefaultMQPushConsumerImpl ->> DefaultMQPushConsumerImpl:executePullRequestLater() >>提交拉取恳求延后,放入其他线程
end
alt 音讯行列快照已上锁
DefaultMQPushConsumerImpl ->> RebalanceImpl:computePullFromWhereWithException >>获取偏移方位
RebalanceImpl -->> DefaultMQPushConsumerImpl:回来偏移方位
else 未上锁
DefaultMQPushConsumerImpl ->> DefaultMQPushConsumerImpl:executePullRequestLater() >>提交拉取恳求延后,放入其他线程
end

假如音讯处理行列没有被上锁,则延后一瞬间延迟3s将pullRequest目标放入拉取拉取使命中。

音讯消费

  1. 提交消费恳求,音讯提交到内部的线程池。
// 提交消费恳求,音讯提交到内部的线程池
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);
  1. ConsumeMessageOrderlyService#submitConsumeRequest() 履行方法。
public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispathToConsume) {
    if (dispathToConsume) {
        ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
        this.consumeExecutor.submit(consumeRequest);
    }
}
  1. 提交消费使命核心逻辑

进口:ConsumeMessageService#ConsumeRequest#run()

第一步,假如音讯行列现已下线,则跳出本次消费。

if (this.processQueue.isDropped()) {
    log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
    return;
}

第二步,依据前面得到的消费行列,获取目标而且申请一个锁

目的是确保同一时间,消费行列只会被一个线程池中的一个线程消费。

final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
    //...
}

第三步,进入核心逻辑处理

集群形式:前提条件是音讯行列上锁成功且锁未过期。

(this.processQueue.isLocked() && !this.processQueue.isLockExpired())

当消费市场大于MAX_TIME_CONSUME_CONTINUOUSLY设置值,则跳出本次使命,交给线程池其他线程处理。

long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
    ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
    break;
}

获取音讯默许每次拉取一条信息,在之前咱们现已循环读取音讯list,存入msgTreeMap。

现在从msgTreeMap中获取数据,假如数据为空则continueConsume设为false,跳出当前使命。

final int consumeBatchSize =
    ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);

第四步,向ConsumeMessageContext目标填充数据,履行消费的钩子函数。

ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
    consumeMessageContext = new ConsumeMessageContext();
    consumeMessageContext
        .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
    consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
    consumeMessageContext.setMq(messageQueue);
    consumeMessageContext.setMsgList(msgs);
    consumeMessageContext.setSuccess(false);
    // init the consume context type
    consumeMessageContext.setProps(new HashMap<String, String>());
    ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}

第五步,申请消费锁

this.processQueue.getConsumeLock().lock();

第六步,履行消费注册的音讯消费监听器业务逻辑,回来 ConsumeOrderlyStatus 成果。

status = messageListener.consumeMessage(
    Collections.unmodifiableList(msgs), context);

第七步,假如一切正常则回来 ConsumeOrderlyStatus.SUCCESS 值

continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
// 履行commit提交音讯消费进展
case SUCCESS:commitOffset = consumeRequest.getProcessQueue().commit();
// 读取旧音讯进展,并更新回来
Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {
    msgSize.addAndGet(0 - msg.getBody().length);
}
this.consumingMsgOrderlyTreeMap.clear();
if (offset != null) {
    return offset + 1;
}

最后,假如音讯进展偏移量大于0且消费行列没有中止,则更新音讯消费进展。

if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
    this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}

音讯行列重试失利:假如重试达到最大次数重试次数而且向Broker服务器发送ACK音讯回来成功,将音讯存入DLQ行列,被确定音讯消费成功,继续履行后边的音讯。

RocketMQ结合实际场景顺序消费,它是如何保证顺序消费的?

总结:

为了确保音讯的次序性,咱们有必要确保同一个音讯行列在同一时间只能被顾客组内一个顾客消费,

从负载均衡方面,向Broker建议确定该音讯行列的恳求,上锁成功则新建一个拉取使命PullRequest,

从音讯消费方面,批量拉取音讯成功后,进行提交消费恳求,音讯提交到内部的线程池,为了确保音讯的次序性,

咱们有必要为消费行列上锁,来确保同一时间消费行列只会被线程池中的一个线程消费。

四、音讯消费时保持次序性

上面的经过源码的阅览,咱们知道消费失利是有重试机制,默许重试 16 次,重试的次数达到最大之后,将音讯存入DLQ行列,即被确定音讯消费成功,这里就会中止重试音讯与下一跳音讯的次序性。

例:发送音讯次序为 音讯A -> 音讯B ->音讯C

graph LR;
  A-->B
  B-->C

因为音讯B进行最大次数的重试后依然没有成功,音讯存入了DLQ行列中,

终究咱们的音讯次序变成了 音讯A ->音讯B,破坏了咱们的次序性。

graph LR;
  A-->C
解决方案:在消费音讯前,添加一些前置条件,查询同一个订单号下,上一个音讯是否被成功消费或许存入DLQ行列中,可以引进音讯辅佐表,来进行记录。

五、如何提高次序消费的消费速度?

依据上面的源码,咱们了解到为了满足次序消费,所以对消费行列进行了加锁,

所以消费端的并发度并不取决消费端线程池的大小,而是取决于分给给顾客的行列数量。

解决方案:提高顾客的行列数量。

六、扩容需要注意什么?

次序音讯在消费音讯时会确定音讯消费行列,在分配到音讯行列时,能从该行列拉取音讯还需要在 Broker 端申请该消费行列的锁。

在进行横向扩容的时候会进行重新负载,为了确保音讯能够进入同一个行列,就需要确保在扩容的时候行列中没有滞留的音讯。

RocketMQ结合实际场景顺序消费,它是如何保证顺序消费的?