SOFARegistry | 聊一聊服务发现的数据一致性

文|肖健(花名:昱恒)

蚂蚁集团技能专家

专注于服务发现范畴,现在首要从事蚂蚁注册中心 SOFARegistry 设计、研制工作。

本文 9492字 阅读 24 分钟

PART. 1 前语

1.1 什么是服务发现

微服务的体系中,多个运用程序之间将以 RPC 办法进行彼此通讯。这些运用程序的服务实例是动态改变的,咱们需求知道这些实例的准确列表,才干让运用程序之间按预期进行 RPC 通讯。这便是服务发现在微服务体系中的中心作用。

SOFARegistry | 聊一聊服务发现的数据一致性

SOFARegistry 是蚂蚁集团在生产大规模运用的服务注册中心,阅历了多年大促的检测,支撑蚂蚁巨大的服务集群;具有分布式可水平扩容、容量大、推送推迟低、高可用等特色。

1.2 服务发现的考量

设计和考量一个服务发现体系,能够从下面这些目标展开:

SOFARegistry | 聊一聊服务发现的数据一致性

各个目标之间并不是彼此独立的。例如关于数据共同性计划的选型也会影响到数据分区、数据仿制、集群容灾、多集群同步等计划的决议计划,也在很大程度上决定这个服务发现体系的整体架构。

这篇文章重点分析了各个服务发现体系的数据共同性计划,以及依据这个计划延伸出来的特性,帮助咱们开始了解服务发现体系。

PART. 2 开源产品分析

2.1 为什么需求数据共同性

依据上述描述,数据共同性在服务发现体系中如此重要,甚至会影响到整个服务发现体系的各方面架构考量,那咱们究竟为什么需求数据共同性呢?

要回答这个问题,让咱们从单点故障说起:早期咱们运用的服务,以及服务数据存储,它们往往是布置在单节点上的。可是单节点存在单点故障,一旦单节点宕机就整个服务不可用,对事务影响非常大。随后,为了处理单点问题,软件体系引入了数据仿制技能,完结多副本。

经过数据仿制计划,一方面咱们能够提高服务可用性,防止单点故障;另一方面,多副本能够提高读吞吐量、甚至就近布置在事务地点的地理方位,降低拜访推迟。

随着多副本的引入,就会涉及到多个副本之间的数据怎样保持共同的问题,于是数据共同性随之而来。

2.2 开源产品分析

关于多个副本之间进行数据同步,共同性关系从强到弱顺次是:

  • 线性共同性 (Linearizability consistency)

  • 次序共同性 (Sequential consistency)

  • 因果共同性 (Causal consistency)

  • 终究共同性 (Eventual consistency)

咱们比照一下现在开源的比较典型的服务发现产品,在数据共同性上的计划完结:

SOFARegistry | 聊一聊服务发现的数据一致性

PART. 3 Etcd 数据共同性

SOFARegistry | 聊一聊服务发现的数据一致性

3.1 Etcd 读数据流程

1. Client:Etcdctl 封装了操作 Etcd、KV Server、Cluster、Auth、Lease、Watch 等模块的 API;

2. KV Server: Client 发送 RPC 恳求到了 Server 后,KV Server 依据拦截器记载一切恳求的履行耗时及错误码、来源 IP 等,也可操控恳求是否允许经过;

3. Raft: Etcd 收到读恳求后,向 Etcd Raft 模块建议 Read Index 读数据恳求,回来最新的 ReadState 结构体数据;

4. MVCC:KV Server 获取到 Read State 数据后,从 MVCC 模块的 Tree Index 读取依据 Key-Version 的唯一标识 Revision;再以 Revision 作为 Key 从 Boltdb 中读取数据。

3.2 Etcd 写数据流程

1. Client: Etcdctl 封装了操作 Etcd、KV Server、Cluster、Auth、Lease、Watch 等模块的 API;

2. KV Server: 经过一系列查看之后,然后向 Raft 模块建议 (Propose) 一个提案 (Proposal) ,提案内容为存储的 value;

3. Raft:

a.向 Raft 模块建议提案后,KV Server 模块会等待此 put 恳求;假如一个恳求超时未回来成果,会呈现的 EtcdServer:request timed out 错误。

b.Raft 模块收到提案后,假如当时节点是 Follower,它会转发给 Leader,只要 Leader 才干处理写恳求。Leader 收到提案后,经过 Raft 模块将 put 提案音讯播送给集群各个节点,一起需求把集群 Leader 任期号、投票信息、已提交索引、提案内容持久化到一个 WAL (Write Ahead Log) 日志文件中,用于保证集群的共同性、可恢复性。

4. Raft 模块提交 Proposal 完结后,向MVCC模块提交写数据。

3.3 Raft 功用分解

共识算法的祖师爷是 Paxos, 可是由于它过于杂乱、难于理解,工程实践上也较难落地,导致在工程界落地较慢。

Standford 大学的 Diego 提出的 Raft 算法正是为了可理解性、易完结而诞生的,它经过问题分解,将杂乱的共识问题拆分红三个子问题,分别是:

  • Leader 推举: Leader 故障后集群能快速选出新 Leader;

  • 日志仿制:集群只要 Leader 能写入日志, Leader 担任仿制日志到 Follower 节点,并强制 Follower 节点与自己保持相同;

  • 安全性: 一个任期内集群只能产生一个 Leader、已提交的日志条目在产生 Leader 推举时,必定会存在更高任期的新 Leader 日志中、各个节点的状态机运用的任意方位的日志条目内容应相同等。

下面以实际场景为案例,分别深化讨论这三个子问题,看看 Raft 是怎么处理这三个问题,以及在 Etcd 中的运用完结。

关于 Raft 的 Leader 推举与日志仿制,能够从www.kailing.pub/raft/index.…动画中进一步了解。

3.4 Etcd 读写共同性

3.4.1 线性共同性写

一切的 Read/Write 都会来到 Leader,Write 会有 Oplog Leader 被序列化,顺次次序往后 commit,并 Apply 然后在回来,那么一旦一个 Write 被 committed,那么其前面的 Write 的 Oplog 必定就被 committed 了。一切的 Write 都是有严格的次序的,一旦被 committed 就可见了,所以 Raft 是线性共同性写。

3.4.2 线性共同性读

Etcd 默许的读数据流程是 Linearizability Read,那么怎么样才干读取到 Leader 已经完结提交的数据呢?

读恳求走一遍 Raft 协议

SOFARegistry | 聊一聊服务发现的数据一致性

每个 Read 都生成一个对应的 Oplog,和 Write 相同,都会走一遍共同性协议的流程,会在此 Read Oplog 被 Apply 的时分读,那么这个 Read Oplog 之前的 Write Oplog 肯定也被 Applied 了,那么必定能够被读取到,读到的也必定是最新的。

  • 有什么问题?

  • 不只有日志写盘开销,还有日志仿制的 RPC 开销,在读比重较大的体系中是无法接受的;

  • 还多了一堆的 Raft ‘读日志’。

Read Index

  • 这是 Raft 论文中提到过的一种优化计划,具体来说:

  • Leader 将当时自己 Log 的 Commit Index 记载到一个 local 变量 Read Index 里面;

  • 向其它节点建议一次 Heartbeat,假如大多数节点回来了对应的 Heartbeat Response,那么 Leader 就能够确定现在自己仍然是 Leader;

  • Leader 等待自己的状态机履行,直到 Apply Index 超过了 Read Index,这样就能够安全的供给 Linearizable Read 了;

  • Leader 履行 Read 恳求,将成果回来给 Client。

  • Read Index 小结:

  1. 相比较于走 Raft Log 的办法,Read Index 读省去了磁盘的开销,能大幅度提高吞吐,结合 JRaft 的 batch + pipeline ACK + 全异步机制,三副本的状况下 Leader 读的吞吐接近于 RPC 的上限;

  2. 推迟取决于多数派中最慢的一个 Heartbeat Response。

Lease Read

  • Lease Read 与 Read Index 类似,但更进一步,不只省去了 Log,还省去了网络交互;它能够大幅提高读的吞吐,也能明显降低延时;

  • 基本的思路是 Leader 取一个比 election timeout (1s) 小的租期 (最好小一个数量级,100ms) , 在租约期内不会产生推举,这就保证了 Leader 不会变,所以能够越过 Read Index 的第二步,也就降低了延时。

3.4.3 串行性读

直接读状态机数据回来、无需经过 Raft 协议与集群进行交互的模式,在 Etcd 里叫做串行 (Serializable) 读,能够经过 WithSerializable() 进行设置,它具有低延时、高吞吐量的特色,合适对数据共同性要求不高的场景。

SOFARegistry | 聊一聊服务发现的数据一致性

PART. 4 Eureka 数据共同性

4.1Eureka 数据读写流程

SOFARegistry | 聊一聊服务发现的数据一致性

  • Eureka 节点彻底对等布置,每台 Server 保存全量的数据:

  • Sub 会守时 (Eureka.client.registry-fetch-interval-seconds 界说,默许值为 30s) 向注册中心获取数据,更新本地缓存;

  • 服务实例会经过心跳 (Eureka.Instance.lease-renewal-interval-in-seconds 界说心跳的频率,默许值为 30s) 续约的办法向 Eureka Server 守时更新自己的状态。Eureka Server 收到心跳后,会告诉集群里的其它 Eureka Server 更新此实例的状态。Service Provider/Service Consumer 也会守时更新缓存的实例信息。

  • 服务的下线有两种状况:

  • 在 Service Provider 服务 shutdown 的时分,自动告诉 Eureka Server 把自己除掉,从而防止客户端调用已经下线的服务;

  • Eureka Server 会守时 (间隔值是 Eureka.server.eviction-interval-timer-in-ms,默许值为 0,默许状况不删去实例) 进行查看,假如发现实例在在必守时刻 (此值由 Eureka.Instance.lease-expiration-duration-in-seconds 界说,默许值为 90s) 内没有收到心跳,则会注销此实例。

4.2 发动全量拉取

private boolean fetchRegistry()
{
//Ifthedeltaisdisabledorifitisthefirsttime,getallapplications    
    if (serverConfig.shouldDisableDeltaForRemoteRegions()       
    || (getApplications() == null)
    ||(getApplications().getRegisteredApplications().size()==0)){       
    // 全量获取
    logger.info("Disabledeltaproperty:
    }
    ",serverConfig.shouldDisableDeltaForRemoteRegions());
    logger.info("Applicationisnull:{}",getApplications()==null);
    logger.info("RegisteredApplicationssizeiszero:{}",getApplications().getRegisteredApplications().isEmpty()); 
    success = storeFullRegistry();
    } 
    else {
    //增量获取        success = fetchAndStoreDelta();    
    }
    return success;
    }
  1. Eureka-Server 的仿制算法是依靠增量仿制+全量仿制完结的。区别于 ZooKeeper,这里没有 Leader 的概念,一切的结点都是平等的,因而数据并不保证共同性。

  2. 发动时调用 storeFullRegistry,选取 1 台 Eureke-Server 进行一次全量拉取,运用 EurekaHttpClient.getApplications();url=”/apps” ;

  3. Server 端获取本地 Cache 中的数据进行回来。

4.3 数据改变增量仿制

4.3.1 Client 建议仿制

  1. 此处的 Client 指的是 Eureka-1,当 Eureka-1 收到客户端的服务注册 (Registers) 、服务更新 (Renewals) 、服务撤销 (Cancels) 、服务超时 (Expirations) 和服务状态改变 (Status Changes) 后,改写本地注册信息;

  2. 遍历一切的节点 (会扫除自己) ,将音讯转发到其它节点;为了完结数据同步 (Eureka 保证的 AP 特性) ,每个节点需求维护一个节点列表,这个节点列表便是 PeerEurekaNodes,她担任管理一切的 PeerEurekaNodes;

  3. 转发恳求时,在 HTTPHeader 中携带 x-netflix-discovery-replication : true 的标识,则处理恳求的机器不会再将恳求持续转发,防止死循环。

/**
*
ReplicatesallinstancechangestopeerEurekanodesexceptfor
* replication traffic to this node. * 
*/privatevoidreplicateInstanceActionsToPeers(Actionaction,StringappName,
Stringid,InstanceInfoinfo,InstanceStatusnewStatus,
PeerEurekaNodenode){
    switch (action) {  
    case Cancel:            
    node.cancel(appName, id);
    break;
    case Heartbeat:
    InstanceStatusoverriddenStatus=overriddenInstanceStatusMap.get(id);
    infoFromRegistry=getInstanceByAppAndId(appName,id,false);
    node.heartbeat(appName,id,infoFromRegistry,overriddenStatus,false);  
    break;
    case Register: 
    node.register(info);   
    break;    
    case StatusUpdate:
    infoFromRegistry=getInstanceByAppAndId(appName,id,false);
    node.statusUpdate(appName,id,newStatus,infoFromRegistry);
    break;     
    case DeleteStatusOverride:
    infoFromRegistry=getInstanceByAppAndId(appName,id,false);
    node.deleteStatusOverride(appName,id,infoFromRegistry);   
    break; 
    }}
    @OverridepublicEurekaHttpResponse<Void>register(InstanceInfoinfo){
    String urlPath = "apps/" + 
    info.getAppName();
    BuilderresourceBuilder=jerseyClient.target(serviceUrl).path(urlPath).request();    
    addExtraProperties(resourceBuilder);  
    addExtraHeaders(resourceBuilder);  
    response = resourceBuilder      
    .accept(MediaType.APPLICATION_JSON)
    .acceptEncoding("gzip")       
    .post(Entity.json(info));
    returnanEurekaHttpResponse(response.getStatus()
    ).headers(headersOf(response)).build();
}

4.3.2 Server 处理增量仿制

1.Server 收到数据改变恳求后,依据 lastDirtyTimestamp 处理数据版别冲突,lastDirtyTimestamp 是注册中心里面服务实例 (Instance) 的一个属性,表示此服务实例最近一次改变时刻;

  1. Eureka Server A 把数据发送给 Eureka Server B,数据冲突有 2 种状况:
  • A 的数据比 B 的新,B 回来 404,A 从头把这个运用实例注册到 B;
  • A 的数据比 B 的旧,B 回来 409,要求 A 同步 B 的数据。
publicvoidregister
(InstanceInforegistrant,intleaseDurtion,booleanisReplication)
{
    // .... 获取 instance 实例对象
    Lease<InstanceInfo>existingLease=gMap.get(registrant.getId()); 
    //假如 Eureka Server 中该实例已经存在
    if(existingLease!=null&&(existingLease.getHolder()!=null))
    {     
    // 比较 lastDirtyTimestamp , 以 lastDirtyTimestamp 大的为准
    if(existingLastDirtyTimestamp>registrationLastDirtyTimestamp){    
    registrant = existingLease.getHolder();  
    }
    }}

4.4 Apps 守时增量同步与校验

4.4.1 Client

在 Eureka Server 发动完结初次全量同步后,考虑从增量数据仿制会有处理失利的状况,所以需求有一个守时使命每隔 30s 进行增量数据同步与校验:

@OverridepublicEurekaHttpResponse<Applications>
getDelta(String...regions)
{    
return getApplicationsInternal("apps/delta", regions);
}
private boolean fetchAndStoreDelta() throws Throwable 
{    
long currGeneration = fetchRegistryGeneration.get(); 
Applications delta = fetchRemoteRegistry(true);
   String reconcileHashCode = ""; 
   //加锁进行差量更新    
   if (fetchRegistryUpdateLock.tryLock())
   {        
   try 
   {            updateDelta(delta);
   reconcileHashCode=getApplications().getReconcileHashCode(); 
   } 
   finally {  
   fetchRegistryUpdateLock.unlock();
   }    
   } 
   else { logger.warn("Cannotacquireupdatelock,abortingudateDeltaoperationoffetchAndStoreDelta"); 
   }
//Thereisadiffinnumberofinstancesforsomereason
    if(!reconcileHashCode.equals(delta.getAppsHashCode()))
    {        deltaMismatches++;
    returnreconcileAndLogDifference(delta,reconcileHashCode); 
    } 
    else {
    deltaSuccesses++;
    }
    return delta != null;
    }
  1. 增量数据同步成功后加锁,进行 add、modify、 delete 等操作,url=”apps/delta”;

  2. 运用 updateDelta 更新数据后,运用 reconcileHashCode (依据 Client 和 Server 的全量 Applications 核算获得) 校验是否增量更新成功,reconcileHashCode 格局:UP_count1_DOWN_count2_STARTING_count3;

  3. 假如校验的 reconcileHashCode 不共同,再建议一次全量同步动作;

4.4.2 Server

privateConcurrentLinkedQueue<RecentlyChangedItem>
recentlyChangedQueue=newConcurrentLinkedQueue<RecentlyChangedItem>();@Deprecatedpublic Applications getApplicationDeltas() 
{    
        //从ecentlyChangedQueue获取增量同步的数据
Iterator<RecentlyChangedItem>iter=this.recentlyChangedQueue.iterator();
logger.debug("Thenumberofelementsinthedeltaqueueis:{}",  
this.recentlyChangedQueue.size()); 
while (iter.hasNext()) 
{
Lease<InstanceInfo>lease=iter.next().getLeaseInfo();    
InstanceInfo instanceInfo = lease.getHolder();        logger.debug(
"Theinstanceid{}isfoundwithstatus{}andactiontype{}",
instanceInfo.getId(),instanceInfo.getStatus().name(),instanceInfo.getActionType().name());Applicationapp=applicationInstancesMap.get(instaceInfo.getAppName());
if (app == null) 
{
app=newApplication(instanceInfo.getAppName());
applicationInstancesMap.put(instanceInfo.getAppName(),app);   
apps.addApplication(app);        
}
app.addInstance(newInstanceInfo(decorateInstanceInfo(lease)));
}    
//核算本地全量数据的 hashcode    
apps.setAppsHashCode(allApps.getReconcileHashCode());
}
  1. 从 recentlyChangedQueue 行列中获取增量数据,依据办法的注释,recentlyChangedQueue 中寄存的是 getRetentionTimeInMSInDeltaQueue 时刻内 (默许 180s) 的 Client 注册信息;

  2. Client 建议 Delta 增量同步时,前后两次恳求或许获取到相同的 Delta Apps 信息,Client 需求兼容这种状况;

  3. Eureka Server 收到 Register、Cancel、StatusUp、Expirations 等操作时,会更新 recentlyChangedQueue 中的信息;

  4. 设置守时使命 (30s 运转一次) 整理行列中的过期数据 (180s)

4.5 点评

  1. Client 30s 向服务端获取一次数据,Service 改变生效时刻较长;

  2. 运用 recentlyChangedQueue 保存 180s 数据改变的办法进行增量同步,假如数据量大行列简单爆破;

  3. 假如 reconcileHashCode 在增量同步的时分核算不共同,建议全量同步,假如全量同步的次数太多,简单有功能瓶颈;

  4. reconcileHashCode 格局:UP_count1_DOWN_count2_STARTING_count3,仅仅保证 UP/DOWN 数量相等,无法保证数据是终究共同性。

PART. 5 Nacos 数据共同性

5.1 Nacos 数据读写流程

SOFARegistry | 聊一聊服务发现的数据一致性

  1. Nacos 运用的是单节点全量存储数据,Client 与单个 Nacos 节点进行服务的发布和订阅;

  2. 每个 Server 中有一个恳求处理的前置 Filter,依据 Server 列表的 Hash 分片,核算 Pub 数据归属于哪台 Nacos-Server,然后进行恳求转发;

  3. Nacos-1 中调用本地的 Register 办法,将服务信息存储到本地内存的服务注册列表,然后给 Client 回来成功;

  4. Nacos-1 依据 Distro 协议,将 Pub Register 恳求同步给全集群的 Nacos Server;

  5. Sub Client 连接到 Nacos-3 进行服务数据订阅,Nacos-3 将本地数据进行回来。

5.2 发动全量拉取

  1. 新加入的 Distro 节点会进行全量数据拉取,具体操作是轮询一切的 Distro 节点,经过向其它的机器发送恳求拉取全量数据;

  2. Nacos v1 依据 HTTP 协议进行通讯,v2 依据 gRPC 协议进行通讯;

  3. 发动期间需求向全量的 Distro 机器都建议全量拉取:

  • 关于新的机器,从处理读恳求的角度看,能够只拉取 1 台Distro 的机器数据,即便获取的部分数据是比较旧的,也仅仅与拉取的 Target Server 供给了相同的数据服务;

  • 从处理写恳求的角度看,只要从全量的机器拉取,才干保证本机器担任的 Hash 分片的数据最新,所以需求向一切的 Distro Server 做数据同步,保证本机担任的 Hash 分片的数据最新;

  • 在全量拉取操作完结之后,Nacos 的每台机器上都维护了当时的一切注册上来的非持久化实例数据,开始供给服务。

5.3 数据改变增量仿制

  • 关于 add、change、delete,在 Nacos-1 履行后,将数据改变与 action 播送到全集群的 Distro 服务器;
    有了上述两个机制之后,也不能彻底保证 Distro 服务器之间的数据彻底是相同的,例如存在 Notify 失利等场景。因而还需求有一个守时校验机制,比对全集群的 Server 之间的数据共同性,并进行修复。

5.4 v1 版别节点数据Verify

  • Nacos-1 每隔 5s 履行一次守时使命,核算本节点数据的 digest 摘要;

  • Verify 校验时,将本地的一切 Service,依据 Hash 规矩匹配本节点担任的 Service,并核算对应的 CheckSum,然后组装成恳求参数:Map<string< ne-text=””>,String> keyCheckSums 。

  • CheckSum 的核算规矩如下:

public synchronized void recalculateChecksum() {
List<Instance> ips = allIPs();
    StringBuilder ipsString = new StringBuilder(); 
    String serviceString = getServiceString();   
    ipsString.append(serviceString);
    for (Instance ip : ips) {   
    String string = ip.getIp() + ":" + ip.getPort() + "_" + ip.getWeight() + "_" + ip.isHealthy() + "_" + ip   
    .getClusterName();     
    ipsString.append(string); 
    ipsString.append(',');    }
    checksum = MD5Utils.md5Hex(ipsString.toString(), Constants.ENCODE);
    }
  • Nacos-2 Server 端收到 Verify 恳求后,将数据分红 3 种场景:不需求处理的、需求更新的、需求删去的;

  • 关于需求删去的 Service 数据,直接在内存中删去;

  • 关于需求更新的 Service,调用 Nacos-1 进行 Server 的全量数据获取,然后更新本地的数据。

// 关于有差异的 service 进行全量数据同步@Overridepublic DistroData getData(DistroKey key, String targetServer) 
{    
try {  
List<String> toUpdateKeys = null;    
if (key instanceof DistroHttpCombinedKey) {  
toUpdateKeys = ((DistroHttpCombinedKey) key).getActualResourceTypes(); 
} else {       
toUpdateKeys = new ArrayList<>(1);  
toUpdateKeys.add(key.getResourceKey());      
}      
byte[] queriedData = NamingProxy.getData(toUpdateKeys, key.getTargetServer()); 
return new DistroData(key, queriedData);
} catch (Exception e)
{        
throw new DistroException(String.format("Get data from %s failed.", key.getTargetServer()), e);    }
}

示意图:

  • 假定现在有 2 个节点,Nacos-A 是 A_SERVICE_XXX 服务的 Leader 节点,Nacos-B 是 B_SERVICE_XXX 服务的 Leader 节点;

  • Nacos-A 发送 CheckSum 恳求时,将自己作为 Leader 的 A_SERVICE_XXX 分别核算 md5code;

  • md5code 生成规矩:ip.getIp() + “:” + ip.getPort() + “” + ip.getWeight() + “” + ip.isHealthy() + “_” + ip.getClusterName();

  • 在 Nacos-B 中核算出有差异的 A_SERVICE_XXX,关于需求 Update 的从 Nacos-A 中进行全量数据拉取;关于需求 Remove 的从内存中删去。

5.5 v2 版别 Verify

  • 区别于 v1 版别的完结,v2 中以 ClientId 维度进行 CheckSum;

  • Nacos-1 关于本节点的一切 ClientId,每个 ClientId都包装成一个 Task 使命,运用 gRPC 发送给一切的 Distro 节点;

@Overridepublic List<DistroData> getVerifyData() {
List<DistroData> result = new LinkedList<>(); 
for (String each : clientManager.allClientId()) 
{
Client client = clientManager.getClient(each);  
if (null == client || !client.isEphemeral()) 
{           
continue;    
}       
if (clientManager.isResponsibleClient(client)) {   
// TODO add revision for client.   
DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(), 0);   
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);   
DistroData data = new DistroData(distroKey,                                             ApplicationUtils.getBean(Serializer.class).serialize(verifyData));            data.setType(DataOperation.VERIFY);     
result.add(data);   
}   
}   
return result;
}
  • 每个 ClientId 发送的校验 Version=1,Version 作为保留的扩展特性;

  • 接纳 Verify 恳求的节点从恳求参数中获取 ClientId,并查看自身是否有这个 Client,若此 Client 存在,则更新 Client 下的一切 Instance、以及 Client 自身的最新活泼时刻为当时时刻。

5.6 小结

1. V1 Distro 终究数据共同性:

  • 核算每个 Service 的 CheckSum 时,运用的是 ip.getIp() + “:” + ip.getPort() + “” + ip.getWeight() + “” + ip.isHealthy() + “_” + ip.getClusterName() 进行 CheckSum 核算;

  • 关于需求更新的数据,向原节点全量拉取 Service 的数据;能够考虑优化成差量拉取。

2. V2 Distro 终究共同性: 每个节点以 ClientId 为维度进行集群播送,以 ClientId,Version=0 进行数据校验。

PART. 6 SOFARegistry

6.1 Registry 数据读写

SOFARegistry | 聊一聊服务发现的数据一致性

  1. Client 建议服务注册数据 Publisher 给 SessionServer,SessionServer 接纳成功;

  2. SessionServer 接纳到 Publisher 数据后,首先写入内存 (Client 发送过来的 Publisher 数据,SessionServer 都会存储到内存,用于后续能够跟 DataServer 做定时查看) ,然后将 Publisher 数据发送给 DataServer,DataServer收到 Session 的 Pub 之后,修正 Datum 的版别号;

  3. DataServer 先对 Notify 的恳求做 merge 操作 (等待 1000ms) ,然后将数据的改变事情告诉给一切 SessionServer (事情内容是 ID和版别号信息和版别号信息: 和 )

  4. SessionServer 接纳到改变事情告诉后,比照 SessionServer 内存中存储的 DataInfoId 的 Version,发现比 DataServer 发过来的小,所以自意向 DataServer 获取DataInfoId 的数据,即获取具体的 Publisher 列表数据,获取数据成功后,创立 pushTask;

  5. SessionServer 检测 pushTask 是否达到履行时刻 (T2+500MS) ,关于达到履行时刻的 pushTask,从行列中取出 Task,开始进行推送;

  6. SessionServer 将数据推送给相应的 Client、Client Callback、SeesionServer 收到 ACK。

6.2 v6 秒级数据共同性

详见 www.sofastack.tech/projects/so… 本文不再重复描述。

6.3 多机房数据共同性

在 6.2的同机房 Data-Leader 与 Data-Follower 数据同步的计划下,能够将这个计划进一步扩展到多机房之间的数据同步:

SOFARegistry | 聊一聊服务发现的数据一致性

Meta 跨机房同步 SlotTable:

  1. 数据:本机房 SlotTable 数据;

  2. 通讯:全量轮询;

  3. DataCenterB Meta Leader 守时拉取到 DataCenterA 集群的 SlotTable 数据更新后,保存到本地 Meta Leader 内存中,然后告诉给 DataCenterB 集群的Data 和 Session。

Data 跨机房同步 SlotData:

  1. 数据:每台 Data 同步自身 Slot Leader 的数据;

  2. 通讯:增量告诉+全量 DataInfoId 守时比对拉取;

  3. Data-A1 和 Data-B2 从 Meta 获取到完好的 SlotTable 数据后,能够解析到自己是 SlotId=1 的 Leader 节点,需求进行数据同步;

  4. 当 Data-B2 中收到本机房 Session 的 Pub、ubPub、Client_off 恳求后,完结本机房 Datum 数据处理;然后将 Datum.Version 告诉给本机房 Session,一起将具体的 Pub、ubPub、Client_off 恳求发送给 Data-A1;

  5. Data-A1守时将 SlotId=1 的摘要数据发送给 Data-B2,将 SlotId=2 的摘要数据发送给 Data-B3,回来有差异的 DataInfoId 列表;再将差异 DataInfoId 进行性细的 Pub 摘要比照,保证数据终究共同;

  6. Data-A1 将改变的 DataInfoId 以及 Datum Version 告诉给本集群一切的 Session,将 DataCenterB 的数据改变推送给 DataCenterA 的一切 Client。

PART. 7 总结

最终咱们对 SOFARegistry 和其它开源产品进行总结比照:

SOFARegistry | 聊一聊服务发现的数据一致性

了解更多…

SOFARegistry Star 一下✨:
github.com/sofastack/s…

本周引荐阅读

SOFARegistry | 大规模集群优化实践

SOFARegistry 源码|数据同步模块解析

SOFARegistry 源码|数据分片之中心-路由表 SlotTable 分析

SOFAServerless 体系助力事务极速研制