SOFA-JRaft 源码解析:线性一致性读

关于线性一致性读的定义,简单而言就是在 T 时刻执行写入操作,那么在 T 时刻之后一定能够读取到之前写入的值。Raft 算法能够至少保证集群节点数据的最终一致性,也就说就某一特定时刻而言各个节点之间的数据状态允许存在滞后。在分布式场景下如果允许各个节点随机响应用户的读请求,则可能会读取到脏数据。如果希望基于 Raft 算法实现线性一致性读语义,最简单的方式就是将读操作作为一个指令提交给 Raft 集群,由于 Raft 算法能够保证指令执行的顺序性,所以该读操作指令一定能够读取到在此之前写入的值(本文将此类线性一致性读策略命名为 RaftLog Read)。然而,RaftLog Read 的缺点也是显而易见的,每次读操作都需要走一遍完整的 Raft 算法流程势必效率低下,并且大部分的应用场景都具备读多写少的特征,所以该策略势必会让 Raft 集群产生大量的日志文件,增加磁盘和网络的开销。

换一个角度思考,在 Raft 算法中更新操作都由 Leader 节点负责响应,那么极端一点每次都从 Leader 节点读数据是不是就万事大吉了呢?先不说这种方式将一个分布式系统退化成了单机系统,我们还需要考虑下面两个问题:

  • 日志数据从提交到被业务状态机所应用这中间存在一定的时间滞后性,所以直接执行读操作不一定能够读取到最新的数据。
  • 当前 Leader 节点不一定是有效的,因为 Leader 节点的变更通常有一个时间差,而这中间存在导致脏读的可能性。

对于一个分布式系统而言,只能从 Leader 节点读这显然是不能接受的,所以我们的真正需求是能够从各个节点发起读请求,同时还需要保证读写的线性一致性。为此,Raft 算法的最终实现思路可以概括如下:

  1. 确定当前 Leader 节点的有效性;
  2. 从 Leader 节点拉取最新的 comittedIndex 值,即 lastComittedIndex 值;
  3. 等待本地已被业务状态应用的 LogEntry 对应的 logIndex 值超过该 lastComittedIndex 位置。

如果目标 Leader 节点是有效的,那么由于写操作在前(令时刻为 T1),在发起读操作的那一刻(令时刻为 T2(T2 > T1))开始,该 Leader 节点已提交的日志中必定包含 T1 时刻的写操作指令,也就是说该写操作对应 LogEntry 的 logIndex 必定小于等于 lastComittedIndex 值。所以,如果我们在本地等待业务状态机完成对于 lastComittedIndex 位置之前日志的应用,一定能够保证 T2 时刻的读操作能够从本地看到 T2 时刻 Leader 节点的数据状态,又 T2 > T1,所以 T2 时刻的读操作至少能够看到 T1 时刻的写结果。

既然证明了上述思路能够让 Raft 算法实现线性一致性读语义,那么接下去问题就演变成如何确定当前 Leader 节点是否有效。Raft 算法为此提出了两种思路,即:ReadIndex Read 和 Lease Read。

  • ReadIndex Read

我们的目标是确定当前 Leader 节点是否有效,直观的思路就是让该 Leader 节点向所有的 Follower 节点发送一次心跳请求,如果过半数的 Follower 节点都能正常响应该心跳请求,则视为当前 Leader 节点仍然有效,这要是 ReadIndex Read 的核心思想。相对于 RaftLog Read,ReadIndex Read 策略虽然也需要与各个 Follower 节点进行一次 RPC 交互(心跳请求交互的开销极小),但是省去了日志流程,在磁盘和网络开销层面都更加友好,整体性能也完胜 RaftLog Read。

  • Lease Read

ReadIndex Read 相对于 RaftLog Read 策略,在保证线性一致性读语义的前提下在磁盘占用、网络开销,以及读性能等方面都有明显的改善,但是不足之处在于仍然会与各个 Follower 节点进行一次 RPC 交互,能不能消除这一次 RPC 交互呢?为此,Raft 算法基于 clock + heartbeat 的思想提出了 Lease Read 策略。

Lease Read 策略下 Leader 节点会记录一轮成功心跳请求的开始时间(即向各个 Follower 节点成功发送心跳 RPC 请求的最早时间戳),令其为 startTime。由于 Raft 的选举由超时策略触发(令超时时长为 electionTimeout),所以我们可以认为所有的 Follower 节点在 startTime + electionTimeout 时间之前都不会发起新一轮的选举请求(因为心跳请求会重置 Follower 节点的选举计时器),即对应的 Leader 节点在此时间之前一定是有效的,从而避免了向各个 Follower 节点发送心跳请求的操作。

然而,Lease Read 策略的应用有一个前提,即所有节点宿主机的时钟频率是一致的,如果某个节点的时钟频率相对较快则会导致该节点在 startTime + electionTimeout 之前出现超时的情况。为了解决此类时钟漂移的问题,我们可以将 startTime + electionTimeout 修改为 startTime + electionTimeout / clockDriftBound,即容忍一定的时间漂移,不过这只是降低了问题出现的可能性,并没有完全解决问题。好在大多数情况下 CPU 的时钟频率都是准确的,并且 Lease Read 策略相对于 ReadIndex Read 策略在性能层面表现更优(JRaft 给出的统计提升在 15% 左右),所以业务可以基于自己的应用场景决策使用哪种线性一致性读策略。

实现内幕

上面介绍了线性一致性读的定义,以及在 Raft 算法中提供线性一致性读语义的 3 种策略及其优缺点,本小节我们开始从 JRaft 算法库的实现层面去分析如何实现这些策略。

JRaft 节点定义了 Node#readIndex 方法用于实现向 JRaft 集群发送一个线性一致性读请求,并感知当前节点是否完成与 Leader 节点就当前时刻 Leader 节点已经提交的数据的同步状态。该方法接收两个参数,其中 requestContext 参数用于封装一些请求上下文信息,而 done 参数则会在节点完成线性一致性读处理时被回调,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
public void readIndex(final byte[] requestContext, final ReadIndexClosure done) {
// 当前节点正在被关闭
if (this.shutdownLatch != null) {
Utils.runClosureInThread(done,
new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));
throw new IllegalStateException("Node is shutting down");
}
Requires.requireNonNull(done, "Null closure");
// 向 ReadOnlyService 提交一个 ReadIndex 请求,
// 当本地数据与 Leader 节点数据在特定的位置(lastCommittedIndex)同步时,响应回调
this.readOnlyService.addRequest(requestContext, done);
}

如果当前节点处于正常运行状态,则上述方法会调用 ReadOnlyService#addRequest 方法以 Disruptor 消息的形式向集群提交一个线性一致性读请求事件,而处理这一类型事件的逻辑则由 ReadOnlyServiceImpl#executeReadIndexEvents 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void executeReadIndexEvents(final List<ReadIndexEvent> events) {
if (events.isEmpty()) {
return;
}

// 构造 ReadIndex 请求
final ReadIndexRequest.Builder rb = ReadIndexRequest.newBuilder() //
.setGroupId(this.node.getGroupId()) //
.setServerId(this.node.getServerId().toString());

final List<ReadIndexState> states = new ArrayList<>(events.size());

for (final ReadIndexEvent event : events) {
rb.addEntries(ZeroByteStringHelper.wrap(event.requestContext.get()));
states.add(new ReadIndexState(event.requestContext, event.done, event.startTime));
}
final ReadIndexRequest request = rb.build();

// 处理 ReadIndex 请求
this.node.handleReadIndexRequest(request, new ReadIndexResponseClosure(states, request));
}

上述实现的主要逻辑在于构造 ReadIndex 请求,然后调用 NodeImpl#handleReadIndexRequest 方法处理该请求。发起线性一致性读请求的节点可以是 Leader 节点,也可以是 Follower 或 Learner 节点:

  • 对于 Follower 或 Learner 节点而言,需要将请求转发给 Leader 节点,以获取当前 Leader 节点的 lastCommittedIndex 位置。
  • 对于 Leader 节点而言,其目的是读取本地的 lastCommittedIndex 值返回给请求节点,但是在返回之前需要验证当前 Leader 节点的有效性。

方法 NodeImpl#handleReadIndexRequest 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void handleReadIndexRequest(final ReadIndexRequest request, final RpcResponseClosure<ReadIndexResponse> done) {
final long startMs = Utils.monotonicMs();
this.readLock.lock();
try {
switch (this.state) {
// 当前节点是 LEADER 角色
case STATE_LEADER:
// 基于 ReadIndexRead 或 LeaseRead 策略验证当前 Leader 节点是否仍然有效
readLeader(request, ReadIndexResponse.newBuilder(), done);
break;
// 当前节点是 FOLLOWER 角色
case STATE_FOLLOWER:
// 向 Leader 节点发送 ReadIndex 请求
readFollower(request, done);
break;
// 当前正在执行 LEADER 节点切换
case STATE_TRANSFERRING:
done.run(new Status(RaftError.EBUSY, "Is transferring leadership."));
break;
default:
done.run(new Status(RaftError.EPERM, "Invalid state for readIndex: %s.", this.state));
break;
}
} finally {
// ... metrics
}
}

上述方法依据当前节点的角色分而治之,对于 Follower 或 Learner 节点而言只是简单的将 ReadIndex 请求转发给 Leader 节点进行处理,实现比较简单。下面重点来看一下 NodeImpl#readLeader 方法的实现,不管当前 ReadIndex 请求是来自 Leader 节点还是 Follower 节点,最终都需要转发给 Leader 节点执行。方法 NodeImpl#readLeader 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
private void readLeader(final ReadIndexRequest request,
final ReadIndexResponse.Builder respBuilder,
final RpcResponseClosure<ReadIndexResponse> closure) {
// 获取仲裁值,即集群节点的半数加 1
final int quorum = getQuorum();

// 当前集群只有一个节点,直接返回 lastCommittedIndex 值
if (quorum <= 1) {
// Only one peer, fast path.
respBuilder.setSuccess(true) //
.setIndex(this.ballotBox.getLastCommittedIndex());
closure.setResponse(respBuilder.build());
closure.run(Status.OK());
return;
}

// 获取本地记录的 lastCommittedIndex 值
final long lastCommittedIndex = this.ballotBox.getLastCommittedIndex();
// 校验 term 值是否发生变化,以保证对应的 lastCommittedIndex 值是有效的
if (this.logManager.getTerm(lastCommittedIndex) != this.currTerm) {
// Reject read only request when this leader has not committed any log entry at its term
closure.run(new Status(
RaftError.EAGAIN,
"ReadIndex request rejected because leader has not committed any log entry at its term, logIndex=%d, currTerm=%d.",
lastCommittedIndex, this.currTerm));
return;
}
// 记录 lastCommittedIndex 到请求响应对象中
respBuilder.setIndex(lastCommittedIndex);

// 对于来自 Follower 节点或 Learner 节点的请求,peerId 字段会记录这些节点已知的 leaderId 值,所以不为 null
if (request.getPeerId() != null) {
// request from follower or learner, check if the follower/learner is in current conf.
final PeerId peer = new PeerId();
peer.parse(request.getServerId());
// 请求来源节点并不是当前 Leader 节点管理范围内的节点
if (!this.conf.contains(peer) && !this.conf.containsLearner(peer)) {
closure.run(new Status(RaftError.EPERM,
"Peer %s is not in current configuration: %s.", peer, this.conf));
return;
}
}

// 基于参数决策是走 ReadIndexRead 还是 LeaseRead 策略,默认走 ReadIndexRead 策略,
// 如果是 LeaseRead,则基于时间戳检查集群中是否有过半数的节点仍然认可当前 Leader 节点,
ReadOnlyOption readOnlyOpt = this.raftOptions.getReadOnlyOptions();
if (readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased && !isLeaderLeaseValid()) {
// If leader lease timeout, we must change option to ReadOnlySafe
readOnlyOpt = ReadOnlyOption.ReadOnlySafe;
}

switch (readOnlyOpt) {
// ReadIndexRead 策略
case ReadOnlySafe:
final List<PeerId> peers = this.conf.getConf().getPeers();
Requires.requireTrue(peers != null && !peers.isEmpty(), "Empty peers");
// 向所有的 Follower 节点发送心跳请求,以检查当前 Leader 节点是否仍然有效
final ReadIndexHeartbeatResponseClosure heartbeatDone =
new ReadIndexHeartbeatResponseClosure(closure, respBuilder, quorum, peers.size());
// Send heartbeat requests to followers
for (final PeerId peer : peers) {
if (peer.equals(this.serverId)) {
continue;
}
this.replicatorGroup.sendHeartbeat(peer, heartbeatDone);
}
break;
// LeaseRead 策略,能够走到这里,说明集群中有超过半数的节点仍然认可当前 Leader 节点
case ReadOnlyLeaseBased:
// Responses to followers and local node.
respBuilder.setSuccess(true);
closure.setResponse(respBuilder.build());
closure.run(Status.OK());
break;
}
}

Leader 节点处理线性一致性读请求的整体执行流程可以概括为:

  1. 如果当前集群只有一个节点,则立即响应成功;
  2. 否则,校验当前节点本地 lastCommittedIndex 值是否有效,如果无效则响应异常;
  3. 否则,对于来自 Follower 或 Learner 节点的请求,需要校验请求来源节点是否是有效节点,即这些节点位于当前 Leader 节点的主权范围内,如果不是则响应异常;
  4. 否则,基于 ReadIndex Read 或 Lease Read 策略判断当前节点 LEADER 角色的有效性,如果有效则响应成功,否则响应失败。

当一个节点刚刚竞选成为 LEADER 角色时,此时该节点本地的 lastCommittedIndex 值并不一定是当前整个系统最新的 lastCommittedIndex 值,所以上述步骤二需要校验本地 lastCommittedIndex 值的有效性,如果对应的 term 值不匹配则一定是无效的。此外,前面在分析主节点选举机制时曾介绍过当一个节点竞选成功后会将当前集群的节点配置信息作为任期内第一条 LogEntry 进行提交,这一操作能够保证 Leader 节点的 lastCommittedIndex 值是集群范围内最新的。

在获取到最新的 lastCommittedIndex 位置之后,只要能够确定当前 Leader 节点是有效的即能返回该 lastCommittedIndex 值。Raft 算法提出了两种策略以验证当前 Leader 节点的有效性,即 ReadIndex Read 和 Lease Read。JRaft 算法库在实现上允许我们在节点启动时通过参数指定具体的策略,默认则采用 ReadIndex Read 策略。

对于 ReadIndex Read 策略而言,Leader 节点会调用 ReplicatorGroup#sendHeartbeat 方法向集群中除自己以外的所有节点发送一次心跳请求,如果集群中过半数的节点能够成功响应该请求,则视为当前 Leader 节点仍然有效。关于心跳机制已在前面分析日志复制机制时介绍过,这里不再重复说明。

对于 Lease Read 策略而言,Leader 节点会基于租约机制判断当前 LEADER 角色是否仍然有效,即判断最近一轮向所有 Follower 节点成功发送 RPC 请求的最早时间距离当前时间是否在租约范围内。判断的过程由 NodeImpl#isLeaderLeaseValid 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private boolean isLeaderLeaseValid() {
final long monotonicNowMs = Utils.monotonicMs();
// 检查距离最近校验当前 Leader 节点有效性的时间是否在租约范围内
if (checkLeaderLease(monotonicNowMs)) {
return true;
}
// 检查管理的所有 Follower 节点是否有超过半数仍然认为当前 Leader 节点有效
checkDeadNodes0(this.conf.getConf().getPeers(), monotonicNowMs, false, null);
return checkLeaderLease(monotonicNowMs);
}

private boolean checkLeaderLease(final long monotonicNowMs) {
// 最近一次向所有活跃 Follower 节点成功发送 RPC 请求的最早时间距离指定时间是否在有效租约范围内
return monotonicNowMs - this.lastLeaderTimestamp < this.options.getLeaderLeaseTimeoutMs();
}

关于 NodeImpl#checkDeadNodes0 方法的实现已在前面介绍主节点选举机制时分析过,这里不再展开。

最后来看一下针对线性一致性读响应 ReadIndexResponseClosure 的处理过程,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public void run(final Status status) {
// 响应异常,说明 Leader 节点本地的 lastCommittedIndex 值无效,
// 或者当前请求节点不是一个有效节点,或者节点状态不能够响应当前请求
if (!status.isOk()) {
// 快速失败
notifyFail(status);
return;
}
final ReadIndexResponse readIndexResponse = getResponse();
// 响应失败,说明 Leader 节点的 LEADER 角色无效
if (!readIndexResponse.getSuccess()) {
notifyFail(new Status(-1, "Fail to run ReadIndex task, maybe the leader stepped down."));
return;
}

// Success
final ReadIndexStatus readIndexStatus = new ReadIndexStatus(
this.states, this.request, readIndexResponse.getIndex());
// 为各个 ReadIndex 请求填充从 Leader 节点读取到的 lastCommittedIndex 值
for (final ReadIndexState state : this.states) {
// Records current commit log index.
state.setIndex(readIndexResponse.getIndex());
}

boolean doUnlock = true;
ReadOnlyServiceImpl.this.lock.lock();
try {
// 本地已经应用的 LogEntry 的 logIndex 已经超过 lastCommittedIndex 位置,
// 说明就 lastCommittedIndex 位置而言,此位置之前的数据已经能够保证与 Leader 节点同步
if (readIndexStatus.isApplied(ReadOnlyServiceImpl.this.fsmCaller.getLastAppliedIndex())) {
// Already applied, notify readIndex request.
ReadOnlyServiceImpl.this.lock.unlock();
doUnlock = false;
// 回调响应各个 ReadIndex 请求
notifySuccess(readIndexStatus);
}
// 本地已经应用的 LogEntry 的 logIndex 还未到达 lastCommittedIndex 位置,
// 缓存当前请求,等待对应的 LogEntry 在本地被应用时回调响应
else {
// Not applied, add it to pending-notify cache.
ReadOnlyServiceImpl.this.pendingNotifyStatus
.computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) //
.add(readIndexStatus);
}
} finally {
if (doUnlock) {
ReadOnlyServiceImpl.this.lock.unlock();
}
}
}

如果目标 Leader 节点的 LEADER 角色仍然有效,则当前节点会等待本地被应用到状态机的 LogEntry 的 logIndex 位置不小于从该 Leader 节点获取到的 lastComittedIndex 值。这样可以保证就 lastComittedIndex 这个位置而言,本地与 Leader 的数据是同步的,否则就需要等待本地的数据与 Leader 节点的数据进行同步。ReadOnlyServiceImpl 实现了 LastAppliedLogIndexListener 接口,所以当完成应用一批日志数据到状态机中时,相应的 ReadOnlyServiceImpl#onApplied 方法也会被回调,从而尝试触发执行那些等待数据同步的回调。

总结

本文对线性一致性读的定义进行了介绍,并对 Raft 算法提供线性一致性读语义的 3 种策略进行了说明,同时比对了这些策略的优缺点,最后从 JRaft 的源码层面分析了如何实现线性一致性读。JRaft 支持 ReadIndex Read 和 Lease Read 两种线性一致性读策略,并且默认采用 ReadIndex Read 策略。前面我们曾分析了这两种策略各自的优缺点,业务可以结合自己的应用场景进行决策。不过需要注意的一点是,无论采用哪种策略,线性一致性读都需要与 Leader 节点进行交互,当 QPS 较高时需要考量 Leader 节点的负载能力。

参考

  1. Raft Consensus Algorithm
  2. SOFA-JRaft 官网
  3. SOFA-JRaft:线性一致读实现剖析
  4. TiKV 功能介绍 - Lease Read