本文正在参与「金石计划 . 分割6万现金大奖」

前言

在上一篇文章中RocketMQ顾客怎么完成重平衡,咱们简略叙述了RocketMQ顾客是怎么完成重平衡的,咱们在源码中发现默认的重平衡战略是均匀分配战略AllocateMessageQueueAveragely,另外咱们还能够设置其他的重平衡战略,你知道有哪几种嘛?你用过其间的哪些分配战略呢?

AllocateMessageQueueAveragely【均匀分配】

在上一篇文章中,咱们简略阐明晰均匀分配的成果,下面咱们依据源码来看一下:

@Override
  public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
    List<String> cidAll) {
​
    List<MessageQueue> result = new ArrayList<MessageQueue>();
    // 查看参数,不合理的参数就不做分配了
    if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
      return result;
     }
    // cidAll和mqAll已经排序好了,保证一切顾客实例拿到的次序一起;
    // 获取当时顾客ID在cidAll中的索引方位
    int index = cidAll.indexOf(currentCID);
    // 核算取模成果
    int mod = mqAll.size() % cidAll.size();
    // 核算每隔顾客均匀分配到的数量
    int averageSize =
      mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
        + 1 : mqAll.size() / cidAll.size());
    // 核算起始索引值
    int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
    // 核算遍历此时
    int range = Math.min(averageSize, mqAll.size() - startIndex);
    for (int i = 0; i < range; i++) {
      // 核算方针MessageQueue,并添加到成果会集
      result.add(mqAll.get((startIndex + i) % mqAll.size()));
     }
    // 回来成果集
    return result;
   }

举个例子阐明一下:

假设有7个MessageQueue,3个顾客,分配成果如下:

1.顾客C1分配到的MessageQueue索引值为[0,1,2];

2.顾客C2分配到的MessageQueue索引值为[3,4];

3.顾客C3分配到的MessageQueue索引值为[5,6];

RocketMQ重平衡策略你用过几种?

假设说MessageQueue数量小于顾客的数量,比如当时MessageQueue数量为2,顾客数量为3,那么第三个顾客是不会被分配MessageQueue的,只要前两个顾客各1个MessageQueue

AllocateMessageQueueAveragelyByCircle【均匀交叉分配】

这个均匀交叉的分配战略完成方式更简略:

  @Override
  public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
    List<String> cidAll) {
​
    List<MessageQueue> result = new ArrayList<MessageQueue>();
    if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
      return result;
     }
    // 核算当时顾客ID地点索引方位
    int index = cidAll.indexOf(currentCID);
    for (int i = index; i < mqAll.size(); i++) {
      // 每次间隔cidAll.size个messageQueue就放进成果集
      if (i % cidAll.size() == index) {
        result.add(mqAll.get(i));
       }
     }
    // 回来成果集
    return result;
   }

仍然还是之前的实例:

假设有7个MessageQueue,3个顾客,分配成果如下:

1.顾客C1分配到的MessageQueue索引值为[0,3,6];

2.顾客C2分配到的MessageQueue索引值为[1,4];

3.顾客C3分配到的MessageQueue索引值为[2,5];

RocketMQ重平衡策略你用过几种?

AllocateMessageQueueConsistentHash【一起性哈希分配】

  @Override
  public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
    List<String> cidAll) {
​
    List<MessageQueue> result = new ArrayList<MessageQueue>();
    if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
      return result;
     }
    // 把每一个顾客ID放进调集,预备创立一起性哈希环
    Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
    for (String cid : cidAll) {
      cidNodes.add(new ClientNode(cid));
     }
​
    final ConsistentHashRouter<ClientNode> router; //for building hash ring
    // 创立一起性哈希环
    if (customHashFunction != null) {
      router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
     } else {
      router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
     }
​
    List<MessageQueue> results = new ArrayList<MessageQueue>();
    for (MessageQueue mq : mqAll) {
      // 判断MessageQueue落在那一个虚拟节点上面
      ClientNode clientNode = router.routeNode(mq.toString());
      // 比照当时顾客ID与节点的key,假设一起,阐明落在当时顾客ID节点上面
      if (clientNode != null && currentCID.equals(clientNode.getKey())) {
        // 放进成果集
        results.add(mq);
       }
     }
    // 回来成果集
    return results;
   }

上述源码的算法能够大约归纳为以下几点:

1.假设当时有3个顾客,那么把这三个顾客ID搜集好,放在一个调会集;

2.创立一个哈希环,上面遍及了若干个虚拟节点,一切的虚拟节点均匀地与3个顾客关联;

3.遍历一切的MessageQueue,把每一个MessageQueue放到哈希环中,看它落在哪个虚拟节点上面,对应的虚拟节点的key假设与当时顾客ID一样,那么就把MessageQueue分配给当时顾客;

一起性哈希算法其实便是为了让分配成果愈加均匀一点,这个算法的分配成果图示就不好画了;

AllocateMessageQueueByMachineRoom【机房编号分配】

看称号就知道这个分配战略是依据机房编号来履行的,所以里边需求咱们预备好机房编号:

private Set<String> consumeridcs;

consumeridcs里边是当时顾客需求拉取的Broker地点的机房编号调集,而且要求该Broker命名格式如下:机房编号@brokerName,一定要坚持机房编号consumeridcs里边的元素一起;

  @Override
  public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
    List<String> cidAll) {
​
    List<MessageQueue> result = new ArrayList<MessageQueue>();
    if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
      return result;
     }
    // 获取当时顾客地点的索引值
    int currentIndex = cidAll.indexOf(currentCID);
    if (currentIndex < 0) {
      return result;
     }
    // 预备搜集方针机房的MessageQueue
    List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
    for (MessageQueue mq : mqAll) {
      // brokerName按照@符号分割
      String[] temp = mq.getBrokerName().split("@");
      // 取出前面的【机房编号】,查看consumeridcs是否包含【机房编号】
      if (temp.length == 2 && consumeridcs.contains(temp[0])) {
        // 符合要求的MessageQueue就搜集起来
        premqAll.add(mq);
       }
     }
    // MessageQueue数量除顾客数量,7/3 = 2
    int mod = premqAll.size() / cidAll.size();
    // MessageQueue数量对顾客数量取模 7%3 = 1
    int rem = premqAll.size() % cidAll.size();
    // 核算起始方位索引 2*0 = 0
    int startIndex = mod * currentIndex;
    // 核算完毕方位索引 0+2 = 2
    int endIndex = startIndex + mod;
    // 取出指定的MessageQueue放进成果会集  0 1 6
    for (int i = startIndex; i < endIndex; i++) {
      result.add(premqAll.get(i));
     }
    // 假设还有多的MessageQueue,那么就按次序每个顾客分一个
    if (rem > currentIndex) {
      result.add(premqAll.get(currentIndex + mod * cidAll.size()));
     }
    return result;
   }

举例来剖析一下:

假设有两个机房,第一个机房3个MessageQueue,称号Shanghai-A@Broker-a,第二个机房4个MessageQueue,称号为Hangzhou-A@Broker-b,现在有3个顾客,每隔顾客中consumeridcs中的元素为Shanghai-AHangzhou-A,那么意味着这三个顾客来分配两个机房总共7个MessageQueue

咱们在上一篇文章中讲过,mqAll是排序好的,分配的成果如下:

RocketMQ重平衡策略你用过几种?

AllocateMachineRoomNearby【同机房优先分配】

这个分配战略需求传两个参数,第一个是AllocateMessageQueueStrategy分配战略,第二个是MachineRoomResolver;第二个参数用来解析MessageQueue地点机房编号和顾客地点机房编号,第一个参数用来履行分配任务;所以这个AllocateMachineRoomNearby其实是在更细粒度上履行的分配战略;

@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
    List<String> cidAll) {
  List<MessageQueue> result = new ArrayList<MessageQueue>();
  if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
    return result;
   }
​
  //group mq by machine room
  Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
  for (MessageQueue mq : mqAll) {
    // 获取messageQueue地点的机房编号
    String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);
    if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
      if (mr2Mq.get(brokerMachineRoom) == null) {
        mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
       }
      // 按编号放进Map中,同编号的在一个ArrayList中
      mr2Mq.get(brokerMachineRoom).add(mq);
     } else {
      throw new IllegalArgumentException("Machine room is null for mq " + mq);
     }
   }
​
  //group consumer by machine room
  Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();
  for (String cid : cidAll) {
    // 获取顾客地点机房编号
    String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);
    if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
      if (mr2c.get(consumerMachineRoom) == null) {
        mr2c.put(consumerMachineRoom, new ArrayList<String>());
       }
      // 按编号放进Map中,同编号的在一个ArrayList中
      mr2c.get(consumerMachineRoom).add(cid);
     } else {
      throw new IllegalArgumentException("Machine room is null for consumer id " + cid);
     }
   }
​
  List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();
​
  // 获取当时顾客地点机房编号
  String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);
  // 取出当时机房的一切MessageQueue
  List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
  // 取出当时机房的一切顾客
  List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
  if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {
    // 按照设置的分配战略来分配同一机房内的MessageQueue
    allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
   }
​
  // 假设MessageQueue布置的机房中没有顾客,那么一切的顾客一起分配这些MessageQueue
  for (Entry<String, List<MessageQueue>> machineRoomEntry : mr2Mq.entrySet()) {
    if (!mr2c.containsKey(machineRoomEntry.getKey())) { // no alive consumer in the corresponding machine room, so all consumers share these queues
      allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, machineRoomEntry.getValue(), cidAll));
     }
   }
  // 回来成果集
  return allocateResults;
}

这个分配战略也好理解,便是同一个机房的MessageQueue由当时机房的顾客自己分配,假设MessageQueue地点的机房没有顾客,那么就由一切顾客一起分配;

仍然举例阐明:

假设当时有三个机房Hangzhou-AShanghai-AShenzhen-AHangzhou-A机房中有4个MessageQueue,2个顾客,Shanghai-A机房有3个MessageQueue,2个顾客,Shenzhen-A机房有2个MessageQueue,没有顾客;咱们选用选用AllocateMachineRoomNearby分配战略,而且子分配战略是AllocateMessageQueueAveragely均匀分配,那么终究的分配成果如下所示:

RocketMQ重平衡策略你用过几种?

AllocateMessageQueueByConfig【固定分配】

这个战略底子不必,因为它相当于用户自己写死了当时顾客消费哪一个MessageQueue,咱们能够看一下代码:

public class AllocateMessageQueueByConfig extends AbstractAllocateMessageQueueStrategy {
  private List<MessageQueue> messageQueueList;
​
  @Override
  public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
    List<String> cidAll) {
    // 回来用户指定的MessageQueue列表
    return this.messageQueueList;
   }
​
  @Override
  public String getName() {
    return "CONFIG";
   }
​
  public List<MessageQueue> getMessageQueueList() {
    return messageQueueList;
   }
​
  public void setMessageQueueList(List<MessageQueue> messageQueueList) {
    this.messageQueueList = messageQueueList;
   }
}

使用这个战略底子无法完成重平衡,看看就行;

自定义重平衡战略

咱们也能够依据自己的实际情况来完成自定义的重平衡战略,只需求完成AllocateMessageQueueStrategy接口即可,或者继承AbstractAllocateMessageQueueStrategy也能够,然后在创立好顾客实例后把自定义的分配战略设置进去,如下:

DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(group);
consumer.setNamesrvAddr(nameSrv);
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueStrategy() {
        @Override
        public List<MessageQueue> allocate(String s, String s1, List<MessageQueue> list, List<String> list1) {
     // TODO
          return null;
        }
​
        @Override
        public String getName() {
     // TODO
          return null;
        }
      });

小结

这篇文章带咱们认识了以下几种重平衡分配战略,并通过图示了解了它们的分配原理:

1.AllocateMessageQueueAveragely【均匀分配】;

2.AllocateMessageQueueAveragelyByCircle【均匀交叉分配】;

3.AllocateMessageQueueConsistentHash【一起性哈希分配】;

4.AllocateMessageQueueByMachineRoom【机房编号分配】;

5.AllocateMachineRoomNearby【同机房优先分配】;

6.AllocateMessageQueueByConfig【固定分配】;

7.自定义分配战略;