本文是ROCKETMQ的第二篇,主要将介绍ROCKETMQ音讯发送和消费相关关注点

一. 音讯发送

中心关注点:如何可靠发送音讯、如何将音讯发送到broker(负载)

1.1 中心类

ROCKETMQ-音讯发送与消费(二)

  • MQAdmin:MQ 根本的办理接口,供给对 MQ 供给根底的办理能力
  • MQProducer:音讯发送者接口
  • ClientConfig:客户端装备相关
  • DefaultMQProducer:音讯发送者默许完成类
  • TransactionMQProducer:事务音讯发送者默许完成类

1.2 音讯发送类型

  • 同步发送
/**
同步发送,参数列表说明如下:
-   Message msg:待发送的音讯目标
-   long timeout:超时时刻,默许为 3s
**/
SendResult send(Message msg, long timeout)
  • 异步发送
/**
异步音讯发送,参数列表说明如下:
-   Message msg:待发送的音讯
-   SendCallback sendCallback:异步发送回调接口
-   long timeout:发送超时时刻,默许为 3s
**/
void send(Message msg, SendCallback sendCallback, long timeout)
  • Oneway 音讯发送
void sendOneway(Message msg)
  • 批量发送音讯(同步)
SendResult send(Collection<Message> msgs, MessageQueue mq, long timeout)

1.3 行列挑选

1.3.1 音讯发送轮询战略

  • RoundRobin形式
    • 运用范围:对于非次序音讯(一般音讯、定时/延时音讯、事务音讯)场景,默许且只能运用RoundRobin形式的负载均衡战略。
    • 战略原理:轮询办法

ROCKETMQ-音讯发送与消费(二)

  • MessageGroupHash形式
    • 运用范围:次序音讯,默许且只能运用MessageGroupHash形式的负载均衡战略
    • 战略原理:Hash算法,生产者发送音讯时,以音讯组为粒度,依照内置的Hash算法,将相同音讯组的音讯分配到同一行列中,保证同一音讯组的音讯依照发送的先后次序存储。

ROCKETMQ-音讯发送与消费(二)

1.3.2 音讯发送高可用设计与毛病躲避机制

针对非次序音讯
保证音讯发送的高可用性,在内部引进了重试机制,默许重试 2 次。RocketMQ 音讯发送端采取的行列负载均衡默许采用轮循。

ROCKETMQ-音讯发送与消费(二)
topicA 在 broker-a、broker-b 上别离创立了 4 个行列,例如一个线程运用 Producer 发送音讯时,通过对 sendWhichQueue getAndIncrement() 办法获取下一个行列。

例如在发送之前 sendWhichQueue 该值为 broker-a 的 q1,假如由于此刻 broker-a 的突发流量反常大导致音讯发送失利,会触发重试,依照轮循机制,下一个挑选的行列为 broker-a 的 q2 行列,此次音讯发送大概率仍是会失利。
为此引进毛病躲避机制:在音讯重试的时分,会尽量躲避上一次发送的 Broker,回到上述示例,当音讯发往 broker-a q1 行列时回来发送失利,那重试的时分,会先排除 broker-a 中一切行列,即这次会挑选 broker-b q1 行列,增大音讯发送的成功率。

供给两种躲避战略,该参数由sendLatencyFaultEnable操控

1.3 事务音讯

音讯发送与数据库事务的不一致性带来的事务出错

ROCKETMQ-音讯发送与消费(二)

简略的登录实践,原文

ROCKETMQ-音讯发送与消费(二)

ROCKETMQ-音讯发送与消费(二)

其他思路: 假如一定需求发送可靠音讯,也可采用本地事务表办法:

  1. 本地事务发送MQ音讯前,先记录事务表,与本地事务一起
  2. 发送MQ,如发送失利不处理,发送成功,删除本地事务表(或许修改状态)
  3. 定时使命轮询本地事务表,将未发送的音讯捞取出来发送

上述可保证音讯发送的最终一致性

二. 音讯消费

2.1 中心类

ROCKETMQ-音讯发送与消费(二)

  • MQConsumer:MQ顾客
  • MQPushConsumer推形式顾客
  • MQPullConsumer拉形式顾客
  • DefaultMQPushConsumer:推形式默许完成类
  • DefaultMQPullConsumer:取形式默许完成类

2.2 推拉形式运用示例

2.2.1 PULL拉形式

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class PullConsumerTest {
    public static void main(String[] args) throws Exception {
        Semaphore semaphore = new Semaphore();
        Thread t = new Thread(new Task(semaphore));
        t.start();
        CountDownLatch cdh = new CountDownLatch(1);
        try {
            //程序运转 120s 后介绍
            cdh.await(120 * 1000, TimeUnit.MILLISECONDS);
        } finally {
            semaphore.running = false;
        }
    }
    /**
     * 音讯拉取中心完成逻辑
     */
    static class Task implements Runnable {
        Semaphore s = new Semaphore();
        public Task(Semaphore s ) {
            this.s = s;
        }
        public void run() {
            try {
                DefaultMQPullConsumer consumer = new 
                    DefaultMQPullConsumer("dw_pull_consumer");
                consumer.setNamesrvAddr("127.0.01:9876");
                consumer.start();
                Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
                Set<MessageQueue> msgQueueList = consumer.
                    fetchSubscribeMessageQueues("TOPIC_TEST"); // 获取该 Topic 的一切行列
                if(msgQueueList != null && !msgQueueList.isEmpty()) {
                    boolean noFoundFlag = false;
                    while(this.s.running) {
                        if(noFoundFlag) { // 没有找到音讯,暂停一下消费
                            Thread.sleep(1000);
                        }
                        for( MessageQueue q : msgQueueList ) {
                            PullResult pullResult = consumer.pull(q, "*",                                          decivedPulloffset(offsetTable
                             , q, consumer) , 3000);
                            System.out.println("pullStatus:"   
                                               pullResult.getPullStatus());
                            switch (pullResult.getPullStatus()) {
                                case FOUND:
                                    doSomething(pullResult.getMsgFoundList());
                                    break;
                                case NO_MATCHED_MSG:
                                    break;
                                case NO_NEW_MSG:
                                case OFFSET_ILLEGAL:
                                    noFoundFlag = true;
                                    break;
                                default:
                                    continue ;
                            }
                            //提交位点
                            consumer.updateConsumeOffset(q, 
                                 pullResult.getNextBeginOffset());
                        }
                        System.out.println("balacne queue is empty: "   consumer.
                              fetchMessageQueuesInBalance("TOPIC_TEST").isEmpty());
                    }
                } else {
                    System.out.println("end,because queue is enmpty");
                }
                consumer.shutdown();
                System.out.println("consumer shutdown");
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
    }
    /** 拉取到音讯后详细的处理逻辑 */
    private static void doSomething(List<MessageExt> msgs) {
        System.out.println("本次拉取到的音讯条数:"   msgs.size());
    }
    public static long decivedPulloffset(Map<MessageQueue, Long> offsetTable, 
             MessageQueue queue, DefaultMQPullConsumer consumer) throws Exception {
        long offset = consumer.fetchConsumeOffset(queue, false);
        if(offset < 0 ) {
            offset = 0;
        }
        System.out.println("offset:"   offset);
        return offset;
    }
    static class Semaphore {
        public volatile boolean running = true;
    }
}

上述针对单顾客场景,多顾客针对PULL形式就比较复杂

2.2.2 PUSH推形式

public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new 
            DefaultMQPushConsumer("dw_test_consumer_6");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TOPIC_TEST", "*");
        consumer.setAllocateMessageQueueStrategy(new 
               AllocateMessageQueueAveragelyByCircle());
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                try {
                    System.out.printf("%s Receive New Messages: %s %n", 
                          Thread.currentThread().getName(), msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Throwable e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

2.3 DefaultMQPushConsumer中心概念

PUSH 形式是对 PULL 形式的封装,类似于一个高级 API,用户运用起来将非常简略,根本将音讯消费所需求处理的问题都封装好了,故运用起来将变得简略。

2.3.1 setConsumeFromWhere

包含三个枚举类:

  1. CONSUME_FROM_LAST_OFFSET

    • 当一个新的订阅组(Consumer Group)第一次发动时,从这个行列的最后一个偏移量(Offset)开端消费。
    • 在后续的发动中,顾客会持续早年次消费的进展(即前次消费的偏移量)开端消费。
    • 这种形式适用于那些只关怀最新音讯,而不需求处理前史音讯的场景。
  2. CONSUME_FROM_FIRST_OFFSET

    • 当一个新的订阅组第一次发动时,从这个行列的初始方位(即第一个偏移量)开端消费。
    • 在后续的发动中,顾客会持续早年次消费的进展开端消费。
    • 这种形式通常用于那些需求处理一切音讯,包含前史音讯的场景。
  3. CONSUME_FROM_TIMESTAMP

    • 当一个新的订阅组第一次发动时,从指定的时刻戳方位开端消费。
    • RocketMQ 会根据这个时刻戳找到对应的音讯方位,并从那个方位开端消费。
    • 在后续的发动中,顾客会持续早年次消费的进展开端消费。
    • 这种形式答应顾客根据特定的时刻条件来开端消费,适用于那些需求根据时刻进行音讯回溯的场景。

ConsumeFromWhere 这个参数的含义是,初次发动从何处开端消费。更精确的表述是,假如查询不到音讯消费进展时,从什么地方开端消费

2.3.2 AllocateMessageQueueStrategy 音讯行列负载算法

RocketMQ 默许供给了如下负载均衡算法:

  • AllocateMessageQueueAveragely:均匀接连分配算法。
  • AllocateMessageQueueAveragelyByCircle:均匀轮流分配算法。
  • AllocateMachineRoomNearby:机房内优先就近分配。
  • AllocateMessageQueueByConfig:手动指定,这个通常需求合作装备中心,在顾客发动时,首要先创立 AllocateMessageQueueByConfig 目标,然后根据装备中心的装备,再根据当前的行列信息,进行分配,即该办法不具备行列的主动负载,在 Broker 端进行行列扩容时,无法主动感知,需求手动改变装备。
  • AllocateMessageQueueByMachineRoom:消费指定机房中的行列,该分配算法首要需求调用该战略的setConsumeridcs(Set<String> consumerIdCs)办法,用于设置需求消费的机房,将刷选出来的音讯按均匀接连分配算法进行行列负载。

2.3.3 OffsetStore 音讯进展存储办理器

RocketMQ 在播送音讯、集群消费两种形式下音讯消费进展的存储战略会有所不同。

  • 集群形式:RocketMQ 会将音讯消费进展存储在 Broker 服务器,存储途径为${ROCKET_HOME}/store/config/ consumerOffset.json文件中。
  • 播送形式:RocketMQ 会将音讯消费进存储在消费端地点的机器上,存储途径为${user.home}/.rocketmq_offsets中。

2.3.4 其他参数

  • consumeThreadMin:顾客每一个消费组线程池中最小的线程数量,默许为 20。
  • consumeThreadMax:最大线程数,由于行列是无界行列,该参数没啥含义
  • consumeConcurrentlyMaxSpan:并发音讯消费时处理行列中最大偏移量与最小偏移量的差值的阔值,如差值超越该值,触发消费端限流。默许值2000
  • pullThresholdForQueue:消费端答应消费端端单行列积压的音讯数量,假如处理行列中超越该值,会触发音讯消费端的限流。默许值1000
  • pullThresholdSizeForQueue:消费端答应消费端但行列中揉捏的音讯体巨细,默许为 100MB
  • pullThresholdForTopic:按 Topic 等级进行音讯数量限流,默许不敞开,为 -1
  • pullThresholdSizeForTopic:按 Topic 等级进行音讯音讯体巨细进行限流,默许不敞开
  • pullInterval:音讯拉取的距离,默许 0 表示,音讯客户端在拉取一批音讯提交到线程池后当即向服务端拉取下一批
  • pullBatchSize:一次音讯拉取恳求最多从 Broker 回来的音讯条数,默许为 32
  • consumeMessageBatchMaxSize:音讯消费一次最大消费的音讯条数
  • maxReconsumeTimes:音讯消费重试次数,并发消费形式下默许重试 16 次后进入到死信行列,假如是次序消费,重试次数为 Integer.MAX_VALUE。
  • suspendCurrentQueueTimeMillis:消费形式为次序消费时设置每一次重试的距离时刻,提高重试成功率
  • consumeTimeout:音讯消费超时时刻

2.3.5 音讯消费进展提交

ROCKETMQ-音讯发送与消费(二)
向broker提交音讯进展时,提交的是取 ProceeQueue 中最小的偏移量为音讯消费进展,那样或许会导致重复消费

原因是 msg 1~5,msg3 msg4和msg1消费完,msg2 msg5没有消费完,此刻往broker提交msg2,假如此刻client挂掉,重启下次从msg2开端消费,msg3 msg4会重复消费

2.4 DefaultLitePullConsumer 中心概念

DefaultMQPullConsumer(PULL 形式)的 API 太底层,运用起来及其不方便,RocketMQ 官方设计者也留意到这个问题,为此在 RocketMQ 4.6.0 版别中引进了 PULL 形式的另外一个完成类 DefaultLitePullConsumer

2.4.1 中心UML图

ROCKETMQ-音讯发送与消费(二)

2.4.2 运用举例

public class LitePullConsumerSubscribe02 {
    public static volatile boolean running = true;
    public static void main(String[] args) throws Exception {
        DefaultLitePullConsumer litePullConsumer = new 
            DefaultLitePullConsumer("dw_lite_pull_consumer_test");
        litePullConsumer.setNamesrvAddr("192.168.3.166:9876");
        litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        litePullConsumer.subscribe("TopicTest", "*");
        litePullConsumer.setAutoCommit(true); //该值默许为 true
        litePullConsumer.start();
        try {
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                doConsumeSomething(messageExts);
            }
        } finally {
            litePullConsumer.shutdown();
        }
    }
    private static void doConsumeSomething(List<MessageExt> messageExts) {
        // 真实的事务处理
        System.out.printf("%s%n", messageExts);
    }
}

留意参数pullThreadNums,音讯拉取线程数量,默许为 20 个,留意这个是每一个顾客默许 20 个线程往 Broker 拉取音讯。这个应该是 Lite PULL 形式对比 PUSH 形式一个非常大的优势。

Lite Pull流程如下:

ROCKETMQ-音讯发送与消费(二)

运用场景参阅:12 结合实际场景再聊 DefaultLitePullConsumer 的运用

三. 次序音讯

3.1 发送音讯发往同一个行列

ROCKETMQ-音讯发送与消费(二)

3.2 消费音讯,同一行列需求次序消费

ROCKETMQ-音讯发送与消费(二)

3.3. 呈现问题

保证消费端对单行列中的音讯次序处理,故多线程处理,需求依照音讯消费行列进行加锁。

消费端的横向扩容或 Broker 端行列个数的改变都会触发音讯消费行列的从头负载,在并发音讯时在行列负载的时分一个消费行列有或许被多个顾客一起音讯,但次序消费时并不会呈现这种情况,由于次序音讯不只仅在消费音讯时会确定音讯消费行列,在分配到音讯行列时,能从该行列拉取音讯还需求在 Broker 端请求该消费行列的锁,即同一个时刻只要一个顾客能拉取该行列中的音讯,保证次序消费的语义。

早年面的文章中也介绍到并发消费形式在消费失利时有重试机制,默许重试 16 次,而且重试时是先将音讯发送到 Broker,然后再次拉取到音讯,这种机制就会丧失其消费的次序性,故假如是次序消费形式,音讯重试时在消费端不停的重试,重试次数为 Integer.MAX_VALUE,即假如一条音讯假如一向不能消费成功,其音讯消费进展就会一向无法向前推动,即会造成音讯积压现象。

参阅资料
生产者负载均衡