etcd的数据生效落库的流程首要是在日志被Leader同步到大多数raft节点,并Apply到应用层数据库的进程。raft状态机重启只会导致日志Entry丢掉,raft重启之后,能够向Leader节点同步进行数据康复的,可是日志Entry业务数据被Apply写库场景相对杂乱,日志Entry既不能漏Apply,也不能重复Apply,因而在crash-safe多的是确保etcd日志Entry在Apply流程上忍受节点宕机的能力。

etcd日志Entry组件Apply流程触及多个组件交互,如:raft状态组件、mvcc、wal组件等等,相关组件的介绍现已在专栏前面的文章由叙说,这儿首要叙说etcd是怎么安排多组件,并在etcd宕机重启后完结数据康复,实现数据一致性的,日志Apply流程如图【图解apply流程】

Etcd crash-safe实现分析

etcd的crash-safe可简单描绘为:运转时多组件耐久化,康复时多组件数据进行操作补偿。可是多个组件怎么交互、怎么耐久化、重启后康复补偿机制才是要害的,因而本文会分成Ready数据产生、写WAL日志、Apply一般日志Entry、重启数据康复等几个阶段介绍etcd apply流程,并介绍etcd重启后确保数据一致的crash-safe要害流程。

产生Ready数据

Ready是raft状态机将能够Apply的日志/snapshot、需求写入到WAL的日志等安排妥当数据传递给上层应用的一个数据目标,咱们只关怀apply日志Entry相关的数据字段,界说如下:

// Ready encapsulates the entries and messages that are ready to read,
// be saved to stable storage, committed or sent to other peers.
// All fields in Ready are read-only.
type Ready struct {
	// The current state of a Node to be saved to stable storage BEFORE
	// Messages are sent.
	// HardState will be equal to empty state if there is no update.
	pb.HardState
	// Entries specifies entries to be saved to stable storage BEFORE
	// Messages are sent.
	Entries []pb.Entry
	// Snapshot specifies the snapshot to be saved to stable storage.
	Snapshot pb.Snapshot
	// CommittedEntries specifies entries to be committed to a
	// store/state-machine. These have previously been committed to stable
	// store.
	CommittedEntries []pb.Entry
	// MustSync indicates whether the HardState and Entries must be synchronously
	// written to disk or if an asynchronous write is permissible.
	MustSync bool
        // ...  
}

Ready目标中与Apply流程有关的要害字段有如下:

**pb.HardState:**raft状态机内部需求耐久化的要害字段,如:Term任期、Vote投票目标、Commit提交的日志index等。

**Entries:**新产生的日志Entry。

**Snapshot:**Leader向Follower同步的snapshot,帮助Follower快速追上Leader的日志进展。

**CommittedEntries:**现已提交的日志Entry,等候应用到状态机中。

**MustSync:**用来告诉上层是否需求调用系统sync API进即将数据及时落盘的标识。

raft状态机产生Ready数据之后,经过Ready channel将Ready数据发出去,并用advancec等候上层处理结果,具体代码如下:

func (n *node) run() {
	var propc chan msgWithResult
	var readyc chan Ready
	var advancec chan struct{}
	var rd Ready
	r := n.rn.raft
	// other code ...
	for {
		if advancec != nil {
			readyc = nil
		} else if n.rn.HasReady() {
			// Populate a Ready. Note that this Ready is not guaranteed to
			// actually be handled. We will arm readyc, but there's no guarantee
			// that we will actually send on it. It's possible that we will
			// service another channel instead, loop around, and then populate
			// the Ready again. We could instead force the previous Ready to be
			// handled first, but it's generally good to emit larger Readys plus
			// it simplifies testing (by emitting less frequently and more
			// predictably).
			rd = n.rn.readyWithoutAccept()
			readyc = n.readyc
		}
		select {
		// other case ...
		case <-n.tickc:
			n.rn.Tick()
		case readyc <- rd:
			n.rn.acceptReady(rd)
			advancec = n.advancec
		case <-advancec:
			n.rn.Advance(rd)
			rd = Ready{}
			advancec = nil
		case <-n.stop:
			close(n.done)
			return
		}
	}
}

落WAL日志

raft状态机讲Ready数据发出之后,应用层会经过监听ready channel获取到Ready数据,相关流程在如下文件中:

https://github.com/etcd-io/etcd/blob/main/server/etcdserver/raft.go

因为etcdserver/raft中raftNode对Ready结构的处理比较要害,这儿奉上于Apply相关的完好代码流程,并将注释加入到代码里边,具体逻辑如下:

func (r *raftNode) start(rh *raftReadyHandler) {
	internalTimeout := time.Second
	go func() {
		defer r.onStop()
		islead := false
		for {
			select {
			case rd := <-r.Ready():
				// 数据首要是在ApplyAll函数中,被apply到数据库中,
				// 构建notifyc channel首要是为了本协程能够和ApplyAll协程进行进展协调
				notifyc := make(chan struct{}, 1)
				ap := apply{
					entries:  rd.CommittedEntries,
					snapshot: rd.Snapshot,
					notifyc:  notifyc,
				}
				// 将apply数据发送给ApplyAll任务协程
				select {
				case r.applyc <- ap:
				case <-r.stopped:
					return
				}
				// 下面两步是耐久化数据:
				// (1)假如ready中包含snapshot数据,就耐久化snapshot,然后将snapshot元数据写入到WAL中,该操作首要在如下函数完结:
				//     func (st *storage) SaveSnap(snap raftpb.Snapshot) error,对应图【图解apply流程】流程的3
				// (2)将HardState和新产生的日志写入到WAL中,并会在最后调用sync将数据即时落盘,对应图【图解apply流程】流程的4,5
				// Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
				// ensure that recovery after a snapshot restore is possible.
				if !raft.IsEmptySnap(rd.Snapshot) {
					if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
						if r.lg != nil {
							r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
						} else {
							plog.Fatalf("failed to save Raft snapshot %v", err)
						}
					}
				}
				if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
					if r.lg != nil {
						r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
					} else {
						plog.Fatalf("failed to save state and entries error: %v", err)
					}
				}
				if !raft.IsEmptyHardState(rd.HardState) {
					proposalsCommitted.Set(float64(rd.HardState.Commit))
				}
				if !raft.IsEmptySnap(rd.Snapshot) {
					// Force WAL to fsync its hard state before Release() releases
					// old data from the WAL. Otherwise could get an error like:
					// panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
					// See https://github.com/etcd-io/etcd/issues/10219 for more details.
					if err := r.storage.Sync(); err != nil {
						if r.lg != nil {
							r.lg.Fatal("failed to sync Raft snapshot", zap.Error(err))
						} else {
							plog.Fatalf("failed to sync Raft snapshot %v", err)
						}
					}
					// 假如snapshot存在的话,etcdserver在ApplyAll函数中首要会applySnapshot
					// applySnapshot 首要会等候notifyc信号,只有等候信号,snapshot落盘之后才会开端apply snapshot流程
					// etcdserver now claim the snapshot has been persisted onto the disk
					notifyc <- struct{}{}
					// 将snapshot保存到storage中到,对应图【图解apply流程】流程的8.2
					r.raftStorage.ApplySnapshot(rd.Snapshot)
					if r.lg != nil {
						r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
					} else {
						plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
					}
					// 整理多余的snapshot
					if err := r.storage.Release(rd.Snapshot); err != nil {
						if r.lg != nil {
							r.lg.Fatal("failed to release Raft wal", zap.Error(err))
						} else {
							plog.Fatalf("failed to release Raft wal %v", err)
						}
					}
				}
				// 将日志Entry保存到Storage中,对应图【图解apply流程】流程的8.1
				r.raftStorage.Append(rd.Entries)
				// 等候ApplyAll流程结束
				// (1)Follower及Candidate除了告诉ApplyAll raft-log、hardState、snapshot悉数落盘
				//     在有EntryConfChange装备的时分需求等候Apply流程结束
				//     首要是避免存在节点被除掉,导致竞选这类影响集群稳定的消息发送到即将被除掉的节点中,
				//     被除掉的节点在apply之后,会整理掉transport中的peer,因而,消息就不会被发送到类似的节点中
				// (2)Leader节点只需求告诉ApplyAll raft-log、hardState、snapshot悉数落盘
				if !islead {
					// finish processing incoming messages before we signal raftdone chan
					msgs := r.processMessages(rd.Messages)
					// now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
					notifyc <- struct{}{}
					// Candidate or follower needs to wait for all pending configuration
					// changes to be applied before sending messages.
					// Otherwise we might incorrectly count votes (e.g. votes from removed members).
					// Also slow machine's follower raft-layer could proceed to become the leader
					// on its own single-node cluster, before apply-layer applies the config change.
					// We simply wait for ALL pending entries to be applied for now.
					// We might improve this later on if it causes unnecessary long blocking issues.
					waitApply := false
					for _, ent := range rd.CommittedEntries {
						if ent.Type == raftpb.EntryConfChange {
							waitApply = true
							break
						}
					}
					if waitApply {
						// blocks until 'applyAll' calls 'applyWait.Trigger'
						// to be in sync with scheduled config-change job
						// (assume notifyc has cap of 1)
						select {
						case notifyc <- struct{}{}:
						case <-r.stopped:
							return
						}
					}
					r.transport.Send(msgs)
				} else {
					// leader already processed 'MsgSnap' and signaled
					notifyc <- struct{}{}
				}
				// 告诉raft状态机:Ready目标现已处理完结
				r.Advance()
			case <-r.stopped:
				return
			}
		}
	}()
}

Apply日志Entry

对Ready进行处理协程首要是与etcdserver的ApplyAll流程交互,并经过notify-channel来进行进展协同,下面剖析下ApplyAll流程,逻辑解说悉数加到代码的注释里边,具体如下:

func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
	// 假如存在snapshot,将snapshot存储到store中,对应图【图解apply流程】7.1
	s.applySnapshot(ep, apply)
	// 假如存在Entries,将Entries存储到store中,对应图【图解apply流程】7.2
	s.applyEntries(ep, apply)
	proposalsApplied.Set(float64(ep.appliedi))
	s.applyWait.Trigger(ep.appliedi)
	// 等候ready处理流程将snapshot、HardState、raft-log耐久化到磁盘
	// wait for the raft routine to finish the disk writes before triggering a
	// snapshot. or applied index might be greater than the last index in raft
	// storage, since the raft routine might be slower than apply routine.
	<-apply.notifyc
	// 因为有新的日志被Apply,因而需求判别下是否满意了从头做一次snapshot的条件
	// 假如满意snapshot的创立条件,就新建一个snapshot
	s.triggerSnapshot(ep)
  // other code ...
}
func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
	if raft.IsEmptySnap(apply.snapshot) {
		return
	}
	// 判别被刺apply的snapshot是否合法
	if apply.snapshot.Metadata.Index <= ep.appliedi {
		if lg != nil {
			lg.Panic(
				"unexpected leader snapshot from outdated index",
				zap.Uint64("current-snapshot-index", ep.snapi),
				zap.Uint64("current-applied-index", ep.appliedi),
				zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
				zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
			)
		} else {
			plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1",
				apply.snapshot.Metadata.Index, ep.appliedi)
		}
	}
	// 等候snapshot在Ready处理协程中被耐久化到磁盘,因为耐久化snapshot首要是凭借于snapshotter
	// 下面openSnapshotBackend也是凭借与snapshotter,所以在调用openSnapshotBackend之前有必要确保snapshot保存到了snapshotter
	// wait for raftNode to persist snapshot onto the disk
	<-apply.notifyc
	newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot)
	if err != nil {
		if lg != nil {
			lg.Panic("failed to open snapshot backend", zap.Error(err))
		} else {
			plog.Panic(err)
		}
	}
         // (1)从newbe(backend.Backend)中康复lessor相关信息
         // (2)从newbe(backend.Backend)康复当前store的索引及数据信息
	if err := s.kv.Restore(newbe); err != nil {
		if lg != nil {
			lg.Panic("failed to restore mvcc store", zap.Error(err))
		} else {
			plog.Panicf("restore KV error: %v", err)
		}
	}
	// 设置ConsistentWatchableKV最后一条日志Entry的index
	s.consistIndex.setConsistentIndex(s.kv.ConsistentIndex())
	if lg != nil {
		lg.Info("restored mvcc store")
	} else {
		plog.Info("finished restoring mvcc store")
	}
	// 用新建的backend康复etcd环境运转的其他信息,如:store鉴权、集群节点信息、重建transport等
	ep.appliedt = apply.snapshot.Metadata.Term
	ep.appliedi = apply.snapshot.Metadata.Index
	ep.snapi = ep.appliedi
	ep.confState = apply.snapshot.Metadata.ConfState
}
func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
	if len(apply.entries) == 0 {
		return
	}
	// 查看本次要Apply的Entry是否合法
	firsti := apply.entries[0].Index
	if firsti > ep.appliedi+1 {
		if lg := s.getLogger(); lg != nil {
			lg.Panic(
				"unexpected committed entry index",
				zap.Uint64("current-applied-index", ep.appliedi),
				zap.Uint64("first-committed-entry-index", firsti),
			)
		} else {
			plog.Panicf("first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, ep.appliedi)
		}
	}
	// 从即将apply的日志中除掉现已apply过的
	var ents []raftpb.Entry
	if ep.appliedi+1-firsti < uint64(len(apply.entries)) {
		ents = apply.entries[ep.appliedi+1-firsti:]
	}
	if len(ents) == 0 {
		return
	}
	// 调用apply函数履行真实的apply日志Entry的操作
	var shouldstop bool
	if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
		go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
	}
}
// 遍历一切的Entry,依据Entry类型履行Apply操作
func (s *EtcdServer) apply(
	es []raftpb.Entry,
	confState *raftpb.ConfState,
) (appliedt uint64, appliedi uint64, shouldStop bool) {
	for i := range es {
		e := es[i]
		switch e.Type {
		case raftpb.EntryNormal:
			s.applyEntryNormal(&e)
			s.setAppliedIndex(e.Index)
			s.setTerm(e.Term)
		case raftpb.EntryConfChange:
			// set the consistent index of current executing entry
			if e.Index > s.consistIndex.ConsistentIndex() {
				s.consistIndex.setConsistentIndex(e.Index)
			}
			var cc raftpb.ConfChange
			pbutil.MustUnmarshal(&cc, e.Data)
			removedSelf, err := s.applyConfChange(cc, confState)
			s.setAppliedIndex(e.Index)
			s.setTerm(e.Term)
			shouldStop = shouldStop || removedSelf
			s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
		default:
			if lg := s.getLogger(); lg != nil {
				lg.Panic(
					"unknown entry type; must be either EntryNormal or EntryConfChange",
					zap.String("type", e.Type.String()),
				)
			} else {
				plog.Panicf("entry type should be either EntryNormal or EntryConfChange")
			}
		}
		appliedi, appliedt = e.Index, e.Term
	}
	return appliedt, appliedi, shouldStop
}

履行真实的apply 操作时,不同的Entry类型处理方式是不一样的,这儿Entry类型首要有两种:读写业务Entry、_raft装备改变Entry,_本节先不介绍raft装备改变,下面首要介绍一般日志Entry的Apply流程:

Apply读写业务Entry

一般的业务Entry首要是key-value的读写操作,因而首要是和mvcc存储/key-value存储组件打交互,要害代码的解说都在注释中:

func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
        // etcd v3中首要是设计两个版别的存储组件:
	// (1)v2版别的存储组件,仅仅简单的key-value存储,日志Entry重复Apply不影响store一致性
	// (2)v3版别的存储组件,支持业务的mvcc存储组件,日志Entry重复Apply影响store一致性
	// 在介绍mvcc模块时,构建mvcc目标是会传入一个ConsistentIndexGetter
	// etcdserver在初始化mvcc组件时会将s.consistIndex传入到mvcc目标的结构参数中
	// 这样s.consistIndex.setConsistentIndex(e.Index)能够设置mvcc业务End函数里边获取当前业务关联的Entry Index
	shouldApplyV3 := false
	if e.Index > s.consistIndex.ConsistentIndex() {
		// set the consistent index of current executing entry
		s.consistIndex.setConsistentIndex(e.Index)
		shouldApplyV3 = true
	}
        // other code ...
	// 将数据存储到v2的存储中
	var raftReq pb.InternalRaftRequest
	if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible
		var r pb.Request
		rp := &r
		pbutil.MustUnmarshal(rp, e.Data)
		s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp)))
		return
	}
	if raftReq.V2 != nil {
		req := (*RequestV2)(raftReq.V2)
		s.w.Trigger(req.ID, s.applyV2Request(req))
		return
	}
	// 假如日志Entry没有Apllied到V3版别的存储的,运用Applyer v3履行Entry对应的业务
	// Entry对应的业务结束后,在End函数中经过ConsistentIndexGetter获取前面(s.consistIndex.setConsistentIndex(e.Index))设置的当前日志Index
	// 将日志Index和业务改变一同存储到Store中,并在合适的机遇耐久化到boltdb
	if !shouldApplyV3 {
		return
	}
	id := raftReq.ID
	if id == 0 {
		id = raftReq.Header.ID
	}
	var ar *applyResult
	needResult := s.w.IsRegistered(id)
	if needResult || !noSideEffect(&raftReq) {
		if !needResult && raftReq.Txn != nil {
			removeNeedlessRangeReqs(raftReq.Txn)
		}
		ar = s.applyV3.Apply(&raftReq)
	}
        // other code ...
}

重启数据康复

上面几大段首要是描绘了,咱们暂时疏忽了raft装备改变日志Entry的Apply,因为这块后期会有专门的文章去叙说,这儿咱们首要重视下一般日志Entry的重启康复,不过在剖析etcd重启流程没必要过于重视时一般日志Entry仍是装备改变Entry,因为整个流程时相对比较通用的,你大能够以为etcd在前史运转进程中还没有产生过日志改变。

上面描绘咱们看到数据耐久化的当地有三部分,依照耐久化的顺序分别是:sanpshot、WAL,业务数据耐久化:

snapshot:耐久化首要是耐久化了完好的一份snapshot数据。

WAL:耐久化了日志entry、raft HardState、snapshot元数据等等。

业务数据耐久化:参阅前面mvcc组件的put/range/del等操作,耐久化的用户业务数据及业务日志Entry等。

etcdserver重启的场景也比较多,这儿首要是重视在有WAL正常重启的进程,该进程首要流程在如下文件中:

https://github.com/etcd-io/etcd/blob/v3.4.9/etcdserver/server.go#L410

首要是判别是否存在一份可用的snapshot,假如存在snapshot就创立一个snapshot目标:

// 首要查看WAL文件中,WAL Entry的可用性,从中取出snapshot元数据
walSnaps, serr := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir())
if serr != nil {
	return nil, serr
}
// 经过snapshot元数据,找到最新的可用的snapshot数据,构建sanpshot目标
snapshot, err = ss.LoadNewestAvailable(walSnaps)
if err != nil && err != snap.ErrNoSnapshot {
	return nil, err
}
// 因为WAL是线性写入的,后写入的Entry最新,因而从snap entry中找到最后匹配元数据的snapshot,即为NewestAvailable目标
func (s *Snapshotter) LoadNewestAvailable(walSnaps []walpb.Snapshot) (*raftpb.Snapshot, error) {
	return s.loadMatching(func(snapshot *raftpb.Snapshot) bool {
		m := snapshot.Metadata
		for i := len(walSnaps) - 1; i >= 0; i-- {
			if m.Term == walSnaps[i].Term && m.Index == walSnaps[i].Index {
				return true
			}
		}
		return false
	})
}

有了snapshot目标,便能够运用其去构建存储目标:

// 运用snapshot康复v2版别的k-v存储和v3版别的backend目标
if snapshot != nil {
	if err = st.Recovery(snapshot.Data); err != nil {
		if cfg.Logger != nil {
			cfg.Logger.Panic("failed to recover from snapshot")
		} else {
			plog.Panicf("recovered store from snapshot error: %v", err)
		}
	}
	// other code ...
	if be, err = recoverSnapshotBackend(cfg, be, *snapshot); err != nil {
		if cfg.Logger != nil {
			cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
		} else {
			plog.Panicf("recovering backend from snapshot error: %v", err)
		}
	}
	// other code ...
}

结构完snapshot进入启动raft node节点流程的具体解说在如下代码逻辑中:

func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
	// 取出snapshot里边的元数据数据
	var walsnap walpb.Snapshot
	if snapshot != nil {
		walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
	}
	// 凭借于walsnap元数据找到合适的WAL文件,读取WAL文件,回来节点集群信息、HardState、日志Entry等等
	w, id, cid, st, ents := readWAL(cfg.Logger, cfg.WALDir(), walsnap)
        // 因为首要是以raft节点正常重启流程作为叙说的,不考虑raft重启后集群被新建的问题
	// 设置raft目标地点集群ID
	cl := membership.NewCluster(cfg.Logger, "")
	cl.SetID(id, cid)
	// 将snapshot存储到storage中
	s := raft.NewMemoryStorage()
	if snapshot != nil {
		s.ApplySnapshot(*snapshot)
	}
	// 将HardState/日志Enties存储到Storage中
	s.SetHardState(st)
	s.Append(ents)
	// 构建raft状态机目标,此刻raft目标能够看到如下内容:
	// (1) 日志Entries
        // (2) 那些日志被Committed了
	// 注意:截止到现在etcd重启前的数据基本被加载结束了,能够剖析下etcd突然宕机或许呈现的两种数据不一致的场景
	//      (1)Entry被重复Applied:截止到现在,raft并不知道那些日志被Applied了,尽管raft.Config里边有Applied uint64字段能够告诉raft现已applied的日志index
	//      可是raft.Config.Applied字段此刻为0,因而raft杂构建Ready目标时会把介于raftlog中满意(max(applied, firstIndex), committed]Entry悉数回来,
	//      因而,假如Storage的日志Entries存在了现已Applied的日志Entry,或许会被从头打包到Ready目标中
	//      可是在Apply的时分,该日志并不会被真实Apply,而是会被过滤掉,因而Store供给接口能够获取到最后一次Applied的日志Entry index
	//      (2)被Applid的Entry为耐久化时突然宕机:只需合理的整理WAL日志,确保WAL日志Entry和现已被Apply的有交集或连续,就不会导致etcd在宕机时丢掉业务Entry,也不会呈现日志Entry被重复Apply的状况产生.
	//      WAL中整理wal文件实在创立snapshot之后整理的,而且创立snapshot的日志Entry都是被apply的,而且WAL整理日志Entry的进程也比较保存,会多保存一个WAL文件,
	//      即使现已Apply的日志在boltdb tx batch buffer中还未耐久化,在创立snapshot之前etcd宕机,但这些日志WAL也现已耐久化了,而且记录是现已committed了,
	//      这些applied的日志,未落盘时,会被etcd raft状态机从头提交,底子不会呈现丢掉日志Entry的状况
	c := &raft.Config{
		ID:              uint64(id),
		ElectionTick:    cfg.ElectionTicks,
		HeartbeatTick:   1,
		Storage:         s,
		MaxSizePerMsg:   maxSizePerMsg,
		MaxInflightMsgs: maxInflightMsgs,
		CheckQuorum:     true,
		PreVote:         cfg.PreVote,
	}
	// other code ...
	// 启动raft目标
	n := raft.RestartNode(c)
	raftStatusMu.Lock()
	raftStatus = n.Status
	raftStatusMu.Unlock()
	return id, cl, n, s, w
}

截止到现在etcd重启前的数据基本被加载结束了,能够开端剖析下etcd宕机时,两种数据不一致的场景:
(1)Entry被重复Applied:构建raft目标时,raft.Config里边有Applied字段没有被使用起来,明显raft目标并不知道那些日志被Applied了,因而raft杂构建Ready目标时会把介于raftlog中满意(max(applied, firstIndex), committed]Entry悉数回来用于本次applied,假如Storage的日志Entries存在了现已Applied的日志Entry,或许会被从头打包到Ready目标中,但在Apply的时分,该日志并不会被真实Apply,而是会被过滤掉,因而Store供给接口能够获取到最后一次Applied的日志Entry index。
(2)被Applid的Entry未耐久化时突然宕机:只需合理的整理WAL日志,确保WAL日志Entry与现已Applied的Entry有交集或连续,就不会导致etcd在宕机时丢掉业务Entry,也不会呈现日志Entry被重复Apply的状况产生。etcd的snapshot及WAL机制,确实会确保这一点,WAL在整理比较旧的wal文件是在创立snapshot之后整理的,而且只会整理打包到snapshot里边的日志Entry,创立snapshot的日志Entry都是被apply的,而且WAL整理日志Entry的也比较保存,会多保存一个WAL文件,即使现已Apply的日志在boltdb tx batch buffer中,还未耐久化,在创立snapshot之前etcd宕机,这些日志在WAL也现已耐久化了,且committed也耐久化了,etcd重启后会被etcd raft状态机从头提交,底子不会呈现丢掉日志Entry的状况。