JStorm 源码解析:Nimbus 的启动和运行机制

本篇我们一起分析一下 nimbus 节点的启动和运行机制。Nimbus 节点是 Storm 集群的调度者和管理者,它是集群与用户交互的窗口,负责 topology 任务的分配、启动和运行,也管理着集群中所有的 supervisor 节点的运行,监控着整个集群的运行状态,并将集群运行信息汇集给 UI 进行展示。

Nimbus 节点的启动过程位于 NimbusServer 类中,这是一个驱动类,main 方法中会加载集群配置文件,包括 default.yaml 和 storm.yaml,并将配置文件内容与启动时的命令行参数一起封装成 map 对象便于后续使用,真正的启动逻辑位于 NimbusServer#launchServer 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void launchServer(final Map conf, INimbus inimbus) {
LOG.info("Begin to start nimbus with conf " + conf);
try {
// 1. 验证当前为分布式运行模式,不允许以本地模式运行
StormConfig.validate_distributed_mode(conf);

// 2. 创建当前 JVM 进程对应的目录:${storm.local.dir}/nimbus/pids/${pid},如果存在历史运行记录,则会进行清除
this.createPid(conf);

// 3. 注册 shutdown hook 方法,用于在 JVM 进程终止时执行清理逻辑
this.initShutdownHook();

// 4. 模板方法
inimbus.prepare(conf, StormConfig.masterInimbus(conf));

// 5. 基于 conf 创建 NimbusData 对象
data = this.createNimbusData(conf, inimbus);

// 6. 注册一个 follower 线程
this.initFollowerThread(conf);

// 7. 创建并启动一个后端 HTTP 服务(默认端口为 7621,主要用于查看和下载 nimbus 的日志数据)
int port = ConfigExtension.getNimbusDeamonHttpserverPort(conf);
hs = new Httpserver(port, conf);
hs.start();

// 8. 如果集群运行在 YARN 上,则初始化容器心跳线程
this.initContainerHBThread(conf);

// 9. 创建 ServiceHandler(实现了 Nimbus.Iface),并启动 Thrift 服务,用于处理 Nimbus 请求
serviceHandler = new ServiceHandler(data);
this.initThrift(conf);
} catch (Throwable e) {
if (e instanceof OutOfMemoryError) {
LOG.error("Halting due to out of memory error...");
}
LOG.error("Fail to run nimbus ", e);
} finally {
this.cleanup();
}
LOG.info("Quit nimbus");
}

整个启动过程可以概括如下:

  1. 检测运行模式是否为集群模式,不允许以本地模式运行;
  2. 在本地创建对应的进程目录:${storm.local.dir}/nimbus/pids/${pid}
  3. 注册 shutdown hook 方法,用于在集群销毁时执行相应的清理逻辑;
  4. 模板方法,如果用户实现了 INimbus#prepare 方法,则会在这里被调度;
  5. 创建并初始化封装 nimbus 运行数据的 NimbusData 对象;
  6. 注册一个 follower 线程,用于支持 HA 机制;
  7. 启动一个 HTTP 服务,主要用于查看和下载 nimbus 节点的运行日志数据;
  8. 如果集群运行在 YARN 上,则初始化容器的心跳线程;
  9. 启动 nimbus thrift 服务。

整个方法的运行逻辑还是相当清晰的,下面就其中一些关键步骤深入分析,主要包含 NimbusData 的实例化过程、HA 机制,以及 thrift 服务的初始化启动过程。

NimbusData 的实例化过程

首先来看一下 NimbusData 的实例化过程,位于 NimbusServer#createNimbusData 方法中,该方法基于前面加载的集群配置信息创建 NimbusData 类实例,并在构造方法中执行了一系列的初始化逻辑。NimbusData 是 nimbus 端非常重要的一个类,封装了 nimbus 节点所有的运行数据,这里挑重点分析一下其构造的初始化过程:

  1. 创建上传和下载传输通道处理器;
  2. 创建并初始化对应的 blobstore 实例;
  3. 创建 StormZkClusterState 对象,并设置本地缓存。

创建上传和下载传输通道处理器位于 NimbusData#createFileHandler 方法中,前面我们在分析 topology 构建和提交过程时曾分析过 jar 文件的上传过程,在开始上传之前客户端会先通知 nimbus 节点做一些准备工作,其中就包含创建文件上传通道,对于创建完成的通道会记录到一个 TimeCacheMap 类型的 uploaders 字段中等待后续取用。在 supervisor 从 nimbus 节点下载对应 topology 的 jar 文件时会创建相应的下载传输通道,并记录到 TimeCacheMap 类型的 downloaders 字段中。本方法就是对这两个字段执行初始化的过程,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public void createFileHandler() {
// 注册一个 callback 方法,基于回调的方式关闭管道或输入流
ExpiredCallback<Object, Object> expiredCallback = new ExpiredCallback<Object, Object>() {
@Override
public void expire(Object key, Object val) {
try {
LOG.info("Close file " + String.valueOf(key));
if (val != null) {
if (val instanceof Channel) {
Channel channel = (Channel) val;
channel.close();
} else if (val instanceof BufferFileInputStream) {
BufferFileInputStream is = (BufferFileInputStream) val;
is.close();
}
}
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}

}
};

/*
* 获取文件上传和下载的超时时间,默认为 30 秒
*
* During upload/download with the master,
* how long an upload or download connection is idle before nimbus considers it dead and drops the connection.
*/
int file_copy_expiration_secs = JStormUtils.parseInt(conf.get(Config.NIMBUS_FILE_COPY_EXPIRATION_SECS), 30);

/*
* TimeCacheMap 在实例化时会启动一个守护线程,
* 并依据超时时间循环从 buckets 中去除对象,并应用执行 callback 的 expire 方法
* 这里的 expire 逻辑是执行关闭管道或输入流
*/
uploaders = new TimeCacheMap<>(file_copy_expiration_secs, expiredCallback);
downloaders = new TimeCacheMap<>(file_copy_expiration_secs, expiredCallback);
}

该方法主要完成了 3 件事情,其中 1 和 2 比较直观,而 3 则在 TimeCacheMap 类实例化时完成,3 件事情分别如下:

  1. 为通道或流创建回调策略,用于关闭通道或流;
  2. 实例化 uploaders 和 downloaders 属性;
  3. 启动一个守护线程,该线程会定期对过期的通道应用注册的回调策略。

我们来看一下步骤 3 的逻辑,TimeCacheMap 是一个自定义的 map 类型,包含 map 类型常用的方法,同时具备超时机制,在实例化对象时会创建并启动一个后台线程,用于定时的应用超时回调策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) {
if (numBuckets < 2) {
throw new IllegalArgumentException("numBuckets must be >= 2");
}
buckets = new LinkedList<>();
for (int i = 0; i < numBuckets; i++) {
buckets.add(new HashMap<K, V>());
}

// 注册回调策略
this.callback = callback;
final long expirationMillis = expirationSecs * 1000L;
final long sleepTime = expirationMillis / (numBuckets - 1);

/*
* cleaner 线程会一直循环的执行,
* 间隔指定时间从缓冲区尾部获取对象,并为该对象应用 callback 的 expire 方法
*/
this.cleaner = new Thread(new Runnable() {

@Override
public void run() {
while (!AsyncLoopRunnable.getShutdown().get()) {
Map<K, V> dead;
JStormUtils.sleepMs(sleepTime);
synchronized (lock) {
// 从缓冲区队尾获取对象
dead = buckets.removeLast();
// 添加一个空的 map 到缓冲区,从而保证线程的正常运行
buckets.addFirst(new HashMap<K, V>());
}
if (TimeCacheMap.this.callback != null) {
for (Entry<K, V> entry : dead.entrySet()) {
TimeCacheMap.this.callback.expire(entry.getKey(), entry.getValue());
}
}
}
}
});
cleaner.setDaemon(true);
cleaner.start();
}

这里我们以 uploaders 为例,当客户端请求 nimbus 执行文件上传准备时,nimbus 会为本次请求创建一个上传通道,同时记录到 uploaders 中,本质上是记录到了 TimeCacheMap 的 buckets 字段头部。在实例化 uploaders 时,方法会创建相应的守护线程,每间隔指定时间(默认是 30 秒)从 buckets 尾部移除超时的通道,并应用回调策略,这里也就是在 createFileHandler 方法开始时创建的关闭通道回调策略。NimbusData#mkBlobCacheMap 方法的逻辑与 createFileHandler 基本相同。

下面来看一下 BlobStore 实例的创建和初始化过程,BlobStore 是一个键值存储对象,用于存储 topology 对象,以及 topology 配置信息等。Storm 默认提供了两类存储实现:本地文件存储(LocalFsBlobStore)和 HDFS 文件存储(HdfsBlobStore)。如果是本地存储则需要 ZK 的介入来保证数据一致性,而采用 HDFS 存储则会使用 HDFS 自带的备份和一致性保证。在 NimbusData 实例化过程中会调用 BlobStoreUtils#getNimbusBlobStore 方法创建并初始化 BlobStore 实例,方法会检查 nimbus.blobstore.class 配置,该配置用于指定具体的 BlobStore 实现类全称类名(包括 HdfsBlobStore),如果没有指定则默认采用 LocalFsBlobStore 实现,并在实例化后调用对应的 prepare 方法执行初始化,这里以 LocalFsBlobStore#prepare 进行说明。对于本地模式而言,会采用 ${storm.local.dir}/blobs/ 作为存储的基础路径,并以 FileBlobStoreImpl 类实例操作本地文件,同时会创建对应的 ZK 客户端用于操作 ZK,以维护数据的一致性。

最后来看一下 StormZkClusterState 类对象的创建。StormZkClusterState 类也是一个非常重要的类,它实现了 StormClusterState 接口。Storm 可以看做是基于 ZK 的分布式实时任务调度系统,基于 ZK 实现对整个集群中任务和节点的协调和调度,而集群与 ZK 之间的通信都依赖于 StormZkClusterState 类对象,其实例化过程中所做的主要工作就是在 ZK 上创建相应的一级目录,并注册一个数据变更回调策略,用于触发监听相应路径数据变更时的回调处理器。创建的路径包括:

1
2
3
4
5
6
7
8
9
10
- supervisors
- topology
- assignments
- assignments_bak
- tasks
- taskbeats
- taskerrors
- metrics
- blobstore
- gray_upgrade

这里我们对上面的路径进行一个简单的说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
+ ${zk_root_dir}
| ---- + topology: 记录集群中所有正在运行的 topology 数据
| ---- | ---- + ${topology_id}: 指定 topology 的相关信息(名称、开始运行时间、运行状态等)

| ---- + supervisors: 记录集群中所有 supervisor 节点的心跳信息
| ---- | ---- + ${supervivor_id}: 指定 supervisor 的心跳信息(心跳时间、主机名称、所有 worker 的端口号、运行时间等)

| ---- + assignments: 记录提交给集群的 topology 任务分配信息
| ---- | ---- + ${topology_id}: 指定 topology 的任务分配信息(对应 nimbus 上的代码目录、所有 task 的启动时间、每个 task 与节点和端口的映射关系等)

| ---- + assignments_bak: 记录提交给集群的 topology 任务分配信息的备份

| ---- + tasks: 记录集群中所有 topology 的 task 信息
| ---- | ---- + ${topology_id}: 指定 topology 的所有 task 信息
| ---- | ---- | ---- + ${task_id}: 指定 task 所属的组件 ID 和类型(spout/bolt)

| ---- + taskbeats: 记录集群中所有 task 的心跳信息
| ---- | ---- + ${topology_id}: 记录指定 topology 下所有 task 的心跳信息、topologyId,以及 topologyMasterId 等
| ---- | ---- | ---- + ${task_id}: 指定 task 的心跳信息(最近一次心跳时间、运行时长、统计信息等)

| ---- + taskerrors: 记录集群中所有 topology 的 task 运行错误信息
| ---- | ---- + ${topology_id}: 指定 topology 下所有 task 的运行错误信息
| ---- | ---- | ---- + ${task_id}: 指定 task 的运行错误信息

| ---- + metrics: 记录集群中所有 topology 的 metricsId

| ---- + blobstore: 记录集群对应的 blobstore 信息,用于协调数据一致性

| ---- + gray_upgrade: 记录灰度发布中的 topologyId

Storm 集群的运行严重依赖于 ZK 进行协调,所以在集群较大的时候 ZK 有可能成为瓶颈,JStorm 在这一块引入了缓存进行优化,因为 ZK 中的数据有相当一部分是很少变更的,采用缓存策略可以提升访问速度,又减小对于 ZK 的读压力。缓存实例的创建也在 NimbusData 实例化期间完成,相应逻辑位于 NimbusData#createCache 方法中,该方法会创建一个 NimbusCache 缓存类对象,并将其记录到 StormZkClusterState 的相应属性中。NimbusCache 采用了两级缓存设计,即内存和文件,构造方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public NimbusCache(Map conf, StormClusterState zkCluster) {
super();

// 获取本地缓存的具体实现类
String dbCacheClass = this.getNimbusCacheClass(conf);
LOG.info("NimbusCache db cache will use {}", dbCacheClass);

try {
dbCache = (JStormCache) Utils.newInstance(dbCacheClass);

String dbDir = StormConfig.masterDbDir(conf);
// 设置本地缓存数据存放目录
conf.put(RocksDBCache.ROCKSDB_ROOT_DIR, dbDir); // ${storm.local.dir}/nimbus/rocksdb
// 是否在 nimbus 启动时清空数据,默认为 true
conf.put(RocksDBCache.ROCKSDB_RESET, ConfigExtension.getNimbusCacheReset(conf));
dbCache.init(conf);
if (dbCache instanceof TimeoutMemCache) {
memCache = dbCache;
} else {
memCache = new TimeoutMemCache();
memCache.init(conf);
}
}
// 省略 catch 代码块
this.zkCluster = zkCluster;
}

JStormCache 接口声明了缓存的基本操作,针对该接口 Storm 主要提供了两类实现:TimeoutMemCache 和 RocksDBCache。对于文件存储而言,如果是本地模式,或者 linux 和 mac 以外的平台均采用 TimeoutMemCache,否则会检查 nimbus.cache.class 配置是否有指定相应的缓存实现类,如果没有指定的话,Storm 会采用 RocksDBCache 作为文件存储。RocksDBCache 的存储实现基于 rocksdb,这是一个由 Facebook 开发和维护的嵌入式键值数据库,借用了 leveldb 项目的核心代码,以及来自 HBase 的设计思想,可以简单将其理解为本地版本的 HBase

Nimbus 节点的 HA 机制

Nimbus 节点在整个 Storm 集群中地位无可厚非,但是单点的设计对于目前大环境下的高可用来说是欠缺的,虽然 nimbus 本身的运行数据是无状态的,但是当 nimbus 节点宕机后,我们还是希望有其它 nimbus 节点能够快速顶替上来,以保证业务 topology 的正常运行。早期的 Storm 实现存在单点的问题,所以 JStorm 在改写的时候引入了 HA 机制来解决这一问题,对于 JStorm 来说一个集群运行过程中只能有一个 nimbus leader 节点,但是可以启动多个 nimbus follower 节点,当 leader 节点宕机之后,follower 节点们可以依据优先级竞选成为 leader 节点。实际上集群刚刚启动时所有的 nimbus 节点都是 follower,不过在短时间内就会依赖于 HA 机制从中选出一个 leader 节点。

JStorm HA 机制依赖于 ZK 实现,会在 ZK 根节点下创建 nimbus_master 和 nimbus_slave 两个临时节点,顾名思义,nimbus_master 用于存储 nimbus leader 的相关信息,其实就是节点对应的 IP 和端口,而 nimbus_slave 主要存储 nimbus follower 的相关信息。简单的说,nimbus 在启动时会抢占式在 ZK 上创建临时节点(EPHEMERAL 类型),先创建成功者成为 leader,余下的成为 follower,这些 follower 会定期检查 nimbus_master 节点是否存在,因为是 EPHEMERAL 类型,所以当 leader 宕机之后对应的节点会被 ZK 主动删除,此时余下的 follower 感知到 leader 不存在会立即抢占式顶替上来,这也算是 ZK 的典型应用场景。

HA 机制的启动过程位于 NimbusServer#initFollowerThread 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void initFollowerThread(Map conf) {
// 如果当前 nimbus 成为 leader,则会触发此回调执行初始化操作
Callback leaderCallback = new Callback() {
@Override
public <T> Object execute(T... args) {
try {
init(data.getConf());
} catch (Exception e) {
LOG.error("Nimbus init error after becoming a leader", e);
JStormUtils.halt_process(0, "Failed to init nimbus");
}
return null;
}
};
// 创建并启动 follower 线程
follower = new FollowerRunnable(data, 5000, leaderCallback);
Thread thread = new Thread(follower);
thread.setDaemon(true);
thread.start();
LOG.info("Successfully init Follower thread");
}

方法的主要逻辑就是为当前 nimbus 节点创建并启动一个 follower 线程,相应的实现位于 FollowerRunnable 类中,该类实例化的过程中会执行以下几件事情:

  1. 判断当前是否是以集群模式运行,对于本地模式不适用于 HA 机制;
  2. 将当前节点的 IP 和端口号信息注册到 ZK 的 nimbus_slave 和 nimbus_slave_detail 目录下,表示当前节点是一个 nimbus follower 节点;
  3. 检查当前节点是否存在 leader,如果不存在则尝试成为 leader 节点;
  4. 如果使用本地存储 blobstore 数据则记录状态信息到 ZK,以保证数据的一致性。

当 follower 线程启动之后,follower 默认会每间隔 5 秒钟检查一次当前集群是否存在 nimbus leader 节点,如果不存在则会尝试成为 leader,该过程位于 FollowerRunnable#tryToBeLeader 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void tryToBeLeader(final Map conf) throws Exception {
// 依据候选 nimbus 从节点的优先级来决定当前 nimbus 从节点是否有资格尝试成为 leader
boolean allowed = this.check_nimbus_priority();
if (allowed) {
// 回调策略再次尝试
RunnableCallback masterCallback = new RunnableCallback() {
@Override
public void run() {
try {
tryToBeLeader(conf);
} catch (Exception e) {
LOG.error("tryToBeLeader error", e);
JStormUtils.halt_process(30, "Cant't be master" + e.getMessage());
}
}
};
// 尝试成为 leader 节点
LOG.info("This nimbus can be leader");
data.getStormClusterState().try_to_be_leader(Cluster.MASTER_SUBTREE, hostPort, masterCallback);
} else {
LOG.info("This nimbus can't be leader");
}
}

方法首先会基于所有 follower 的优先级来决定当前 follower 节点是否有资格尝试成为 leader,对于有资格的 follower 会调用 StormZkClusterState#try_to_be_leader 方法尝试在 ZK 上创建 nimbus_master 临时节点并写入自己的 IP 和端口号。如果对应 nimbus_master 节点已经存在,则说明已经有 leader 选举出来,则当前尝试失败,否则如果不存在 NodeExistsException 异常则表示竞选成功。

如果集群已经存在 leader,则方法会判断对应的 leader 是否是当前 follower 自身,如果是的话且上一次的 leader 不存在或是其它 follower 节点,则会触发之前在 NimbusServer#initFollowerThread 方法中定义的回调策略,本质上是调用了 NimbusServer#init 方法,该方法主要执行以下初始化逻辑:

  1. 执行 NimbusData#init 方法;
  2. 清除一些老的 topology(在 ZK 上有记录但是在本地没有对应的 topology 文件);
  3. 启动 topology 任务分配后台线程,也就是 TopologyAssign 线程(之前在分析 topology 任务分配过程的篇章中有专门介绍);
  4. 更新集群中 topology 的状态信息(设置为 startup)和心跳信息;
  5. 启动定时清理任务,默认每隔 10 分钟会清理上传到本地的 topology 文件(inbox 目录,更多本地目录说明如下);
  6. 启动 metrics 监控任务。
1
2
3
4
5
6
7
8
9
+ ${nimbus_local_dir}
| ---- + nimbus
| ---- | ---- + inbox: 存放客户端上传的 jar 包
| ---- | ---- | ---- + stormjar-{uuid}.jar: 对应一个具体的 jar 包
| ---- | ---- + stormdist
| ---- | ---- | ---- + ${topology_id}
| ---- | ---- | ---- | ---- + stormjar.jar: 包含当前拓扑所有代码的 jar 包(从 inbox 那复制过来的)
| ---- | ---- | ---- | ---- + stormcode.ser: 当前拓扑对象的序列化文件
| ---- | ---- | ---- | ---- + stormconf.ser: 当前拓扑的配置信息文件

Thrift 服务的初始化启动过程

最后我们来看一下 nimbus 服务的启动过程。Thrift 是一种接口描述语言和二进制通讯协议,同时也是一个强大的 RPC 中间件,跨语言高效通讯是其主要卖点。Nimbus 启动起来本质上就是一个 thrift 服务,在介绍 topology 任务提交过程时我们就已经接触到与 nimbus 节点通信的过程,本质上也是 RPC 服务调用的过程。所有 RPC 接口的实现均位于 ServiceHandler 类中,该类实现了 Nimbus.Iface 接口,NimbusServer 主要调用 NimbusServer#initThrift 方法来启动 thrift 服务,过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void initThrift(Map conf) throws TTransportException {
// 获取 thrift 端口,默认为 8627
Integer thrift_port = JStormUtils.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT)); // ${nimbus.thrift.port}
TNonblockingServerSocket socket = new TNonblockingServerSocket(thrift_port);

// ${nimbus.thrift.max_buffer_size}
Integer maxReadBufSize = JStormUtils.parseInt(conf.get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE));
// 设置服务运行参数
THsHaServer.Args args = new THsHaServer.Args(socket);
args.workerThreads(ServiceHandler.THREAD_NUM); // 64
args.protocolFactory(new TBinaryProtocol.Factory(false, true, maxReadBufSize, -1));
args.processor(new Nimbus.Processor<Iface>(serviceHandler));
args.maxReadBufferBytes = maxReadBufSize;

thriftServer = new THsHaServer(args);

LOG.info("Successfully started nimbus: started Thrift server...");
thriftServer.serve();
}

方法实现了一个标准的 thrift 服务启动过程,如果对于 thrift 不熟悉可以参考 Thrift: The Missing Guide。Nimbus 节点启动后默认监听 8627 端口,然后等待客户端的请求。到此,一个 nimbus 节点启动的主要流程就基本完成了。