深入浅出etcd/raft —— 0x03 Raft选举
本文为原创文章,转载请严格遵守CC BY-NC-SA协议。
0. 引言
本文会对etcd/raft中Raft选举算法的实现与优化进行分析。这里假定读者阅读过Diego Ongaro的《In Search of an Understandable Consensus Algorithm (Extended Version)》(这里有笔者的翻译,笔者英语水平一般,欢迎指正。),其中提到的部分,本文中不会做详细的解释。对etcd/raft的总体结构不熟悉的读者,可以先阅读《深入浅出etcd/raft —— 0x02 etcd/raft总体设计》。
本文首先会简单介绍etcd/raft对Raft选举部分的算法优化,然后通过源码分析etcd/raft的选举实现。
1. Raft选举算法优化
在leader选举方面,etcd/raft对《In Search of an Understandable Consensus Algorithm (Extended Version)》中介绍的基本Raft算法做了三种优化。这三种优化都在Diego Ongaro的博士论文《CONSENSUS: BRIDGING THEORY AND PRACTICE》的6.4 Processing read-only queries more efficiently和9.6 Preventing disruptions when a server rejoins the cluster中有提到。
etcd/raft实现的与选举有关的优化有Pre-Vote、Check Quorum、和Leader Lease。在这三种优化中,只有Pre-Vote和Leader Lease最初是对选举过程的优化,Check Quorum期初是为了更高效地实现线性一致性读(Linearizable Read)而做出的优化,但是由于Leader Lease需要依赖Check Quorum,因此笔者也将其放在这里一起讲解。本系列将etcd/raft对实现线性一致性读的优化留在了后续的文章中,本文仅介绍为了实现更高效的线性一致性读需要在选举部分做出的优化。
除此之外,etcd/raft还实现了Leader Transfer,即主动地进行leader的交接。其实现方式比较简单,只需要让希望成为新leader节点主动发起投票请求即可,这里不再过多讲解。需要注意的是,Leader Transfer不保证交接一定成功,只有目标节点能够得到数量达到quorum的选票时才能当选leader,Leader Transfer类型的投票不受Pre-Vote、Check Quorum、Leader Lease机制约束。
1.1 Pre-Vote
如下图所示,当Raft集群的网络发生分区时,会出现节点数达不到quorum(达成共识至少需要的节点数)的分区,如图中的Partition 1。
在节点数能够达到quorum的分区中,选举流程会正常进行,该分区中的所有节点的term最终会稳定为新选举出的leader节点的term。不幸的是,在节点数无法达到quorum的分区中,如果该分区中没有leader节点,因为节点总是无法收到数量达到quorum的投票而不会选举出新的leader,所以该分区中的节点在election timeout超时后,会增大term并发起下一轮选举,这导致该分区中的节点的term会不断增大。
如果网络一直没有恢复,这是没有问题的。但是,如果网络分区恢复,此时,达不到quorum的分区中的节点的term值会远大于能够达到quorum的分区中的节点的term,这会导致能够达到quorum的分区的leader退位(step down)并增大自己的term到更大的term,使集群产生一轮不必要的选举。
Pre-Vote机制就是为了解决这一问题而设计的,其解决的思路在于不允许达不到quorum的分区正常进入投票流程,也就避免了其term号的增大。为此,Pre-Vote引入了“预投票”,也就是说,当节点election timeout超时时,它们不会立即增大自身的term并请求投票,而是先发起一轮预投票。收到预投票请求的节点不会退位。只有当节点收到了达到quorum的预投票响应时,节点才能增大自身term号并发起投票请求。这样,达不到quorum的分区中的节点永远无法增大term,也就不会在分区恢复后引起不必要的一轮投票。
1.2 Check Quorum
在Raft算法中,保证线性一致性读取的最简单的方式,就是讲读请求同样当做一条Raft提议,通过与其它日志相同的方式执行,因此这种方式也叫作Log Read。显然,Log Read的性能很差。而在很多系统中,读多写少的负载是很常见的场景。因此,为了提高读取的性能,就要试图绕过日志机制。
但是,直接绕过日志机制从leader读取,可能会读到陈旧的数据,也就是说存在stale read的问题。在下图的场景中,假设网络分区前,Node 5是整个集群的leader。在网络发生分区后,Partition 0分区中选举出了新leader,也就是图中的Node 1。
但是,由于网络分区,Node 5无法收到Partition 0中节点的消息,Node 5不会意识到集群中出现了新的leader。此时,虽然它不能成功地完成日志提交,但是如果读取时绕过了日志,它还是能够提供读取服务的。这会导致连接到Node 5的client读取到陈旧的数据。
Check Quorum可以减轻这一问题带来的影响,其机制也非常简单:让leader每隔一段时间主动地检查follower是否活跃。如果活跃的follower数量达不到quorum,那么说明该leader可能是分区前的旧leader,所以此时该leader会主动退位转为follower。
需要注意的是,Check Quorum并不能完全避免stale read的发生,只能减小其发生时间,降低影响。如果需要严格的线性一致性,需要通过其它机制实现。
1.3 Leader Lease
分布式系统中的网络环境十分复杂,有时可能出现网络不完全分区的情况,即整个整个网络拓补图是一个连通图,但是可能并非任意的两个节点都能互相访问。
这种现象不止会出现在网络故障中,还会出现在成员变更中。在通过ConfChange
移除节点时,不同节点应用该ConfChange
的时间可能不同,这也可能导致这一现象发生。
在上图的场景下,Node 1与Node 2之间无法通信。如果它们之间的通信中断前,Node 1是集群的leader,在通信中断后,Node 2无法再收到来自Node 1的心跳。因此,Node 2会开始选举。如果在Node 2发起选举前,Node 1和Node 3中都没有新的日志,那么Node 2仍可以收到能达到quorum的投票(来自Node 2本身的投票和来自Node 3的投票),并成为leader。
Leader Lease机制对投票引入了一条新的约束以解决这一问题:当节点在election timeout超时前,如果收到了leader的消息,那么它不会为其它发起投票或预投票请求的节点投票。也就是说,Leader Lease机制会阻止了正常工作的集群中的节点给其它节点投票。
Leader Lease需要依赖Check Quorum机制才能正常工作。接下来笔者通过一个例子说明其原因。
假如在一个5个节点组成的Raft集群中,出现了下图中的分区情况:Node 1与Node 2互通,Node 3、Node 4、Node 5之间两两互通、Node 5与任一节点不通。在网络分区前,Node 1是集群的leader。
在既没有Leader Lease也没有Check Quorum的情况下,Node 3、Node 4会因收不到leader的心跳而发起投票,因为Node 2、Node 3、Node 4互通,该分区节点数能达到quorum,因此它们可以选举出新的leader。
而在使用了Leader Lease而不使用Check Quorum的情况下,由于Node 2仍能够收到原leader Node 1的心跳,受Leader Lease机制的约束,它不会为其它节点投票。这会导致即使整个集群中存在可用节点数达到quorum的分区,但是集群仍无法正常工作。
而如果同时使用了Leader Lease和Check Quorum,那么在上图的情况下,Node 1会在election timeout超时后因检测不到数量达到quorum的活跃节点而退位为follower。这样,Node 2、Node 3、Node 4之间的选举可以正常进行。
1.4 引入的新问题与解决方案
引入Pre-Vote和Check Quorum(etcd/raft的实现中,开启Check Quorum会自动开启Leader Lease)会为Raft算法引入一些新的问题。
当一个节点收到了term比自己低的消息时,原本的逻辑是直接忽略该消息,因为term比自己低的消息仅可能是因网络延迟的迟到的旧消息。然而,开启了这些机制后,在如下的场景中会出现问题:
场景1: 如上图所示,在开启了Check Quorum / Leader Lease后(假设没有开启Pre-Vote,Pre-Vote的问题在下一场景中讨论),数量达不到quorum的分区中的leader会退位,且该分区中的节点永远都无法选举出leader,因此该分区的节点的term会不断增大。当该分区与整个集群的网络恢复后,由于开启了Check Quorum / Leader Lease,即使该分区中的节点有更大的term,由于原分区的节点工作正常,它们的选举请求会被丢弃。同时,由于该节点的term比原分区的leader节点的term大,因此它会丢弃原分区的leader的请求。这样,该节点永远都无法重新加入集群,也无法当选新leader。(详见issue #5451、issue #5468)。
场景2: Pre-Vote机制也有类似的问题。如上图所示,假如发起预投票的节点,在预投票通过后正要发起正式投票的请求时出现网络分区。此时,该节点的term会高于原集群的term。而原集群因没有收到真正的投票请求,不会更新term,继续正常运行。在网络分区恢复后,原集群的term低于分区节点的term,但是日志比分区节点更新。此时,该节点发起的预投票请求因没有日志落后会被丢弃,而原集群leader发给该节点的请求会因term比该节点小而被丢弃。同样,该节点永远都无法重新加入集群,也无法当选新leader。(详见issue #8501、issue #8525)。
场景3: 在更复杂的情况中,比如,在变更配置时,开启了原本没有开启的Pre-Vote机制。此时可能会出现与上一条类似的情况,即可能因term更高但是log更旧的节点的存在导致整个集群的死锁,所有节点都无法预投票成功。这种情况比上一种情况更危险,上一种情况只有之前分区的节点无法加入集群,在这种情况下,整个集群都会不可用。(详见issue #8501、issue #8525)。
为了解决以上问题,节点在收到term比自己低的请求时,需要做特殊的处理。处理逻辑也很简单:
- 如果收到了term比当前节点term低的leader的消息,且集群开启了Check Quorum / Leader Lease或Pre-Vote,那么发送一条term为当前term的消息,令term低的节点成为follower。(针对场景1、场景2)
- 对于term比当前节点term低的预投票请求,无论是否开启了Check Quorum / Leader Lease或Pre-Vote,都要通过一条term为当前term的消息,迫使其转为follower并更新term。(针对场景3)
2. etcd/raft中Raft选举的实现
本节中,笔者将分析etcd/raft中选举部分的实现。
2.1 MsgHup与hup
在etcd/raft的实现中,选举的触发是通过MsgHup
消息实现的,无论是主动触发选举还是因election timeout超时都是如此:
// *** node.go ***
func (n *node) Campaign(ctx context.Context) error { return n.step(ctx, pb.Message{Type: pb.MsgHup}) }
// *** rawnode.go ***
func (rn *RawNode) Campaign() error {
return rn.raft.Step(pb.Message{
Type: pb.MsgHup,
})
}
// *** raft.go ***
// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
r.electionElapsed++
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
因此可以跟着MsgHup
的处理流程,分析etcd/raft中选举的实现。正如笔者在《深入浅出etcd/raft —— 0x02 etcd/raft总体设计》中所说,etcd/raft通过raft
结构体的Step
方法实现Raft状态机的状态转移。
func (r *raft) Step(m pb.Message) error {
// ... ...
switch m.Type {
case pb.MsgHup:
if r.preVote {
r.hup(campaignPreElection)
} else {
r.hup(campaignElection)
}
// ... ...
}
// ... ...
}
Step
方法在处理MsgHup
消息时,会根据当前配置中是否开启了Pre-Vote
机制,以不同的CampaignType
调用hup
方法。CampaignType
是一种枚举类型(go语言的枚举实现方式),其可能值如下表所示。
值 | 描述 |
---|---|
campaignPreElection |
表示Pre-Vote的预选举阶段。 |
campaignElection |
表示正常的选举阶段(仅超时选举,不包括Leader Transfer)。 |
campaignTransfer |
表示Leader Transfer阶段。 |
接下来对hup
的实现进行分析。
func (r *raft) hup(t CampaignType) {
if r.state == StateLeader {
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
return
}
if !r.promotable() {
r.logger.Warningf("%x is unpromotable and can not campaign", r.id)
return
}
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
}
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
return
}
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
r.campaign(t)
}
hup
方法会对节点当前状态进行一些检查,如果检查通过才会试图让当前节点发起投票或预投票。首先,hup
会检查当前节点是否已经是leader,如果已经是leader那么直接返回。接下来,hup
通过promotable()
方法判断当前节点能否提升为leader。
// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list.
func (r *raft) promotable() bool {
pr := r.prs.Progress[r.id]
return pr != nil && !pr.IsLearner && !r.raftLog.hasPendingSnapshot()
}
promotable()
的判定规则有三条:
- 当前节点是否已被集群移除。(通过
ProgressTracker.ProgressMap
映射中是否有当前节点的id的映射判断。当节点被从集群中移除后,被移除的节点id会被从该映射中移除。笔者会在后续讲解集群配置变更的文章中详细分析其实现。) - 当前节点是否为learner节点。
- 当前节点是否还有未被保存到稳定存储中的快照。
这三条规则中,只要有一条为真,那么当前节点就无法成为leader。在hup
方法中,除了需要promotable()
为真,还需要判断一条规则:
- 当前的节点已提交的日志中,是否有还未被应用的集群配置变更
ConfChange
消息。
如果当前节点已提交的日志中还有未应用的ConfChange
消息,那么该节点也无法提升为leader。
只有当以上条件都满足后,hup
方法才会调用campaign
方法,根据配置,开始投票或预投票。
2.2 campaign
campaign
是用来发起投票或预投票的重要方法。
// campaign transitions the raft instance to candidate state. This must only be
// called after verifying that this is a legitimate transition.
func (r *raft) campaign(t CampaignType) {
if !r.promotable() {
// This path should not be hit (callers are supposed to check), but
// better safe than sorry.
r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
}
var term uint64
var voteMsg pb.MessageType
if t == campaignPreElection {
r.becomePreCandidate()
voteMsg = pb.MsgPreVote
// PreVote RPCs are sent for the next term before we've incremented r.Term.
term = r.Term + 1
} else {
r.becomeCandidate()
voteMsg = pb.MsgVote
term = r.Term
}
if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
// We won the election after voting for ourselves (which must mean that
// this is a single-node cluster). Advance to the next state.
if t == campaignPreElection {
r.campaign(campaignElection)
} else {
r.becomeLeader()
}
return
}
var ids []uint64
{
idMap := r.prs.Voters.IDs()
ids = make([]uint64, 0, len(idMap))
for id := range idMap {
ids = append(ids, id)
}
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
}
for _, id := range ids {
if id == r.id {
continue
}
r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
var ctx []byte
if t == campaignTransfer {
ctx = []byte(t)
}
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}
因为调用campaign
的方法不止有hup
,campaign
方法首先还是会检查promotable()
是否为真。
if t == campaignPreElection {
r.becomePreCandidate()
voteMsg = pb.MsgPreVote
// PreVote RPCs are sent for the next term before we've incremented r.Term.
term = r.Term + 1
} else {
r.becomeCandidate()
voteMsg = pb.MsgVote
term = r.Term
}
在开启Pre-Vote后,首次调用campaign
时,参数为campaignPreElection
。此时会调用becomePreCandidate
方法,该方法不会修改当前节点的Term
值,因此发送的MsgPreVote
消息的Term
应为当前的Term + 1
。而如果没有开启Pre-Vote或已经完成预投票进入正式投票的流程或是Leader Transfer时(即使开启了Pre-Vote,Leader Transfer也不会进行预投票),会调用becomeCandidate
方法。该方法会增大当前节点的Term
,因此发送MsgVote
消息的Term
就是此时的Term
。becomeXXX
用来将当前状态机的状态与相关行为切换相应的角色,笔者会在后文详细分析其实现与修改后的行为。
接下来,campaign
方法开始发送投票请求。在向其它节点发送请求之前,该节点会先投票给自己:
if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
// We won the election after voting for ourselves (which must mean that
// this is a single-node cluster). Advance to the next state.
if t == campaignPreElection {
r.campaign(campaignElection)
} else {
r.becomeLeader()
}
return
}
poll
方法会在更新本地的投票状态并获取当前投票结果。如果节点投票给自己后就赢得了选举,这说明集群是以单节点的模式启动的,那么如果当前是预投票阶段当前节点就能立刻开启投票流程、如果已经在投票流程中或是在Leader Transfer就直接当选leader即可。如果集群不是以单节点的模式运行的,那么就需要向其它有资格投票的节点发送投票请求:
var ids []uint64
{
idMap := r.prs.Voters.IDs()
ids = make([]uint64, 0, len(idMap))
for id := range idMap {
ids = append(ids, id)
}
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
}
for _, id := range ids {
if id == r.id {
continue
}
r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
var ctx []byte
if t == campaignTransfer {
ctx = []byte(t)
}
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
请求的Term
字段就是之前记录的term
,即预投票阶段为当前Term + 1
、投票阶段为当前的Term
。
2.3 Step方法与step
在前文中,笔者提到过Step
函数是Raft状态机状态转移的入口方法,Step
方法的参数是Raft消息。Step
方法会检查消息的Term
字段,对不同的情况进行不同的处理。Step
方法还会对与选举相关的一些的消息进行特殊的处理。最后,Step
会调用raft
接口体step
字段中记录的函数签名。step
字段的定义如下:
// Definition of `stepFunc`
type stepFunc func(r *raft, m pb.Message) error
// step field in struct `raft`
step stepFunc
上一节中提到的becomeXXX
函数会让状态机切换到相应角色,并切换raft
结构体的step
字段中记录的函数。让不同角色的节点能够用不同的逻辑来处理Raft消息。
在调用step
字段记录的函数处理请求前,Step
会根据消息的Term
字段,进行一些预处理。
2.3.1 对Term为0的消息的预处理
// Handle the message term, which may result in our stepping down to a follower.
switch {
case m.Term == 0:
// local message
// case ... ...
}
etcd/raft使用Term
为0的消息作为本地消息,Step
不会对本地消息进行特殊处理,直接进入之后的逻辑。
2.3.2 对Term大于当前节点Term的消息的预处理
case m.Term > r.Term:
if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
force := bytes.Equal(m.Context, []byte(campaignTransfer))
inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
if !force && inLease {
// If a server receives a RequestVote request within the minimum election timeout
// of hearing from a current leader, it does not update its term or grant its vote
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
return nil
}
}
switch {
case m.Type == pb.MsgPreVote:
// Never change our term in response to a PreVote
case m.Type == pb.MsgPreVoteResp && !m.Reject:
// We send pre-vote requests with a term in our future. If the
// pre-vote is granted, we will increment our term when we get a
// quorum. If it is not, the term comes from the node that
// rejected our vote so we should become a follower at the new
// term.
default:
r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
r.becomeFollower(m.Term, m.From)
} else {
r.becomeFollower(m.Term, None)
}
}
对于Term
大于当前节点的Term
的消息,如果消息类型为MsgVote
或MsgPreVote
,先要检查这些消息是否需要处理。其判断规则如下:
force
:如果该消息的CampaignType
为campaignTransfer
,force
为真,表示该消息必须被处理。inLease
:如果开启了Check Quorum(开启Check Quorum会自动开启Leader Lease),且election timeout超时前收到过leader的消息,那么inLease
为真,表示当前Leader Lease还没有过期。
如果!force && inLease
,说明该消息不需要被处理,可以直接返回。
对于Term
大于当前节点的Term
的消息,Step
还需要判断是否需要切换自己的身份为follower,其判断规则如下:
- 如果消息为
MsgPreVote
消息,那么不需要转为follower。 - 如果消息为
MsgPreVoteResp
且Reject
字段不为真时,那么不需要转为follower。 - 否则,转为follower。
在转为follower时,新的Term
就是该消息的Term
。如果消息类型是MsgApp
、MsgHeartbeat
、MsgSnap
,说明这是来自leader的消息,那么将lead
字段直接置为该消息的发送者的id,否则暂时不知道当前的leader节点是谁。
2.3.3 对Term小于当前节点Term的消息的预处理
最后,如果消息的Term
比当前Term
小,因存在1.4节中提到的问题,除了忽略消息外,还要做额外的处理:
case m.Term < r.Term:
if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
// We have received messages from a leader at a lower term. It is possible
// that these messages were simply delayed in the network, but this could
// also mean that this node has advanced its term number during a network
// partition, and it is now unable to either win an election or to rejoin
// the majority on the old term. If checkQuorum is false, this will be
// handled by incrementing term numbers in response to MsgVote with a
// higher term, but if checkQuorum is true we may not advance the term on
// MsgVote and must generate other messages to advance the term. The net
// result of these two features is to minimize the disruption caused by
// nodes that have been removed from the cluster's configuration: a
// removed node will send MsgVotes (or MsgPreVotes) which will be ignored,
// but it will not receive MsgApp or MsgHeartbeat, so it will not create
// disruptive term increases, by notifying leader of this node's activeness.
// The above comments also true for Pre-Vote
//
// When follower gets isolated, it soon starts an election ending
// up with a higher term than leader, although it won't receive enough
// votes to win the election. When it regains connectivity, this response
// with "pb.MsgAppResp" of higher term would force leader to step down.
// However, this disruption is inevitable to free this stuck node with
// fresh election. This can be prevented with Pre-Vote phase.
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
} else if m.Type == pb.MsgPreVote {
// Before Pre-Vote enable, there may have candidate with higher term,
// but less log. After update to Pre-Vote, the cluster may deadlock if
// we drop messages with a lower term.
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true})
} else {
// ignore other cases
r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
}
return nil
}
这段代码实现了1.4节中的解决方案,这里不再赘述。
MsgAppResp
类型的消息作为回复,这只是因为etcd社区认为没必要为其新增一种消息类型。所以这里的代码建议读者阅读相应的commit与issue下的讨论(相应的issue已在1.4节中给出)。2.3.4 不通过step处理的情况
除了在预处理阶段中直接丢弃的消息外,还有一些消息不会通过step
字段记录的函数处理。笔者先来介绍这些消息,之后分角色介绍step
与becomeXXX
在不同情况下的处理方式。
switch m.Type {
case pb.MsgHup:
if r.preVote {
r.hup(campaignPreElection)
} else {
r.hup(campaignElection)
}
case pb.MsgVote, pb.MsgPreVote:
// We can vote if this is a repeat of a vote we've already cast...
canVote := r.Vote == m.From ||
// ...we haven't voted and we don't think there's a leader yet in this term...
(r.Vote == None && r.lead == None) ||
// ...or this is a PreVote for a future term...
(m.Type == pb.MsgPreVote && m.Term > r.Term)
// ...and we believe the candidate is up to date.
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
// Note: it turns out that that learners must be allowed to cast votes.
// This seems counter- intuitive but is necessary in the situation in which
// a learner has been promoted (i.e. is now a voter) but has not learned
// about this yet.
// For example, consider a group in which id=1 is a learner and id=2 and
// id=3 are voters. A configuration change promoting 1 can be committed on
// the quorum `{2,3}` without the config change being appended to the
// learner's log. If the leader (say 2) fails, there are de facto two
// voters remaining. Only 3 can win an election (due to its log containing
// all committed entries), but to do so it will need 1 to vote. But 1
// considers itself a learner and will continue to do so until 3 has
// stepped up as leader, replicates the conf change to 1, and 1 applies it.
// Ultimately, by receiving a request to vote, the learner realizes that
// the candidate believes it to be a voter, and that it should act
// accordingly. The candidate's config may be stale, too; but in that case
// it won't win the election, at least in the absence of the bug discussed
// in:
// https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263.
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
// When responding to Msg{Pre,}Vote messages we include the term
// from the message, not the local term. To see why, consider the
// case where a single node was previously partitioned away and
// it's local term is now out of date. If we include the local term
// (recall that for pre-votes we don't update the local term), the
// (pre-)campaigning node on the other end will proceed to ignore
// the message (it ignores all out of date messages).
// The term in the original message and current local term are the
// same in the case of regular votes, but different for pre-votes.
r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
if m.Type == pb.MsgVote {
// Only record real votes.
r.electionElapsed = 0
r.Vote = m.From
}
} else {
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
}
default:
err := r.step(r, m)
if err != nil {
return err
}
}
return nil
}
第一种情况是大家熟悉的MsgHup
消息,这种消息的处理见2.1节。
第二种情况是MsgVote
和MsgPreVote
消息。首先需要判断该节点能否为其投票。其判断规则有3条:
- 如果该节点当前已为消息的发送者投过票,说明这可能是重复的消息,该节点可以安全地为其重新投票。
- 如果该节点还没有投过票且当前term内还没有leader,那么该节点可以为其投票。
- 如果这是
MsgPreVote
消息且其Term
大于当前节点的Term
,那么该节点可以为其投票。
如果满足以上3个条件中的任一条,且该消息中的Index
和Term
至少与当前节点的日志一样新,那么该节点为其发送相应的投票消息。如果该消息的是MsgVote
消息,该节点需要记录其将选票投给了谁,并重置election timeout
的计时器。
否则,节点会拒绝该请求,为其发送一条Reject
为true的MsgVoteResp
或MsgPreVote
消息。,以避免1.4节中提到的问题。
除了这些情况外,消息都会通过step
字段记录的函数,按照不同的节点角色进行处理。
2.4 becomeXXX与stepXXX
在上文中笔者介绍过,becomeXXX
函数用于将切换Raft状态机的角色,stepXXX
是Raft状态机的相应角色下状态转移的行为。etcd/raft中becomeXXX
共有四种:becomeFollower
、becomeCandidate
、becomePreCandidate
、becomeLeader
,stepXXX
共有三种:stepLeader
、stepCandidate
、stepFollower
,becomeCandidate
和becomePreCandidate
相应的行为均为stepCandidate
。
本节中,笔者将介绍becomeXXX
和stepXXX
中与选举相关的逻辑。
2.4.1 Candidate、PreCandidate
Candidate
和PreCandidate
的行为有很多相似之处,本节笔者将分析二者行为并比对异同之处。
func (r *raft) becomeCandidate() {
// ... ...
r.step = stepCandidate
r.reset(r.Term + 1)
r.tick = r.tickElection
r.Vote = r.id
r.state = StateCandidate
r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
}
func (r *raft) becomePreCandidate() {
// ... ...
// Becoming a pre-candidate changes our step functions and state,
// but doesn't change anything else. In particular it does not increase
// r.Term or change r.Vote.
r.step = stepCandidate
r.prs.ResetVotes()
r.tick = r.tickElection
r.lead = None
r.state = StatePreCandidate
r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term)
}
func (r *raft) reset(term uint64) {
if r.Term != term {
r.Term = term
r.Vote = None
}
r.lead = None
r.electionElapsed = 0
r.heartbeatElapsed = 0
r.resetRandomizedElectionTimeout()
r.abortLeaderTransfer()
r.prs.ResetVotes()
// ... ...
}
预选举与选举的区别在主要在于预选举不会改变状态机的term也不会修改当前term的该节点投出的选票。下表列出了becomePreCandidate
和becomeCandidate
修改或未修改的与选举相关的重要字段:
重要字段 | becomePreCandidate | becomCandidate | 描述 |
---|---|---|---|
step |
stepCandidate |
stepCandidate |
step 行为 |
tick |
tickElection |
tickElection |
tick 行为 |
Vote |
mot modified | current node.id |
当前term将选票投给谁 |
state |
StatePreCandidate |
StateCandidate |
状态机角色 |
lead |
None | None | 当前term的leader |
prs.Votes |
rest | reset | 收到的选票 |
无论是PreCandidate
还是PreCandidate
,其行为都是stepCandidate
。其中,部分字段是通过reset
函数修改的。reset
方法用于状态机切换角色时初始化相关字段。因为切换到PreCandidate
严格来说并不算真正地切换角色,因此becomePreCandidate
中没有调用reset
方法,而becomeCandidate
、becomeLeader
、becomeFollower
都调用了reset
方法。本文仅关注reset
中与选举有关的部分,reset
中还有一些与日志复制相关的逻辑,笔者会在后续的文章中分析。
接下来分析stepCandiate
中与选举相关的逻辑:
// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
// whether they respond to MsgVoteResp or MsgPreVoteResp.
func stepCandidate(r *raft, m pb.Message) error {
// Only handle vote responses corresponding to our candidacy (while in
// StateCandidate, we may get stale MsgPreVoteResp messages in this term from
// our pre-candidate state).
var myVoteRespType pb.MessageType
if r.state == StatePreCandidate {
myVoteRespType = pb.MsgPreVoteResp
} else {
myVoteRespType = pb.MsgVoteResp
}
switch m.Type {
case pb.MsgProp:
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return ErrProposalDropped
case pb.MsgApp:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleAppendEntries(m)
case pb.MsgHeartbeat:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleHeartbeat(m)
case pb.MsgSnap:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleSnapshot(m)
case myVoteRespType:
gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
switch res {
case quorum.VoteWon:
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
r.becomeLeader()
r.bcastAppend()
}
case quorum.VoteLost:
// pb.MsgPreVoteResp contains future term of pre-candidate
// m.Term > r.Term; reuse r.Term
r.becomeFollower(r.Term, None)
}
case pb.MsgTimeoutNow:
r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
}
return nil
}
从如上代码中,可以看到PreCandidate
或Candidate
对不同种消息的处理方式:
MsgProp
、MsgTimeoutNow
:丢弃。MsgApp
、MsgHeartbeat
、MsgSnap
:收到了来自leader的消息,转为follower。- 相应地
MsgPreVoteResp
或MsgVoteResp
:通过poll
记录选票并获取当前选举状态。
在条件3中,当前节点在获取选举状态后,会根据不同的状态做出不同的处理:
VotePending
:暂无选举结果,不做处理。VoteWon
:赢得选举,如果当前状态机的角色是PreCandidate
,那么调用campaign
进行正式选举;如果当前状态机的角色是Candidate
,那么当选leader,并向集群广播MsgAppend
消息以通知集群中节点已有leader产生。VoteLost
:选举失败,变为follower。
2.4.2 Leader
leader中与选举相关逻辑的比重较少,这里简单介绍一下。
首先,是becomeLeader
及其修改的选举相关的重要字段:
func (r *raft) becomeLeader() {
// ... ...
r.step = stepLeader
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.state = StateLeader
// ... ...
}
重要字段 | becomeLeader | 描述 | |
---|---|---|---|
step |
stepLeader |
step 行为 |
|
tick |
tickHeartbeat |
tick 行为 |
|
Vote |
None or not modified | 当前term将选票投给谁,如果term没有更新,那么不会修改该字段。 | |
state |
StateLeader |
状态机角色 | |
lead |
current node.id | 当前term的leader | |
prs.Votes |
rest | 收到的选票 |
接下来,分析stepLeader
中与选举相关的逻辑:
func stepLeader(r *raft, m pb.Message) error {
// These message types do not require any progress for m.From.
switch m.Type {
case pb.MsgBeat:
r.bcastHeartbeat()
return nil
case pb.MsgCheckQuorum:
// The leader should always see itself as active. As a precaution, handle
// the case in which the leader isn't in the configuration any more (for
// example if it just removed itself).
//
// TODO(tbg): I added a TODO in removeNode, it doesn't seem that the
// leader steps down when removing itself. I might be missing something.
if pr := r.prs.Progress[r.id]; pr != nil {
pr.RecentActive = true
}
if !r.prs.QuorumActive() {
r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
r.becomeFollower(r.Term, None)
}
// Mark everyone (but ourselves) as inactive in preparation for the next
// CheckQuorum.
r.prs.Visit(func(id uint64, pr *tracker.Progress) {
if id != r.id {
pr.RecentActive = false
}
})
return nil
// ... ...
}
// All other message types require a progress for m.From (pr).
pr := r.prs.Progress[m.From]
if pr == nil {
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
return nil
}
switch m.Type {
// ... ...
case pb.MsgTransferLeader:
if pr.IsLearner {
r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id)
return nil
}
leadTransferee := m.From
lastLeadTransferee := r.leadTransferee
if lastLeadTransferee != None {
if lastLeadTransferee == leadTransferee {
r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
r.id, r.Term, leadTransferee, leadTransferee)
return nil
}
r.abortLeaderTransfer()
r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)
}
if leadTransferee == r.id {
r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)
return nil
}
// Transfer leadership to third party.
r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee)
// Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed.
r.electionElapsed = 0
r.leadTransferee = leadTransferee
if pr.Match == r.raftLog.lastIndex() {
r.sendTimeoutNow(leadTransferee)
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
} else {
r.sendAppend(leadTransferee)
}
}
return nil
}
stepLeader
中处理的消息可以分为两类,一类是不需要知道谁是发送者的消息(大多数为本地消息),另一类需要知道谁是发送者的消息(大多数为来自其它节点的消息)。
stepLeader
中对第一类消息的处理方式如下:
MsgBeat
:该消息为heartbeat timeout超时后通知leader广播心跳消息的消息。因此,收到该消息后,广播心跳消息。MsgCheckQuorum
:该消息为开启Check Quorum时election timeout超时后通知leader进行相关操作的消息。因此,检查活跃的节点数是否达到quorum,如果无法达到,那么退位为follower(其相关操作涉及ProgressTracker
,笔者会在后续的文章中分析,这里只需要知道其作用即可)。
第二类消息中,与选举相关的只有MsgTransferLeader
消息:
- 忽略来自learner的
MsgTransferLeader
消息。 - 判断是否正在进行Leader Transfer,如果正在进行但转移的目标相同,那么不再做处理;如果正在进行但转移的目标不同,那么打断正在进行的Leader Transfer,而执行新的转移。
- 如果转移目标是当前节点,而当前节点已经是leader了,那么不做处理。
- 记录转移目标,以用做第2步中是否打断上次转移的依据。
- 判断转移目标的日志是否跟上了leader。如果跟上了,向其发送
MsgTimeoutNow
消息,让其立即超时并进行新的选举;否则正常向其发送日志。如果转移目标的日志没有跟上leader,则leader在处理转移目标对其日志复制消息的响应时,会判断其是否跟上了leader,如果那时跟上了则向其发送MsgTimeoutNow
消息,让其立即超时并进行新的选举。这部分代码可在处理MsgAppResp
消息时找到:
// Transfer leadership is in progress.
if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
r.sendTimeoutNow(m.From)
}
此处代码只会在follower跟上了其match index才会执行,详情请见本系列后续文章。
2.4.3 Follower
follower中与选举相关的逻辑不是很多。
首先,还是对becomeFollower
中的与选举相关的逻辑进行分析:
func (r *raft) becomeFollower(term uint64, lead uint64) {
r.step = stepFollower
r.reset(term)
r.tick = r.tickElection
r.lead = lead
r.state = StateFollower
r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}
重要字段 | becomeFollower | 描述 | |
---|---|---|---|
step |
stepFollower |
step 行为 |
|
tick |
tickElection |
tick 行为 |
|
state |
StateFollower |
状态机角色 | |
lead |
参数lead |
当前term的leader |
接下来分析stepFollower
中对与选举相关的消息的处理:
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
// ... ...
case pb.MsgApp:
r.electionElapsed = 0
r.lead = m.From
// ... ...
case pb.MsgHeartbeat:
r.electionElapsed = 0
r.lead = m.From
// ... ...
case pb.MsgSnap:
r.electionElapsed = 0
r.lead = m.From
// ... ...
case pb.MsgTransferLeader:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
return nil
}
m.To = r.lead
r.send(m)
case pb.MsgTimeoutNow:
r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
// Leadership transfers never use pre-vote even if r.preVote is true; we
// know we are not recovering from a partition so there is no need for the
// extra round trip.
r.hup(campaignTransfer)
// ... ...
return nil
}
可以看到,follower在收到来自leader的MsgApp
、MsgHeartbeat
、MsgSnap
消息后,会更新当前记录的leader并重置election timeout
定时器。而收到应发给leader的消息后,会把消息转发给leader(如MsgTransferLeader
消息,这里给出的代码中还省略了一些消息)。因此,这里真正需要关心的与选举相关的消息只有MsgTimeoutNow
。
在2.4.2节中可以看到,MsgTimeoutNow
消息是发生Leader Transfer时,leader通知目标节点立即超时并发起选举请求的消息。因此,这里直接以campaignTransfer
作为参数调用了hup
方法。在2.1节和2.2节中可以看到,hup
和campaign
方法对campaignTransfer
和campaignVote
的处理几乎一致,只有在写入发起投票的消息时,如果进行的是Leader Transfer,那么会将campaignTransfer
写入到消息的Context
字段中。在消息的接收方处理该消息时,如果Context
字段为campaignTransfer
,那么不会直接忽略该消息,这一点我们可以在2.3.2节中看到。
3. 总结
本文首先介绍了etcd/raft实现的Raft选举优化,并介绍了使用选举优化后引入的新问题与解决方案,接着对etcd/raft中与选举有关的源码层层深入地分析。
由于选举是Raft算法中重要且复杂的部分,因此其代码分布比较零散。只想了解etcd/raft中对Raft算法做出的优化的读者可以只看本文的第一章。对于想要深入etcd/raft源码实现的读者,建议自己先按照自己的节奏阅读源码,对于不太理解的地方可以参考本文的分析,如果直接从本文的分析入手,可能会感觉有些绕。
参考文献
[1] Ongaro D, Ousterhout J. In search of an understandable consensus algorithm[C]//2014 {USENIX} Annual Technical Conference ({USENIX}{ATC} 14). 2014: 305-319.
[2] Ongaro D, Ousterhout J. In search of an understandable consensus algorithm (extended version)[J]. Retrieved July, 2016, 20: 2018.
[3] Ongaro D. Consensus: Bridging theory and practice[D]. Stanford University, 2014.