文|程征征(诨名:泽睿)

高德软件开发工程师

担任高德新场景事务探究开发与保护 对领域驱动、网络通讯、数据一致性有一定的研究与实践

本文 23009字 阅读约25 分钟

第一次重视 SOFA 社区是在开发一个毛病除去组件时,发现 SOFARPC 中也有相似的组件。在 SOFARPC 的规划中,进口选用了一种无缝刺进的规划办法,使得在不破坏敞开封闭准则前提下,引入单机毛病除去能力。而且是依据内核规划和总线规划,做到可插拔、零侵入,整个毛病除去模块是经过 SPI 动态加载的。核算信息的收集也是经过事情驱动的办法,在 RPC 同步或异步调用完结后,会向事情总线 EventBus 发送对应事情。事情总线接收到对应的事情,以履行后续的毛病除去逻辑。

依据以上优异的规划,我也将其纳为己用,也因此开启了在SOFA 社区的开源探究之路。陆续研究了SOFABoot、SOFARPC 以及 MOSN 等,自我感觉每一个项目的代码水平都很高,对我自己的代码提高有很大的协助。

SOFARegistry 是一个开源的注册中心供给了服务的发布注册订阅等功用,支持海量的服务注册订阅恳求。作为一个名源码爱好者,虽然看过 SOFA 的架构文章大致了解其间的规划哲学,可是由于没有从代码中了解过细节,实际上也是一知半解。恰好借助 SOFARegistry 拓荒的源码分析活动,依据自己的爱好挑选了 SlotTable 这个使命。

SOFARegistry 关于服务数据是分片进行存储的,因此每一个 data server 只会承当一部分的服务数据,详细哪份数据存储在哪个 data server 是有一个称为 SlotTable 的路由表供给的,session 能够经过 SlotTable 对对应的 data derver 进行读写服务数据, slot 对应的 data follower 能够经过 SlotTable 寻址 leader 进行数据同步。

保护 SlotTable 是由 Meta 的 leader 担任的,Meta 会保护 data 的列表,会运用这份列表以及 data 上报的监控数据创立 SlotTable,后续 data 的上下线会触发 Meta 修正 SlotTable, SlotTable 会经过心跳分发给集群中各个节点。

贡献者前言

SOFARegistry 关于服务数据是分片进行存储的,因此每一个 data server 只会承当一部分的服务数据,详细哪份数据存储在哪个 data server 是有一个称为 SlotTable 的路由表供给的,session 能够经过 SlotTable 对对应的 data derver 进行读写服务数据, slot 对应的 data follower 能够经过 SlotTable 寻址 leader 进行数据同步。

保护 SlotTable 是由 Meta 的 leader 担任的,Meta 会保护 data 的列表,会运用这份列表以及 data 上报的监控数据创立 SlotTable,后续 data 的上下线会触发 Meta 修正 SlotTable, SlotTable 会经过心跳分发给集群中各个节点。

SlotTable 在 SOFARegistry 是十分中心的概念,简称路由表。简略来说 SOFARegistry 是需求将发布订阅数据存储在不同的机器节点上,才能确保数据存储的横向扩展。那么不同机器上究竟存储哪些数据,便是 SlotTable 来保存的。

SlotTable 保存了 Slot 和机器节点之间的映射联系,数据经过 Hash 定位到某一个 Slot 上,经过 Slot 找到对应的机器 node 节点,将数据存储到对应的机器上。在这进程中有很多细节需求咱们了解。比如说每一个 Slot 对应 leader 和 follow 节点是怎么分配的。假如机器负载不平衡该怎么平衡,SlotTable 的更新是怎么进行的呢? 这些都是很风趣的细节完结。

1. DataServer 更新SlotTable 路由表进程。

SOFARegistry 源码|数据分片之核心-路由表 SlotTable 剖析

如上图所示 session 和 data 节点定时会向 Meta 节点上报心跳、Meta节点保护了 data 以及 session 节点列表信息、而且在心跳恳求中将回来 SlotTable 路由表信息、data 节点将路由表 SlotTable 保存在本地中。

2. SlotTable 更新平衡算法

由前文可知、SOFARegistry 选用了数据分片存储在 DataServer 节点之上、那么随之而来的问题便是数据怎么分片呢?

SOFARegistry 选用预分配的办法。

传统的一致性 Hash 算法有数据散布规模不固定的特性,该特性使得服务注册数据在服务器节点宕机、下线、扩容之后,需求从头存储排布,这为数据的同步带来了困难。大多数的数据同步操作是运用操作日志记载的内容来进行的,传统的一致性 Hash 算法中,数据的操作日志是以节点分片来划分的,节点改变导致数据散布规模的改变。

在核算机领域,大多数难题都能够经过增加一个中间层来处理,那么关于数据散布规模不固定所导致的数据同步难题,也能够经过同样的思路来处理。

这儿的问题在于,当节点下线后,若再以当时存活节点 ID 一致性 Hash 值去同步数据,就会导致已失效节点的数据操作日志无法获取到,既然数据存储在会改变的地方无法进行数据同步,那么假如把数据存储在不会改变的地方是否就能确保数据同步的可行性呢?答案是必定的,这个中间层便是预分片层,经过把数据与预分片这个不会改变的层相互对应就能处理这个数据同步的难题。

现在业界主要代表项目如 Dynamo、Casandra、Tair、Codis、Redis cluster 等,都选用了预分片机制来完结这个不会改变的层。

事先将数据存储规模等分为 N 个 slot 槽位,数据直接与 slot 相对应,数据的操作日志与相应的 solt 对应,slot 的数目不会由于节点的上下线而产生改变,由此确保了数据同步的可行性。除此之外,还需求引入“路由表”的概念,如图 13,“路由表”担任存放每个节点和 N 个 slot 的映射联系,并确保尽量把一切 slot 均匀地分配给每个节点。这样,当节点上下线时,只需求修正路由表内容即可。保持 slot 不变,即确保了弹性扩缩容,也大大降低了数据同步的难度。

SOFARegistry 源码|数据分片之核心-路由表 SlotTable 剖析

实际上上述 Slot 和 **节点 **的映射联系在源码中以 **SlotTable 和 Slot **的办法进行表达。源码如下代码块所示。


public final class SlotTable implements Serializable {
  public static final SlotTable INIT = new SlotTable(-1, Collections.emptyList());
  // 终究一次更新的时刻 epoch
  private final long epoch;
  //保存了 一切的 slot 信息; slotId ---> slot 目标的映射
  private final Map<Integer, Slot> slots;
}
public final class Slot implements Serializable, Cloneable {
  public enum Role {
    Leader,
    Follower,
  }
  private final int id;
  //当时slot的leader节点
  private final String leader;
  //最近更新时刻
  private final long leaderEpoch;
  //当时slot的follow节点
  private final Set<String> followers;
}

由于节点在动态改变中、所以 Slot 和 节点的映射也在时刻改变中、那么咱们接下来的要点便是 SlotTable 的改变进程。SlotTable 的改变是在 Meta 节点中触发、当有服务上下线的时分会触发SlotTable 的改变、除此之外也会定时执履行 SlotTable的改变。

SlotTable的整个同步更新进程如图所示。

代码参阅 com.alipay.sofa.registry.server.Meta.slot.arrange.ScheduledSlotArranger#arrangeSync.

SlotTable 的定时改变是经过在初始化 ScheduledSlotArranger 时分实例化守护线程不断的 定时履行 内部使命 Arranger 的 arrangeSync 办法来完结 SlotTable 改变的。大致流程如下所示。

SOFARegistry 源码|数据分片之核心-路由表 SlotTable 剖析

由于担任 SlotTable 的更新是在 MetaServer 中的主节点更新的。

所以更新 SlotTable的第一步便是判别是否是主节点。主节点才担任真实的 SlotTable 改变进程。

第二步是获取最新的 DataServer 节点,由于 从头分配 SlotTable 本质上是 对 DataServer 节点和 slot 槽位之间的映射联系进行从头分配。所以必定需求获取到当时正在存活的 DataServer 节点信息,然后方便的对之进行 slot 分配。

(这儿获取正在存活的 DataServer 也便是有和 MetaServer 保持心跳的 DataServer, 底层是从 com.alipay.sofa.registry.server.Meta.lease.impl.SimpleLeaseManager中获取,感爱好能够检查相关源码) 。

第三部是分配前置校验,实际上一些边界条件的判别、例如 DataServer 是否为空、 DataServer 的巨细是否大于配置的 minDataNodeNum,只有满足这些条件才进行改变。

第四步 履行 trayArrageSlot 办法、进入到该办法内部之中。

首要获取进程内部锁、实际上是一个 ReentrantLock,这儿主要是为了避免定时使命屡次一同履行 SlotTable 的分配工作。

private final Lock lock = new ReentrantLock();

随后便是依据当时的 Data 节点信息创立 SlotTableBuilder、这儿的 SlotTableBuilder 又是何方神圣呢?回到 SlotTable 更新的办法、一般是创立一个新的 SlotTable 目标、然后用这个新创立的目标去替代老的 SlotTable 目标、然后完结改变 SlotTable 操作、一般不会直接对老的SlotTable直接进行增删该 操作、这样并发导致的一致性问题很难操控。所以依据此、SlotTableBuilder 从它的称号就能够看出 它是 SlotTable 的创立者、内部聚合了SlotBuilder 目标。其实和 SlotTable 相似的、SlotTable 内部聚合了 Slot 信息。

在检查 SlotTable 改变算法之前、咱们先了解一下 SlotTableBuilder 的创立进程。SlotBuilder 的结构如下所示。

public class SlotTableBuilder {
  //当时正在创立的 Slot 信息
  private final Map<Integer, SlotBuilder> buildingSlots = Maps.newHashMapWithExpectedSize(256);
  // 反向查询索引数据、经过 节点查询该节点现在担任哪些 slot 的数据的办理。
  private final Map<String, DataNodeSlot> reverseMap = Maps.newHashMap();
  //slot 槽的个数
  private final int slotNums;
  //follow 节点的数量
  private final int followerNums;
  //最近一次更新的时刻
  private long epoch;
}

SlotTableBuilder 能够看出内部聚合了一个 buildingSlots 、标识正在创立的 Slot。由于 SlotTable 是由 Slot 构成的、这点也很简单了解。除此之外 SlotTableBuilder 内部也聚合了一个 reverseMap,代表反向查询索引,这个映射的 key是 dataServer、value是 DataNodeSlot 目标. DataNodeSlot 源码如下。

/**
    经过 Slot 找 leader和follows.
    本质上是经过节点找 Slot,当时节点作为leaders的slot、和以当时节点作为 follower 的节点.
    也便是说 当时我这个节点、我在那些 slot 中作为 leader, 对应的是 Set<Integer> leaders.
    以及我当时这个节点在哪些 slot 中作为 follow,对应存储在 Set<Integer> follows.
**/
public final class DataNodeSlot  {
  private final String dataNode;
  private final Set<Integer> leaders = Sets.newTreeSet();
  private final Set<Integer> followers = Sets.newTreeSet();
}

用一张图来表达 DataNodeSlot 如下所示。可见它和图1是刚好相反的映射。经过节点查找 与该节点有相关的 slot信息、由于后边要经常用到这一层查询、所以直接将这种联系保存下来。为了后边陈述方便、这儿核算几种陈述办法。

  1. 节点被作为leader 的slot调集咱们称为 : 节点 leader 的slot调集。
  2. 节点被作为follow 的slot调集咱们称为 : 节点 follow 的slot调集。
  3. SlotTable 相关的一切节点统称为: SlotTable 的节点列表

SOFARegistry 源码|数据分片之核心-路由表 SlotTable 剖析

再回到 SlotTableBuilder 创立

private SlotTableBuilder createSlotTableBuilder(SlotTable slotTable,
                                                List<String> currentDataNodeIps,
                                                int slotNum,int replicas) {
    //经过 NodeComparator 包装当时新增的、删去的的节点.
    NodeComparator comparator = new NodeComparator(slotTable.getDataServers(), currentDataNodeIps);
    SlotTableBuilder slotTableBuilder = new SlotTableBuilder(slotTable, slotNum, replicas);
    //履行 slotTableBuilder 的初始化
    slotTableBuilder.init(currentDataNodeIps);
    //在这儿将现已下线的 data 节点删去掉、
    //其间现已删去的 是经过 NodeComparator 内部的getDiff办法 完结的。
    comparator.getRemoved().forEach(slotTableBuilder::removeDataServerSlots);
    return slotTableBuilder;
}

办法参数 SlotTable 是经过 SlotManager 目标获取到旧的的 SlotTable 目标。currentDataNodeIps 代表当时存活的 dataServer (经过心跳保持和 MetaServer 的衔接) 然后传入 createSlotTableBuilder 办法内部。createSlotTableBuilder 办法内部经过 NodeComparator 目标核算而且包装了 旧的 “SlotTable 的节点列表” 与 传入的 currentDataNodeIps 之前的差异值。包括当时 CurrentDataNodeIps 中 新增和删去的 DataServer。随之调用 SlotTableBuilder 的 init 办法 。履行 SlotTableBuilder 的初始化。

SlotTableBuilder 的 init 源码如下。

 public void init(List<String> dataServers) {
    for (int slotId = 0; slotId < slotNums; slotId++) {
      Slot slot = initSlotTable == null ? null : initSlotTable.getSlot(slotId);
      if (slot == null) {
        getOrCreate(slotId);
        continue;
      }
     //1. 从头新建一个 SlotBuilder 将本来的 Slot 里面的数据复制过来
     //2. 复制 leader节点
      SlotBuilder slotBuilder =
          new SlotBuilder(slotId, followerNums, slot.getLeader(), slot.getLeaderEpoch());
     //3. 复制 follow 节点。
      slotBuilder.addFollower(initSlotTable.getSlot(slotId).getFollowers())
      buildingSlots.put(slotId, slotBuilder);
    }
     //4. 初始化反向查询索引数据、经过 节点查询该节点现在办理哪些 slot
    initReverseMap(dataServers);
  }

由上面的代码能够看出实际上init做了这么一件事情: 初始化 SlotBuilder 内部的 slotBuilder目标、而且将本来旧的 SlotTable 的 leader和follow节点悉数复制过去了。注意在实例化 SlotTableBuilder 的时分传入了旧的 SlotTable也便是这儿的 initSlotTable 目标。

init 办法终究一步的 initReverseMap 从称号能够看出构建了一个实例化反向路由表、反向查找表、从Node节点到Slot的查找功用、由于在之后的处理傍边经常会用到 某一个 data节点担任了那些slot的leader角色、以及哪些slot的follow角色. 所以这儿做了一层索引处理。

再回到 ScheduledSlotArranger 类中 createSlotTableBuilder 办法终究一步,此时 SlotTableBulder 内部现已完结了 旧的 SlotTable 的数据复制。

comparator.getRemoved().forEach(slotTableBuilder::removeDataServerSlots);

上文咱们说过 comparator 目标内部保存了 新的 dataServer 和旧的 ‘SlotTable 的节点列表’ 比较信息。

所以在新的 dataServer 中现已删去的节点、咱们需求从 SlotTableBuilder 中删去。内部的删去逻辑也是迭代一切的 SlotBuilder 比较 leader 和当时节点是否相同、相同则删去、follow同理。

public void removeDataServerSlots(String dataServer) {
    for (SlotBuilder slotBuilder : buildingSlots.values()) {
      //删去该 SlotBuilder  follow 节点中的 dataServer
      slotBuilder.removeFollower(dataServer)
      //假如该 SlotBuilder 的 leader 节点是 dataServer ,
      //那么设置该 slotBuilder 的leader节点为空、需求从头进行分配
      if (dataServer.equals(slotBuilder.getLeader())) {
        slotBuilder.setLeader(null);
      }
    }
    reverseMap.remove(dataServer);
}

总结来说创立 SlotTableBuilder 的进程便是依据旧的 SlotTable 实例化 SlotTableBuilder (内部的 SlotBuilder)、核算 旧的 ‘SlotTable 的节点列表’ 和当时最新的 dataServer的差异值、更新 SlotTableBuilder 内部的 SlotBuilder 相关的 leader 和follow值。

SOFARegistry 源码|数据分片之核心-路由表 SlotTable 剖析

到这一步实际上现已做完了 SlotTableBuilder 的构建进程。到这儿想想接下来该做什么呢? 能够想想,假如咱们触发 SlotTable 从头分配的是某一个 dataA 节点下线了,那么在 slotTableBuilder::removeDataServerSlots 这一步会将咱们正在创立的 SlotTableBuilder 中的 dataA 所办理的 Slot 的 leader 或许 follow 删去掉,那么该 Slot 的 leader 或许 follow 很或许就会变成空。也便是说该 Slot 没有 data 节点处理恳求。所以咱们依据当时 SlotBuilder 中是否有为完结分配的 Slot 来决议是否进行从头分配操作, 是否有未完结分配的Slot代码块如下。

  public boolean hasNoAssignedSlots() {
    for (SlotBuilder slotBuilder : buildingSlots.values()) {
      if (StringUtils.isEmpty(slotBuilder.getLeader())) {
        //当时 Slot的leader节点为空
        return true;
      }
      if (slotBuilder.getFollowerSize() < followerNums) {
        //当时 Slot的follow节点的个数小于配置的 followerNums
        return true;
      }
    }
    return false;
  }

创立完结 SlotTableBuilder 而且有没有完结分配的 Slot, 履行真实的分配进程,如下图所示。

SOFARegistry 源码|数据分片之核心-路由表 SlotTable 剖析

由图可知分配进程终究托付给 DefaultSlotAssigner ,DefaultSlotAssigner 在构造办法中实例化了 当时正在创立的 SlotTableBuilder /currentDataServers 的视图/MigrateSlotGroup, 其间 MigrateSlotGroup

内部保存的是那些短少 **leader **以及 followSlot

public class MigrateSlotGroup {
   //哪些 Slot 短少 leader
  private final Set<Integer> leaders = Sets.newHashSet();
  //哪些Slot 短少 follow 以及短少的个数
  private final Map<Integer, Integer> lackFollowers = Maps.newHashMap();
}

assign 代码如下. 代码中先分配 短少leader的 slot、随后分配短少 follow 的 slot

public SlotTable assign() {
    BalancePolicy balancePolicy = new NaiveBalancePolicy();
    final int ceilAvg =
        MathUtils.divideCeil(slotTableBuilder.getSlotNums(), currentDataServers.size());
    final int high = balancePolicy.getHighWaterMarkSlotLeaderNums(ceilAvg);
    //分配短少leader的slot
    if (tryAssignLeaderSlots(high)) {
      slotTableBuilder.incrEpoch();
    } 
    //分配短少 follow 的 slot
    if (assignFollowerSlots()) {
      slotTableBuilder.incrEpoch();
    } 
    return slotTableBuilder.build();
}

leader 节点分配

进入 tryAssignLeaderSlots 办法内部检查详细分配算法细节。经过代码注释的办法来解释详细完结。

private boolean tryAssignLeaderSlots(int highWatermark) {
    //依照 follows 节点的数量 从大到小排序 0比较特别排在终究边,0 为什么比较特别呢、由于无论怎么分配、
    //终究挑选出来的leader一定不是该slot的follow、由于该slot的follow为空
    //优先组织 follow节点比较少的 Slot
    //其实这点也能够想理解的。这些没有 leader 的 slot 分配顺序必定是要依据 follow节点越少的优先分配最好
    //以避免这个 follow 也挂了、那么数据就有或许会丢掉了。
    List<Integer> leaders =
        migrateSlotGroup.getLeadersByScore(new FewerFollowerFirstStrategy(slotTableBuilder));
    for (int slotId : leaders) {
      List<String> currentDataNodes = Lists.newArrayList(currentDataServers);
       //挑选 nextLeader 节点算法?
      String nextLeader =
          Selectors.slotLeaderSelector(highWatermark, slotTableBuilder, slotId)
              .select(currentDataNodes);
      //判别nextLeader是否是当时slot的follow节点 将follow节点提高为主节点的。 
      boolean nextLeaderWasFollower = isNextLeaderFollowerOfSlot(slotId, nextLeader);
      // 将当时 slot 的 leader 节点用挑选出来的 nextLeader 替换
      slotTableBuilder.replaceLeader(slotId, nextLeader);
      if (nextLeaderWasFollower) {
        //由于当时 Slot 将 follow节点提高为leader节点了、那么该 Slot 必定 follows 个数又不够了、需求再次分配 follow 节点
        migrateSlotGroup.addFollower(slotId);
      }
    }
    return true;
  }

上面分配 leader 代码中中心挑选 nextLeader 办法。

 String nextLeader =
          Selectors.slotLeaderSelector(highWatermark, slotTableBuilder, slotId)
              .select(currentDataNodes);

经过 Selectors 挑选一个 合适的 leader节点。

继续追踪 DefaultSlotLeaderSelector.select 办法内部。同理咱们选用代码注释的办法来解释详细完结。

public String select(Collection<String> candidates) {
  //candidates: 当时一切的候选节点,也是 tryAssignLeaderSlots 办法传入的 currentDataServers
  Set<String> currentFollowers = slotTableBuilder.getOrCreate(slotId).getFollowers();
  Collection<String> followerCandidates = Lists.newArrayList(candidates);
  followerCandidates.retainAll(currentFollowers);
  //经过 followerCandidates.retainAll(currentFollowers)) 之后 followerCandidates 
  //只是保存 当时 Slot 的 follow 节点
  //而且采取了一个战略是 当时 follow 节点作为其他 Slot 的leader最少的优先、
  //用直白的话来说。
  //当时 follower 越是没有被作为其他 Slot 的leader节点、那么
  //证明他便是越 '闲' 的。必然优先考虑挑选它作为leader 节点。
  String leader = new LeastLeaderFirstSelector(slotTableBuilder).select(followerCandidates);
  if (leader != null) {
    DataNodeSlot dataNodeSlot = slotTableBuilder.getDataNodeSlot(leader);
    if (dataNodeSlot.getLeaders().size() < highWaterMark) {
      return leader;
    }
  }
  //从其他的机器中挑选一个,优先挑选充任 leader 的 slot 个数最少的那一个 DataServer
  return new LeastLeaderFirstSelector(slotTableBuilder).select(candidates);
}

经过上面 select 办法源码注释相信能够很简单了解 SOFARegistry 的做法。总结来说,便是首要从 当时 slot 的 follow 节点中找出 leader,由于在此情况下不需求做数据搬迁,相当于主节点挂了,提高备份节点为主节点完结高可用。可是详细挑选哪一个,SOFARegistry 采取的战略是 在一切的 follow 节点中找出最 “闲”的那一个,可是假如它一切的 follow 节点作为 leader 节点办理的 Slot 个数大于 highWaterMark,那么证明该 Slot 的一切 follow 节点都太”忙”了,那么就会从悉数存活的机器中挑选一个 “当作为 leader 节点办理的 Slot个数”最少的那一个,可是这种情况其实有数据同步开销的。

follow 节点分配

同理经过源码注解办法来胪陈

  private boolean assignFollowerSlots() {
    //运用 FollowerEmergentScoreJury 排序得分战略表明
    // 某一个 slot 短少越多 follow、排序越靠前。  
    List<MigrateSlotGroup.FollowerToAssign> followerToAssigns =
        migrateSlotGroup.getFollowersByScore(new FollowerEmergentScoreJury());
    int assignCount = 0;
    for (MigrateSlotGroup.FollowerToAssign followerToAssign : followerToAssigns) {
      // 当时待分配的 slotId
      final int slotId = followerToAssign.getSlotId();
      // 当时 slotId 槽中还有多少待分配的 follow 从节点。依次迭代分配。
      for (int i = 0; i < followerToAssign.getAssigneeNums(); i++) {
        final List<String> candidates = Lists.newArrayList(currentDataServers);
        // 依据上文中的 DataNodeSlot 结构、依据 节点被作为follow 的slot的个数从小到大排序。
        // follows 个数相同、依照最少作为 leader 节点进行排序。
        // 其实终究目的便是找到最 "闲" 的那一台机器。
        candidates.sort(Comparators.leastFollowersFirst(slotTableBuilder));
        boolean assigned = false;
        for (String candidate : candidates) {
          DataNodeSlot dataNodeSlot = slotTableBuilder.getDataNodeSlot(candidate);
          //跳过现已是它的 follow 或许 leader 节点的Node节点
          if (dataNodeSlot.containsFollower(slotId) || dataNodeSlot.containsLeader(slotId)) {
            continue;
          }
          //给当时 slotId 增加候选 follow 节点。
          slotTableBuilder.addFollower(slotId, candidate);
          assigned = true;
          assignCount++;
          break;
        }
      }
    }
    return assignCount != 0;
  }

如之前所述、MigrateSlotGroup 保存了 需求进行从头分配 leader 以及 follow 的 Slot 信息。算法的主要进程如下。

  1. 找到一切没有足够 follow 的 Slot 信息
  2. 依据 短少 follow 个数越多越优先准则排序
  3. 迭代一切短少 follow 的 Slot 信息 这儿是 被 MigrateSlotGroup.FollowerToAssign 包装
  4. 内部循环迭代短少 follow 巨细、增加给该 Slot 所需的 follow
  5. 对候选 dataServer 进行排序、依照 “闲、忙“成都进行排序
  6. 履行增加 follow 节点

到此、我么现已给短少 leader 或许 follow 的 Slot 完结了节点分配。

SlotTable 平衡算法

了解完 SlotTable 的改变进程以及算法之后、相信我们对此有了自己的了解。那么SlotTable 的平衡进程其实也是相似的。概况能够参阅源码com.alipay.sofa.registry.server.Meta.slot.balance.DefaultSlotBalancer。

由于在节点的频频上下线进程中、势必会导致某一些节点的负载(担任的 slot 办理数量)过高、某些节点的负载又很低、这样需求一种动态平衡机制来确保节点的相对负载均衡

进口在 DefaultSlotBalancer.balance办法内部

public SlotTable balance() {
    //平衡 leader 节点
    if (balanceLeaderSlots()) {
      LOGGER.info("[balanceLeaderSlots] end");
      slotTableBuilder.incrEpoch();
      return slotTableBuilder.build();
    }
    if (balanceHighFollowerSlots()) {
      LOGGER.info("[balanceHighFollowerSlots] end");
      slotTableBuilder.incrEpoch();
      return slotTableBuilder.build();
    }
    if (balanceLowFollowerSlots()) {
      LOGGER.info("[balanceLowFollowerSlots] end");
      slotTableBuilder.incrEpoch();
      return slotTableBuilder.build();
    }
    // check the low watermark leader, the follower has balanced
    // just upgrade the followers in low data server
    if (balanceLowLeaders()) {
      LOGGER.info("[balanceLowLeaders] end");
      slotTableBuilder.incrEpoch();
      return slotTableBuilder.build();
    }
    return null;
}

由于篇幅约束、这儿只分析 leader 节点平衡进程。如上源码中的 balanceLeaderSlots() 其他进程和 它相似、感爱好的读者也能够自己查找源码分析。

进入 balanceLeaderSlots 办法内部。

  private boolean balanceLeaderSlots() {
    //这儿便是找到每一个节点 dataServer 作为leader的 slot 个数的最大天花板值----> 
    //简单想到的计划必定是平均办法、一共有 slotNum 个slot、
    //将这些slot的leader归属平均分配给 currentDataServer
    final int leaderCeilAvg = MathUtils.divideCeil(slotNum, currentDataServers.size());
    if (upgradeHighLeaders(leaderCeilAvg)) {
      //假如有替换过 leader、那么就直接回来、不用进行 migrateHighLeaders 操作
      return true;
    }
    if (migrateHighLeaders(leaderCeilAvg)) {
      //经过上面的 upgradeHighLeaders 操作
      //不能找到 follow 进行搬迁、由于一切的follow也都很忙、在 exclude 傍边、
      //所以没法找到一个follow进行搬迁。那么咱们尝试搬迁 follow。
      //由于 highLeader 的一切 follower 都是比较忙、所以需求将这些忙的节点进行搬迁、期待给这些 highLeader 所担任的 slot 替换一些比较悠闲的 follow
      return true;
    }
    return false;
  }

咱们要点重视 upgradeHighLeaders 办法、同理选用源码注解的办法

 private boolean upgradeHighLeaders(int ceilAvg) {
    //"假如一个节点的leader的slot个数大于阈值、那么就会用目标slot的follow节点来替换当时leader"  最多移动 maxMove次数
    final int maxMove = balancePolicy.getMaxMoveLeaderSlots();
    //了解来说这块能够直接将节点的 leader 个数大于 ceilAvg 的 节点用其他节点替换就能够了、为什么还要再次向上取整呢?
    //主要是避免slotTable出现颤动,所以设定了触发改变的上下阈值 这儿向上取整、是作为一个不平衡阈值来运用、
    // 便是只针关于不平衡多少(这个多少能够操控)的进行再平衡处理
    final int threshold = balancePolicy.getHighWaterMarkSlotLeaderNums(ceilAvg);
    int balanced = 0;
    Set<String> notSatisfies = Sets.newHashSet();
    //循环履行替换操作、默认履行 maxMove 次
    while (balanced < maxMove) {
      int last = balanced;
      //1. 找到 哪些节点的 leader 个数 超越 threshold 、并对这些节点依照leader 的个数的从大到小摆放。
      final List<String> highDataServers = findDataServersLeaderHighWaterMark(threshold);
      if (highDataServers.isEmpty()) {
        break;
      }
      // 没有任何 follow 节点能用来晋升到 leader 节点
      if (notSatisfies.containsAll(highDataServers)) {
        break;
      }
      //2. 找到能够作为新的leader的 节点,可是不包括现已不能增加任何leader的节点、由于这些节点的leader现已超越阈值了。
      final Set<String> excludes = Sets.newHashSet(highDataServers);
      excludes.addAll(findDataServersLeaderHighWaterMark(threshold - 1));
      for (String highDataServer : highDataServers) {
        if (notSatisfies.contains(highDataServer)) {
          //假如该节点现已在不满足替换条件行列中、则不在进行查找可替换节点操作
          continue;
        }
        //找到能够作为新的leader的 节点,可是不包括现已不能增加任何leader的节点、由于这些节点的leader现已超越阈值了。
        //算法进程是: 
        //1. 从 highDataServer 所担任的一切 slot 中找到某一个 slot、这个 slot 满足一个条件便是: 该 slot 的 follow 节点中有一个最闲(也便是 节点的leader的最小)
        //2. 找到这个 slot、咱们只需求替换该 slot 的leader为找到的follow
        //其实站在宏观的视点来说便是将 highDataServer 节点leader 的一切slot的follow节点依照闲忙程度进行排序、
        //找到那个最闲的、然后让他当leader。这样就替换了 highDataServer 当leader了
        Tuple<String, Integer> selected = selectFollower4LeaderUpgradeOut(highDataServer, excludes);
        if (selected == null) {
          //没有找到任何 follow节点用来替代 highDataServer节点、所以该节点不满足可替换条件、参加到 notSatisfies 不行替换行列中. 以便于外层循环直接过滤。
          notSatisfies.add(highDataServer);
          continue;
        }
        //找到 highDataServer 节点的某一个可替换的 slotId
        final int slotId = selected.o2;
        // 找到 slotId 替换 highDataServer 作为leader 的节点 newLeaderDataServer
        final String newLeaderDataServer = selected.o1; 
        // 用 newLeaderDataServer 替换 slotId 旧的 leader 节点。
        slotTableBuilder.replaceLeader(slotId, newLeaderDataServer); 
        balanced++;
      }
      if (last == balanced) break;
    }
    return balanced != 0;
  }

进入关键查找可替换的 slotId 和新的 leader 节点的进程,同理选用源码注解的办法。

  /*
    从 leaderDataServer 所leader的一切slot中、挑选一个能够替换的slotId
    和新的leader来替换leaderDataServer
   */
  private Tuple<String, Integer> selectFollower4LeaderUpgradeOut(
      String leaderDataServer, Set<String> excludes) {
    //获取当时 leaderDataServer 节点 leader 或许 follow 的slotId 视图。DataNodeSlot 结构咱们上文有说过。
    final DataNodeSlot dataNodeSlot = slotTableBuilder.getDataNodeSlot(leaderDataServer);
    Set<Integer> leaderSlots = dataNodeSlot.getLeaders();
    Map<String, List<Integer>> dataServers2Followers = Maps.newHashMap();
    //1. 从 dataNodeSlot 获取 leaderDataServer 节点leader的一切slotId: leaderSlots
    for (int slot : leaderSlots) {
      //2. 从slotTableBuilder 中找出当时 slot 的follow
      List<String> followerDataServers = slotTableBuilder.getDataServersOwnsFollower(slot);
      //3. 去掉excludes ,得到候选节点,由于 excludes 必定不会是新的 leader 节点
      followerDataServers = getCandidateDataServers(excludes, null, followerDataServers);
      //4. 构建 候选节点到 slotId 调集的映射联系。
      for (String followerDataServer : followerDataServers) {
        List<Integer> followerSlots =
            dataServers2Followers.computeIfAbsent(followerDataServer, k -> Lists.newArrayList());
        followerSlots.add(slot);
      }
    }
    if (dataServers2Followers.isEmpty()) {
      //当 leaderDataServer 节点的follow 都是 excludes 中的成员时分、那么就有或许是空的。
      return null;
    }
    List<String> dataServers = Lists.newArrayList(dataServers2Followers.keySet());
    //依照 候选节点的 leader的 slot 个数升序排序、也便是也便是找到那个最不忙的,感爱好能够检查 leastLeadersFirst 办法内部完结。
    dataServers.sort(Comparators.leastLeadersFirst(slotTableBuilder));
    final String selectedDataServer = dataServers.get(0);
    List<Integer> followers = dataServers2Followers.get(selectedDataServer);
    return Tuple.of(selectedDataServer, followers.get(0));
  }

至此咱们完结了 高负载 leader 节点的替换、在此进程中假如有替换过、那么直接回来、假如没有替换过、咱们会继续履行 DefaultSlotBalancer 中的 migrateHighLeaders 操作。由于假如经过 DefaultSlotBalancer 中的 upgradeHighLeaders 操作之后没有进行过任何leader的替换、那么证明 高负载的 leader 节点同样它的 follow 节点也很忙、所以需求做得便是对这些忙的 follow 节点也要进行搬迁。咱们继续经过源码注释的办法来检查详细的进程。

private boolean migrateHighLeaders(int ceilAvg) {
    final int maxMove = balancePolicy.getMaxMoveFollowerSlots();
    final int threshold = balancePolicy.getHighWaterMarkSlotLeaderNums(ceilAvg);
    int balanced = 0;
    while (balanced < maxMove) {
      int last = balanced;
      // 1. find the dataNode which has leaders more than high water mark
      //    and sorted by leaders.num desc
      final List<String> highDataServers = findDataServersLeaderHighWaterMark(threshold);
      if (highDataServers.isEmpty()) {
        return false;
      }
      // 2. find the dataNode which could own a new leader
      // exclude the high
      final Set<String> excludes = Sets.newHashSet(highDataServers);
      // exclude the dataNode which could not add any leader
      excludes.addAll(findDataServersLeaderHighWaterMark(threshold - 1));
      final Set<String> newFollowerDataServers = Sets.newHashSet();
      // only balance highDataServer once at one round, avoid the follower moves multi times
      for (String highDataServer : highDataServers) {
        Triple<String, Integer, String> selected =
            selectFollower4LeaderMigrate(highDataServer, excludes, newFollowerDataServers);
        if (selected == null) {
          continue;
        }
        final String oldFollower = selected.getFirst();
        final int slotId = selected.getMiddle();
        final String newFollower = selected.getLast();
        slotTableBuilder.removeFollower(slotId, oldFollower);
        slotTableBuilder.addFollower(slotId, newFollower);
        newFollowerDataServers.add(newFollower);
        balanced++;
      }
      if (last == balanced) break;
    }
    return balanced != 0;
  }

3. session 和 data 节点怎么运用路由表

上文咱们 了解了 SlotTable 路由表在心跳中从 Meta 节点获取而且更新到本地中、那么 session 和 data 节点怎么运用路由表呢。首要咱们先看看 session 节点怎么运用 SlotTable 路由表。 session 节点承当着客户端的发布订阅恳求,而且经过 SlotTable 路由表对 data 节点的数据进行读写; session 节点本地 SlotTable 路由表保存在 SlotTableCacheImpl 。

public final class SlotTableCacheImpl implements SlotTableCache {
  // 不同核算 slot 方位的算法抽象
  private final SlotFunction slotFunction = SlotFunctionRegistry.getFunc();
  //本地路由表、心跳中从 Meta 节点获取到。
  private volatile SlotTable slotTable = SlotTable.INIT;
  //依据 dataInfoId 获取 slotId
  @Override
  public int slotOf(String dataInfoId) {
    return slotFunction.slotOf(dataInfoId);
  }
}

源码中的 SlotFunctionRegistry 注册了两种算法完结。分别是 crc32 和 md5 完结、源码如下所示。

public final class SlotFunctionRegistry {
  private static final Map<String, SlotFunction> funcs = Maps.newConcurrentMap();
  static {
    register(Crc32cSlotFunction.INSTANCE);
    register(MD5SlotFunction.INSTANCE);
  }
  public static void register(SlotFunction func) {
    funcs.put(func.name(), func);
  }
  public static SlotFunction getFunc() {
    return funcs.get(SlotConfig.FUNC);
  }
}

随意挑选某一个算法、例如 MD5SlotFunction、依据 dataInfoId 核算 slotId 的完结如下。

public final class MD5SlotFunction implements SlotFunction {
  public static final MD5SlotFunction INSTANCE = new MD5SlotFunction();
  private final int maxSlots;
  private final MD5HashFunction md5HashFunction = new MD5HashFunction();
  private MD5SlotFunction() {
    this.maxSlots = SlotConfig.SLOT_NUM;
  }
  //核算 slotId的最底层逻辑。可见也是经过取hash然后对 slot槽个数取余
  @Override
  public int slotOf(Object o) {
    // make sure >=0
    final int hash = Math.abs(md5HashFunction.hash(o));
    return hash % maxSlots;
  }
}

了解了详细依据 DataInfoId 来经过 SlotTable 获取详细的数据 slotId,咱们来看看在 session 节点中何时触发核算 datInfoId 的 slotId。咱们能够想想,一般 session 节点运用来处理客户端的发布订阅恳求,那么当有发布恳求的时分,发布的数据一同也会向 data 节点写入发布的元数据,那么必定需求知道该数据保存在哪一台机器上,此时就需求依据 dataInfoId 找到对应的 slotId,进而找到对应的 leader 节点,经过网络通讯工具将发布恳求转发给该节点处理,session 数据接收发布恳求处理 handler 为 PublisherHandler。

SOFARegistry 源码|数据分片之核心-路由表 SlotTable 剖析

如上面时序图所示、在 DataNodeServiceImpl 终究的 commitReq 办法中会将发布恳求增加到内部的 BlockingQueue 傍边去,DataNodeServiceImpl 内部的 Worker 目标会消费 BlockingQueue 中内部履行真实的数据写入进程。详细源码请参阅 :

private final class Worker implements Runnable {
    final BlockingQueue<Req> queue;
    Worker(BlockingQueue<Req> queue) {
      this.queue = queue;
    }
    @Override
    public void run() {
      for (; ; ) {
          final Req firstReq = queue.poll(200, TimeUnit.MILLISECONDS);
          if (firstReq != null) {
            Map<Integer, LinkedList<Object>> reqs =
                drainReq(queue, sessionServerConfig.getDataNodeMaxBatchSize());
            //由于 slot 的个数有或许大于 work/blockingQueue 的个数、所以
            //并不是一个 slot 对应一个 work、那么一个blockQueue 中或许存在发往多个slot的数据、这儿
            //有或许一次发送不完这么多数据、需求分批发送、将首要进入行列的优先发送了。
            LinkedList<Object> firstBatch = reqs.remove(firstReq.slotId);
            if (firstBatch == null) {
              firstBatch = Lists.newLinkedList();
            }
            firstBatch.addFirst(firstReq.req);
            request(firstReq.slotId, firstBatch);
            for (Map.Entry<Integer, LinkedList<Object>> batch : reqs.entrySet()) {
              request(batch.getKey(), batch.getValue());
            }
          }
        }
     }
 }
private boolean request(BatchRequest batch) {
  final Slot slot = getSlot(batch.getSlotId());
  batch.setSlotTableEpoch(slotTableCache.getEpoch());
  batch.setSlotLeaderEpoch(slot.getLeaderEpoch());
  sendRequest(
      new Request() {
        @Override
        public Object getRequestBody() {
          return batch;
        }
        @Override
        public URL getRequestUrl() {
          //经过 slot 路由表找到对应的 leader data节点,这
          //个路由表是 心跳中从 Meta 节点获取来的。
          return getUrl(slot);
        }
      });
  return true;
}

借此源码解析的使命让我细心的研究了 SlotTable 数据预分片机制,真实了解了一个工业等级的数据分片是怎么完结的,高可用是怎么做的,以及在功用完结中的各种取舍。

因此和我们共享这篇对 SOFARegistry SlotTable 的分析,欢迎我们留言辅导。

也有幸参加了SOFARegistry的社区会议,在社区前辈们的辅导下仔细了解了详细细节的规划考量,一同评论 SOFARegistry 的未来发展。

欢迎参加,参加 SOFA 社区的源码解析

现在SOFARegistry源码解析使命已发完, Layotto 源码解析还有 1 个使命待招领,有爱好的朋友能够试试看。

SOFARegistry 源码|数据分片之核心-路由表 SlotTable 剖析

Layotto

待招领使命:WebAssembly 相关

github.com/mosn/layott…

之后也会推出其他项目的源码解析活动,敬请期待…