privatevoiddoSnapshot(final Closure done){ if (this.snapshotExecutor != null) { // 调用 SnapshotExecutor 生成快照 this.snapshotExecutor.doSnapshot(done); } else { if (done != null) { final Status status = new Status(RaftError.EINVAL, "Snapshot is not supported"); Utils.runClosureInThread(done, status); } } }
// 正在安装快照 if (this.downloadingSnapshot.get() != null) { Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Is loading another snapshot.")); return; }
// 正在生成快照,不允许重复执行 if (this.savingSnapshot) { Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Is saving another snapshot.")); return; }
// 状态机调度器最后应用的 LogEntry 已经被快照,说明没有新的数据可以被快照 if (this.fsmCaller.getLastAppliedIndex() == this.lastSnapshotIndex) { // There might be false positive as the getLastAppliedIndex() is being updated. // But it's fine since we will do next snapshot saving in a predictable time. doUnlock = false; this.lock.unlock(); this.logManager.clearBufferedLogs(); Utils.runClosureInThread(done); return; }
// 可以被快照的数据量小于阈值,暂不生成快照 finallong distance = this.fsmCaller.getLastAppliedIndex() - this.lastSnapshotIndex; if (distance < this.node.getOptions().getSnapshotLogIndexMargin()) { // If state machine's lastAppliedIndex value minus lastSnapshotIndex value is // less than snapshotLogIndexMargin value, then directly return. if (this.node != null) { LOG.debug("Node {} snapshotLogIndexMargin={}, distance={}, so ignore this time of snapshot by snapshotLogIndexMargin setting.", this.node.getNodeId(), distance, this.node.getOptions().getSnapshotLogIndexMargin()); } doUnlock = false; this.lock.unlock(); Utils.runClosureInThread(done); return; }
// 创建并初始化快照写入器,默认使用 LocalSnapshotWriter 实现类 final SnapshotWriter writer = this.snapshotStorage.create(); if (writer == null) { Utils.runClosureInThread(done, new Status(RaftError.EIO, "Fail to create writer.")); reportError(RaftError.EIO.getNumber(), "Fail to create snapshot writer."); return; }
// 标记当前正在安装快照 this.savingSnapshot = true; // 创建一个回调,用于感知异步快照生成状态 final SaveSnapshotDone saveSnapshotDone = new SaveSnapshotDone(writer, done, null); if (!this.fsmCaller.onSnapshotSave(saveSnapshotDone)) { // 往 Disruptor 队列投递事件失败 Utils.runClosureInThread(done, new Status(RaftError.EHOSTDOWN, "The raft node is down.")); return; } this.runningJobs.incrementAndGet(); } finally { if (doUnlock) { this.lock.unlock(); } }
voidcontinueRun(final Status st){ // 更新已经被快照的 logIndex 和 term 状态值,更新 LogManager 状态 finalint ret = onSnapshotSaveDone(st, this.meta, this.writer); if (ret != 0 && st.isOk()) { st.setError(ret, "node call onSnapshotSaveDone failed"); } if (this.done != null) { Utils.runClosureInThread(this.done, st); } }
// com.alipay.sofa.jraft.storage.snapshot.SnapshotExecutorImpl#onSnapshotSaveDone intonSnapshotSaveDone(final Status st, final SnapshotMeta meta, final SnapshotWriter writer){ int ret; this.lock.lock(); try { ret = st.getCode(); // InstallSnapshot can break SaveSnapshot, check InstallSnapshot when SaveSnapshot // because upstream Snapshot maybe newer than local Snapshot. if (st.isOk()) { // 已安装的快照相对于本次生成的快照数据要新 if (meta.getLastIncludedIndex() <= this.lastSnapshotIndex) { ret = RaftError.ESTALE.getNumber(); if (this.node != null) { LOG.warn("Node {} discards an stale snapshot lastIncludedIndex={}, lastSnapshotIndex={}.", this.node.getNodeId(), meta.getLastIncludedIndex(), this.lastSnapshotIndex); } writer.setError(RaftError.ESTALE, "Installing snapshot is older than local snapshot"); } } } finally { this.lock.unlock(); }
// 生成快照成功 if (ret == 0) { // 记录快照元数据信息 if (!writer.saveMeta(meta)) { LOG.warn("Fail to save snapshot {}.", writer.getPath()); ret = RaftError.EIO.getNumber(); } } // 生成快照失败 else { if (writer.isOk()) { writer.setError(ret, "Fail to do snapshot."); } } // 关闭快照写入器 try { writer.close(); } catch (final IOException e) { LOG.error("Fail to close writer", e); ret = RaftError.EIO.getNumber(); } boolean doUnlock = true; this.lock.lock(); try { // 生成快照成功 if (ret == 0) { // 更新最新快照对应的 logIndex 和 term 值 this.lastSnapshotIndex = meta.getLastIncludedIndex(); this.lastSnapshotTerm = meta.getLastIncludedTerm(); doUnlock = false; this.lock.unlock(); // 更新 LogManager 状态,并将本地已快照的日志剔除 this.logManager.setSnapshot(meta); // should be out of lock doUnlock = true; this.lock.lock(); } if (ret == RaftError.EIO.getNumber()) { reportError(RaftError.EIO.getNumber(), "Fail to save snapshot."); } // 清除正在生成快照的标记 this.savingSnapshot = false; this.runningJobs.countDown(); return ret;
} finally { if (doUnlock) { this.lock.unlock(); } } }
publicvoidinstallSnapshot(final InstallSnapshotRequest request, final InstallSnapshotResponse.Builder response, final RpcRequestClosure done){ // 从请求中获取快照元数据信息 final SnapshotMeta meta = request.getMeta(); // 新建一个下载快照的任务 final DownloadingSnapshot ds = new DownloadingSnapshot(request, response, done); // DON'T access request, response, and done after this point // as the retry snapshot will replace this one. // 尝试注册当前任务,可能存在有其它任务正在运行的情况 if (!registerDownloadingSnapshot(ds)) { LOG.warn("Fail to register downloading snapshot."); // This RPC will be responded by the previous session return; } Requires.requireNonNull(this.curCopier, "curCopier"); try { // 等待从 Leader 复制快照数据完成 this.curCopier.join(); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); LOG.warn("Install snapshot copy job was canceled."); return; }
this.lock.lock(); try { // SnapshotExecutor 已被停止 if (this.stopped) { LOG.warn("Register DownloadingSnapshot failed: node is stopped."); ds.done.sendResponse(RpcFactoryHelper // .responseFactory() // .newResponse(InstallSnapshotResponse.getDefaultInstance(), RaftError.EHOSTDOWN, "Node is stopped.")); returnfalse; } // 正在生成快照 if (this.savingSnapshot) { LOG.warn("Register DownloadingSnapshot failed: is saving snapshot."); ds.done.sendResponse(RpcFactoryHelper // .responseFactory().newResponse(InstallSnapshotResponse.getDefaultInstance(), RaftError.EBUSY, "Node is saving snapshot.")); returnfalse; }
ds.responseBuilder.setTerm(this.term); // 安装快照请求中的 term 值与当前节点的 term 值不匹配 if (ds.request.getTerm() != this.term) { LOG.warn("Register DownloadingSnapshot failed: term mismatch, expect {} but {}.", this.term, ds.request.getTerm()); ds.responseBuilder.setSuccess(false); ds.done.sendResponse(ds.responseBuilder.build()); returnfalse; } // 需要安装的快照数据已经被快照 if (ds.request.getMeta().getLastIncludedIndex() <= this.lastSnapshotIndex) { LOG.warn( "Register DownloadingSnapshot failed: snapshot is not newer, request lastIncludedIndex={}, lastSnapshotIndex={}.", ds.request.getMeta().getLastIncludedIndex(), this.lastSnapshotIndex); ds.responseBuilder.setSuccess(true); ds.done.sendResponse(ds.responseBuilder.build()); returnfalse; } final DownloadingSnapshot m = this.downloadingSnapshot.get(); // null 表示当前没有正在进行中的安装快照操作 if (m == null) { this.downloadingSnapshot.set(ds); Requires.requireTrue(this.curCopier == null, "Current copier is not null"); // 从指定的 URI 下载快照数据 this.curCopier = this.snapshotStorage.startToCopyFrom(ds.request.getUri(), newCopierOpts()); if (this.curCopier == null) { this.downloadingSnapshot.set(null); LOG.warn("Register DownloadingSnapshot failed: fail to copy file from {}.", ds.request.getUri()); ds.done.sendResponse(RpcFactoryHelper // .responseFactory() // .newResponse(InstallSnapshotResponse.getDefaultInstance(), RaftError.EINVAL, "Fail to copy from: %s", ds.request.getUri())); returnfalse; } this.runningJobs.incrementAndGet(); returntrue; }
// A previous snapshot is under installing, check if this is the same snapshot and resume it, // otherwise drop previous snapshot as this one is newer
// m 为正在安装快照的任务,ds 为当前任务
// 当前新注册的任务与正在执行的任务安装的是同一份快照数据 if (m.request.getMeta().getLastIncludedIndex() == ds.request.getMeta().getLastIncludedIndex()) { // m is a retry // Copy |*ds| to |*m| so that the former session would respond this RPC. saved = m; this.downloadingSnapshot.set(ds); result = false; } // 正在执行的安装快照任务操作的数据更新,忽略当前任务 elseif (m.request.getMeta().getLastIncludedIndex() > ds.request.getMeta().getLastIncludedIndex()) { // |is| is older LOG.warn("Register DownloadingSnapshot failed: is installing a newer one, lastIncludeIndex={}.", m.request.getMeta().getLastIncludedIndex()); ds.done.sendResponse(RpcFactoryHelper // .responseFactory() // .newResponse(InstallSnapshotResponse.getDefaultInstance(), RaftError.EINVAL, "A newer snapshot is under installing")); returnfalse; } // 当前安装快照任务操作的数据相对于正在执行的任务更新 else { // 正在执行的任务已经进入了 loading 阶段 if (this.loadingSnapshot) { LOG.warn("Register DownloadingSnapshot failed: is loading an older snapshot, lastIncludeIndex={}.", m.request.getMeta().getLastIncludedIndex()); ds.done.sendResponse(RpcFactoryHelper // .responseFactory() // .newResponse(InstallSnapshotResponse.getDefaultInstance(), RaftError.EBUSY, "A former snapshot is under loading")); returnfalse; } Requires.requireNonNull(this.curCopier, "curCopier"); // 停止当前正在执行的任务 this.curCopier.cancel(); LOG.warn( "Register DownloadingSnapshot failed: an older snapshot is under installing, cancel downloading, lastIncludeIndex={}.", m.request.getMeta().getLastIncludedIndex()); ds.done.sendResponse(RpcFactoryHelper // .responseFactory() // .newResponse(InstallSnapshotResponse.getDefaultInstance(), RaftError.EBUSY, "A former snapshot is under installing, trying to cancel")); returnfalse; } } finally { this.lock.unlock(); } if (saved != null) { // Respond replaced session LOG.warn("Register DownloadingSnapshot failed: interrupted by retry installling request."); saved.done.sendResponse(RpcFactoryHelper // .responseFactory() // .newResponse(InstallSnapshotResponse.getDefaultInstance(), RaftError.EINTR, "Interrupted by the retry InstallSnapshotRequest")); } return result; }
注册新的快照文件下载任务的整体执行流程可以概括为:
如果当前 SnapshotExecutor 已被停止,则放弃注册新的任务;
否则,如果当前正在生成快照文件,则放弃注册新的任务;
否则,校验安装快照请求中指定的 term 值是否与当前节点的 term 值相匹配,如果不匹配则说明请求来源节点已经不再是 LEADER 角色,放弃为本次安装快照请求注册新的任务;