目录

深入浅出etcd/raft —— 0x04 Raft日志

本文为原创文章,转载请严格遵守CC BY-NC-SA协议

0. 引言

本文会对etcd/raft中Raft日志复制算法的实现与优化进行分析。这里假定读者阅读过Diego Ongaro的《In Search of an Understandable Consensus Algorithm (Extended Version)》(这里有笔者的翻译,笔者英语水平一般,欢迎指正。),其中提到的部分,本文中不会做详细的解释。对etcd/raft的总体结构不熟悉的读者,可以先阅读《深入浅出etcd/raft —— 0x02 etcd/raft总体设计》

本文首先介绍了etcd/raft中日志复制部分的优化。由于etcd/raft中对日志复制的优化大部分属于实现上的优化,这些优化是在系统中很常见的优化,因此本文会一笔带过其理论部分,而着重于讲解etcd/raft中日志复制的实现。

因为日志复制的逻辑涉及到的方面多、逻辑复杂、经过数年的版本演进部分逻辑难以理解,因此本文后半部分详细地分析了etcd/raft中与日志复制相关的几乎所有逻辑,以供读者参考。这里不建议读者通读本文讲解实现的部分,而是按照自己的节奏阅读源码,在遇到难以理解的部分时可以将本文作为补充参考。

1. etcd/raft日志复制优化

本节将介绍etcd/raft中日志复制部分采用的优化。

1.1 快速回退

在《In Search of an Understandable Consensus Algorithm (Extended Version)》和《CONSENSUS: BRIDGING THEORY AND PRACTICE》介绍Raft算法基本概念时,提到了一种快速回退next index的方法。当follower拒绝leader的AppendEntries RPC(MsgApp)请求时,follower会通过响应消息(MsgAppResp)的一个字段(RejectHint)告知leader日志冲突的位置与当前term的第一条日志的index。这样,leader可以直接将该follower的next index回退到该位置,然后继续以“一次回退一条”的方式检查冲突。

etcd/raft中也实现了类似的优化,但是其将follower的最后一条日志作为该字段的值。正如Diego Ongaro所说,故障不会经常发生,因此出现很多不一致的日志条目的可能性不大(etcd/raft该部分的作者也是这样想的,详见pull#2021),所以回退到follower的最后一条日志后继续检查冲突即可。

1.2 并行写入

《CONSENSUS: BRIDGING THEORY AND PRACTICE》的10.2.1 Writing to the leader’s disk in parallel介绍了一种减少Raft算法关键路径上的磁盘写入的优化。在朴素的实现方式中,leader需要先将新日志写入本地稳定存储之后再为follower复制这些日志,这会大大增加处理的延迟。

事实上,这次关键路径上的磁盘写入是可以优化的。leader可以在为follower复制日志的同时将这些日志写入其本地稳定存储。为了简化实现,leader自己的match index表示其写入到了稳定存储的最后一条日志的索引。当当前term的某日志被大多数的match index覆盖时,leader便可以使commit index前进到该节点处。这种优化是安全的,通过这种优化,leader甚至可以在日志写入本地稳定存储完成之前提交日志。

1.3 批处理与流水线

《CONSENSUS: BRIDGING THEORY AND PRACTICE》的10.2.2 Batching and pipelining介绍了Raft算法实现时的批处理与流水线优化。批处理与流水线是各种系统提高吞吐量的常用方式,etcd/raft也不例外。

1.3.1 批处理

简而言之,批处理(batch)就是在消息到来时先推迟对消息的处理,等到消息积累到一定数量或者经过一段时间后一起处理这批消息,在损失可接受的延迟的情况下提高系统吞吐量。在etcd/raft的实现中主要有两处使用了批处理的设计,一处是网络,一处是存储:

  • 网络:leader在为稳定的follower复制日志时,会用一条消息复制多条日志,且每次可能同时发送多条消息。后文会介绍相关实现。
  • 存储:在前文中笔者介绍过数据的存储时etcd/raft的使用者的责任,使用者需要将Ready结构体中的HardStatesEntriesSnapshot保存到稳定存储,然后在处理完所有字段后调用Advance方法以接收下一批数据。ReadyAdvance的设计即体现了微批处理的思想。

1.3.2 流水线

流水线(pipeline)同样是各种系统常用的提高吞吐量的方式。在etcd/raft的实现中,leader在向follower发送完日志复制请求后,不会等待follower响应,而是立即更新其nextIndex,并继续处理,以提高吞吐量。

在正常且稳定的情况下,消息应恰好一次且有序到达。但是在异常情况下,可能出现消息丢失、消息乱序、消息超时等等情况,在前文深入浅出etcd/raft —— 0x03 Raft选举介绍Step方法时,介绍了一些对过期消息的处理方式,重复的地方本文不再赘述。当follower收到过期的日志复制请求时,会拒绝该请求,随后follower会回退其nextIndex以重传之后的日志。

2. etcd/raft中的日志结构

在分析etcd/raft的日志复制的实现时,首先要了解etcd/raft中Raft日志结构的实现方式。etcd/raft中Raft日志是通过结构体raftLog实现的。本节将介绍raftLog的设计与实现。

2.1 raftLog的设计

etcd/raft中Raft日志是通过raftLog结构体记录的。raftLog结构体中,既有还未持久化的数据,也有已经持久化到稳定存储的数据;其中数据既有日志条目,也有快照。如果直观的给出raftLog中数据的逻辑结构,其大概如下图所示。

/posts/code-reading/etcdraft-made-simple/4-log/assets/raftLog.svg
raftLog逻辑结构示意图

raftLog中的数据,按照是否已持久化到稳定存储,可分为两部分:已持久化到稳定存储的部分(stable)和还未持久化到稳定存储的部分(unstable)。无论是stable的部分还是unstable的部分中,都可能包含快照或日志,且每部分的快照中包含的已压缩的日志比该部分相应的未压缩的日志更旧。需要注意的是,在etcd/raft的实现中,在同一时刻,raftLog中的4个段可能并不是同时存在的。

提示
关于日志unstable段对算法安全性的讨论详见5.1节

在etcd/raft的日志操作中,有4个经常使用的索引:

索引名 描述
committed 在该节点所知数量达到quorum的节点保存到了稳定存储中的日志里,index最高的日志的index。
applied 在该节点的应用程序已应用到其状态机的日志里,index最高的日志的index。
其中, $ applied \le committed $ 总是成立的。
firstIndex 在该节点的日志中,最新的快照之后的第一条日志的index。
lastIndex 在该节点的日志中,最后一条日志的index。

这里需要注意的是,所有的这些索引都是相对于当前节点而不是整个集群的,例如,当index为$i_1$的日志已被集群中数量达到quorum的节点保存到稳定存储时,一些节点可能还不知道$i_1$已被commit。

raftLog中unstable的部分保存在unstable结构体中,而stable的部分稍有些复杂。为了让使用etcd/raft模块的开发者能够根据自己的需求自定义Raft日志存储,stable的部分不是直接通过内部的结构体实现的。go.etcd.io/etcd/raft/storage.go文件中定义了Storage接口,只要实现了该接口,都可以用来存储stable日志。在深入浅出etcd/raft —— 0x02 etcd/raft总体设计的引言中,笔者提到Storage接口只定义了读取稳定存储中的日志、快照、状态的方法(如下图所示),etcd/raft并不关心也不知道开发者写入稳定存储的方式。那么,etcd/raft是怎样将unstable中的数据写入到稳定存储中的呢?

/posts/code-reading/etcdraft-made-simple/4-log/assets/overview.svg
etcd/raft职责示意图

深入浅出etcd/raft —— 0x01 raftexample中,笔者通过了官方提供的raftexample示例,介绍了使用etcd/raft的开发者与Node接口打交道并处理Ready结构体的方式(在深入浅出etcd/raft —— 0x02 etcd/raft总体设计中也有提到)。其中,开发者需要将Ready结构体EntitiesSnapshot字段中的数据保存到稳定存储中,这就是将数据从unstable转移到stable中的过程,这种设计也让etcd/raft不需要依赖稳定存储的具体写入方法。下图直观地表示了follower节点从收到leader发来的日志到将其保存至稳定存储中的大致流程(快照的处理方式也同理)。

/posts/code-reading/etcdraft-made-simple/4-log/assets/log-path.svg
日志复制流程示意图

接下来,笔者对Storageunstable进行分析。其中,对Storage的分析包括etcd采用的Storage实现——MemoryStorage

说明
Storage接口定义的是稳定存储的读取方法。之所以etcd使用了基于内存的MemoryStorage,是因为etcd在写入MemoryStorage前,需要先写入预写日志(Write Ahead Log,WAL)或快照。而预写日志和快照是保存在稳定存储中的。这样,在每次重启时,etcd可以基于保存在稳定存储中的快照和预写日志恢复MemoryStorage的状态。也就是说,etcd的稳定存储是通过快照、预写日志、MemoryStorage三者共同实现的。

2.2 Storage的设计与实现

Storage接口定义了etcd/raft中需要的读取稳定存储中日志、快照、状态等方法。


// Storage is an interface that may be implemented by the application
// to retrieve log entries from storage.
//
// If any Storage method returns an error, the raft instance will
// become inoperable and refuse to participate in elections; the
// application is responsible for cleanup and recovery in this case.
type Storage interface {
	// TODO(tbg): split this into two interfaces, LogStorage and StateStorage.

	// InitialState returns the saved HardState and ConfState information.
	InitialState() (pb.HardState, pb.ConfState, error)
	// Entries returns a slice of log entries in the range [lo,hi).
	// MaxSize limits the total size of the log entries returned, but
	// Entries returns at least one entry if any.
	Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
	// Term returns the term of entry i, which must be in the range
	// [FirstIndex()-1, LastIndex()]. The term of the entry before
	// FirstIndex is retained for matching purposes even though the
	// rest of that entry may not be available.
	Term(i uint64) (uint64, error)
	// LastIndex returns the index of the last entry in the log.
	LastIndex() (uint64, error)
	// FirstIndex returns the index of the first log entry that is
	// possibly available via Entries (older entries have been incorporated
	// into the latest Snapshot; if storage only contains the dummy entry the
	// first log entry is not available).
	FirstIndex() (uint64, error)
	// Snapshot returns the most recent snapshot.
	// If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
	// so raft state machine could know that Storage needs some time to prepare
	// snapshot and call Snapshot later.
	Snapshot() (pb.Snapshot, error)
}

Storage中定义了需要从稳定存储读取的6种方法(具体定义见上面给出的代码与注释,这里不再赘述)。其中HardState指Raft状态机需要在本地稳定存储中持久化保存的状态,对应论文中的Persistent State(etcd/raft为了优化Raft日志,其保存的字段与原文稍有不同),相应的,SoftState只不需要持久化保存的状态,对应论文中的Volatile State;另外,由于陈旧的日志会被压缩成快照,因此有些方法并不总能获取到所需的值。

接下来对MemoryStorage的实现进行分析。MemoryStorage是etcd中使用的Storage实现,其实现了Storage中定义的读取稳定存储的方法,还实现了相应的写入稳定存储的方法。MemoryStorage结构体中的字段如下:


// MemoryStorage implements the Storage interface backed by an
// in-memory array.
type MemoryStorage struct {
	// Protects access to all fields. Most methods of MemoryStorage are
	// run on the raft goroutine, but Append() is run on an application
	// goroutine.
	sync.Mutex

	hardState pb.HardState
	snapshot  pb.Snapshot
	// ents[i] has raft log position i+snapshot.Metadata.Index
	ents []pb.Entry
}

其中,ents字段就是用来保存日志条目的切片。该切片的首元素被设计为用来保存元数据的条目,而不是真正的日志条目,其保存了快照的最后一条日志对应的term和index,该条目也被成为dummy entryMemoryStorageents字段可以直观表示为下图的结构。

/posts/code-reading/etcdraft-made-simple/4-log/assets/memorystorage.svg
MemoryStorage中ents结构示意图

注意,图中的first indexlast indexStorage接口的FirstIndexLastIndex方法定义的索引,而不是整个raftLogfirst indexlast index

MemoryStorage的实现不是很复杂,其中很多逻辑是在处理越界和dummy entry,这里不再占用篇幅详细解释。此外,MemoryStorage通过互斥锁保证其操作是线程安全的。

2.3 unstable的设计与实现

unstable结构体中保存了还未被保存到稳定存储中的快照或日志条目。


// unstable.entries[i] has raft log position i+unstable.offset.
// Note that unstable.offset may be less than the highest log
// position in storage; this means that the next write to storage
// might need to truncate the log before persisting unstable.entries.
type unstable struct {
	// the incoming unstable snapshot, if any.
	snapshot *pb.Snapshot
	// all entries that have not yet been written to storage.
	entries []pb.Entry
	offset  uint64

	logger Logger
}

unstable结构体中的offset字段记录了unstable的日志起点,该起点可能比Storage中index最高的日志条目旧,也就是说Storageunstable中的日志可能有部分重叠,因此在处理二者之间的日志时,有一些裁剪日志的操作。

unstable中较为重要的方法如下表所示。

方法
描述
maybeFirstIndex() (uint64, bool) 获取相对整个raftLogfirst index,当unstable无法得知该值时,第二个返回值返回false
maybeLastIndex() (uint64, bool) 获取相对整个raftLoglast index,当unstable无法得知该值时,第二个返回值返回false
maybeTerm(i uint64) (uint64, bool) 获取给定index的日志条目的term,当unstable无法得知该值时,第二个返回值返回false
stableTo(i, t uint64) 通知unstable当前index为i、term为t及其之前的日志已经被保存到了稳定存储中,可以裁剪掉unstable中的这段日志了。裁剪后会根据空间利用率适当地对空间进行优化。
stableSnapTo(i uint64) 通知unstable当前index在i及其之前的快照已经保存到了稳定存储中,如果unstable中保存了该快照,那么可以释放该快照了。
restore(s pb.Snapshot) 根据快照恢复unstable的状态(设置unstbale中的offsetsnapshot,并将entries置空)。
truncateAndAppend(ents []pb.Entry) 对给定的日志切片进行裁剪,并将其加入到unstable保存的日志中。
slice(lo uint64, hi uint64) 返回给定范围内的日志切片。首先会通过mustCheckOutOfBounds(lo, hi uint64)方法检查是否越界,如果越界会因此panic。

unstable中的first indexlast index的实现与Storage稍有不同。unstablemaybeFirstIndex方法与maybeLastIndex方法获取的是相对整个raftLogfirst indexlast index,当unstable无法得知整个raftLogfirst indexlast index时,这两个方法的第二个返回值会被置为false。这种设计与raftLog的实现有关,在raftLogfirstIndexlastIndex方法中,首先会调用unstablemaybeFirstIndex方法或maybeLastIndex方法,如果查询的索引不在unstable中时,其会继续询问StorageunstablemaybeFirstIndex方法与maybeLastIndex方法的实现如下。


// maybeFirstIndex returns the index of the first possible entry in entries
// if it has a snapshot.
func (u *unstable) maybeFirstIndex() (uint64, bool) {
	if u.snapshot != nil {
		return u.snapshot.Metadata.Index + 1, true
	}
	return 0, false
}

// maybeLastIndex returns the last index if it has at least one
// unstable entry or snapshot.
func (u *unstable) maybeLastIndex() (uint64, bool) {
	if l := len(u.entries); l != 0 {
		return u.offset + uint64(l) - 1, true
	}
	if u.snapshot != nil {
		return u.snapshot.Metadata.Index, true
	}
	return 0, false
}

简单来说,只有unstable中包含快照时,unstable才可能得知整个raftLogfirst index的位置(快照前的日志不会影响快照后的状态);而只有当unstable中既没有日志也没有快照时,unstable才无法得知last index的位置。

unstable中另一处比较重要的方法是truncateAndAppend。当raftLog需要将新日志保存到unstable中时会调用该方法。该方法会根据给定的日志切片的范围和当前unstable中日志切片的范围对二者进行适当地裁剪,其逻辑如下。


func (u *unstable) truncateAndAppend(ents []pb.Entry) {
	after := ents[0].Index
	switch {
	case after == u.offset+uint64(len(u.entries)):
		// after is the next index in the u.entries
		// directly append
		u.entries = append(u.entries, ents...)
	case after <= u.offset:
		u.logger.Infof("replace the unstable entries from index %d", after)
		// The log is being truncated to before our current offset
		// portion, so set the offset and replace the entries
		u.offset = after
		u.entries = ents
	default:
		// truncate to after and copy to u.entries
		// then append
		u.logger.Infof("truncate the unstable entries before index %d", after)
		u.entries = append([]pb.Entry{}, u.slice(u.offset, after)...)
		u.entries = append(u.entries, ents...)
	}
}

该方法首先会获取给定日志切片中第一个日志条目的index(after),将其与unstable中已有日志的index进行比较,以确定处理方法:

  1. after恰好是unstable的下一条日志时,直接将其追加到unstable当前保存的日志之后。
  2. afterunstable的第一条日志还早时,此时给定的日志切片与unstable中的日志可能有冲突的部分(如Raft算法中leader强制覆盖follower中的日志时),为了更简单地处理冲突,直接将unstable中保存的日志替换为给定日志。
  3. afteroffset之后但与unstable中部分日志重叠时,重叠部分和之后部分可能会有冲突,因此裁剪掉unstable的日志中在after及其之后的部分,并将给定日志追加到其后。

另外,在unstablestableTo方法裁剪完日志后,会调用shrinkEntriesArray方法优化空间利用率。即如果剩余日志条目小于用来保存日志的切片容量的一半时,将剩余的日志拷贝到容量恰好为剩余日志长度的新切片中,并释放对原切片的引用。需要注意的是,这里不能直接释放原切片的空间或在原切片上进行修改,因为程序的其它部分可能还持有对原切片的引用。


// shrinkEntriesArray discards the underlying array used by the entries slice
// if most of it isn't being used. This avoids holding references to a bunch of
// potentially large entries that aren't needed anymore. Simply clearing the
// entries wouldn't be safe because clients might still be using them.
func (u *unstable) shrinkEntriesArray() {
	// We replace the array if we're using less than half of the space in
	// it. This number is fairly arbitrary, chosen as an attempt to balance
	// memory usage vs number of allocations. It could probably be improved
	// with some focused tuning.
	const lenMultiple = 2
	if len(u.entries) == 0 {
		u.entries = nil
	} else if len(u.entries)*lenMultiple < cap(u.entries) {
		newEntries := make([]pb.Entry, len(u.entries))
		copy(newEntries, u.entries)
		u.entries = newEntries
	}
}

2.4 raftLog的实现

在介绍了Storage接口和unstable结构体后,接下来继续看raftLog的具体实现。raftLog结构体源码如下:


type raftLog struct {
	// storage contains all stable entries since the last snapshot.
	storage Storage

	// unstable contains all unstable entries and snapshot.
	// they will be saved into storage.
	unstable unstable

	// committed is the highest log position that is known to be in
	// stable storage on a quorum of nodes.
	committed uint64
	// applied is the highest log position that the application has
	// been instructed to apply to its state machine.
	// Invariant: applied <= committed
	applied uint64

	logger Logger

	// maxNextEntsSize is the maximum number aggregate byte size of the messages
	// returned from calls to nextEnts.
	maxNextEntsSize uint64
}

可以看到,raftLogStorage接口实例storageunstable结构体实例unstable组成。在2.1节提到的4个常用索引中,committedapplied索引是通过raftLog的字段实现的,而firstIndexlastIndex是通过raftLog的方法实现的:


func (l *raftLog) firstIndex() uint64 {
	if i, ok := l.unstable.maybeFirstIndex(); ok {
		return i
	}
	index, err := l.storage.FirstIndex()
	if err != nil {
		panic(err) // TODO(bdarnell)
	}
	return index
}

func (l *raftLog) lastIndex() uint64 {
	if i, ok := l.unstable.maybeLastIndex(); ok {
		return i
	}
	i, err := l.storage.LastIndex()
	if err != nil {
		panic(err) // TODO(bdarnell)
	}
	return i
}

firstIndexlastIndex的实现方式在2.3节中已经介绍过,这里不再赘述。raftLog在创建时,会将unstableoffset置为storagelast index + 1,并将committedapplied置为storageforst index - 1


// newLog returns log using the given storage and default options. It
// recovers the log to the state that it just commits and applies the
// latest snapshot.
func newLog(storage Storage, logger Logger) *raftLog {
	return newLogWithSize(storage, logger, noLimit)
}

// newLogWithSize returns a log using the given storage and max
// message size.
func newLogWithSize(storage Storage, logger Logger, maxNextEntsSize uint64) *raftLog {
	if storage == nil {
		log.Panic("storage must not be nil")
	}
	log := &raftLog{
		storage:         storage,
		logger:          logger,
		maxNextEntsSize: maxNextEntsSize,
	}
	firstIndex, err := storage.FirstIndex()
	if err != nil {
		panic(err) // TODO(bdarnell)
	}
	lastIndex, err := storage.LastIndex()
	if err != nil {
		panic(err) // TODO(bdarnell)
	}
	log.unstable.offset = lastIndex + 1
	log.unstable.logger = logger
	// Initialize our committed and applied pointers to the time of the last compaction.
	log.committed = firstIndex - 1
	log.applied = firstIndex - 1

	return log
}

raftLog提供了如下的较为重要的方法:

方法
描述
maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) 尝试追加新日志,并更新committed索引(详见下文分析)。
unstableEntries() []pb.Entry 返回全部unstable中的日志条目。
nextEnts() (ents []pb.Entry) 返回可被应用到状态机的日志条目(已提交但还未应用),返回的长度受创建raftLog时指定的maxNextEntsSize限制。
hasNextEnts() bool 返回是否存在可被应用到状态机的日志。该方法不会调用slice方法,性能更高。
hasPendingSnapshot() bool 返回是否有未应用到状态机的快照(即unstable中保存的快照)。
commitTo(tocommit uint64) 更新committed索引,该方法会检查参数合法性。
appliedTo(i uint64) 更新applied索引,该方法会检查参数合法性。
stableTo(i, t uint64) 通知unstable当前已保存到稳定存储中最后的日志的index与term,让其适当裁剪日志。
stableSnapTo(i uint64) 通知unstable当前已保存到稳定存储中最后的快照的index,让其释放快照。
lastTerm() uint64 获取最后一条日志的term。
term(i uint64) (uint64, error) 获取给定index的日志条目的term。
entries(i, maxsize uint64) ([]pb.Entry, error) 获取index从i开始的最多maxsize条日志。
allEntries() []pb.Entry 获取全部日志条目。
isUpToDate(lasti, term uint64) bool 判断给定的term和index对应的日志条目是否至少与当前最后一个日志条目一样新。
matchTerm(i, term uint64) bool 判断给定的index与term是否日志中相应index的条目的term匹配。
maybeCommit(maxIndex, term uint64) bool 如果给定的index和term对应的日志条目还未被提交,将日志提交到给日志条目处并返回true,否则返回false
restore(s pb.Snapshot) 将给定快照应用到(unstable)日志中。
slice(lo, hi, maxSize uint64) ([]pb.Entry, error) 返回index从lohi的最多maxSize条日志,该方法会检查参数是否合法。

appendmaybeAppend是向raftLog写入日志的方法。二者的区别在于append不会检查给定的日志切片是否与已有日志有冲突,因此leader向raftLog中追加日志时会调用该函数;而maybeAppend会检查是否有冲突并找到冲突位置,并试图通过覆盖本地日志的方式解决冲突。但是,二者都会检查给定的日志起点是否在committed索引位置之前,如果在其之前,这违背了Raft算法的Log Matching性质,因此会引起panic(其实follower不会将committed之前的日志传给该函数,因此永远不会进入该分支)。源码如下:


// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
// it returns (last index of new entries, true).
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
	if l.matchTerm(index, logTerm) {
		lastnewi = index + uint64(len(ents))
		ci := l.findConflict(ents)
		switch {
		case ci == 0:
		case ci <= l.committed:
			l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
		default:
			offset := index + 1
			l.append(ents[ci-offset:]...)
		}
		l.commitTo(min(committed, lastnewi))
		return lastnewi, true
	}
	return 0, false
}

func (l *raftLog) append(ents ...pb.Entry) uint64 {
	if len(ents) == 0 {
		return l.lastIndex()
	}
	if after := ents[0].Index - 1; after < l.committed {
		l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
	}
	l.unstable.truncateAndAppend(ents)
	return l.lastIndex()
}

maybeAppend方法会检查参数的合法性,当可以追加时,其会返回追加的最后一条日志的index与true,否则返回0与false。其需要的参数有这批日志的前一个日志条目的index与term(用于校验匹配)、leader最新确认的committed索引、和待追加的日志ents

首先,maybeAppend方法会检查这批日志的前一个条目的index和这批日志的term与raftLog对应条目的index与term是否匹配,如果不匹配则返回(0, false)。如果匹配无误,其会调用findConflict方法寻找待追加的日志与已有日志的第一个冲突条目的index或第一条新日志的index。在进一步分析前,先看一下findConflict的实现方式:


// findConflict finds the index of the conflict.
// It returns the first pair of conflicting entries between the existing
// entries and the given entries, if there are any.
// If there is no conflicting entries, and the existing entries contains
// all the given entries, zero will be returned.
// If there is no conflicting entries, but the given entries contains new
// entries, the index of the first new entry will be returned.
// An entry is considered to be conflicting if it has the same index but
// a different term.
// The first entry MUST have an index equal to the argument 'from'.
// The index of the given entries MUST be continuously increasing.
func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
	for _, ne := range ents {
		if !l.matchTerm(ne.Index, ne.Term) {
			if ne.Index <= l.lastIndex() {
				l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
					ne.Index, l.zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term)
			}
			return ne.Index
		}
	}
	return 0
}

findConflict返回的情况可以分为3种:

  1. 如果给定的日志与已有的日志的index和term冲突,其会返回第一条冲突的日志条目的index。
  2. 如果没有冲突,且给定的日志的所有条目均已在已有日志中,返回0.
  3. 如果没有冲突,且给定的日志中包含已有日志中没有的新日志,返回第一条新日志的index。

maybeAppend会根据findConflict的返回值确定接下来的处理方式:

  1. 如果返回0,说明既没有冲突又没有新日志,直接进行下一步处理。
  2. 如果返回值小于当前的committed索引,说明committed前的日志发生了冲突,这违背了Raft算法保证的Log Matching性质,因此会引起panic。
  3. 如果返回值大于committed,既可能是冲突发生在committed之后,也可能是有新日志,但二者的处理方式都是相同的,即从将从冲突处或新日志处开始的日志覆盖或追加到当前日志中即可。

除了会引起panic的情况外,该方法接下来会调用commitTo方法,更新当前的committed索引为给定的新日志中最后一条日志的index(lastnewi)和传入的新的committed中较小的一个。commitTo方法保证了committed索引只会前进而不会回退,而使用lastnewi和传入的committed中的最小值则是因为传入的数据可能有如下两种情况:

  1. leader给follower复制日志时,如果复制的日志条目超过了单个消息的上限,则可能出现leader传给follower的committed值大于该follower复制完这条消息中的日志后的最大index。此时,该follower的新committed值为lastnewi
  2. follower能够跟上leader,leader传给follower的日志中有未确认被法定数量节点稳定存储的日志,此时传入的committedlastnewi小,该follower的新committed值为传入的committed值。

raftLogslice方法是会返回指定的日志片段,该方法会检查给定的范围是否可以访问。


// slice returns a slice of log entries from lo through hi-1, inclusive.
func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) {
	err := l.mustCheckOutOfBounds(lo, hi)
	if err != nil {
		return nil, err
	}
	if lo == hi {
		return nil, nil
	}
	var ents []pb.Entry
	if lo < l.unstable.offset {
		storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset), maxSize)
		if err == ErrCompacted {
			return nil, err
		} else if err == ErrUnavailable {
			l.logger.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset))
		} else if err != nil {
			panic(err) // TODO(bdarnell)
		}

		// check if ents has reached the size limitation
		if uint64(len(storedEnts)) < min(hi, l.unstable.offset)-lo {
			return storedEnts, nil
		}

		ents = storedEnts
	}
	if hi > l.unstable.offset {
		unstable := l.unstable.slice(max(lo, l.unstable.offset), hi)
		if len(ents) > 0 {
			combined := make([]pb.Entry, len(ents)+len(unstable))
			n := copy(combined, ents)
			copy(combined[n:], unstable)
			ents = combined
		} else {
			ents = unstable
		}
	}
	return limitSize(ents, maxSize), nil
}

在获取raftLog中的日志切片时,该方法首先会通过mustCheckOutOfBounds方法检查给定的范围是否越界。


// l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries)
func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
	if lo > hi {
		l.logger.Panicf("invalid slice %d > %d", lo, hi)
	}
	fi := l.firstIndex()
	if lo < fi {
		return ErrCompacted
	}

	length := l.lastIndex() + 1 - fi
	if hi > fi+length {
		l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex())
	}
	return nil
}

mustCheckOutOfBounds检查了如下几个方面:

  1. 是否满足$lo \le hi$。(slice获取的是左闭右开区间$[lo,hi)$的日志切片。)
  2. 是否满足$lo \ge firstIndex$,否则该范围中部分日志已被压缩,无法获取。
  3. 是否满足$hi \le lastIndex+1$,否则该范围中部分日志还没被追加到当前节点的日志中,无法获取。

slice确保给定范围没有越界后,如果这段范围跨了stable和unstable两部分,那么该方法会分别从Storage获取$[lo,\text{unstable.offset})$、从unstable获取$[\text{unstable.offset},hi)$;否则直接从其中一部分获取完整的切片。在返回切片前,silce还会按照maxSize参数限制返回的切片长度。

3. 复制进度跟踪

在《In Search of an Understandable Consensus Algorithm (Extended Version)》中,leader只通过 nextInext[]matchIndex[] 来跟踪follower的日志进度。而etcd/raft为了解耦不同情况下的日志复制逻辑并实现一些日志复制相关的优化,还需要记录一些其它信息。因此,etcd/raft中leader使用Progress结构体来跟踪每个follower(和learner)的日志复制进度。

3.1 Progress结构体

Progess结构体是leader用来跟踪follower日志复制进度的结构,即“表示从leader视角看到的follower的进度”。leader会为每个follower(和learner)维护各自的Progress结构。官方提供了Progress设计文档,该文档简单介绍了其设计与功能。

Progress的结构如下:


// Progress represents a follower’s progress in the view of the leader. Leader
// maintains progresses of all followers, and sends entries to the follower
// based on its progress.
//
// NB(tbg): Progress is basically a state machine whose transitions are mostly
// strewn around `*raft.raft`. Additionally, some fields are only used when in a
// certain State. All of this isn't ideal.
type Progress struct {
	Match, Next uint64
	// State defines how the leader should interact with the follower.
	//
	// When in StateProbe, leader sends at most one replication message
	// per heartbeat interval. It also probes actual progress of the follower.
	//
	// When in StateReplicate, leader optimistically increases next
	// to the latest entry sent after sending replication message. This is
	// an optimized state for fast replicating log entries to the follower.
	//
	// When in StateSnapshot, leader should have sent out snapshot
	// before and stops sending any replication message.
	State StateType

	// PendingSnapshot is used in StateSnapshot.
	// If there is a pending snapshot, the pendingSnapshot will be set to the
	// index of the snapshot. If pendingSnapshot is set, the replication process of
	// this Progress will be paused. raft will not resend snapshot until the pending one
	// is reported to be failed.
	PendingSnapshot uint64

	// RecentActive is true if the progress is recently active. Receiving any messages
	// from the corresponding follower indicates the progress is active.
	// RecentActive can be reset to false after an election timeout.
	//
	// TODO(tbg): the leader should always have this set to true.
	RecentActive bool

	// ProbeSent is used while this follower is in StateProbe. When ProbeSent is
	// true, raft should pause sending replication message to this peer until
	// ProbeSent is reset. See ProbeAcked() and IsPaused().
	ProbeSent bool

	// Inflights is a sliding window for the inflight messages.
	// Each inflight message contains one or more log entries.
	// The max number of entries per message is defined in raft config as MaxSizePerMsg.
	// Thus inflight effectively limits both the number of inflight messages
	// and the bandwidth each Progress can use.
	// When inflights is Full, no more message should be sent.
	// When a leader sends out a message, the index of the last
	// entry should be added to inflights. The index MUST be added
	// into inflights in order.
	// When a leader receives a reply, the previous inflights should
	// be freed by calling inflights.FreeLE with the index of the last
	// received entry.
	Inflights *Inflights

	// IsLearner is true if this progress is tracked for a learner.
	IsLearner bool
}

Progress中有两个重要的索引:matchnextmatch表示leader所知的该follower的日志中匹配的日志条目的最高index,如果leader不知道该follower的日志状态时,match为0;next表示leader接下来要给该follower发送的日志的第一个条目的index。根据Raft算法论文,next是可能因异常回退的,而match是单调递增的。next小于match的节点会被认为是落后的节点。

Progress的一些常用的方法如下表所示:

方法
描述
ResetState(state StateType) 重置状态为目标状态,该方法会清空所有状态记录的数据。该方法由BecomeXXX方法调用。
BecomeProbe() 将follower转为Probe状态。
BecomeReplicate() 将follower转为Replicate状态。
BecomeSnapshot(snapshoti uint64) 将follower转为Snapshot状态,并指定需要为其发送的快照的index。
MaybeUpdate(n uint64) bool 用于更新follower的进度(match index),如果传入的进度比当前进度旧,则不会更新进度并返回false,该方法还会根据传入的进度更新next index。leader会在收到来自follower的MsgAppResp消息时调用该方法。
OptimisticUpdate(n uint64) 不做检查直接更新next index,用于StateReplicate状态下日志复制流水线优化。
MaybeDecrTo(rejected, last uint64) bool 用于回退next index,该方法会根据参数判断是否需要回退,如果参数是来自过期的消息,那么不会回退。如果回退,则会返回true。
IsPaused() bool 判断为该follower发送消息的发送窗口是否阻塞,发送窗口大小与该follower的状态和Raft的配置有关。

以上的很多方法都与follower的状态有关,因此这里先介绍Progress中规定的3中follower状态。

3.2 follower的3种状态

为了更加清晰地处理leader为follower复制日志的各种情况,etcd/raft将leader向follower复制日志的行为分成三种,记录在ProgressState字段中:

  1. StateProbe:当leader刚刚当选时,或当follower拒绝了leader复制的日志时,该follower的进度状态会变为StateProbe类型。在该状态下,leader每次心跳期间仅为follower发送一条MsgApp消息,且leader会根据follower发送的相应的MsgAppResp消息调整该follower的进度。
  2. StateReplicate:该状态下的follower处于稳定状态,leader会优化为其复制日志的速度,每次可能发送多条MsgApp消息(受Progress的流控限制,后文会详细介绍)。
  3. StateSnapshot:当follower所需的日志已被压缩无法访问时,leader会将该follower的进度置为StateSnapshot状态,并向该follower发送快照。leader不会为处于StateSnapshot状态的follower发送任何的MsgApp消息,直到其成功收到快照。
提示
每条MsgApp消息可以包含多个日志条目。

Progress中的PendingSnapshotProbeSent字段是StateProebeStateSnapshot状态下需要记录的字段,后文会详细讲解。

Progress中的RecentActive字段用来标识该follower最近是否是“活跃”的。该字段除了用于Check Quorum外(详见深入浅出etcd/raft —— 0x03 Raft选举),在日志复制时,leader不会将不活跃的follower转为StateSnapshot状态或发送快照。(这是为了修复issue#3378中提到的问题,感兴趣的读者可以查看该issue和issue#3976)。

ProgressInflights字段是对日志复制操作进行流控的字段。虽然ConfigMaxSizePerMsg字段限制了每条MsgApp消息的字节数,但是在StateReplicate状态下优化日志复制时,每次可能会发送多条MsgApp消息。因此,Config中又加入了MaxInflightMsgs字段来限制每次发送的MsgApp消息数。Inflights实现了MaxInflightMsgs字段配置的流控。

Inflights结构体实现了一个动态扩容的FIFO队列,其中记录了每条MsgAppIndex字段的值,以在收到MsgAppResp的ack时释放队列。Inflights的实现也比较简单,感兴趣的读者可以自行阅读源码学习其实现,这里不再赘述。

Progress的三种状态看做是不同大小的Inflights下的行为(其实并不是这样实现的):

  1. StateProbe => Inflight.size = 1
  2. StateReplicate => Inflight.size = MaxInflightMsgs
  3. StateSnapshot => Inflight.size = 0

IsPaused方法中看到类似的逻辑:


// IsPaused returns whether sending log entries to this node has been throttled.
// This is done when a node has rejected recent MsgApps, is currently waiting
// for a snapshot, or has reached the MaxInflightMsgs limit. In normal
// operation, this is false. A throttled node will be contacted less frequently
// until it has reached a state in which it's able to accept a steady stream of
// log entries again.
func (pr *Progress) IsPaused() bool {
	switch pr.State {
	case StateProbe:
		return pr.ProbeSent
	case StateReplicate:
		return pr.Inflights.Full()
	case StateSnapshot:
		return true
	default:
		panic("unexpected state")
	}
}

3.3 状态转换与更新回退

在进一步分析etcd/raft的日志复制实现时,需要先简单了解BecomeXXX在进行状态转换时的做的操作及更新进度、回退进度的操作。


// BecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
// optionally and if larger, the index of the pending snapshot.
func (pr *Progress) BecomeProbe() {
	// If the original state is StateSnapshot, progress knows that
	// the pending snapshot has been sent to this peer successfully, then
	// probes from pendingSnapshot + 1.
	if pr.State == StateSnapshot {
		pendingSnapshot := pr.PendingSnapshot
		pr.ResetState(StateProbe)
		pr.Next = max(pr.Match+1, pendingSnapshot+1)
	} else {
		pr.ResetState(StateProbe)
		pr.Next = pr.Match + 1
	}
}

BecomeProbe分为两种情况,一种是从StateSnapshot进入StateProbe状态,当leader得知follower成功应用了快照后,需要调用NodeReportSnapshot方法,该方法会调用BecomeProbe将该follower的进度状态转为StateProbe。此时,可以将next index置为该快照的index的下一条。在一般情况下,则从match index处开始检测冲突(Next是下一条应为该follower发送的日志的index,因此应为当前认为的最后一条匹配日志的index+1)。


// BecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
func (pr *Progress) BecomeReplicate() {
	pr.ResetState(StateReplicate)
	pr.Next = pr.Match + 1
}

// BecomeSnapshot moves the Progress to StateSnapshot with the specified pending
// snapshot index.
func (pr *Progress) BecomeSnapshot(snapshoti uint64) {
	pr.ResetState(StateSnapshot)
	pr.PendingSnapshot = snapshoti
}

BecomeReplicateBecomeSnapshot逻辑都很简单,在重置状态后,二者分别设置了相应的next index和正在发送的快照的index。

接下来分析更新match indexnext index与回退next index的相关逻辑:


// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
// index acked by it. The method returns false if the given n index comes from
// an outdated message. Otherwise it updates the progress and returns true.
func (pr *Progress) MaybeUpdate(n uint64) bool {
	var updated bool
	if pr.Match < n {
		pr.Match = n
		updated = true
		pr.ProbeAcked()
	}
	pr.Next = max(pr.Next, n+1)
	return updated
}

// OptimisticUpdate signals that appends all the way up to and including index n
// are in-flight. As a result, Next is increased to n+1.
func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 }

MaybeUpdate会根据传入的index更新MatchNext到更高的值,如果Match更新,则会返回true,同时立刻对StateProbe状态的follower进行确认,否则返回false。其调用者会根据返回值判断该follower是否跟上了复制进度。


// MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The
// arguments are the index the follower rejected to append to its log, and its
// last index.
//
// Rejections can happen spuriously as messages are sent out of order or
// duplicated. In such cases, the rejection pertains to an index that the
// Progress already knows were previously acknowledged, and false is returned
// without changing the Progress.
//
// If the rejection is genuine, Next is lowered sensibly, and the Progress is
// cleared for sending log entries.
func (pr *Progress) MaybeDecrTo(rejected, last uint64) bool {
	if pr.State == StateReplicate {
		// The rejection must be stale if the progress has matched and "rejected"
		// is smaller than "match".
		if rejected <= pr.Match {
			return false
		}
		// Directly decrease next to match + 1.
		//
		// TODO(tbg): why not use last if it's larger?
		pr.Next = pr.Match + 1
		return true
	}

	// The rejection must be stale if "rejected" does not match next - 1. This
	// is because non-replicating followers are probed one entry at a time.
	if pr.Next-1 != rejected {
		return false
	}

	pr.Next = max(min(rejected, last+1), 1)
	pr.ProbeSent = false
	return true
}

MaybeDecrTo的参数有follower拒绝的MsgApp请求的index(rejected,即MsgAppMsgAppRespIndex)和该follower最后一条日志的索引(last,即MsgAppRespRejectHint)。其中,rejected参数是用来判断该消息是否是过期的消息的,其判断逻辑如下:

  • 如果follower的状态为StateReplicateNext应该是跟上Match的进度的,那么如果rejected不大于Match,那么该消息过期。
  • 在其它状态下,Next可能没有跟上Match的进度,因此不能通过Match判断。由于其它状态下至多只会为其发送一条日志复制请求,因此只要rejected不等于Next-1,该消息就是过期的。

MaybeDecrTo不会对过期的消息进行处理。否则,将回退NextNext的回退有两种方案:

  • 回退一条日志。即新的Next为上一条Next-1,这里的Next-1即为发送MsgApp时用于日志匹配的Index字段的值,也是rejected的值。
  • 快速回退,回退到该follower的最后一条日志。即新的Next为该follower最后一条日志的后一条日志的index,即last+1

4. etcd/raft中日志复制实现

4.1 节点启动时日志处理

在节点启动时,日志的last index就是稳定存储Storagelast index。follower和candidate不需要对日志进行额外的处理,而leader需要获取每个follower(和learner)的进度,并以当前term提交一条空日志条目,以提交之前term的日志(详见《In Search of an Understandable Consensus Algorithm (Extended Version)》中5.4.2 Committing entries from previous terms)。同样,当新leader当选时,也需要做同样的操作。如下是becomeLeaderreset中与日志复制有关的源码:


func (r *raft) becomeLeader() {
	// ... ...
	r.reset(r.Term)
	// ... ...

	// Followers enter replicate mode when they've been successfully probed
	// (perhaps after having received a snapshot as a result). The leader is
	// trivially in this state. Note that r.reset() has initialized this
	// progress with the last index already.
	r.prs.Progress[r.id].BecomeReplicate()

	// ... ...

	emptyEnt := pb.Entry{Data: nil}
	if !r.appendEntry(emptyEnt) {
		// This won't happen because we just called reset() above.
		r.logger.Panic("empty entry was dropped")
	}
	// As a special case, don't count the initial empty entry towards the
	// uncommitted log quote. This is because we want to preserve the
	// behavior of allowing one entry larger than quote if the current
	// usage is zero.
	r.reduceUncommittedSize([]pb.Entry{emptyEnt})
	r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}

func (r *raft) reset(term uint64) {
	// ... ...
	r.prs.ResetVotes()
	r.prs.Visit(func(id uint64, pr *tracker.Progress) {
		*pr = tracker.Progress{
			Match:     0,
			Next:      r.raftLog.lastIndex() + 1,
			Inflights: tracker.NewInflights(r.prs.MaxInflight),
			IsLearner: pr.IsLearner,
		}
		if id == r.id {
			pr.Match = r.raftLog.lastIndex()
		}
	})
	// ... ...
}

becomeLeader调用reset方法时,会初始化所有节点的next index为leader日志的last index + 1。因为leader刚当选时不知道除了自己之外的节点的复制进度,将除自己外的所有节点的match index置为0,而将自己的match index置为自己的last index

随后,leader会在当前term为自己的日志追加一条空日志条目,以提交之前term的日志(详见《In Search of an Understandable Consensus Algorithm (Extended Version)》中5.4.2 Committing entries from previous terms)。

在将控日志条目加入到日志后,有一行r.reduceUncommittedSize([]pb.Entry{emptyEnt})代码。想了解这样代码的作用,需要先了解etcd/raft中避免新日志过多无法处理速度跟不上的机制。

Config中,可以看到如下的一条配置:


	// MaxUncommittedEntriesSize limits the aggregate byte size of the
	// uncommitted entries that may be appended to a leader's log. Once this
	// limit is exceeded, proposals will begin to return ErrProposalDropped
	// errors. Note: 0 for no limit.
	MaxUncommittedEntriesSize uint64

该配置用于限制leader日志中未提交日志的最大字节数,如果超过该值则丢弃新提议,以避免新日志过多处理速度跟不上。当该值为0时,表示不设限制。etcd/raft是以如下方式实现该约束的:

  • 在leader调用appendEntry方法向日志追加新条目时,appendEntry方法会调用increaseUncommittedSize(ents []pb.Entry) bool方法,该方法会根据配置与raft结构体中的uncommittedSize字段判断追加后会不会超过MaxUncommittedEntriesSize的限制,如果超过了该限制,会返回false,appendEntry方法会拒绝这些提议,如果没有超过限制,则仅增大uncommittedSize字段字段并返回true。需要注意的是,当uncommittedSize字段为0时不会拒绝提议,以保证leader不会因单条较大的MsgProp消息阻塞;同样该方法也不会拒绝空日志条目,因为其常用于新当选的leader提交之前的term的日志或离开joint configuration
  • 在etcd/raft的使用者调用NodeAdvance方法时,会调用reduceUncommittedSize(ents []pb.Entry)方法,以释放流控容量。

increaseUncommittedSizereduceUncommittedSize的源码如下:


// increaseUncommittedSize computes the size of the proposed entries and
// determines whether they would push leader over its maxUncommittedSize limit.
// If the new entries would exceed the limit, the method returns false. If not,
// the increase in uncommitted entry size is recorded and the method returns
// true.
//
// Empty payloads are never refused. This is used both for appending an empty
// entry at a new leader's term, as well as leaving a joint configuration.
func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
	var s uint64
	for _, e := range ents {
		s += uint64(PayloadSize(e))
	}

	if r.uncommittedSize > 0 && s > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
		// If the uncommitted tail of the Raft log is empty, allow any size
		// proposal. Otherwise, limit the size of the uncommitted tail of the
		// log and drop any proposal that would push the size over the limit.
		// Note the added requirement s>0 which is used to make sure that
		// appending single empty entries to the log always succeeds, used both
		// for replicating a new leader's initial empty entry, and for
		// auto-leaving joint configurations.
		return false
	}
	r.uncommittedSize += s
	return true
}

// reduceUncommittedSize accounts for the newly committed entries by decreasing
// the uncommitted entry size limit.
func (r *raft) reduceUncommittedSize(ents []pb.Entry) {
	if r.uncommittedSize == 0 {
		// Fast-path for followers, who do not track or enforce the limit.
		return
	}

	var s uint64
	for _, e := range ents {
		s += uint64(PayloadSize(e))
	}
	if s > r.uncommittedSize {
		// uncommittedSize may underestimate the size of the uncommitted Raft
		// log tail but will never overestimate it. Saturate at 0 instead of
		// allowing overflow.
		r.uncommittedSize = 0
	} else {
		r.uncommittedSize -= s
	}
}

而在becomeLeader中,写入空日志条目后,空日志条目也可能会占用一部分容量(与计算日志大小的方式有关,之前etcd/raft中直接通过protobuf生成的Size()方法计算,其会受其它字段影响,详见pull#10199)。因此,为了不将空日志条目的大小记在其中,其调用了reduceUncommittedSize方法。

becomeLeader执行完后,就会进入正常的处理逻辑。此时,所有follower的进度状态为默认状态StateProbe,以便leader获取follower的进度。

4.2 leader中的日志提议

“提议”是新的日志条目的起点,因此这里从日志的提议开始分析。

日志的提议是通过MsgProp消息实现的。candidate或pre candidate直接丢弃MsgProp消息,follower会将MsgProp消息转发给leader,只有leader会对MsgProp消息做真正的处理:

	// stepCandidate
	// ... ...
	case pb.MsgProp:
		r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
		return ErrProposalDropped

	// stepFollower :
	// ... ...
	case pb.MsgProp:
		if r.lead == None {
			r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
			return ErrProposalDropped
		} else if r.disableProposalForwarding {
			r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
			return ErrProposalDropped
		}
		m.To = r.lead
		r.send(m)

	// stepLeader :
	// ... ...
	case pb.MsgProp:
		if len(m.Entries) == 0 {
			r.logger.Panicf("%x stepped empty MsgProp", r.id)
		}
		if r.prs.Progress[r.id] == nil {
			// If we are not currently a member of the range (i.e. this node
			// was removed from the configuration while serving as leader),
			// drop any new proposals.
			return ErrProposalDropped
		}
		if r.leadTransferee != None {
			r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
			return ErrProposalDropped
		}

		// Process ConfChange Msg
		//... ...

		if !r.appendEntry(m.Entries...) {
			return ErrProposalDropped
		}
		r.bcastAppend()
		return nil

在leader将MsgProp中的提议追加到本地日志之前,还需要做一些判断与处理:

  1. 首先leader会检查自己的Progress结构是否还存在,以判断自己是否已经被ConfChange操作移出了集群,如果该leader被移出了集群,则不会处理该提议。
  2. 接着,leader还会判断当前是否在进行leader transfer,如果该leader正在将领导权转移给其它节点,那么同样不会处理该提议。
  3. 如果提议中包含ConfChange消息,会做特殊处理,在后文介绍ConfChange时会分析这部分逻辑,这里暂时不做介绍。
  4. 如果在追加提议中的日志后会超过MaxUncommittedSize的限制,则不会追加该提议。这部分逻辑在4.1 节点启动时日志处理已经做过介绍,这里不再赘述。

如果leader成功地将这些日志追加到了本地日志中,leade会调用bcastAppend方法,为所有follower(和learner)广播日志追加消息。

4.3 leader为follower复制日志

leader通过bcastAppend方法为follower(和learner)复制日志,该方法及其相关方法源码如下:


// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.prs.
func (r *raft) bcastAppend() {
	r.prs.Visit(func(id uint64, _ *tracker.Progress) {
		if id == r.id {
			return
		}
		r.sendAppend(id)
	})
}

// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
	r.maybeSendAppend(to, true)
}

// maybeSendAppend sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
	pr := r.prs.Progress[to]
	if pr.IsPaused() {
		return false
	}
	m := pb.Message{}
	m.To = to

	term, errt := r.raftLog.term(pr.Next - 1)
	ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
	if len(ents) == 0 && !sendIfEmpty {
		return false
	}

	if errt != nil || erre != nil { // send snapshot if we failed to get term or entries

		// ... ... #1

	} else {

		// ... ... #2

	}
	r.send(m)
	return true
}

leader在调用bcastAppend方法时,会向所有其它节点广播MsgAppMsgSnap消息,且即使是空消息也会广播。这里需要关注的是maybeSendAppend的实现,该函数是向一个节点发送MsgAppMsgSnap消息的方法。

该方法的大致流程如下:

  1. 首先通过PrgoressIsPaused方法检查该节点进度,如果该节点进度阻塞了,那么不会向其发送消息。
  2. 获取用于日志匹配的日志条目(index为next index - 1的日志)的term。
  3. 获取该节点的next index之后的日志。
  4. 如果日志长度为0且不需要发送空日志,那么直接返回。
  5. 如果步骤2、3中任一步骤产生了错误,说明用于日志匹配的条目已被压缩,该节点落后了太多,因此需要为其发送MsgSnap消息;否则,发送MsgApp消息。
  6. 调用send方法,填充需要发送的消息中缺失的字段(如TermFrom,具体逻辑见send方法的源码,这里不再赘述),并将消息放入该节点的信箱。由于etcd/raft不负责通信模块,因此这里不会真正发送,而是通过Ready结构体将需要发送的消息交给etcd/raft的使用者处理。

接下来先来分析第5步中MsgApp消息的生成方式:


		// ... ... #2

		m.Type = pb.MsgApp
		m.Index = pr.Next - 1
		m.LogTerm = term
		m.Entries = ents
		m.Commit = r.raftLog.committed
		if n := len(m.Entries); n != 0 {
			switch pr.State {
			// optimistically increase the next when in StateReplicate
			case tracker.StateReplicate:
				last := m.Entries[n-1].Index
				pr.OptimisticUpdate(last)
				pr.Inflights.Add(last)
			case tracker.StateProbe:
				pr.ProbeSent = true
			default:
				r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
			}
		}

这段逻辑配置了MsgApp消息的相关字段。IndexLogTerm字段是用于日志匹配的日志(即发送的日志的上一条日志)的index与term(用于日志匹配的term字段为LogTerm,消息的Term字段为该节点当前的term,部分消息需要自己指定,部分消息由send方法填充)。Entries字段保存了需要复制的日志条目。Commit字段为leader提交的最后一条日志的索引。

如果该消息携带的日志非空,该方法还会更新该follower的进度状态:

  • 如果节点处于StateReplicate状态,此时通过流水线的方式优化日志复制速度,直接更新其Next索引(详见1.3节),并通过Inflights进行流控(详见3.2节)。
  • 如果节点处于StateProbe状态,此时将ProbeSent置为true,阻塞后续的消息,直到收到确认。

在分析了MsgApp消息的生成方式后,接下来分析MsgSnap消息的生成:


		// ... ... #1

		if !pr.RecentActive {
			r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
			return false
		}

		m.Type = pb.MsgSnap
		snapshot, err := r.raftLog.snapshot()
		if err != nil {
			if err == ErrSnapshotTemporarilyUnavailable {
				r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
				return false
			}
			panic(err) // TODO(bdarnell)
		}
		if IsEmptySnap(snapshot) {
			panic("need non-empty snapshot")
		}
		m.Snapshot = snapshot
		sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
		r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
			r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
		pr.BecomeSnapshot(sindex)
		r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)

在准备快照之前,这段逻辑线判断了该follower节点最近是否是活跃的,如果不活跃则不会为其发送快照(详见3.2节)。

在生成快照并检测快照无误后,需要通过BecomeSnapshot方法将该follower的状态转为StateSnapshot,以阻塞该节点后续的MsgApp消息。

在follower转为StateSnapshot后,只有两种跳出StateSnapshot的方法:

  1. follower节点应用快照后会发送MsgAppResp消息,该消息会报告当前follower的last index。如果follower应用了快照后last index就追赶上了其match index,那么leader会直接将follower的状态转移到StateRelicate状态,为其继续复制日志。
  2. leader节点的使用者还需要主动调用NodeReportSnapshot方法告知leader节点快照的应用状态,leader会将该follower的状态转移到StateProbe状态(与方法1重复的消息会被忽略)。

方法1的逻辑会在后文介绍leader对MsgAppResp消息的处理时介绍,这里仅介绍方法2。

ReadyEntries字段的注释中可以看到,如果需要发送的日志中包含MsgSnap消息,那么当前节点(一定是leader节点)必须在目标节点应用完该消息后调用NodeReportSnapshot方法上报快照状态。该方法在rawnode中的实现方式如下:


// ReportSnapshot reports the status of the sent snapshot.
func (rn *RawNode) ReportSnapshot(id uint64, status SnapshotStatus) {
	rej := status == SnapshotFailure

	_ = rn.raft.Step(pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej})
}

ReportSnapshot方法会将一条MsgSnapStatus消息应用给leader状态机。如果快照应用失败,该消息的Reject字段会被置为true。只有leader需要处理MsgSnapStatus消息,其处理方式如下:


		// stepLeader
		// ... ...

		case pb.MsgSnapStatus:
		if pr.State != tracker.StateSnapshot {
			return nil
		}
		// TODO(tbg): this code is very similar to the snapshot handling in
		// MsgAppResp above. In fact, the code there is more correct than the
		// code here and should likely be updated to match (or even better, the
		// logic pulled into a newly created Progress state machine handler).
		if !m.Reject {
			pr.BecomeProbe()
			r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
		} else {
			// NB: the order here matters or we'll be probing erroneously from
			// the snapshot index, but the snapshot never applied.
			pr.PendingSnapshot = 0
			pr.BecomeProbe()
			r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
		}
		// If snapshot finish, wait for the MsgAppResp from the remote node before sending
		// out the next MsgApp.
		// If snapshot failure, wait for a heartbeat interval before next try
		pr.ProbeSent = true

从这段逻辑中可以看到,无论快照应用是否成功,leader都会将该follower的进度状态转为StateProbe状态。不同之处仅在于在调用BecomeProbe方法之前是否将PendingSnapshot的值置为0。读者可以回顾ResetState(StateProbe)的实现(详见3.3节),在没有将其置为0时,下一次检测日志匹配时会从该follower的match index + 1和该快照的index+1二者中较大者开始检测;而将其置为0后,只会从该follower的match index + 1开始检测。

4.4 follower处理来自leader的日志

follower处理来自leader的日志复制消息时,同样分为对MsgApp和对MsgSnap的处理,handleAppendEntries方法用来处理MsgApp消息、handleSnapshot用来处理MsgSnap消息。在处理这两种消息时,都会使用MsgAppResp方法对其进行相应。


func (r *raft) handleAppendEntries(m pb.Message) {
	if m.Index < r.raftLog.committed {
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
		return
	}

	if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
	} else {
		r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
			r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
	}
}

follower对MsgApp消息的处理可分为如下情况:

  1. 如果用于日志匹配的条目在committed之前,说明这是一条过期的消息,因此直接返回MsgAppResp消息,并将消息的Index字段置为committed的值,以让leader快速更新该follower的next index
  2. 接下来,验证用于日志匹配的字段TermIndex是否与本地的日志匹配。如果匹配并保存了日志,则返回MsgAppResp消息,并将消息的Index字段置为本地最后一条日志的index,以让leader发送后续的日志。
  3. 如果日志不匹配,返回的MsgAppRespReject字段会被置为true,且RejectHint字段值为本地最后一条日志的索引,以便leader快速回退其next index。同时,MsgAppIndex字段会透传给MsgAppResp,以便leader校验该消息是否为过期的消息。

关于RejectHint的使用在1.1节3.3节中已经介绍过,这里不再赘述。需要注意的是,这里的“返回”,指的同样是将消息存入相应节点的信箱中,等待etcd/raft模块的使用者处理Ready结构体时发送给相应的节点。

handleSnapshot的处理方式如下:


func (r *raft) handleSnapshot(m pb.Message) {
	sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
	if r.restore(m.Snapshot) {
		r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
			r.id, r.raftLog.committed, sindex, sterm)
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
	} else {
		r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
			r.id, r.raftLog.committed, sindex, sterm)
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
	}
}

处理MsgSnap消息时,handleSnapshot方法会调用restore方法尝试应用快照。如果快照应用功能成功,则返回一条MsgAppResp消息,该消息的Index字段为本地最后一条日志的index;而如果快照没有被应用,那么返回的MsgAppResp消息的Index字段会被置为本地的committed索引。

可以看出,对MsgSnap消息的处理,重点在restore方法的实现。


// restore recovers the state machine from a snapshot. It restores the log and the
// configuration of state machine. If this method returns false, the snapshot was
// ignored, either because it was obsolete or because of an error.
func (r *raft) restore(s pb.Snapshot) bool {
	if s.Metadata.Index <= r.raftLog.committed {
		return false
	}
	
	// ... ...

	// Now go ahead and actually restore.

	if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
		r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
			r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
		r.raftLog.commitTo(s.Metadata.Index)
		return false
	}

	r.raftLog.restore(s)

	// ... ...

	r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] restored snapshot [index: %d, term: %d]",
		r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
	return true
}

restore方法中有一些对ConfChange的处理,这部分会在本系列后续的文章中介绍,这里暂时略过。除此之外,restore中还有一些防止不应发生的情况的“Defense in depth”代码,这里也不做介绍,感兴趣的读者可以自行结合注释了解。

restore对快照做了如下处理:

  1. 如果快照的index没超过本地的committed索引,这说明快照过旧,因此不做处理直接返回false。
  2. 将快照的index和term与本地日志匹配,如果成功匹配,说明本地日志已经包含了快照覆盖的日志,因此不要应用该快照。同时,因为快照覆盖的日志都应是已被提交的日志,这也说明了本地的committed索引落后了,因此调用raftLogcommitTo方法,让本地committed索引快速前进到该快照的index,然后直接返回false。
  3. 如果到这里方法仍没返回,则可以将快照应用到本地。调用raftLogrestore方法,并返回true。

无论是处理MsgApp消息还是处理MsgSnap消息,返回的消息都是MsgAppResp。下一节中将分析leader对MsgAppResp消息的处理方式。

4.5 leader处理来自follower的日志复制响应

stepLeader方法在处理MsgAppResp消息时,会根据该消息和发送该消息的follower的状态来进行不同的处理:

	
	// stepLeader
	// ... ...

	case pb.MsgAppResp:
		pr.RecentActive = true

		if m.Reject {
			r.logger.Debugf("%x received MsgAppResp(MsgApp was rejected, lastindex: %d) from %x for index %d",
				r.id, m.RejectHint, m.From, m.Index)
			if pr.MaybeDecrTo(m.Index, m.RejectHint) {
				r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
				if pr.State == tracker.StateReplicate {
					pr.BecomeProbe()
				}
				r.sendAppend(m.From)
			}
		} else {
			oldPaused := pr.IsPaused()
			if pr.MaybeUpdate(m.Index) {
				switch {
				case pr.State == tracker.StateProbe:
					pr.BecomeReplicate()
				case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
					// TODO(tbg): we should also enter this branch if a snapshot is
					// received that is below pr.PendingSnapshot but which makes it
					// possible to use the log again.
					r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
					// Transition back to replicating state via probing state
					// (which takes the snapshot into account). If we didn't
					// move to replicating state, that would only happen with
					// the next round of appends (but there may not be a next
					// round for a while, exposing an inconsistent RaftStatus).
					pr.BecomeProbe()
					pr.BecomeReplicate()
				case pr.State == tracker.StateReplicate:
					pr.Inflights.FreeLE(m.Index)
				}

				if r.maybeCommit() {
					r.bcastAppend()
				} else if oldPaused {
					// If we were paused before, this node may be missing the
					// latest commit index, so send it.
					r.sendAppend(m.From)
				}
				// We've updated flow control information above, which may
				// allow us to send multiple (size-limited) in-flight messages
				// at once (such as when transitioning from probe to
				// replicate, or when freeTo() covers multiple messages). If
				// we have more entries to send, send as many messages as we
				// can (without sending empty messages for the commit index)
				for r.maybeSendAppend(m.From, false) {
				}
				
				// ... ...

			}
		}

这段逻辑首先处理了MsgAppRespReject字段为true的情况,这只会在follower处理MsgApp消息时发现日志条目不匹配时发生。因此,处理这种消息时,调用了MaybeDecrTo方法回退其Next索引。如果回退失败,说明这是一条过期的消息,不做处理;如果回退成功,且该节点为StateReplicate状态,则调用BecomeProbe使其转为StateProbe状态来查找最后一条匹配日志的位置。回退成功时,还会再次为该节点调用sendAppend方法,以为其发送MsgApp消息。

在处理MsgAppRespReject为false的消息时,其会调用MaybeUpdate方法来判断该消息的Index字段是否跟上了该follower的match index,并在需要时更新其next index。如果该消息没有跟上match index,那么不会对该消息做其它处理。其原因有三:

  1. 这条消息是过期的消息,不需要处理。
  2. 这条消息可能是follower应用快照发来的响应,且此时该follower仍未跟上其match index(可能是follower重启恢复后导致的)。此处后续处理逻辑即为在4.3节中提到的跳出StateSnapshot的第1中情况;如果这里因没跟上match index而没有跳出StateSnapshot状态,也会在etcd/raft模块使用者主动调用ReportSnapshot方法时跳出该状态。因此不会阻塞。
  3. 这条消息可能是StateProbe状态的follower发来的确认相应,但此时该follower仍未跟上其match index(可能是follower重启恢复后导致的)。因在一次心跳周期内,leader仅应向处于StateProbe状态的follower发送1条MsgApp消息,因此其释放应在心跳相关的逻辑中,该逻辑会在后文分析。因此也不会阻塞。

在分析完为什么这里仅处理跟上match indexMsgAppResp消息后,接下来其处理方式。

首先,该方法会根据发送该消息的follower的状态进行处理:

  1. 如果该follower处于StateProbe状态且现在跟上了进度,则将其转为StateReplica状态。
  2. 如果该follower处于StateSnapshot状态且现在跟上了进度,且从该follower发送该消息后到leader处理这条消息时,leader没有为其发送新快照(通过比较MatchPendingSnapshot判断),则将其转为StateReplica状态。
  3. 如果该follower处于StateReplicate状态,那么释放Inflights中该消息的Index字段值之前的所有消息。因为收到的MsgAppResp可能是乱序的,因此需要释放之前的所有消息(过期消息不会被处理)。

接下来,该方法调用了maybeCommit方法,该方法会根据所有节点的进度更新leader的commit index,在commit index有更新时返回true,否则返回false(该方法中有与成员变更相关的逻辑,这里暂时不对其进行分析,而是将其留给后续的文章,这里只需要知道其功能即可)。如果commit index有更新,那么调用bcastAppend方法广播新的committed索引。如果commit index没有更新,还需要进一步判断该follower之前是否是阻塞的,如果是那么为该follower发送一条日志复制消息以更新其committed索引,因为在该节点阻塞时可能错过了committed索引的更新消息。

接着,通过for循环继续为该节点发送新的日志复制消息。因为日志复制部分有流控逻辑,因此这里的循环不会成为死循环。这样做可以尽可能多地为节点复制日志,以提高日志复制效率。

最后这里还有一处leader transfer的逻辑,此处在本系列介绍Raft选举时有提到过,这里不再赘述。

4.6 心跳消息中日志复制相关操作

除了MsgAppMsgSnapMsgAppResp消息外,心跳消息MsgHeartbeat即其相应的相应消息MsgHeartbeatResp中也有一些与日志复制相关的逻辑(如StateProbe状态下释放阻塞状态的逻辑)。虽然这部分逻辑不多但同样重要,本节分析这部分逻辑。

首先是leader为follower发送心跳消息时的相关逻辑:


// stepLeader
// ... ...
case pb.MsgBeat:
	r.bcastHeartbeat()

// ... ...

// sendHeartbeat sends a heartbeat RPC to the given peer.
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
	// Attach the commit as min(to.matched, r.committed).
	// When the leader sends out heartbeat message,
	// the receiver(follower) might not be matched with the leader
	// or it might not have all the committed entries.
	// The leader MUST NOT forward the follower's commit to
	// an unmatched index.
	commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
	m := pb.Message{
		To:      to,
		Type:    pb.MsgHeartbeat,
		Commit:  commit,
		Context: ctx,
	}

	r.send(m)
}

tickHeartbeat方法中,每次心跳会将一条MsgBeat应用到状态机。该消息会触发bcastHeartbeat方法,为其它节点广播心跳消息。bcastHeartbeat方法中有一些与实现线性一致性读相关的逻辑,这里将其留给本系列的后续文章。这里只需要看该方法最后调用的sendHeartbeat方法,方法生成的MsgHeartbeat消息中的Index字段为leader的committed索引。而在follower处理MsgHeartbeat消息时,会根据该字段更新自己的committed索引,以避免空闲集群没有新提议无法更新follower的committed状态的问题。


func (r *raft) handleHeartbeat(m pb.Message) {
	r.raftLog.commitTo(m.Commit)
	r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}

随后,follower会向leader发送MsgHeartbeatResp消息作为响应。leader在处理该消息时,主要做的也是线性一致性读相关的处理,但也有部分与日志复制相关的逻辑:


	case pb.MsgHeartbeatResp:
		pr.RecentActive = true
		pr.ProbeSent = false

		// free one slot for the full inflights window to allow progress.
		if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
			pr.Inflights.FreeFirstOne()
		}
		if pr.Match < r.raftLog.lastIndex() {
			r.sendAppend(m.From)
		}
		
		// ... ...

在leader收到新条例响应时,会重置ProbeSent为false,以在下一个心跳周期继续为处于StateProbe的follower复制日志。

同时,如果该follower处于StateReplicate状态且其用于流控的Inflights已满,leader会为其释放一个Inflights的槽位,以保证在每个心跳周期处于StateReplicate状态的follower都至少能收到一条MsgApp消息。

最后,如果该节点的match index小于leader当前最后一条日志,则为其调用sendAppend方法来复制新日志。

5. Q & A

5.1 为什么raftLog使用了unstable也能保证安全性?

etcd/raft为了能够批处理网络与磁盘I/O,在raftLog中设计了一段还未保存到稳定存储的unstable段。在阅读日志复制部分代码时,有些读者可能会有这一疑惑:

  • follower回复MsgAppResp请求时Index字段为整个raftLoglast index,其中包括了unstable段。而leader会根据MsgAppRespIndex字段更新follower的match index,且leader会根据quorum的match index计算committed index。那么会不会出现被commit的日志其实还没有被quorum的节点保存到稳定存储从而无法保证安全性的情况?

显然,如果日志在commit之前没有被quorum的节点保存到稳定存储,那么的确存在日志丢失的情况。在《Consensus: Bridging theory and practice》的11.7.3 Avoiding persistent storage writes中确实提到了这种设计引用1。但是etcd/raft中,其实并不会出现没有被quorum节点保存到稳定存储就commit的情况。这与Ready要求的字段处理顺序有关。

首先,正如上文提到,因为etcd/raft中的网络操作也是批处理设计,因此send方法只是将消息放入信箱,而不是立刻将其发出(etcd/raft也没有通信模块)。因此,当follower收到MsgApp请求时,执行的操作实际上是(不考虑特殊情况):

  1. 将新日志追加到unstable中。
  2. 将包含unstablelast indexMsgAppResp消息放入信箱,等待发送。

当用户收到下一个Ready结构体时,其收到的其实是如下内容:


// node.go
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
	rd := Ready{
		Entries:          r.raftLog.unstableEntries(),
		CommittedEntries: r.raftLog.nextEnts(),
		Messages:         r.msgs,
	}
	if softSt := r.softState(); !softSt.equal(prevSoftSt) {
		rd.SoftState = softSt
	}
	if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
		rd.HardState = hardSt
	}
	if r.raftLog.unstable.snapshot != nil {
		rd.Snapshot = *r.raftLog.unstable.snapshot
	}
	if len(r.readStates) != 0 {
		rd.ReadStates = r.readStates
	}
	rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))
	return rd
}

// log.go
func (l *raftLog) unstableEntries() []pb.Entry {
	if len(l.unstable.entries) == 0 {
		return nil
	}
	return l.unstable.entries
}

可以看到,Ready结构体的Entries字段是全量的unstable段的日志,Messages字段是全量的信箱中的消息。而Ready结构体的处理顺序必须满足如下顺序:

  1. 先将ReadyEntriesHardStateSnapshot保存到稳定存储(如果值非空)。
  2. 再发送ReadyMessages字段中的消息。

因此,在etcd/raft模块的使用者将含有unstablelast indexMsgAppResp消息发出之前,unstable中的所有日志已经被保存到了稳定存储中。所以,当leader收到该MsgAppResp并根据其Index字段更新该follower的match index时,match index之前的消息确实被保存到了该follower的稳定存储中。

关于稳定存储与安全性,《Consensus: Bridging theory and practice》给出了更详细的描述与形式化的证明,这里引用2再摘录部分与本问题相关的段落,便于读者参考。

引用1

11.7.3 Avoiding persistent storage writes

Many papers suggest using replication rather than stable storage for durability. For example, in Viewstamped Replication Revisited, servers do not write log entries to stable storage. When a server restarts, its log is not used for voting until it learns the current information (its disk is only used as an optimization to avoid network transfers). The trade-off is that data loss is possible in catastrophic events. For example, if a majority of the cluster were to restart simultaneously, the cluster would have potentially lost entries and would not be able to form a new view. Raft could be extended in similar ways to support disk-less operation, but we think the risk of availability or data loss usually outweighs the benefits.

引用2

3.8 Persisted state and server restarts

… …

Each server also persists new log entries before they are counted towards the entries’ commitment; this prevents committed entries from being lost or “uncommitted” when servers restart.

… …

The state machine can either be volatile or persistent. A volatile state machine must be recovered after restarts by reapplying log entries (after applying the latest snapshot; see Chapter 5). A persistent state machine, however, has already applied most entries after a restart; to avoid reapplying them, its last applied index must also be persistent.

… …

If a server loses any of its persistent state, it cannot safely rejoin the cluster with its prior identity. Such a server can usually be added back into the cluster with a new identity by invoking a cluster membership change (see Chapter 4). If a majority of the cluster loses its persistent state, however, log entries may be lost and progress on cluster membership changes will not be possible; to proceed, a system administrator would need to admit the possibility of data loss.

5.2 Entries、HardState、Snapshot持久化顺序有要求吗?

在处理Ready结构体时,除了要保证先持久化再发送消息的顺序,需要持久化的字段的保存顺序也值得关注。官方的建议是按照EntriesHardStateSnapshot的顺序持久化。因为在raft初始化加载HardState时,会检查commit index是否在[snapshot last index, log last index)范围内,


// raft.go
func (r *raft) loadState(state pb.HardState) {
	if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
		r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
	}
	r.raftLog.committed = state.Commit
	r.Term = state.Term
	r.Vote = state.Vote
}

在etcd的预写日志wal的实现中,EntriesHardState时同步落盘的,以避免重启时不一致的问题。

6. 总结

本文会对etcd/raft中Raft日志复制算法的实现与优化进行分析。由于etcd/raft中对日志复制的优化大部分属于实现上的优化,因此本文讲解优化理论的部分较少,而讲解etcd/raft中日志复制实现的部分较多。

因为日志复制的逻辑涉及到的方面多、逻辑复杂、经过数年的版本演进部分逻辑难以理解,因此本文详细地分析了etcd/raft中与日志复制相关的几乎所有逻辑,以供读者参考。这里不建议读者通读本文讲解实现的部分,而是按照自己的节奏阅读源码,在遇到难以理解的部分时可以将本文作为补充参考。

参考文献

[1] Ongaro D, Ousterhout J. In search of an understandable consensus algorithm[C]//2014 {USENIX} Annual Technical Conference ({USENIX}{ATC} 14). 2014: 305-319.

[2] Ongaro D, Ousterhout J. In search of an understandable consensus algorithm (extended version)[J]. Retrieved July, 2016, 20: 2018.

[3] Ongaro D. Consensus: Bridging theory and practice[D]. Stanford University, 2014.

[4] Raft 笔记(五) – Log replication. 我叫尤加利(技术博客)

[5] Raft协议实现学习之—初始化和Leader Election过程. BUCKET & HAMMER(技术博客)(其中对unstable的讨论存在问题,但该文仍提出了一些很好的问题)