同程艺龙作为对 raft 研究较早的公司,早在14年算法的 paper 刚发布的时分,便已经对其进行了调研,一同也与 paxos 、zab 等算法进行了详细的对比,并在公司内部的计数器、使命调度元信息存储等场景进行试点。不过前期关于 raft 的测验较多的是在 cs++ 技能栈试水,在 java 技能栈里却很罕见涉及。近期刚好有依据 etcd 的老项目因为需求自界说的数据结构需求重构,原有的 etcd 无法在底层数据结构层面满足需求,因而决定选用 java 技能栈结合开源项目 sofa-jraft 进行底层数据存储的开发,以期处理多节点数据强共同的问题。本文假设读者对 raft 及强共同的概念已经有了较深的了解,详细介绍了公司内部怎么运用 jraft 的进行老体系的改造以及运用过程中遇到的工程问题,期望其他对 raft 有兴趣的同学能够一同讨论。

一、背景

公司内部本来存在一个体系 mdb (metadata database),go 言语编写,用于管理一切的实例元数据信息,元数据的内容便是一个 map。该组件供给对元数据增修改查的接口,而且运用 go 言语编写,在检索数据时引入了 k8s selector 的包,运用 k8s selector 的解析规则挑选特定标签的元数据信息。数据耐久化则是实用了强共同组件 etcd 进行存储,key 为元数据的 id,保证仅有,value 为详细的元信息,包括各个标签以及对应的值。

该体系大体架构如图-1所示:

sofa-jraft在同程旅游中的实践
图-1:原来的架构

该架构的弊端:

  1. 每隔一段时刻需求去拉取 etcd 的全量数据,忧虑单次恳求数据量太大的状况,对数据id进行了 hash 分片,顺次获取每个分片下个 key,再经过 key 去获取 value,导致 etcd 的查询频率十分高
  2. 非 id 查询条件的共同性查询,和上面的问题相同,也需求做拉取全量数据的操作
  3. 更新删去操作也是相同,需求拉取全量数据再操作

剖析以上问题能够发现,运用 etcd 作为强共同存储,但 etcd 是依据 KV 存储的组件,而且解析组件 mdb 和 etcd 是别离的,在需求保证数据最新的状况下,必须得去 etcd 拿最新的数据到本地再进行操作。而 etcd 依据 KV,就得拿到 etcd 的全量数据都给拉到本地后再做操作。

假如有一个组件,供给强共同的存储才能,又能直接去解析 k8s selector 的规则,存储的数据结构和元数据信息更加亲和,那么中心的那一层 mdb 就能够直接去掉了,由这个组件来解析对应的 crud 规则,将解析往后的规则直接在本地查询,那么以上问题就能够直接处理了。

二、改造

依据以上问题,咱们预备自己开发一个强共同存储的组件,能够自己解析 k8s selector 的规则,而且将数据保存在自己本地。因为个人对 java 言语比较了解,而且之前运用 nacos 时,对 sofa-jraft 也有必定了解,终究挑选了 sofa-jraft 来构建强共同存储组件,将它命名为 mdb-store。

首要改造点:
  1. 运用 sofa-jraft 编程模型构建事务状况机,事务状况机中依据 raft log 中的 data 类型,进行 crud 的操作。
  2. mdb-store 供给与原来 mdb 相同的 api,让改造对用户通明,改造完结后只需求切换域名对应的实例即可。
  3. 迁移 k8s selector 的解析逻辑,这儿用 java 写了一套和 go 版别 k8s selector 相同解析逻辑的组件 k8s-selector-java,目前该组件已开源 k8s-selector-java

改造往后的架构如图-2所示:

sofa-jraft在同程旅游中的实践

图-2:重构后的架构

经过改造往后,将 mdb 移除,让用户直接和 mdb-store 进行通讯,中心的通讯少了一跳,加快了访问功率。将 mdb 和 etcd 也进行了移除,减少了整个体系的组件,下降运维本钱。

三、sofa-jraft的详细运用

3.1 将写操作转化成 raft log

在 sofa-jraft 中,编程模型和一般的 Spring MVC 的编程形式不太相同。在 Spring MVC 中,一个恳求到达了后端,一般会经过 Controller -> Service -> Processor 这么几层。Controller 担任本次 http 恳求的资源映射, 再由 Controller 去调用特定的 Service 办法,在 Service 层中,对参数进行一些处理和转化,再交由 Processor 层去对恳求做真实的处理。

大体逻辑如图-3所示

sofa-jraft在同程旅游中的实践

图-3:一般的编程模型

在 sofa-jraft 中,一切的写操作都要经过状况机去履行(读操作不需求经过状况机)。需求将写操作转化成 task,状况机去运用 task 然后再进行事务处理。task 中包括两个特点是需求重视的,一个是 done,一个是 data。done 便是本次 task 被状况机处理完结后的回调,比方在 done 的回调逻辑中,将 response flush 到客户端。data 便是 raft log 中的详细数据,比方要履行一条插入元数据的指令。data 就会包括本次操作的类型(插入),以及本次操作的详细数据。

java public class Task implements Serializable { private ByteBuffer data = LogEntry.EMPTY_DATA; private Closure done; /// 省略部分代码 }

大体逻辑如图-4所示

sofa-jraft在同程旅游中的实践

图-4:sofa-jraft的编程模型

一切的操作都笼统成一个实体 Operation,在 Service 层,就依据事务把参数转化成不同的 Operation,然后再将 Operation 序列化,转化成 Task 实体,再由 Node 将 task 提交。这儿能够将 task 看成是一个 raft log,一旦 raft log 被对折的机器给提交。状况机就会运用 raft log,运用完结之后就会触发 done 中的回调。

class Operation<T> {
  	//操作类型,比方增修改查
    int type;
		//操作的哪个表,某些类型不需求此字段
    String table;
    //详细的操作数据,依据type的不同,数据类型也会不同
    T params;
}
  • 构建task并经过node提交给状况机
final Task task = new Task();
//界说回调的逻辑,当该 raft log 被状况机运用后,会进行回调
task.setDone(new StoreClosure(operation, status -> {
    StoreStatus storageStatus = (StoreStatus) status;
    closure.setThrowable(storageStatus.getThrowable());
    closure.setResponse(storageStatus.getResponse());
    closure.run(storageStatus);
}));
//将operation进行序列化,在状况机中会将该值反序列化复原,再交给processor处理
task.setData(ByteBuffer.wrap(serializer.serialize(operation)));
node.apply(task);
3.2 状况机的完成
  • onApply

    onApply 是状况机的中心功能,其目的便是接收入参中的 raft log 以及 done,然后将 raft log 中的数据反序列化,交由自己的事务处理器去进行处理。处理完结之后,触发 done 的回调,这儿就和 Node.apply(task) 关联上了。

                while (iter.hasNext()) {
                    Status status = Status.OK();
                    try {
                        if (iter.done() != null) {
                        		// 阐明当时状况机是 Leader,能够直接从 closure 中获取操作数据
                            closure = (MdbStoreClosure) iter.done();
                            operation = closure.getOperation();
                        } else {
                            // 当时状况机不是 Leader,经过对 raft log 中的数据进行反序列化操作,复原操作数据
                            ByteBuffer data = iter.getData();
                            operation = serializer.deserialize(data.array(), Operation.class);
                        }
                      	//事务处理器进行事务处理,事务处理器中会判别operation的类型,挑选不同的处理逻辑
                        OperationResponse result = mdbStoreProcessor.processOperation(operation);
                        //将result序列化
                        GrpcResponse response = GrpcResponse.newBuilder().setSuccess(true)
                                .setData(ByteString.copyFrom(serializer.serialize(result))).build();
                        Optional.ofNullable(closure)
                                .ifPresent(closure1 -> closure1.setResponse(response));
                    } catch (Throwable e) {
                        status.setError(RaftError.UNKNOWN, e.toString());
                        Optional.ofNullable(closure).ifPresent(closure1 -> closure1.setThrowable(e));
                        throw e;
                    } finally {
                        //对task中的done进行回调
                        Optional.ofNullable(closure).ifPresent(closure1 -> closure1.run(status));
                    }
                  	//将raft log的消费方位+1,表明当时这个raft log已经被成功运用了
                    iter.next();
                }
    
  • onSnapshotSave

初始化 sofa-jraft node 时,存在一个参数,NodeOptions#snapshotUri。该参数设置后就会敞开snapshot 机制,一般是推荐设置。敞开完结之后,每隔30min就会进行一次 snapshot(这儿需求留意的是,30分钟内有raft log提交时,才会进行snapshot)。在进行 snapshot 的时分,需求把当时的数据进行耐久化操作。在 snapshot 完结后,就会将 snapshot 中终究一条 raft log 之前的 raft-loig全部删去。其含义便是避免 raft log 一直添加,导致磁盘占用飙高。

snapshot 机制能够这么去了解,在 sofa-jraft 中,事务 processor 中的操作都是状况机驱动的,而状况机又是由 raft log 驱动。那么 processor 中数据的终究形状其实便是一切的 raft log 运用的总和。比方存在一个 raft log,其事务含义是 i++。10条 raft log 被状况机运用后,驱动 processor 进行10次i++操作,终究的值便是为10。运用就算溃散重启后,重启时,他会去运用之前的10条i++的raft log,processor 中的值也还是10。运用 snapshot 机制,在进行 snapshot 时,把 processor 中的10进行耐久化,耐久化完结往后,将前10条 raft log 进行删去,后续再来2条 i++的 raft log,processor的值变为12,存在2条i++的 raft log。运用就算溃散重启,那么它首要也会读取 snapshot 中的数据10,再去运用2条i++的 raft log,终究数据也是12,和溃散之前保持共同。

Processor的终究态 = snapshot + raft log

				MdbStoreStoreSnapshotFileImpl mdbStoreStoreSnapshotFile = (MdbStoreStoreSnapshotFileImpl) snapshotFile;
        String tempPath = snapshotPath + "_temp";
        File tempFile = new File(tempPath);
        FileUtils.deleteDirectory(tempFile);
        FileUtils.forceMkdir(tempFile);
				//记录总共的table数量
        mdbStoreStoreSnapshotFile
                .writeToFile(tempPath, "tailIndex", new TailIndex(persistData.size()));
				//将每一个table中的数据都进行耐久化
        for (int i = 0; i < persistData.size(); i++) {
            mdbStoreStoreSnapshotFile.writeToFile(tempPath, combineDataFile(i),
                    new TablePersistence(persistData.get(i)));
        }
        File destinationPath = new File(snapshotPath);
        FileUtils.deleteDirectory(destinationPath);
        FileUtils.moveDirectory(tempFile, destinationPath);
  • onSnapshotLoad

    onSnapshotLoad 的几个触发场景。

    1. 当一个节点从头发动时。

    2. 当 Follower 中的 commit-index 都小于 Leader 中 snapshot 的终究一条 raft log 时(Follower太落后了,Follower 需求的 raft log 已经被 Leader 的 snapshot 机制删去了)

    onSnapshotLoad 和上面的 onSnapshotSave 是成对的,这儿只需求把之前保存的文件中的内存读取,然后再进行反序列化,添加到 processor 中的数据容器即可。

    				MdbStoreStoreSnapshotFileImpl mdbStoreStoreSnapshotFile = (MdbStoreStoreSnapshotFileImpl) snapshotFile;
    				//读取总共的文件数
            TailIndex tailIndex = mdbStoreStoreSnapshotFile
                    .readFromFile(snapshotPath, TAIL_INDEX, TailIndex.class);
            int size = tailIndex.data();
            for (int i = 0; i < size; i++) 
              	//挨个读取文件,将文件内容进行反序列化
                TablePersistence tablePersistence = mdbStoreStoreSnapshotFile
                        .readFromFile(snapshotPath, combineDataFile(i), TablePersistence.class);
                TableDataDTO data = tablePersistence.data();
                Table table = new Table(data.getName(), new HashSet<>(data.getIndexNames()),
                        data.getRetryCount());
                for (Record dataData : data.getDatas()) {
                    table.addRecord(dataData);
                }
              	//将数据丢给 processor 中的数据容器
                dataComponent.putData(table.getName(), table);
            }
    
  • 状况机的其他状况变更的办法

    一般来说,节点的状况是不会产生变化的,一旦产生变化,就需求去剖析运用的状况了,调查节点是否正常。StateMachine 供给了状况回调的接口,咱们在回调中对接内部的监控体系,当状况机的节点状况产生变化时,会实时告诉到保护人员,保护人员再对运用进行剖析排查。

3.3 运用 read-index read 进行读操作

依照 raft 论文正常来说,读写操作都只能由 Leader 进行处理,这样能够保证读取的数据都是共同的。这样的话,读恳求的吞吐就没办法添加。关于这个 case,sofa-jraft 供给了 read-index read,能够在 Follower 中进行读取操作,而且能保证在 Follower 中读的成果和在 Leader 中读的成果共同。关于read-index read 能够参阅 pingcap 的这篇博客 pingcap.com/zh/blog/lea…

com.alipay.sofa.jraft.Node#readIndex(final byte[] requestContext, final ReadIndexClosure done) ,第一个参数是建议 read-index read时的上下文,能够在回调中运用。第二个参数便是详细的回调逻辑,需求在 run 办法中完成读取逻辑。

  • read-index read 编程模型
        CompletableFuture future = new CompletableFuture<>();
				node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
                @Override
                public void run(Status status, long index, byte[] reqCtx) {
                  	//状况ok,阐明能够经过 read-index 去进行读取
                    if (status.isOk()) {
                        try {
                            //直接运用 processor 查询数据,不经过状况机
                            OperationResponse<T> res = (OperationResponse<T>) mdbStoreProcessor
                                    .processOperation(operation);
                            future.complete(res);
                        } catch (Throwable t) {
                            future.completeExceptionally(
                                    new IllegalStateException("Fail to read data from processor",
                                            t));
                        }
                    } else {
                      	//状况不ok,可能是超时,也可能是状况机反常等其他原因
                        if (Operation.ALL_DATA == operation.getType()) {
                          	//这儿判别是不是读取全量的数据,读取全量数据的话,需求快速失利,不能转到 leader 走 raft log读取,														//原因见 4.3
                            future.completeExceptionally(new IllegalStateException(
                                    "Fail to get all data by read-index read, status: " + status
                                            .getErrorMsg()));
                        } else {
                          	//经过将本次恳求转发到 Leader 中,走 raft log,在 Leader 的状况机中把本条 raft log 运用后,再														 //回来数据给 Follower
                            LOGGER.warn("ReadIndex read failed, status: {}, go to Leader read.",
                                    status.getErrorMsg());
                            readFromLeader(operation, future);
                        }
                    }
                }
            }
        Object o = future.get(5_000L, TimeUnit.MILLISECONDS);
        if (o instanceof GrpcResponse) {
            //回来类型的GrpcResponse,阐明本次恳求是经过 raft log 转到 Leader 处理并回来的,需求将数据反序列化
            return serializer
                    .deserialize(((GrpcResponse) o).getData().toByteArray(), OperationResponse.class);
        } else {
          	//直接在本地经过 read-index read 读本地内存
            return (OperationResponse<T>) o;
        }
##### 3.4 Follower 恳求转发
在 sofa-jraft 中,一切的写恳求都只能由 Leader 节点进行处理,当写恳求落到了 Follower 中,有两种方法进行处理。
1. 直接拒绝该恳求,并将 Leader 节点的地址回来给客户端,让客户端从头建议恳求。
2. 将当时恳求 hold 在服务端,并将该恳求转发到 Leader 节点,Leader 节点处理完结后,将 response 回来给 Follower,Follower 再将之前 hold 住的恳求回来给客户端。
这儿运用第一种时,需求客户端也进行呼应的改造,为了对客户端通明,咱们挑选了第二种,经过转发的方法,将恳求转给 Leader
在 sofa-jraft 中,各个节点需求经过 Rpc 来进行通讯,比方发送心跳,投票等。sofa-jraft 默许供给了两种通讯方法,一种是 sofa-bolt,还有一种是 grpc,考虑到组件的流行性,挑选了grpc来作为通讯方法。在构建 server 时,运用 GrpcRaftRpcFactory 在创立 RpcServer 。然后将 sofa-jraft 中自带的处理器(心跳处理器,投票处理器等)注册到 RpcServer中。这些处理器都是完成了 RpcProcessor 接口,该接口的 handleRequest 办法会处理收到的恳求。
*运用 GrpcRaftRpcFactory 需求留意的是,需求引入依靠*
```java
<dependency>
    <groupId>com.alipay.sofa</groupId>
    <artifactId>rpc-grpc-impl</artifactId>
    <version>${jraft.grpc.version}</version>
</dependency>

而且需求经过 spi 指定运用 GrpcRaftRpcFactory。文件路径 /resources/META-INF.services/com.alipay.sofa.jraft.rpc.RaftRpcFactory,文件内容 com.alipay.sofa.jraft.rpc.impl.GrpcRaftRpcFactory

这儿,能够界说一个自己的处理器,完成 RpcProcessor 接口,将该 Processor 也注册到 RpcServer 中,复用同一个 RpcServer。

  • 创立 RpcServer 并注册处理器
				//获取GrpcRaftRpcFactory
				GrpcRaftRpcFactory raftRpcFactory = (GrpcRaftRpcFactory) RpcFactoryHelper.rpcFactory();
				//GrpcRequest 是自己的 Processor 通讯运用,这儿运用 proto 去生成 GrpcRequest 和 GrpcResponse
        raftRpcFactory.registerProtobufSerializer(GrpcRequest.class.getName(),
                GrpcRequest.getDefaultInstance());
        raftRpcFactory.registerProtobufSerializer(GrpcResponse.class.getName(),
                GrpcResponse.getDefaultInstance());
        MarshallerRegistry registry = raftRpcFactory.getMarshallerRegistry();
 				//注册GrpcRequest 对应的 response 的默许目标
        registry.registerResponseInstance(GrpcRequest.class.getName(),
                GrpcResponse.getDefaultInstance());
        //创立 GrpcServer
        final RpcServer rpcServer = raftRpcFactory.createRpcServer(peerId.getEndpoint());
 				//注册sofa-jraft中自带的处理器
        RaftRpcServerFactory.addRaftRequestProcessors(rpcServer, RaftExecutor.getRaftCoreExecutor(),
                RaftExecutor.getRaftCliServiceExecutor());
				//注册自己事务的处理器
        rpcServer.registerProcessor(new GrpcRequestProcessor(server));
        return rpcServer;
  • proto file
syntax = "proto3";
option java_multiple_files = true;
package com.xxx.mdb.store.raft.entity;
message GrpcRequest {
  //这儿的data保存的便是 Operation 序列化往后的二进制流
  bytes data =1;
}
message GrpcResponse {
  //这儿的data保存的是事务 Processor 处理完 Operation往后,而且经过序列化后的二进制流
  bytes data = 1;
  //反常信息
  string errMsg = 2;
  //标志位,恳求是否ok
  bool success = 3;
}
  • 自己的处理器,用于接收 Follower 过来的转发恳求
				//假如当时节点不是 Leader,不进行处理
        if (!jRaftServer.getNode().isLeader()) {
            return;
        }
        //界说 done,状况机运用 raft log 后,会回调这个done
        FailoverClosure done = new FailoverClosure() {
            GrpcResponse data;
            Throwable ex;
            @Override
            public void setResponse(GrpcResponse data) {
              	//Follwer在状况机中履行成功后,会将 result 封装成 GrpcResponse,然后在这儿设置
                this.data = data;
            }
            @Override
            public void setThrowable(Throwable throwable) {
                //在反常时,会进行调用
                this.ex = throwable;
            }
            @Override
            public void run(Status status) {
                if (Objects.nonNull(ex)) {
                    LOGGER.error("execute has error", ex);
                  	//ex不为null,阐明产生了反常,将反常回来给 Follower
                    rpcCtx.sendResponse(
                            GrpcResponse.newBuilder().setErrMsg(ex.toString()).setSuccess(false)
                                    .build());
                } else {
                  	//将恳求回来 Follower
                    rpcCtx.sendResponse(data);
                }
            }
        };
        //将从 Follower 过来的恳求提交给状况机,在内部会把 request 的 data 字段给反序列化为 Operation
        jRaftServer.applyOperation(jRaftServer.getNode(), request, done);
  • Follower 中的转发逻辑
					try {
          	//将 operation 序列化成byte数组,然后构建GrpcRequest.
            GrpcRequest request = GrpcRequest.newBuilder()
                    .setData(ByteString.copyFrom(serializer.serialize(operation))).build();
            //从缓存获取当时 Leader 节点的地址,假如 Leader 为空,抛出反常。这儿的 Leader 需求动态改写,每隔5秒中就去改写一次 								//Leader,保证 Leader 是最新的。能够经过 RouteTable#refreshLeader 去守时改写。
            final Endpoint leaderIp = Optional.ofNullable(getLeader())
                    .orElseThrow(() -> new IllegalStateException("Not find leader")).getEndpoint();
          	//经过 grpc 将恳求发送给自己的处理器
            cliClientService.getRpcClient().invokeAsync(leaderIp, request, new InvokeCallback() {
                @Override
                public void complete(Object o, Throwable ex) {
                    if (Objects.nonNull(ex)) {
                      	//存在反常,将反常进行回调
                        closure.setThrowable(ex);
                      	//进行 fail 的回调,回调中会将 exception 回来给客户端
                        closure.run(new Status(RaftError.UNKNOWN, ex.getMessage()));
                        return;
                    }
                  	//将 grpc response 设置给回调类
                    closure.setResponse((GrpcResponse) o);
                    //进行 success 的回调,回调中会将数据回来给客户端
                    closure.run(Status.OK());
                }
                @Override
                public Executor executor() {
                    return RaftExecutor.getRaftCliServiceExecutor();
                }
            }, timeoutMillis);
        } catch (Exception e) {
            closure.setThrowable(e);
            closure.run(new Status(RaftError.UNKNOWN, e.toString()));
        }

四、sofa-jraft 的一些实践

4.1 read-index read 回来数据量过大导致 oom

在咱们的事务场景中,有一个获取全量数据的接口,而且是经过 read-index read 去进行读数据的。在对这个接口进行压测时,会发现 CPU 飙高的状况,经过排查,是因为堆内存占满了,GC线程一直在 work 导致的。经过 dump 堆内存后发现,是因为内部运用 Disruptor 导致的问题,该问题目前已被咱们修复,而且也已反馈给社区,在1.3.8版别中进行了处理。详细问题见 issue#618

4.2 read-index read 呼应时刻较长

在测验同学进行压测,发现读取接口的最大耗时偶然会跑到 500ms,均匀呼应耗时大约在100ms左右。经过反复排查以及阅览代码,终究发现这个问题和 election timeout 有关。在 sofa-jraft 中,election timeout 便是选举超时的时刻,一旦超过了 election timeout,Follwer 还没有收到 Leader 的心跳,Follower 认为当时集群中没有 Leader,自己建议投票来测验中选 Leader。正常状况下,Leader 给 Follower 发心跳的频率是 election timeout / 10,也便是说在 election timeout 期间内,Leader 会给 Follower 发10次心跳,Follower 10次都没有收到心跳的状况下,才会产生选举。而恰巧的是,我设置的 election timeout 刚好便是 5s,5s / 10 刚好便是 500ms。

所以进一步剖析 read-index read 的机制,当 Follower 运用 read-index read 时,首要要去 Leader 获取 Leader 当时的 commit index,然后需求等候 Follower 自己的状况机的 apply index 超过从 Leader 那边获取到的 commit index,然后才会进行 read-index read 的回调。而 Follower 的状况机的 apply 操作是经过 Leader 的心跳恳求驱动的,Leader 中能够知道 raft log 是否被对折提交了,一旦某一条 raft log 被对折提交,Leader 鄙人一次的心跳恳求中就会发最新的 commit index 同步给 Follower,Follower 收到新的 commit index 后,才会驱动自己的状况机去 apply raft log。而心跳恳求的频率又是 election timeout / 10,一切会存在 read-index read 偶然的呼应时刻会是 election timeout / 10.

**怎么处理:**依据以上剖析,将 election timeout 的时刻调整为了1s,心跳频率也就变成了 100ms,最大的呼应耗时也就变低了,均匀呼应耗时也下降到了4ms左右。

read-index read 大约逻辑如图-5所示

sofa-jraft在同程旅游中的实践
图-5:read-index read 处理逻辑

4.3 read-index read 大呼应接口失利后转发恳求到 leader 导致状况机堵塞

在一次排查问题的过程中,置疑网络存在问题。所以联络运维同学,运维同学对履行 tcpdump 指令,对网络进行了抓包。整个集群分为3个机房,2+2+1的形式进行布置,1这个节点的网络偶然会存在动摇。在当时履行 tcpdump 往后4分钟,到1这个节点的读恳求就开始产生 read-index timeout 了,而当时的逻辑是,只要 read-index read 回调状况不ok,就将该恳求转发到 Leader,走 raft log 来进行处理。

这儿存在一个接口,是去读一切的数据,数据量比较大。当 read-index read 超时时,会将这个恳求转发到了 Leader 节点,走 raft log 去读数据,走 raft log 就会在状况机中去进行处理,而这个恳求的 response 比较大,导致在获取完数据后,去序列化数据时比较耗时,大约需求耗费1500ms,状况机中处理 raft log 的吞吐就下降了。而且 raft log 是会从 Leader 仿制给 Follower 的,也便是说,Follower 的状况机也会去履行这个耗时 1500 ms的 raft log,仅仅 Follower 不对 response 做处理罢了。

在上面描绘了 read-index read 的逻辑,Follower 要履行 read-index read,需求状况机的 apply-index 追上 Leader 的 commit index,当产生上述网络动摇时,这个大接口走 raft log 的方法,下降了状况机处理 raft log 的吞吐,导致 Follwer 的 apply index 更难追上 Leader 的 commit index 了。因而陷入了恶性循环,这个大接口共同经过 raft log 转向 Leader 去读取数据,而这个 raft log 处理十分耗时。终究导致状况机的 apply index 远远小于 commit index,一切的客户端的读操作和写操作全部都超时。

**怎么处理:**将这个大接口的读取操作改成快速失利,一旦 read-index read 的回调不成功,不把恳求经过 raft log 转到 Leader 去,直接回来反常给客户端,让客户端重试。

4.4 snapshot 操作时,堵塞状况机运用 raft log,导致呼应超时

体系在压测时,跑着跑着客户端会偶然超时。经过反复排查,发现超时的时刻点和 snapshot 的时刻点重合,依据阅览代码发现,状况机的 apply 操作和 snapshot 操作默许是同步的,而 snapshot 比较耗时,导致了状况机 apply raft log 时刻被延长了,从而客户端恳求超时。

**怎么处理:**在 snapshot 时,将 snapshot 的操作变为异步操作,运用copy on write 把 snapshot 时的内存数据 copy 了一份,再异步进行耐久化处理。这儿需求留意的是,copy on write 会耗费2倍的内存,这儿需求保证不要导致OOM了。不同的场景需求考虑不同的异步 snapshot 的方法。

4.5 raft中存在 raft log 和 snapshot file,需求文件体系保证有状况

sofa-jraft 需求保存 raft log 以及 snapshot file。在容器布置时,需求保证运用运用的 raft 目录是耐久化的。

4.6 敞开 metrics 以及运用 kill -s SIGUSR2 协助问题剖析

在 sofa-jraft 中,存在 node 参数 enableMetrics,是否敞开 metrics 统计指标数据。咱们将它翻开,而且将指标数据输出到一个单独的日志文件,归档的日志能够在剖析问题时供给线索。比方:有时分的读取恳求呼应时刻增大了,就能够经过调查指标数据 read-index 来协助剖析是否是线性读的机制导致恳求呼应飙升。

将指标输出到日志文件:

Node node = ...
NodeOptions nodeOpts =  ...
//翻开监控指标
nodeOpts.setEnableMetrics(true);
node.init(nodeOpts);
Slf4jReporter reporter = Slf4jReporter
         .forRegistry(node.getNodeMetrics().getMetricRegistry())
         //获取到日志的输出目标
         .outputTo(LoggerFactory.getLogger("com.jraft.metrics"))
         .convertRatesTo(TimeUnit.SECONDS)
         .convertDurationsTo(TimeUnit.MILLISECONDS)
         .build();
reporter.start(30, TimeUnit.SECONDS);

除此之外,还能够运用 kill -s SIGUSR2 pid 给 jraft 进程发送信号量,进程收到信号量后,会在进程的发动目录中生成指标数据数据文件。这儿我个人比较重视 node_describe.log 中 log manager 的 diskId 和 appliedId,前者是 raft log 写到磁盘中的方位,后者是状况机当时运用到 raft log 的方位,能够经过对比这两个数据,用来调查状况机的吞吐是否正常一旦两者相差很多,阐明状况机出问题了。

五、后续演进

  • 引入 Learner 节点,添加整个集群的读吞吐量。
  • 继续重视社区,和社区共同发展。

六、结语

以上便是 sofa-jraft 在咱们公司内的运用共享,有问题的小伙伴能够找到我的 github 直接邮箱和我沟通。感谢 sofa-stack 供给的一个如此优秀的 java 框架。 respect!!!

作者
  • 赵延: Github:horizonzy,同程艺龙高级开发,担任服务管理相关作业。重视RPC、服务管理和分布式等领域。

  • 董春明: 同程艺龙架构师,担任服务管理及云原生规划演进相关作业,分布式领域专家,Paper 爱好者。

参阅
  • alibaba nacos 中关于sofa-jraft的运用 nacos
  • jraft-rheakv 中关于sofa-jraft的运用 jraft