Kafka 源码解析:Broker 节点的启动与关闭

从本篇开始我们分析 Kafka 服务端组件的实现。Kafka 集群由多个 broker 节点构成,每个节点上都运行着一个 Kafka 实例,这些实例之间基于 ZK 来发现彼此,并由集群控制器 KafkaController 统筹协调运行,彼此之间基于 socket 连接进行通信。本篇我们主要分析单个 broker 节点上 Kafka 实例的启动和关闭过程,关于集群整体的协调运行机制将在后面按照组件逐一进行分析。

Kafka 提供了 kafka-server-start.sh 脚本来简化服务的启动操作,脚本中通过调用 kafka.Kafka 类来启动 Kafka 服务,这也是 Kafka 整个服务端的驱动类。在 Kafka 服务启动过程中,首先会解析并封装命令行传递的参数,然后创建负责 Kafka 服务启动和关闭操作的 KafkaServerStartable 类对象,并调用 KafkaServerStartable#startup 方法启动服务。

Kafka 驱动类的 main 方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def main(args: Array[String]): Unit = {
try {
// 解析命令行参数
val serverProps = getPropsFromArgs(args)
// 创建 kafkaServerStartable 对象,期间会初始化监控上报程序
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)

// 注册一个钩子方法,当 JVM 被关闭时执行 shutdown 逻辑,本质上是在执行 KafkaServer#shutdown 方法
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
kafkaServerStartable.shutdown()
}
})

// 本质上调用的是 KafkaServer#startup 方法
kafkaServerStartable.startup()
// 阻塞等待 kafka server 运行线程关闭
kafkaServerStartable.awaitShutdown()
} catch {
case e: Throwable =>
fatal(e)
System.exit(1)
}
System.exit(0)
}

KafkaServerStartable 实际只是对 KafkaServer 的简单封装,相应方法实现都只是简单调用了 KafkaServer 类中同名的方法,所以下文我们主要分析 KafkaServer 类的实现。KafkaServer 是对单个 broker 节点生命周期的描绘,其主要逻辑是用来启动和关闭单个 broker 节点,KafkaServer 类字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class KafkaServer(val config: KafkaConfig, // 配置信息对象
time: Time = Time.SYSTEM, // 时间戳工具
threadNamePrefix: Option[String] = None,
kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List() // 监控上报程序
) extends Logging with KafkaMetricsGroup {

/** 标识节点已经启动完成 */
private val startupComplete = new AtomicBoolean(false)
/** 标识节点正在执行关闭操作 */
private val isShuttingDown = new AtomicBoolean(false)
/** 标识节点正在执行启动操作 */
private val isStartingUp = new AtomicBoolean(false)
/** 阻塞主线程等待 KafkaServer 的关闭 */
private var shutdownLatch = new CountDownLatch(1)
/** 记录 broker 节点的当前状态 */
val brokerState: BrokerState = new BrokerState
/** Api 接口类,用于分发各种类型的请求 */
var apis: KafkaApis = _
/** 权限控制相关 */
var authorizer: Option[Authorizer] = None
var credentialProvider: CredentialProvider = _
/** 网络 socket 服务 */
var socketServer: SocketServer = _
/** 简单的连接池实现,用于管理所有的 KafkaRequestHandler */
var requestHandlerPool: KafkaRequestHandlerPool = _
/** 日志数据管理 */
var logManager: LogManager = _
/** 管理当前 broker 节点上的分区副本 */
var replicaManager: ReplicaManager = _
/** topic 增删管理 */
var adminManager: AdminManager = _
/** 动态配置管理 */
var dynamicConfigHandlers: Map[String, ConfigHandler] = _
var dynamicConfigManager: DynamicConfigManager = _
/** group 协调管理组件 */
var groupCoordinator: GroupCoordinator = _
/** 集群控制组件 */
var kafkaController: KafkaController = _
/** 定时任务调度器 */
val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
/** broker 节点活跃性检查 */
var kafkaHealthcheck: KafkaHealthcheck = _
/** broker 缓存整个集群中全部分区的状态信息 */
var metadataCache: MetadataCache = _
/** ZK 操作工具类 */
var zkUtils: ZkUtils = _

// ... 省略方法定义

}

在开始分析 KafkaServer 的启动和关闭逻辑之前,我们首先看一下最简单的 KafkaServer#awaitShutdown 方法实现。在 KafkaServer 中定义了一个 CountDownLatch 类型的 KafkaServer#shutdownLatch 字段,初始 count 值设置为 1,而 KafkaServer#awaitShutdown 方法只是简单的调用了 CountDownLatch#await 方法来阻塞主线程。当 KafkaServer#shutdown 方法执行完成后会调用 CountDownLatch#countDown 方法将 count 值设置为 0,从而让主线程从阻塞态中恢复,并最终关闭整个服务。

服务启动过程分析

方法 KafkaServer#shutdown 的实现我们稍后进行分析,下面首先看一下 Kafka 服务的启动过程,即 KafkaServer#startup 方法的实现。该方法实现较长,这里先对方法的整体执行流程进行概括,然后挑一些重点的步骤做进一步分析:

  1. 运行状态校验,如果当前 broker 节点正在执行关闭操作,则此时不允许再次启动服务,所以抛出异常;如果当前服务已经启动完成,即处于运行状态,则直接返回,不需要重复启动;否则设置正在启动标记;
  2. 设置当前 broker 节点的状态为 Starting,标识 broker 节点正在启动;
  3. 初始化定时任务调度器 KafkaScheduler;
  4. 创建 ZkUtils 工具类对象,用于操作 ZK,期间会在 ZK 上创建一些基本的节点;
  5. 从 ZK 上获取当前 broker 所属集群的 clusterId,如果不存在则创建一个;
  6. 获取当前 broker 节点的 brokerId;
  7. 初始化一些监控相关的配置;
  8. 创建并启动 LogManager,用于管理记录在本地的日志数据;
  9. 创建 MetadataCache 对象,用于为当前 broker 节点缓存整个集群中全部分区的状态信息;
  10. 创建并启动 SocketServer,用于接收并处理来自客户端和其它 broker 节点的请求;
  11. 创建并启动 ReplicaManager,用于管理当前 broker 节点上的分区副本信息;
  12. 创建并启动 KafkaController,每个 broker 节点都会创建并启动一个 KafkaController 实例,但是只有一个 broker 会成为 leader 角色,负责管理集群中所有的分区和副本的状态,也是集群与 ZK 进行交互的媒介;
  13. 创建并启动 GroupCoordinator,负责管理分配给当前 broker 节点的消费者 group 的一个子集;
  14. 创建并初始化 Authorizer 对象,用于权限管理;
  15. 创建 KafkaApis 对象,用于分发接收到的各种类型请求;
  16. 创建 KafkaRequestHandlerPool 线程池对象,用于管理所有 KafkaRequestHandler 线程;
  17. 创建并启动动态配置管理器,用于监听 ZK 的变更;
  18. 将自己的 brokerId 注册到 ZK 中(/brokers/ids/{brokerId} 路径,临时节点),用于标记当前 broker 节点是否存活;
  19. 设置当前 broker 节点的状态为 RunningAsBroker,表示当前 broker 节点已经启动完成,可以对外提供服务;
  20. 更新相关状态标记,标识当前节点的 Kafka 服务启动完成。

下面针对上述流程中的 2、3、4 和 6 几个步骤做进一步说明,对于流程中涉及到的相关类(LogManager、SocketServer、ReplicaManager、KafkaController,以及 GroupCoordinator 等)的实例化和启动的过程会在后续的文章中针对性的分析。

首先来看一下 步骤 2 ,这一步本身的逻辑比较简单,就是将当前 broker 节点的状态设置为 Starting,标识当前 broker 节点正在执行启动操作。我们主要来看一下 broker 节点的状态定义和状态转换,Kafka 为 broker 节点定义了 6 种状态,如下:

1
2
3
4
5
6
7
8
sealed trait BrokerStates { def state: Byte }

case object NotRunning extends BrokerStates { val state: Byte = 0 }
case object Starting extends BrokerStates { val state: Byte = 1 }
case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 }
case object RunningAsBroker extends BrokerStates { val state: Byte = 3 }
case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 }
case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }

关于每种状态的解释和状态转换图如下:

  • NotRunning :初始状态,标识当前 broker 节点未运行。
  • Starting :标识当前 broker 节点正在启动中。
  • RecoveringFromUncleanShutdown :标识当前 broker 节点正在从上次非正常关闭中恢复。
  • RunningAsBroker :标识当前 broker 节点启动成功,可以对外提供服务。
  • PendingControlledShutdown :标识当前 broker 节点正在等待 controlled shutdown 操作完成。
  • BrokerShuttingDown :标识当前 broker 节点正在执行 shutdown 操作。

image

所谓 controlled shutdown,实际上是 Kafka 提供的一种友好的关闭 broker 节点的机制。除了因为硬件等原因导致的节点非正常关闭,一些场景下管理员也需要通过命令行发送 ControlledShutdownRequest 请求来主动关闭指定的 broker 节点,例如迁移机房、升级软件,修改 Kafka 配置等。关于 controlled shutdown 机制,我们将在后面分析 KafkaController 组件时再展开分析。

下面继续来看一下 步骤 3 ,KafkaScheduler 是一个基于 ScheduledThreadPoolExecutor 的定时任务调度器实现,实现了 Scheduler 特质:

1
2
3
4
5
6
trait Scheduler {
def startup()
def shutdown()
def isStarted: Boolean
def schedule(name: String, fun: () => Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS)
}

其中 startup 和 shutdown 方法分别用于启动和关闭调度器,而 isStarted 方法用于检测当前调度器是否已经启动,方法 schedule 用于注册需要进行周期性调度的任务。

步骤 4 调用了 KafkaServer#initZk 方法创建 ZkUtils 对象,ZkUtils 是对 zkclient 的封装,用于操作 ZK。方法 KafkaServer#initZk 会基于 zookeeper.connect 配置获取对应的 ZK 连接,并在 ZK 上创建一些基本的节点。主要的 ZK 节点包括:

  • /brokers/ids/{id}: 记录集群中可用的 broker 的 ID。
  • /brokers/topics/{topic}/partitions: 记录一个 topic 中所有分区的分配信息,以及 AR 集合。
  • /brokers/topics/{topic}/partitions/{partition_id}/state: 记录分区 leader 副本所在的 broker 节点 ID、年代信息、ISR 集合,以及 zkVersion 等。
  • /controller: 记录集群 controller leader 所在 broker 节点的 ID。
  • /controller_epoch: 记录集群 controller leader 的年代信息。
  • /admin/reassign_partitions: 记录需要执行副本重新分配的分区。
  • /admin/preferred_replica_election: 记录需要进行优先副本选举的分区,优先副本是在创建分区时指定的第一个副本。
  • /admin/delete_topics: 记录待删除的 topic 集合。
  • /isr_change_notification: 记录一段时间内 ISR 集合发生变化的分区。
  • /config: 记录一些配置信息。

最后来看一下 步骤 6 获取当前 broker 节点的 brokerId 的过程。我们在启动 Kafka 服务之前可以在配置中通过 broker.id 配置项为当前 broker 节点设置全局唯一的 ID,也可以指定让 Kafka 自动生成。解析 brokerId 的过程位于 KafkaServer#getBrokerId 方法中,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private def getBrokerId: Int = {
// 获取配置的 brokerId
var brokerId = config.brokerId
val brokerIdSet = mutable.HashSet[Int]()

// 遍历 log.dirs 配置的 log 目录列表
for (logDir <- config.logDirs) {
// 在每一个 log 目录下面创建一个 meta.properties 文件,内容包含当前 broker 节点的 ID 和版本信息
val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read()
brokerMetadataOpt.foreach { brokerMetadata =>
brokerIdSet.add(brokerMetadata.brokerId)
}
}

if (brokerIdSet.size > 1) {
// 不允许多个 broker 节点共享同一个 log 目录
// ... 抛出 InconsistentBrokerIdException 异常,略
} else if (brokerId >= 0 && brokerIdSet.size == 1 && brokerIdSet.last != brokerId) {
// 配置的 brokerId 与 meta.properties 中记录的 brokerId 不一致
// ... 抛出 InconsistentBrokerIdException 异常,略
} else if (brokerIdSet.isEmpty && brokerId < 0 && config.brokerIdGenerationEnable) {
// 如果没有配置,则自动创建 brokerId,通过 ZK 保证 brokerId 的全局唯一性
brokerId = generateBrokerId
} else if (brokerIdSet.size == 1) {
// 从 meta.properties 中获取 brokerId
brokerId = brokerIdSet.last
}

brokerId
}

在 broker 节点的每个 log 目录下有一个 meta.properties 文件,记录了当前 broker 节点的 ID 和版本信息。如果当前 broker 节点不是第一次启动,那么 Kafka 可以通过该文件约束 broker.id 配置需要前后保持一致。此外,Kafka 还通过该文件保证一个 log 目录不被多个 broker 节点共享。

服务关闭过程分析

Broker 节点在关闭对应的 Kafka 服务时,首先会设置状态为 BrokerShuttingDown,表示正在执行关闭操作,然后开始关闭注册的相关组件,并在这些组件全部关闭成功之后,更新 broker 状态为 NotRunning。相关实现位于 KafkaServer#shutdown 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def shutdown() {
try {
info("shutting down")

// 如果正在启动,则不允许关闭
if (isStartingUp.get)
throw new IllegalStateException("Kafka server is still starting up, cannot shut down!")

if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
CoreUtils.swallow(controlledShutdown())
// 设置 broker 状态为 BrokerShuttingDown,表示当前 broker 正在执行关闭操作
brokerState.newState(BrokerShuttingDown)

/* 依次关闭相应注册的组件 */

if (socketServer != null) CoreUtils.swallow(socketServer.shutdown())
if (requestHandlerPool != null) CoreUtils.swallow(requestHandlerPool.shutdown())
CoreUtils.swallow(kafkaScheduler.shutdown())
if (apis != null) CoreUtils.swallow(apis.close())
CoreUtils.swallow(authorizer.foreach(_.close()))
if (replicaManager != null) CoreUtils.swallow(replicaManager.shutdown())
if (adminManager != null) CoreUtils.swallow(adminManager.shutdown())
if (groupCoordinator != null) CoreUtils.swallow(groupCoordinator.shutdown())
if (logManager != null) CoreUtils.swallow(logManager.shutdown())
if (kafkaController != null) CoreUtils.swallow(kafkaController.shutdown())
if (zkUtils != null) CoreUtils.swallow(zkUtils.close())
if (metrics != null) CoreUtils.swallow(metrics.close())

// 设置 broker 状态为 NotRunning,表示关闭成功
brokerState.newState(NotRunning)

// 设置状态标记
startupComplete.set(false)
isShuttingDown.set(false)
CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString))
shutdownLatch.countDown()
info("shut down completed")
}
} catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer shutdown.", e)
isShuttingDown.set(false)
throw e
}
}

整体执行流程如代码注释,比较简单,相关组件的关闭逻辑我们将在后续文章分析具体组件时再进行介绍。

总结

本文我们主要分析了 Kafka 服务启动和关闭的过程。Kafka 在设计上将各个主要功能模块都拆分成了一个个组件进行实现,服务启动的过程实际上就是实例化并启动各个组件的过程,关闭过程也是如此。到目前为止,我们主要是分析了服务整体启动的执行流程,关于各个组件的启动逻辑,将在后面的文章中分析具体组件时再针对性介绍。