SOFA-JRaft 源码解析:主节点选举机制

主节点选举(Leader Election)是 Raft 算法的核心组成部分,也是 Raft 算法库的主要应用场景之一。Raft 算法设计了 term 和 logIndex 两个属性,分别用于表示 Leader 节点的任期,以及集群运行期间接收到的指令对应的日志条目的 ID,这两个属性都是单调递增的。一个 Leader 节点在任期内会始终向其管理的所有 Follower 节点宣示主权,以避免这些 Follower 节点发动革命,推翻自己的政权,成为新的 Leader 节点。然而,世事无常,如果 Leader 节点因为某些原因不能或未能即时向某些 Follower 节点宣示自己的主权,则这些 Follower 节点在等待一段随机的时间之后就会尝试竞选成为新的 Leader 节点。

之所以这里采用随机化的等待时间,是为了避免两个或多个 Follower 节点同时发起选举进程,进而出现这些节点都没有赢得过半数的选票。于是,这些节点又在同一时间发起下一轮选举进程,延长了集群无 Leader 节点的时间,而通过随机化各个 Follower 节点等待的时间则能够很好的解决此类问题。

当然,也并不是所有的 Follower 节点都有参选的资格,Raft 算法要求节点在给参选节点投票时必须保证参选节点满足以下两个条件之一:

  1. 参选节点的 term 值大于投票节点,否则拒绝为其投票。
  2. 如果参选节点与投票节点的 term 值相同,则需要保证参选节点的 logIndex 值不小于投票节点。

这两个条件的目的都在于保证当前参选节点本地的日志数据不能比投票节点要陈旧。

上一篇我们分析了 JRaft 算法库的整体架构和节点的初始化启动过程,当一个节点启动之后即会启动对应的预选举计时器,不断检查 Leader 节点的有效性,并随时准备发动新一轮的选举革命,本文我们就针对 JRaft 关于主节点选举的实现展开分析。

Leader 选举

JRaft 在设计层面将选举的过程拆分为预选举和正式选举两个过程,之所以这样设计是为了避免无效的选举进程递增 term 值,进而造成浪费,同时也会导致正常运行的 Leader 节点执行角色降级。Raft 算法要求当节点接收到 term 值更大的请求时需要递增本地的 term 值,以此实现集群中 term 值的同步。对于 Leader 节点而言,当收到 term 值更大的请求时,该节点会认为集群中有新的 Leader 节点生成,于是需要执行角色降级。这一机制能够保证在出现网络分区等问题时,在网络恢复时能够促使 term 值较小的 Leader 节点退位为 Follower 节点,从而实现让集群达到一个新的平稳状态。然而,如果集群中某个 Follower 节点因为某些原因未能接收到 Leader 节点的主权宣示指令,就会一直尝试发动新一轮的选举革命,进而递增 term 值,导致 Leader 节点执行角色降级,最终影响整个集群的正常运行。

预选举的引入则能够很好的解决此类问题,当一个 Follower 节点尝试发起一轮新的选举革命时,该节点不会立即递增 term 值,而是尝试将 term 值加 1 去试探性的征集选票,只有当集群中过半数的节点同意投票的前提下才会进入正式投票的环节,这样对于无效选举而言一般只会停留在预选举阶段,不会对集群的正常运行造成影响。

下面来看一下 JRaft 关于预选举和正式选举的具体实现。

预选举

当启动一个 JRaft 节点时,如果初始化集群节点配置不为空,则节点会调用 NodeImpl#stepDown 方法执行角色降级操作。所谓角色降级实际上是一个宽泛的说法,因为 NodeImpl#stepDown 方法会在多种场景下被调用。而这里调用该方法的背景是一个 FOLLOWER 节点刚刚启动的时候,所以除了初始化一些本地状态之外,整个角色降级过程重点做的一件事就是启动预选举计时器 electionTimer。

预选举计时器 electionTimer 是一个典型的 RepeatedTimer 应用,关于 RepeatedTimer 的实现和运行机制我们在上一篇已经介绍过,本小节我们重点关注在预选举场景下该计时器针对 RepeatedTimer#onTrigger 方法的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
this.electionTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(),
TIMER_FACTORY.getElectionTimer(this.options.isSharedElectionTimer(), name)) {

@Override
protected void onTrigger() {
handleElectionTimeout();
}

@Override
protected int adjustTimeout(final int timeoutMs) {
return randomTimeout(timeoutMs);
}
};

方法 RepeatedTimer#onTrigger 会被计时器周期性调度,而具体的执行逻辑则委托给 NodeImpl#handleElectionTimeout 方法执行。为了尽量避免多个节点同时发起预选举请求,计时器 electionTimer 覆盖实现了 RepeatedTimer#adjustTimeout 方法,以实现对于调度周期进行随机化处理,默认随机区间为 1~2s。

方法 NodeImpl#handleElectionTimeout 是预选举的入口,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void handleElectionTimeout() {
boolean doUnlock = true;
this.writeLock.lock();
try {
// 预选举必须由 Follower 节点发起
if (this.state != State.STATE_FOLLOWER) {
return;
}

// 与当前 Leader 节点的租约还有效,暂不发起预选举
if (isCurrentLeaderValid()) {
return;
}

/* 尝试开始发起预选举 */

// 清空本地记录的 leaderId
resetLeaderId(PeerId.emptyPeer(),
new Status(RaftError.ERAFTTIMEDOUT, "Lost connection from leader %s.", this.leaderId));

// 基于节点优先级判断是否继续发起预选举
if (!allowLaunchElection()) {
return;
}

doUnlock = false;
// 发起预选举
preVote();
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}

上述方法的执行流程可以概括如下:

  1. 如果当前节点不是 FOLLOWER 角色,则放弃预选举;
  2. 否则,如果当前节点与 Leader 节点之间的租约仍然有效,则放弃预选举;
  3. 否则,清空本地记录的 Leader 节点 ID,回调 FSMCaller#onStopFollowing 方法;
  4. 基于节点优先级判断是否允许发起预选举,如果允许则发起预选举进程。

Follower 节点会在本地记录最近一次收到来自 Leader 节点的 RPC 请求时间戳,如果该时间戳距离当前时间小于选举超时时间,则说明当前节点与 Leader 节点之间的租约仍然有效,无需继续发起预选举。

方法 NodeImpl#resetLeaderId 会清空本地记录的 Leader 节点 ID,如果当前节点不是 Leader 角色,并且正在追随某个 Leader 节点,则该方法会回调 FSMCaller#onStopFollowing 方法将停止追随的事件透传给状态机。业务可以通过覆盖实现 StateMachine#onStopFollowing 方法捕获这一事件。

如果当前节点的优先级允许当前节点继续发起预选举,则接下去会调用 NodeImpl#preVote 方法发起预选举进程,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
private void preVote() {
long oldTerm;
try {
LOG.info("Node {} term {} start preVote.", getNodeId(), this.currTerm);
// 当前节点正在安装快照,则放弃预选举
if (this.snapshotExecutor != null && this.snapshotExecutor.isInstallingSnapshot()) {
LOG.warn("Node {} term {} doesn't do preVote when installing snapshot as the configuration may be out of date.",
getNodeId(), this.currTerm);
return;
}
// 当前节点不是一个有效的节点
if (!this.conf.contains(this.serverId)) {
LOG.warn("Node {} can't do preVote as it is not in conf <{}>.", getNodeId(), this.conf);
return;
}
oldTerm = this.currTerm;
} finally {
this.writeLock.unlock();
}

// 从本地磁盘获取最新的 LogId
final LogId lastLogId = this.logManager.getLastLogId(true);

boolean doUnlock = true;
this.writeLock.lock();
try {
// pre_vote need defense ABA after unlock&writeLock
if (oldTerm != this.currTerm) {
LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);
return;
}

// 初始化预选举选票
this.prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
// 遍历向除自己以外的所有连通节点发送 RequestVote RPC 请求,以征集选票
for (final PeerId peer : this.conf.listPeers()) {
if (peer.equals(this.serverId)) {
continue;
}
if (!this.rpcService.connect(peer.getEndpoint())) {
LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
continue;
}
final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm);
done.request = RequestVoteRequest.newBuilder() //
.setPreVote(true) // it's a pre-vote request. 标记为预选举
.setGroupId(this.groupId) //
.setServerId(this.serverId.toString()) //
.setPeerId(peer.toString()) //
.setTerm(this.currTerm + 1) // next term,预选举阶段不会真正递增 term 值
.setLastLogIndex(lastLogId.getIndex()) //
.setLastLogTerm(lastLogId.getTerm()) //
.build();
// 发送请求
this.rpcService.preVote(peer.getEndpoint(), done.request, done);
}
// 自己给自己投上一票
this.prevVoteCtx.grant(this.serverId);
// 检查是否赢得选票
if (this.prevVoteCtx.isGranted()) {
doUnlock = false;
// 如果赢得选票,则继续发起选举进程
electSelf();
}
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}

预选举的整体执行流程可以概括如下:

  1. 校验当前节点是否正在安装快照,如果是则放弃预选举;
  2. 校验当前节点是否位于节点配置列表中,如果不是则说明当前节点不是一个有效节点,放弃预选举;
  3. 从本地磁盘获取最新的 LogId,包含 logIndex 和 term 值;
  4. 初始化预选举选票 Ballot 实例;
  5. 遍历向除自己以外的所有连通节点发送 RequestVote RPC 请求,以征集选票,同时给自己投上一票;
  6. 如果票数过半,则执行 NodeImpl#electSelf 操作进入正式投票环节。

整体流程比较直观,其中方法 NodeImpl#electSelf 属于正式投票环境,我们将在下一小节展开分析,本小节我们继续关注以下两个方面:

  1. LogManager 加载最新 logIndex 和对应 term 值的过程。
  2. 节点对于 RequestVote RPC 预选举请求的处理过程。
加载最新 LogId 数据

上一篇我们分析了 LogManager 的初始化过程,本小节我们继续分析 LogManager 是如何从本地加载返回最新的 logIndex 和对应 term 值数据的,即 LogManager#getLastLogId 方法。该方法接收一个 boolean 类型参数,用于设置是否需要将内存中的数据刷盘,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public LogId getLastLogId(final boolean isFlush) {
LastLogIdClosure c;
this.readLock.lock();
try {
// 直接返回内存中记录的 lastLogIndex,以及对应的 term 值
if (!isFlush) {
if (this.lastLogIndex >= this.firstLogIndex) {
return new LogId(this.lastLogIndex, unsafeGetTerm(this.lastLogIndex));
}
return this.lastSnapshotId;
}
// 将内存中的数据刷盘,并返回最新的 logIndex 和对应的 term 值
else {
// 生成快照之后未产生新的数据
if (this.lastLogIndex == this.lastSnapshotId.getIndex()) {
return this.lastSnapshotId;
}
c = new LastLogIdClosure();
// 往消息队列中发布一个 LAST_LOG_ID 事件
offerEvent(c, EventType.LAST_LOG_ID);
}
} finally {
this.readLock.unlock();
}

// 等待刷盘完成
try {
c.await();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
return c.lastLogId;
}

前面在分析 LogManager 初始化过程时我们介绍了 LogManager 在初始化期间会启动一个 Disruptor 消息队列,并对其运行流程进行了简单的介绍。上述方法如果设置 isFlush = true 则会往该队列提交一个 LAST_LOG_ID 类型事件,并阻塞等待该事件处理完成。方法 StableClosureEventHandler#onEvent 中实现了对 Disruptor 中消息的处理逻辑,并定义了一个 AppendBatcher 类型的属性用于缓存收集到的 LogEntry 数据。在响应 LAST_LOG_ID 事件之前,StableClosureEventHandler 会调用 AppendBatcher#flush 方法将收集到的 LogEntry 数据刷盘,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
LogId flush() {
if (this.size > 0) {
// 将数据落盘,并返回最新的 LogId
this.lastId = appendToStorage(this.toAppend);
for (int i = 0; i < this.size; i++) {
// 清空缓存的 LogEntry 数据
this.storage.get(i).getEntries().clear();
Status st = null;
try {
if (LogManagerImpl.this.hasError) {
// LogManager 运行异常
st = new Status(RaftError.EIO, "Corrupted LogStorage");
} else {
st = Status.OK();
}
// 回调响应
this.storage.get(i).run(st);
} catch (Throwable t) {
LOG.error("Fail to run closure with status: {}.", st, t);
}
}
this.toAppend.clear();
this.storage.clear();

}
this.size = 0;
this.bufferSize = 0;
return this.lastId;
}

private LogId appendToStorage(final List<LogEntry> toAppend) {
LogId lastId = null;
if (!this.hasError) {
final long startMs = Utils.monotonicMs();
final int entriesCount = toAppend.size();
this.nodeMetrics.recordSize("append-logs-count", entriesCount);
try {
// ... metrics
// 将 LogEntry 写入 RocksDB
final int nAppent = this.logStorage.appendEntries(toAppend);
if (nAppent != entriesCount) {
LOG.error("**Critical error**, fail to appendEntries, nAppent={}, toAppend={}", nAppent, toAppend.size());
reportError(RaftError.EIO.getNumber(), "Fail to append log entries");
}
// 获取最新的 LogId
if (nAppent > 0) {
lastId = toAppend.get(nAppent - 1).getId();
}
toAppend.clear();
} finally {
this.nodeMetrics.recordLatency("append-logs", Utils.monotonicMs() - startMs);
}
}
return lastId;
}

上述实现最终会调用 LogStorage#appendEntries 方法将数据落盘,并返回最新的 LogId 实例。LogStorage 默认的实现是 RocksDBLogStorage 类,即将数据写入 RocksDB 存储引擎,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public int appendEntries(final List<LogEntry> entries) {
if (entries == null || entries.isEmpty()) {
return 0;
}
final int entriesCount = entries.size();
final boolean ret = executeBatch(batch -> {
final WriteContext writeCtx = newWriteContext();
// 遍历分类型将 LogEntry 写入 RocksDB
for (int i = 0; i < entriesCount; i++) {
final LogEntry entry = entries.get(i);
// 配置类型的 LogEntry,编码之后写入 default 和 conf column family
if (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION) {
addConfBatch(entry, batch);
}
// 其它类型的 LogEntry,编码之后写入 default column family
else {
writeCtx.startJob();
addDataBatch(entry, batch, writeCtx);
}
}
writeCtx.joinAll();
// 模板方法
doSync();
});

if (ret) {
return entriesCount;
} else {
return 0;
}
}

上一篇曾介绍过 RocksDBLogStorage 设置了两个 column family,即 conf family 和 data family,其中后者复用了 RocksDB 提供的默认 column family。由上述实现可以看到,JRaft 针对配置类型的 LogEntry 会同时写入这两个 family 中,而其它类型的 LogEntry 仅会写入到 data family 中。

RequestVote 预选举请求处理

发起预选举的节点会以 RPC 的方式向集群中的其它节点发送 RequestVote RPC 请求,以征集选票,各节点会基于本地的运行状态决定是否为其投上自己的一票。需要注意的两点是:

  1. 预选举阶段的 RequestVote 请求会设置 preVote = true,以标识自己是一个预选举请求,用来与正式投票阶段的 RequestVote 请求请求相区别。
  2. 为了避免 term 值无谓的递增,预选举阶段不会真正递增 term 值,而只是将 term 加 1 进行试探性的发起投票。

方法 RaftServerService#handlePreVoteRequest 实现了对于预选举阶段 RequestVote 请求的处理逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public Message handlePreVoteRequest(final RequestVoteRequest request) {
boolean doUnlock = true;
this.writeLock.lock();
try {
// 当前节点处于非活跃状态,响应错误
if (!this.state.isActive()) {
LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
return RpcFactoryHelper //
.responseFactory() //
.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
"Node %s is not in active state, state %s.", getNodeId(), this.state.name());
}
// 解析发起投票的节点 ID
final PeerId candidateId = new PeerId();
if (!candidateId.parse(request.getServerId())) {
// 解析错误,响应错误
LOG.warn("Node {} received PreVoteRequest from {} serverId bad format.", getNodeId(), request.getServerId());
return RpcFactoryHelper //
.responseFactory() //
.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
"Parse candidateId failed: %s.", request.getServerId());
}
boolean granted = false;
// noinspection ConstantConditions
do {
// 当前节点与对应 leader 节点之间的租约仍然有效,拒绝投票
if (this.leaderId != null && !this.leaderId.isEmpty() && isCurrentLeaderValid()) {
LOG.info("Node {} ignore PreVoteRequest from {}, term={}, currTerm={}, because the leader {}'s lease is still valid.",
getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, this.leaderId);
break;
}
// 发起投票节点的 term 值小于当前节点,拒绝投票
if (request.getTerm() < this.currTerm) {
LOG.info("Node {} ignore PreVoteRequest from {}, term={}, currTerm={}.",
getNodeId(), request.getServerId(), request.getTerm(), this.currTerm);
// A follower replicator may not be started when this node become leader, so we must check it.
// 如果当前节点是 leader 节点,检查与发起投票节点之间的复制关系
checkReplicator(candidateId);
break;
} else if (request.getTerm() == this.currTerm + 1) {
// A follower replicator may not be started when this node become leader, so we must check it.
// check replicator state
checkReplicator(candidateId);
}
doUnlock = false;
this.writeLock.unlock();

// 获取本地最新的 LogId
final LogId lastLogId = this.logManager.getLastLogId(true);

doUnlock = true;
this.writeLock.lock();
// 封装请求中的 logIndex 和 term 值
final LogId requestLastLogId = new LogId(request.getLastLogIndex(), request.getLastLogTerm());
// 如果请求的 term 值更大,或者在 term 值相等的前提下,请求的 logIndex 不小于当前节点的 logIndex 值,
// 则投上自己的一票
granted = requestLastLogId.compareTo(lastLogId) >= 0;

LOG.info(
"Node {} received PreVoteRequest from {}, term={}, currTerm={}, granted={}, requestLastLogId={}, lastLogId={}.",
getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, granted, requestLastLogId,
lastLogId);
} while (false);

// 响应
return RequestVoteResponse.newBuilder() //
.setTerm(this.currTerm) //
.setGranted(granted) //
.build();
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}

整体响应预选举 RequestVote 请求的执行流程可以概括为:

  1. 如果当前节点处于非活跃状态,则响应错误;
  2. 否则,解析候选节点的节点 ID,如果解析出错,则响应错误;
  3. 否则,如果当前节点与对应 Leader 节点之间的租约仍然有效,则拒绝投票;
  4. 否则,如果候选节点的 term 值相较于当前节点小,则拒绝投票;如果当前节点正好是 Leader 节点,还需要检查候选节点与当前节点之间的复制关系;
  5. 否则,获取本地最新的 logIndex 和对应的 term 值,如果候选节点的 term 和 logIndex 值更新,则同意投票,否则拒绝投票。

如果当前节点是 Leader 节点,但是仍然有节点发起预选举进程,则说明当前节点与目标节点之间的复制关系存在问题,需要重新建立复制关系,并启动对应的复制器 Replicator。关于 Replicator,我们将会在后面介绍日志复制机制时再深入分析,这里暂且跳过。

发起预选举的节点在发送 RequestVote RPC 请求时会为每个请求绑定一个 OnPreVoteRpcDone 回调,当目标节点返回响应时会应用该回调以处理 RequestVote 响应。具体的处理过程由 NodeImpl#handlePreVoteResponse 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void handlePreVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) {
boolean doUnlock = true;
this.writeLock.lock();
try {
// 当前节点已经不是 FOLLOWER 角色,可能已经预选举成功了,忽略响应
if (this.state != State.STATE_FOLLOWER) {
LOG.warn("Node {} received invalid PreVoteResponse from {}, " +
"state not in STATE_FOLLOWER but {}.", getNodeId(), peerId, this.state);
return;
}
// 当前节点的 term 值已经发生变化,忽略响应
if (term != this.currTerm) {
LOG.warn("Node {} received invalid PreVoteResponse from {}, " +
"term={}, currTerm={}.", getNodeId(), peerId, term, this.currTerm);
return;
}
// 目标节点的 term 值较当前节点更大,需要 stepdown,主要是更新本地的 term 值
if (response.getTerm() > this.currTerm) {
LOG.warn("Node {} received invalid PreVoteResponse from {}, term {}, expect={}.",
getNodeId(), peerId, response.getTerm(), this.currTerm);
stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
"Raft node receives higher term pre_vote_response."));
return;
}
LOG.info("Node {} received PreVoteResponse from {}, term={}, granted={}.",
getNodeId(), peerId, response.getTerm(), response.getGranted());
// check granted quorum?
if (response.getGranted()) {
// 目标节点同意投票
this.prevVoteCtx.grant(peerId);
// 检查是否预选举成功
if (this.prevVoteCtx.isGranted()) {
doUnlock = false;
// 进入正式投票环境
electSelf();
}
}
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}

对于预选举 RequestVote 响应的整体处理流程可以概括如下:

  1. 校验当前节点是否仍然是 FOLLOWER 角色,如果不是则忽略响应,可能已经预选举成功了;
  2. 否则,校验当前节点的 term 值是否发生变化,如果是则忽略响应;
  3. 否则,如果目标节点的 term 值较当前节点更大,则忽略响应,并执行 stepdown;
  4. 否则,如果目标节点拒绝投票,则忽略响应;
  5. 否则,如果目标节点同意投票,则更新得票数,并检查是否预选举成功,如果是则进入正式投票环节。

如果当前节点在预选举期间收到 term 值更大的 RequestVote 响应,则会执行 stepdown 逻辑。此时节点的角色仍然是 FOLLOWER,所以除了重置本地状态和再次启动预选举计时器之外,一个重要的工作就是更新当前节点的 term 值,以保证与当前集群已知的最大 term 值看齐。

JRaft 在实现层面大量应用了回调机制,例如上述在处理预选举响应时会让每个目标节点的响应在同意投票的前提下都会回调触发一次 Ballot#grant 操作以更新得票数,并调用 Ballot#isGranted 方法检查得票数是否过半,如果是则进入正式投票的环节。此类异步回调机制在整个 JRaft 设计和实现中比较常见,其思想值得借鉴,不过重度依赖回调可能会让程序陷入 Callback Hell,需要把控尺度。

正式选举

当预选举成功之后,节点接下去会执行 NodeImpl#electSelf 方法进入正式选举进程。实际上,正式选举与预选举在执行流程上基本相同,但是仍然有些细微的差别,本小节一起来分析一下。

触发正式选举进程,除了发生在预选举成功之后之外,主要还包括另外两个场景:

  1. 在只有一个节点的情况下,此时该节点一定能够竞选成功,所以没有进行预选举的必要。
  2. 正式选举阶段超时,此时需要再次发起一轮新的正式选举进程,这也是正式选举计时器 voteTimer 的职责。

方法 NodeImpl#electSelf 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
private void electSelf() {
long oldTerm;
try {
LOG.info("Node {} start vote and grant vote self, term={}.", getNodeId(), this.currTerm);
// 当前节点不是一个合法节点
if (!this.conf.contains(this.serverId)) {
LOG.warn("Node {} can't do electSelf as it is not in {}.", getNodeId(), this.conf);
return;
}
// 当前节点第一次尝试正式选举,需要暂时停止预选举计时器,避免期间再次触发预选举
if (this.state == State.STATE_FOLLOWER) {
LOG.debug("Node {} stop election timer, term={}.", getNodeId(), this.currTerm);
this.electionTimer.stop();
}
// 清空本地记录的 Leader 节点 ID
resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT,
"A follower's leader_id is reset to NULL as it begins to request_vote."));
// 切换角色为 CANDIDATE
this.state = State.STATE_CANDIDATE;
// 正式投票环境真正递增 term 值,而预选举阶段不会
this.currTerm++;
// 更新 votedId 字段,标记投票给自己
this.votedId = this.serverId.copy();
LOG.debug("Node {} start vote timer, term={} .", getNodeId(), this.currTerm);
// 启动正式选举计时器,当选举超时会再次触发正式选举进程
this.voteTimer.start();
// 初始化正式选举选票
this.voteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
oldTerm = this.currTerm;
} finally {
this.writeLock.unlock();
}

// 从本地加载最新的 logIndex 和对应的 term 值
final LogId lastLogId = this.logManager.getLastLogId(true);

this.writeLock.lock();
try {
// vote need defense ABA after unlock&writeLock
if (oldTerm != this.currTerm) {
LOG.warn("Node {} raise term {} when getLastLogId.", getNodeId(), this.currTerm);
return;
}
// 遍历向除自己以外的所有连通节点发送 RequestVote RPC 请求,以征集选票
for (final PeerId peer : this.conf.listPeers()) {
if (peer.equals(this.serverId)) {
continue;
}
if (!this.rpcService.connect(peer.getEndpoint())) {
LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
continue;
}
final OnRequestVoteRpcDone done = new OnRequestVoteRpcDone(peer, this.currTerm, this);
done.request = RequestVoteRequest.newBuilder() //
.setPreVote(false) // It's not a pre-vote request. 标记是正式选举
.setGroupId(this.groupId) //
.setServerId(this.serverId.toString()) //
.setPeerId(peer.toString()) //
.setTerm(this.currTerm) // 这里是递增后的 term 值
.setLastLogIndex(lastLogId.getIndex()) //
.setLastLogTerm(lastLogId.getTerm()) //
.build();
// 发送 RPC 请求
this.rpcService.requestVote(peer.getEndpoint(), done.request, done);
}

// 更本地元数据信息
this.metaStorage.setTermAndVotedFor(this.currTerm, this.serverId);
// 给自己投上一票
this.voteCtx.grant(this.serverId);
// 检查是否竞选成功
if (this.voteCtx.isGranted()) {
// 成为 leader 节点
becomeLeader();
}
} finally {
this.writeLock.unlock();
}
}

正式选举进程的整体执行流程可以概括如下:

  1. 校验当前节点是否是合法节点,即属于集群节点配置集合中的一员,如果不是则放弃参选;
  2. 如果当前节点是 FOLLOWER 角色,说明是刚刚从预选举阶段过渡而来,需要停止预选举计时器 electionTimer,避免期间再次发起新的预选举进程;
  3. 重置本地记录的 leader 节点的 ID;
  4. 切换节点为 CANDIDATE 角色、递增 term 值,以及更新 votedId 为当前节点 ID;
  5. 启动正式选举计时器 voteTimer,用于当正式选举超时时,再次发起一轮新的正式选举进程;
  6. 初始化正式选票 Ballot 实例;
  7. 获取本地最新的 logIndex 和对应的 term 值;
  8. 遍历向除自己以外的所有连通节点发送 RequestVote RPC 请求,以征集选票,同时给自己投上一票;
  9. 更新本地元数据信息,即 term 值和 votedId 值;
  10. 如果票数过半,则执行 NodeImpl#becomeLeader 操作以切换角色为 LEADER,即竞选成功。

总的来说,正式选举与预选举阶段的执行流程基本相同,不过在正式选举阶段会真正递增 term 值。

下面来看一下节点对于正式选举 RequestVote RPC 请求的处理过程,实现位于 NodeImpl#handleRequestVoteRequest 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
public Message handleRequestVoteRequest(final RequestVoteRequest request) {
boolean doUnlock = true;
this.writeLock.lock();
try {
// 节点处于非活跃状态,响应错误
if (!this.state.isActive()) {
LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
return RpcFactoryHelper //
.responseFactory() //
.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
"Node %s is not in active state, state %s.", getNodeId(), this.state.name());
}
// 解析发起正式选举的节点 ID
final PeerId candidateId = new PeerId();
// 解析失败,响应错误
if (!candidateId.parse(request.getServerId())) {
LOG.warn("Node {} received RequestVoteRequest from {} serverId bad format.", getNodeId(), request.getServerId());
return RpcFactoryHelper //
.responseFactory() //
.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
"Parse candidateId failed: %s.", request.getServerId());
}

// noinspection ConstantConditions
do {
// check term
if (request.getTerm() >= this.currTerm) {
LOG.info("Node {} received RequestVoteRequest from {}, term={}, currTerm={}.",
getNodeId(), request.getServerId(), request.getTerm(), this.currTerm);
// 候选节点的 term 值大于当前节点,执行 stepdown
if (request.getTerm() > this.currTerm) {
// increase current term, change state to follower
stepDown(request.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
"Raft node receives higher term RequestVoteRequest."));
}
}
// 候选节点的 term 值小于当前节点,拒绝投票
else {
// ignore older term
LOG.info("Node {} ignore RequestVoteRequest from {}, term={}, currTerm={}.",
getNodeId(), request.getServerId(), request.getTerm(), this.currTerm);
break;
}
doUnlock = false;
this.writeLock.unlock();

// 从本地获取最新的 logIndex 和对应的 term 值
final LogId lastLogId = this.logManager.getLastLogId(true);

doUnlock = true;
this.writeLock.lock();
// vote need ABA check after unlock&writeLock
if (request.getTerm() != this.currTerm) {
LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);
break;
}

// 如果 logIsOk,则说明候选节点的 term 值大于当前节点,或者 term 相同,但是候选节点的 logIndex 不比当前节点小
final boolean logIsOk = new LogId(request.getLastLogIndex(), request.getLastLogTerm())
.compareTo(lastLogId) >= 0;

// 如果 logIsOk,且当前节点目前没有投票给其它节点
if (logIsOk && (this.votedId == null || this.votedId.isEmpty())) {
stepDown(request.getTerm(), false, new Status(RaftError.EVOTEFORCANDIDATE,
"Raft node votes for some candidate, step down to restart election_timer."));
this.votedId = candidateId.copy();
// 更新本地元数据信息
this.metaStorage.setVotedFor(candidateId);
}
} while (false);

// 发送 RequestVote RPC 响应
return RequestVoteResponse.newBuilder() //
.setTerm(this.currTerm) //
.setGranted(request.getTerm() == this.currTerm && candidateId.equals(this.votedId)) //
.build();
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}

响应正式选举 RequestVote 请求的整体执行流程可以概括为:

  1. 如果当前节点处于非活跃状态,则响应错误;
  2. 否则,解析候选节点的节点 ID,如果解析出错则响应错误;
  3. 否则,如果候选节点的 term 值小于当前节点,则拒绝投票;
  4. 否则,如果候选节点的 term 值大于当前节点,则需要执行 stepdown;
  5. 如果候选节点的 term 值更新,或者 term 值相同但是对应的 logIndex 不小于当前节点,且当前节点未投票给其它节点,则同意投票,同时更新本地元数据信息;
  6. 否则,拒绝投票。

关于步骤 4,此时处理 RequestVote RPC 请求的节点角色仍然是 FOLLOWER,所以除了重置本地状态和再次启动预选举计时器之外,一个重要的工作就是更新当前节点的 term 值,以保证与当前集群已知的最大 term 值看齐。

发起正式选举请求的节点在发送 RequestVote RPC 请求时会为每个请求绑定一个 OnRequestVoteRpcDone 回调,当目标节点返回响应时会应用该回调以处理 RequestVote 响应。具体的处理过程由 NodeImpl#handleRequestVoteResponse 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void handleRequestVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) {
this.writeLock.lock();
try {
// 当前节点已经不是 CANDIDATE 角色,可能以及竞选成功,或者被打回 FOLLOWER 角色了,忽略响应
if (this.state != State.STATE_CANDIDATE) {
LOG.warn("Node {} received invalid RequestVoteResponse from {}, state not in STATE_CANDIDATE but {}.",
getNodeId(), peerId, this.state);
return;
}

// 期间 term 值已经发生变化,忽略响应
if (term != this.currTerm) {
LOG.warn("Node {} received stale RequestVoteResponse from {}, term={}, currTerm={}.",
getNodeId(), peerId, term, this.currTerm);
return;
}

// 目标节点的 term 值比当前节点大,需要执行 stepdown
if (response.getTerm() > this.currTerm) {
LOG.warn("Node {} received invalid RequestVoteResponse from {}, term={}, expect={}.",
getNodeId(), peerId, response.getTerm(), this.currTerm);
stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
"Raft node receives higher term request_vote_response."));
return;
}
// check granted quorum?
if (response.getGranted()) {
this.voteCtx.grant(peerId);
// 如果票数过半,则竞选成功
if (this.voteCtx.isGranted()) {
becomeLeader();
}
}
} finally {
this.writeLock.unlock();
}
}

对于正式选举 RequestVote 响应的整体处理流程可以概括如下:

  1. 校验当前节点是不是 CANDIDATE 角色,如果不是则可能已经竞选成功,或者被打回成了 FOLLOWER 角色,忽略响应;
  2. 否则,校验等待响应期间节点的 term 值是否发生变化,如果是则忽略响应;
  3. 否则,如果目标节点的 term 值相较于当前节点更大,则需要忽略响应,并执行 stepdown;
  4. 否则,如果目标节点同意投票,则更新选票计数,否则忽略响应;
  5. 如果票数过半,则执行 NodeImpl#becomeLeader 方法成为 LEADER 角色。

关于步骤 3,当前节点角色为 CANDIDATE,所以执行 stepdown 会让当前节点停止正式选举计时器,并切换角色为 FOLLOWER,并再次启动预选举计时器。此外,还会更新当前节点的 term 值,以保证与当前集群已知的最大 term 值看齐。

如果当前节点票数过半,则接下去会调用 NodeImpl#becomeLeader 方法执行从 CANDIDATE 到 LEADER 的角色转变,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private void becomeLeader() {
// 前置角色必须是 CANDIDATE
Requires.requireTrue(this.state == State.STATE_CANDIDATE, "Illegal state: " + this.state);
LOG.info("Node {} become leader of group, term={}, conf={}, oldConf={}.",
getNodeId(), this.currTerm, this.conf.getConf(), this.conf.getOldConf());
// 停止正式选举计时器
stopVoteTimer();
// 切换角色为 LEADER
this.state = State.STATE_LEADER;
// 更新本地记录的 leader 节点 ID
this.leaderId = this.serverId.copy();
// 设置复制器组的 term 值
this.replicatorGroup.resetTerm(this.currTerm);

// 处理 Follower 节点:遍历将集群中除自己以外的 Follower 节点纳为自己的 Follower,并建立到这些节点的复制关系
for (final PeerId peer : this.conf.listPeers()) {
if (peer.equals(this.serverId)) {
continue;
}
LOG.debug("Node {} add a replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);
if (!this.replicatorGroup.addReplicator(peer)) {
LOG.error("Fail to add a replicator, peer={}.", peer);
}
}

// 处理 Learner 节点:遍历将集群中除自己以外的 Learner 节点纳为自己的 Learner,并建立到这些节点的复制关系
// Learner 节点只是复制日志,不会对日志的提交做决策
for (final PeerId peer : this.conf.listLearners()) {
LOG.debug("Node {} add a learner replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);
if (!this.replicatorGroup.addReplicator(peer, ReplicatorType.Learner)) {
LOG.error("Fail to add a learner replicator, peer={}.", peer);
}
}

// 重置选票箱
this.ballotBox.resetPendingIndex(this.logManager.getLastLogIndex() + 1);
// Register _conf_ctx to reject configuration changing before the first log is committed.
if (this.confCtx.isBusy()) {
throw new IllegalStateException();
}
// 将当前集群配置信息写入日志
this.confCtx.flush(this.conf.getConf(), this.conf.getOldConf());
// 启动 stepdown 计时器
this.stepDownTimer.start();
}

整体的执行流程可以概括为:

  1. 校验当前节点角色是否为 CANDIDATE,LEADER 角色的前置角色必须是 CANDIDATE;
  2. 停止正式选举计时器 voteTimer;
  3. 切换节点角色为 LEADER;
  4. 建立到除自己以外的所有节点之间的复制关系,包括 Follower 和 Learner;
  5. 重置选票箱 BallotBox;
  6. 将当前集群的节点配置信息记录到日志中;
  7. 启动 stepdown 计时器 stepDownTimer。

JRaft 中的节点区分 Learner 和非 Learner 角色,官方对于 Learner 角色的说明如下:

Learner 节点也叫只读节点,只读节点类似于 Follower 节点,将从 Leader 复制日志并应用到本地状态机中,但是不参与选举,复制日志成功也不会被认为是多数派的一员。简而言之,除了复制日志以外,只读成员不参与其他任何 Raft 算法过程。一般应用在为某个服务创建一个只读服务的时候,实现类似读写分离的效果,或者数据冷备等场景。

当一个节点竞选成功成为 LEADER 角色之后,按照 Raft 的强 Leader 约束,所有集群中的其它节点将成为该 Leader 节点的 Follower 节点。所以,Leader 节点需要建立到这些节点的复制关系,包括 Learner 和非 Learner 节点。关于 ReplicatorGroup#addReplicator 的实现将在后面介绍日志复制机制时再展开分析。

方法 ConfigurationCtx#flush 会将当前集群的节点配置信息作为当前节点成为 LEADER 角色之后的第一条日志同步给集群中的 Follower 节点,关于日志复制机制这里先不展开,后面将用一篇文章针对性介绍。这里需要关注的一点是 Leader 节点在将日志数据同步出去之前会设置一个 ConfigurationChangeDone 回调,并在日志数据被 committed 之后触发执行 ConfigurationChangeDone#run 方法。实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
public void run(final Status status) {
if (status.isOk()) {
// 尝试让集群节点配置趋于稳定
onConfigurationChangeDone(this.term);
if (this.leaderStart) {
// 回调状态机 StateMachine#onLeaderStart 逻辑
getOptions().getFsm().onLeaderStart(this.term);
}
} else {
LOG.error("Fail to run ConfigurationChangeDone, status: {}.", status);
}
}

上述方法在节点成为 Leader 时会回调由应用程序实现的 StateMachine#onLeaderStart 方法。此外,方法 NodeImpl#onConfigurationChangeDone 则尝试让集群节点配置趋于稳定,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void onConfigurationChangeDone(final long term) {
this.writeLock.lock();
try {
// 期间 term 值发生变更,或者当前节点已经不是 Leader,直接跳过
if (term != this.currTerm || this.state.compareTo(State.STATE_TRANSFERRING) > 0) {
LOG.warn("Node {} process onConfigurationChangeDone at term {} while state={}, currTerm={}.",
getNodeId(), term, this.state, this.currTerm);
return;
}
// 将集群配置状态切换到下一个阶段
this.confCtx.nextStage();
} finally {
this.writeLock.unlock();
}
}

在 Leader 选举场景下,集群节点配置上下文 ConfigurationCtx 的 stage 分为 STAGE_STABLESTAGE_JOINT 两类,前者表示集群配置已经趋于稳定,而后者则表示集群目前存在新老配置过渡的情况。ConfigurationCtx 针对这两类 stage 的处理逻辑实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// com.alipay.sofa.jraft.core.NodeImpl.ConfigurationCtx#nextStage
case STAGE_JOINT:
this.stage = Stage.STAGE_STABLE;
// 再次应用配置变更,剔除老的配置信息
this.node.unsafeApplyConfiguration(new Configuration(this.newPeers, this.newLearners), null, false);
break;
case STAGE_STABLE:
// 当前集群节点配置是否包含当前节点
final boolean shouldStepDown = !this.newPeers.contains(this.node.serverId);
reset(new Status());
if (shouldStepDown) {
this.node.stepDown(this.node.currTerm, true,
new Status(RaftError.ELEADERREMOVED, "This node was removed."));
}
break;

具体执行逻辑如上述代码注释。关于 Raft 算法的集群节点配置变更算是一个相对复杂的问题,这里不打算展开说明,后续考虑用一篇文章针对性介绍。

此外,在节点成为 LEADER 角色之后会将集群配置信息作为第一条日志进行提交,还有另外一个考虑。当一个节点刚刚竞选成为 LEADER 角色时,此时该节点本地的 committedIndex 值并不一定是当前整个系统范围内最新的 committedIndex 值,这会影响线性一致性读结果的准确性,而通过提交日志操作则能够保证新的 Leader 节点的 committedIndex 被更新为集群范围内的最新值。

Leader 让权

Leader 节点需要定期检查自己的权威是否持续有效,即集群中过半数的 Follower 节点都能响应自己的心跳请求,如果不是则需要让权。这一过程由 stepdown 计时器 stepDownTimer 负责,由前面 NodeImpl#becomeLeader 方法的实现也可以看到在节点成为 LEADER 角色之后会启动 stepdown 计时器。该计时器的核心逻辑由 NodeImpl#handleStepDownTimeout 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private void handleStepDownTimeout() {
this.writeLock.lock();
try {
// 当前节点不是 LEADER 角色,无需让权
if (this.state.compareTo(State.STATE_TRANSFERRING) > 0) {
LOG.debug("Node {} stop step-down timer, term={}, state={}.", getNodeId(), this.currTerm, this.state);
return;
}
final long monotonicNowMs = Utils.monotonicMs();
// 检查集群中是否有超过半数的 Follower 节点仍然在响应自己的心跳请求,如果不是则执行让权操作
checkDeadNodes(this.conf.getConf(), monotonicNowMs);
if (!this.conf.getOldConf().isEmpty()) {
checkDeadNodes(this.conf.getOldConf(), monotonicNowMs);
}
} finally {
this.writeLock.unlock();
}
}

private void checkDeadNodes(final Configuration conf, final long monotonicNowMs) {
// Check learner replicators at first.
for (PeerId peer : conf.getLearners()) {
// 确定到所有 Learner 节点的复制关系都建立了
checkReplicator(peer);
}
// Ensure quorum nodes alive.
final List<PeerId> peers = conf.listPeers();
final Configuration deadNodes = new Configuration();
// 如果集群中认同当前 Leader 节点的 Follower 节点数过半,则无需让权
if (checkDeadNodes0(peers, monotonicNowMs, true, deadNodes)) {
return;
}
LOG.warn("Node {} steps down when alive nodes don't satisfy quorum, " +
"term={}, deadNodes={}, conf={}.", getNodeId(), this.currTerm, deadNodes, conf);
final Status status = new Status();
status.setError(RaftError.ERAFTTIMEDOUT, "Majority of the group dies: %d/%d", deadNodes.size(), peers.size());
// 集群中认同当前 Leader 节点的 Follower 节点数小于一半,执行让权操作
stepDown(this.currTerm, false, status);
}

其中方法 NodeImpl#checkDeadNodes0 会检查目标 Follower 节点与当前 Leader 节点最近一次的 RPC 请求时间戳,以此决定对应的租约是否仍然有效,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private boolean checkDeadNodes0(final List<PeerId> peers,
final long monotonicNowMs,
final boolean checkReplicator,
final Configuration deadNodes) {
// 获取租约时长,默认为选举超时的 90%
final int leaderLeaseTimeoutMs = this.options.getLeaderLeaseTimeoutMs();
int aliveCount = 0;
// 记录向所有活跃节点发送 RPC 请求的最小时间戳
long startLease = Long.MAX_VALUE;
// 遍历逐个检查目标 Follower 节点
for (final PeerId peer : peers) {
if (peer.equals(this.serverId)) {
aliveCount++;
continue;
}
// 检查到目标节点之间的复制关系,避免因为缺失复制关系而误将目标节点判为死亡
if (checkReplicator) {
checkReplicator(peer);
}
// 获取最近一次成功向目标节点发送 RPC 请求的时间戳
final long lastRpcSendTimestamp = this.replicatorGroup.getLastRpcSendTimestamp(peer);
// 到目标节点的租约仍然有效,则视目标节点仍然活跃
if (monotonicNowMs - lastRpcSendTimestamp <= leaderLeaseTimeoutMs) {
aliveCount++; // 活跃节点数加 1
// 更新向所有活跃节点发送 RPC 请求的最小时间戳
if (startLease > lastRpcSendTimestamp) {
startLease = lastRpcSendTimestamp;
}
continue;
}
// 记录死亡节点
if (deadNodes != null) {
deadNodes.addPeer(peer);
}
}
// 活跃节点数过半,说明当前 Leader 节点仍然有效,更新时间戳(向所有活跃节点发送 RPC 请求的最小时间戳)
if (aliveCount >= peers.size() / 2 + 1) {
updateLastLeaderTimestamp(startLease);
return true;
}
return false;
}

如果过半数的 Follower 节点对应的租约都失效,则当前 Leader 节点需要执行让权操作,因为集群有可能正在或已经选举出新的 Leader 节点。如果过半数的 Follower 节点对应的租约仍然有效,则上述操作会使用 NodeImpl#lastLeaderTimestamp 字段记录向这些 Follower 节点成功发送 RPC 请求的最早时间。该字段对于 Leader 节点而言用于在 LeaseRead 策略的线性一致性读场景下判断当前 Leader 节点是否仍然有效,具体将在后面介绍线性一致性读机制的文章中展开介绍。

让权操作的过程由 NodeImpl#stepDown 方法实现,该方法我们在前面已经多次遇到过,只是每次调用时节点的状态不同而已。此时,节点以 LEADER 角色调用该方法,除了将角色切换成 FOLLOWER、初始化本地状态,以及启动预选举计时器 electionTimer 之外,在此之前还会执行如下一段逻辑:

1
2
3
4
5
6
7
8
9
// 停止 stepdown 计时器
stopStepDownTimer();
// 清空选票箱
this.ballotBox.clearPendingTasks();
// signal fsm leader stop immediately
if (this.state == State.STATE_LEADER) {
// 向状态机调度器发布 LEADER_STOP 事件
onLeaderStop(status);
}

状态机调度器的运行机制我们在前面已经介绍过,LEADER_STOP 状态机事件会触发 FSMCaller 回调应用程序实现的 StateMachine#onLeaderStop 方法,这与我们前面介绍的回调应用程序实现的 StateMachine#onLeaderStart 方法相呼应。

总结

主节点选举是 Raft 算法的核心组成部分,是支持 Raft 集群运行所不可或缺的一部分。Raft 算法采用 Strong Leader 的设计,要求整个 Raft 集群必须只有一个 Leader 节点,所有其它 Follower 节点必须服从于 Leader 节点。这为简化共识算法的设计和实现起到了积极的作用,但我们也不能否认 Leader 节点在整个 Raft 算法的运行过程中负担了太多。

本文我们从源码层面分析了 JRaft 算法库关于主节点选举机制的实现。有别于 Raft 算法,JRaft 将主节点选举拆分为预选举和正式选举两个阶段,以此避免无效的选举影响 Leader 节点的正常运行,进而最终影响整个集群的稳定性。此外,通过实现我们也能感受到计时器在 Raft 算法设计中的重要地位,而随机化计时时间这么一个小小的优化则解决了协议运行所面临的重大隐患。

参考

  1. Raft Consensus Algorithm
  2. SOFA-JRaft 官网
  3. SOFA-JRaft:选举机制剖析