Kafka 源码解析:延时任务调度策略

Kafka 一些组件的命名很是有趣,比如炼狱(purgatory)、死神(reaper)等,在日常开发中也建议大家在类和方法命名上能够以一些能够表达类或方法意图的人或事物的名词进行命名,让项目显得更加的生动。今天我们要分析的组件就是以 purgatory 命名的 DelayedOperationPurgatory,DelayedOperationPurgatory 是一个相对独立的组件,我们可以将其抽取出来用于自己的日常项目中,DelayedOperationPurgatory 主要用于管理延时任务,底层依赖于分层时间轮算法实现。

说到延时任务调度,对于 java 开发者来说,日常用到比较多的可能是 JDK 自带的 Timer、ScheduledThreadPoolExecutor 和 DelayQueue 等,但是对于 Kafka 这类需要频繁执行复杂延时任务的分布式系统来说,这些组件在性能上还稍显不足,所以 Kafka 自定义了分层时间轮算法,提供了 O(m) 时间复杂度(m 为时间轮层级数)的任务插入性能和 O(1) 时间复杂度的任务删除性能,要优于 JDK 自带的基于堆实现的 O(log(n)) 时间复杂度的延时任务调度组件。

传统的时间轮算法在实现上采用单环实现,环上对应指定数量的时间格,然后将延时任务分散到对应的时间格上,随着时间指针的推进触发相应的任务执行。这样设计的缺点在于只能添加位于一个特定区间内的延时任务,如果延时任务的跨度较长,则时间轮需要被设计的非常大,在一定程度上是一种浪费,同时也无法彻底解决其表现力不足的问题。分层时间轮则采用多环实现,每个环的时间格在粒度上存在差异(一般上层时间轮一格的时间跨度等于下层整个时间轮的跨度),通过多环嵌套能够让时间轮表示任意延迟时长的任务调度。

时间轮算法定义了从时间维度触发延时任务的执行,实际应用中可能还需要允许从其他维度对延迟任务在到达延时时间之前提前触发。为此,Kafka 定义了 DelayedOperation 和 DelayedOperationPurgatory 组件,通过在注册延时任务时让每个延时任务关注一个或多个 key,当这些 key 状态发生变更时触发调用对应的延时任务,以实现对延时任务的多维度控制。

分层时间轮

Kafka 的分层时间轮算法在实现上主要涉及 TimingWheel、TimerTaskList、TimerTaskEntry,以及 TimerTask 这 4 个类,各个类的作用说明如下:

  • TimerTask :特质类型,用于描述延时任务。
  • TimerTaskList :时间格,采用环形双向链表实现,记录位于同一个时间格中的延时任务。
  • TimerTaskEntry :时间格链表中的一个结点,是对延时任务 TimerTask 的封装。
  • TimingWheel :时间轮,采用定长数组记录放置时间格。

TimerTask 和 TimerTaskEntry

本小节来看一下分层时间轮算法的具体实现,首先来看一下对于延时任务的描述,即 TimerTask 特质,而 TimerTask 特质和类 TimerTaskEntry 在实现上是相互引用的,所以需要将这两者结合起来分析。TimerTask 继承了 Runnable 接口,其字段定义如下:

1
2
3
4
5
6
7
8
9
10
trait TimerTask extends Runnable {

/** 当前任务的延迟时长(单位:毫秒) */
val delayMs: Long
/** 封装当前定时任务的链表节点 */
private[this] var timerTaskEntry: TimerTaskEntry = _

// ... 省略方法定义

}

其中,字段 TimerTask#delayMs 用于设置当前延时任务的延时时长,例如延迟 1 分钟执行,则 delayMs 即等于 1 * 60 * 1000,即 60000 毫秒。而字段 TimerTask#timerTaskEntry 则用于封装当前延时任务并记录到时间格中,属于延时任务与时间格之间建立关系的桥梁。TimerTask 中定义了绑定和获取 timerTaskEntry 的方法 TimerTask#setTimerTaskEntryTimerTask#getTimerTaskEntry

需要知晓的一点是,如果当前任务之前已经被添加到时间格中,即对应的 timerTaskEntry 已经被赋值过,当再次执行绑定时,如果是绑定到新的时间格结点则需要先从之前绑定的时间格中移除当前延时任务。此外,TimerTask#cancel 方法用于取消对应的延时任务,实际上就是调用 TimerTaskEntry#remove 方法从时间格中移除对应的结点。

TimerTaskEntry 类本质上是一个链表结点的定义,其字段定义了结点的前置和后置指针,以及所属的时间格,类字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private[timer] class TimerTaskEntry(val timerTask: TimerTask, // 封装的延时任务
val expirationMs: Long // 延时时间戳,即延时任务的延时时间 + 当前时间戳
) extends Ordered[TimerTaskEntry] {

/** 所属时间格 */
@volatile var list: TimerTaskList = _
/** 后置指针 */
var next: TimerTaskEntry = _
/** 前置指针 */
var prev: TimerTaskEntry = _

// ... 省略方法定义

}

TimerTaskEntry 类对象在被构造时会建立延时任务与结点之间的映射关系,并提供了获取延时任务取消状态,以及移除对应延时任务的操作。这里的 TimerTaskEntry#expirationMs 字段是延时任务到期时间戳,也就是延时任务应该被触发执行的时间戳,在计算上等于延时任务的 TimerTask#delayMs 时间加上任务被添加到时间轮中的时间戳。

TimerTaskList

TimerTaskList 描述了时间轮的一格(即时间格),在实现上采用双向链表实现,用于封装位于特定时间区间范围内的所有的延时任务,其字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {

/** 根结点 */
private[this] val root = new TimerTaskEntry(null, -1)
root.next = root
root.prev = root
/** 记录当前时间格对应时间区间上界 */
private[this] val expiration = new AtomicLong(-1L)

// ... 省略方法定义

}

TimerTaskList 的默认构造方法接收一个 AtomicInteger 类型的计数器变量,该变量在整个时间轮设计中是共享的,用于记录整个分层时间轮中持有的延时任务总数。同时 TimerTaskList 还定义了一个 TimerTaskList#expiration 字段,用于记录当前时间格对应时间区间的上界。TimerTaskList 提供了添加和移除延时任务的方法:TimerTaskList#addTimerTaskList#remove。需要注意的一点是,如果当前添加的延时任务在之前被添加过,则再次添加时会先移除之前的添加记录。同时,TimerTaskList 还提供了 TimerTaskList#flush 方法,该方法接收一个 TimerTaskEntry => Unit 类型的函数 f,用于从当前时间格中移除所有延时任务,并对每个任务应用 f 函数,在后面分析推进时间轮指针时将会看到,借助 TimerTaskList#flush 方法可以执行一个时间格中所有到期且未被取消的任务,并对未到期的任务重新放入对应层级的时间轮中,继续等待调度。

TimingWheel

介绍完了 TimerTask、TimerTaskEntry 和 TimerTaskList,最后来重点看一下分层时间轮 TimingWheel 的实现,其字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private[timer] class TimingWheel(tickMs: Long, // 当前时间轮中一格的时间跨度
wheelSize: Int, // 时间轮的格数
startMs: Long, // 当前时间轮的创建时间
taskCounter: AtomicInteger, // 各层级时间轮共用的任务计数器,用于记录时间轮中总的任务数
queue: DelayQueue[TimerTaskList]) { // 各个层级时间轮共用一个任务队列

/** 时间轮指针,将时间轮划分为到期部分和未到期部分 */
private[this] var currentTime = startMs - (startMs % tickMs) // 修剪成 tickMs 的倍数,近似等于创建时间
/**
* 当前时间轮的时间跨度,
* 只能处理时间范围在 [currentTime, currentTime + interval] 之间的延时任务,超过该范围则需要将任务添加到上层时间轮中
*/
private[this] val interval = tickMs * wheelSize
/** 每一项都对应时间轮中的一格 */
private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
/** 对于上层时间轮的引用 */
@volatile private[this] var overflowWheel: TimingWheel = _

// ... 省略方法定义

}

一个时间轮中包含多个时间格,TimingWheel 使用 TimingWheel#wheelSize 字段记录单层时间轮中的时间格格数,并使用 TimingWheel#tickMs 字段记录一个时间格的时间跨度,同时用一个数组 TimingWheel#buckets 记录这些时间格。作为分层结构设计,TimingWheel 定义了 TimingWheel#overflowWheel 字段用于对上层时间轮进行引用,上层时间轮的时间格跨度为当前时间轮的总时间跨度,对应 TimingWheel#interval 字段。一个 TimingWheel 对应的具体延时任务处理时间是在 [startMs - (startMs % tickMs), startMs - (startMs % tickMs) + interval) 之间,超过该区间的任务将会被提交给上层时间轮进行管理。字段 TimingWheel#currentTime 表示对应时间轮的指针,在 TimingWheel 刚刚被构造出来时,其值等于 startMs - (startMs % tickMs),之所以需要减去 (startMs % tickMs),是为了保持与 tickMs 对齐。各层级的 TimingWheel 共用一个 DelayQueue 对象,其中记录了所有的延时任务,DelayQueue 的每个结点对应一个隶属于某个时间轮的时间格对象。

介绍完了 TimingWheel 的字段定义,我们来看一下 TimingWheel 的方法实现,TimingWheel 总共定义了 3 个方法:TimingWheel#addOverflowWheelTimingWheel#addTimingWheel#advanceClock

方法 TimingWheel#addOverflowWheel 用于添加并初始化上层时间轮,在 Kafka 的分层时间轮算法设计中,上层时间轮是按需添加的,只要在当前时间轮容纳不了给定的延时任务时,才会触发将该延时任务提交给上层时间轮管理,此时如果上层时间轮还未定义,则会调用该方法初始化上层时间轮。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private[this] def addOverflowWheel(): Unit = {
synchronized {
if (overflowWheel == null) {
// 创建上层时间轮
overflowWheel = new TimingWheel(
tickMs = interval, // tickMs 是当前时间轮的时间跨度 interval
wheelSize = wheelSize, // 时间轮的格数不变
startMs = currentTime, // 创建时间即当前时间
taskCounter = taskCounter, // 全局唯一的任务计数器
queue // 全局唯一的任务队列
)
}
}
}

创建上层时间轮无非就是新建一个 TimingWheel 对象,并赋值给当前时间轮的 TimingWheel#overflowWheel 字段。这里需要注意的地方就是对应上层时间轮的字段赋值,由方法实现可以看出上层时间轮中每一个时间格的时间跨度 tickMs 等于当前时间轮的总时间跨度 interval,而时间格格数仍保持不变,对应的任务计数器 taskCounter 和任务队列 queue 都是全局共用的。

方法 TimingWheel#add 用于往时间轮中添加延时任务,该方法接收一个 TimerTaskEntry 类型对象,即对延时任务 TimerTask 的封装。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def add(timerTaskEntry: TimerTaskEntry): Boolean = {
// 获取任务的到期时间戳
val expiration = timerTaskEntry.expirationMs
if (timerTaskEntry.cancelled) {
// 任务已经被取消,则不应该被添加
false
} else if (expiration < currentTime + tickMs) {
// 任务已经到期,则不应该被添加
false
} else if (expiration < currentTime + interval) {
// 任务正好位于当前时间轮的时间跨度范围内,
// 依据任务的到期时间查找此任务所属的时间格,并将任务添加到对应的时间格中
val virtualId = expiration / tickMs
val bucket = buckets((virtualId % wheelSize.toLong).toInt)
bucket.add(timerTaskEntry)

// 更新对应时间格的时间区间上界,如果是第一次往对应时间格中添加延时任务,则需要将时间格记录到全局任务队列中
if (bucket.setExpiration(virtualId * tickMs)) {
queue.offer(bucket)
}
true
} else {
// 已经超出了当前时间轮的时间跨度范围,将任务添加到上层时间轮中
if (overflowWheel == null) this.addOverflowWheel()
overflowWheel.add(timerTaskEntry)
}
}

上述方法的返回值为 Boolean 类型,后面的分析中将会看到,如果方法返回 false 且对应的任务未被取消,则会立即提交执行该任务。针对那些已经到期或已经被取消的任务会立即返回 false,这类任务不需要被添加到时间轮中。对于剩下的延时任务,如果任务的到期时间正好位于当前时间轮处理的时间区间内,则会将任务添加到时间轮对应的时间格中,同时将对应的时间格记录到全局 DelayQueue 中用于后续管理。如果待添加的延时任务已经超出了当前时间轮的处理范围,则会提交给上层时间轮进行管理,这一步会尝试触发创建并初始化上层时间轮。

方法 TimingWheel#advanceClock 用于推动当前时间轮指针(对应 TimingWheel#currentTime 字段),如果存在上层时间轮,则会尝试继续推动上层时间轮。方法实现如下:

1
2
3
4
5
6
7
8
9
10
def advanceClock(timeMs: Long): Unit = {
if (timeMs >= currentTime + tickMs) {
// 尝试推动指针,可能会往前推进多个时间格
currentTime = timeMs - (timeMs % tickMs)

// 尝试推动上层时间轮指针
if (overflowWheel != null)
overflowWheel.advanceClock(currentTime)
}
}

定时器

上面介绍的 TimingWheel 提供了添加延时任务和推进时间轮指针的操作,而具体执行延时任务的操作则交由定时器 SystemTimer 完成。SystemTimer 类实现了 Timer 特质,该特质描绘了定时器应该具备的基本方法,定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
trait Timer {

/**
* 添加延时任务,如果任务到期则会立即触发执行
*
* @param timerTask
*/
def add(timerTask: TimerTask): Unit

/**
* 推动时间轮指针,期间会执行已经到期的任务
*
* @param timeoutMs
* @return 是否有任务被执行
*/
def advanceClock(timeoutMs: Long): Boolean

/**
* 获取时间轮中等待被调度的任务数
*
* @return
*/
def size: Int

/**
* 关闭定时器,丢弃未执行的延时任务
*/
def shutdown(): Unit

}

SystemTimer 类字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class SystemTimer(executorName: String,
tickMs: Long = 1, // 默认时间格时间为 1 毫秒
wheelSize: Int = 20, // 默认时间格大小为 20
startMs: Long = Time.SYSTEM.hiResClockMs // 时间轮启动时间戳
) extends Timer {

/** 延时任务执行线程池 */
private[this] val taskExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
def newThread(runnable: Runnable): Thread = Utils.newThread("executor-" + executorName, runnable, false)
})
/** 各层级时间轮共用的延时任务队列 */
private[this] val delayQueue = new DelayQueue[TimerTaskList]()
/** 各层级时间轮共用的任务计数器 */
private[this] val taskCounter = new AtomicInteger(0)
/** 分层时间轮中最底层的时间轮 */
private[this] val timingWheel = new TimingWheel(
tickMs = tickMs,
wheelSize = wheelSize,
startMs = startMs,
taskCounter = taskCounter,
delayQueue
)

// ... 省略方法定义

}

由上面的字段定义可以看出 SystemTimer 是对时间轮 TimingWheel 的封装,并提供了线程池 taskExecutor 以执行到期的延时任务。SystemTimer 实现了 Timer 特质中声明的所有方法,其中 SystemTimer#sizeSystemTimer#shutdown 方法的实现都比较简单,下面我们重点来看一下用于添加延时任务的 SystemTimer#add 方法,以及推动时间轮指针的 SystemTimer#advanceClock 方法实现。

方法 SystemTimer#add 会将待添加的延时任务 TimerTask 对象封装成 TimerTaskEntry 对象添加到对应的时间格中,添加的过程调用的是 TimingWheel#add 方法。前面曾介绍过该方法会将未到期的延时任务添加到对应的时间轮中并返回 true,对于已到期或已经被取消的延时任务则会立即返回 false。由下面的实现可以看到,对于那些已经到期但是未被取消的任务,会立即被提交给执行线程予以执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
override def add(timerTask: TimerTask): Unit = {
readLock.lock()
try {
// 将 TimerTask 封装成 TimerTaskEntry 对象,并添加到时间轮中
this.addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs))
} finally {
readLock.unlock()
}
}

private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
// 往时间轮中添加延时任务,同时检测添加的任务是否已经到期
if (!timingWheel.add(timerTaskEntry)) {
// 任务到期但未被取消,则立即提交执行
if (!timerTaskEntry.cancelled)
taskExecutor.submit(timerTaskEntry.timerTask)
}
}

方法 SystemTimer#advanceClock 用于推动时间轮指针,推动的操作本质上是调用 TimingWheel#advanceClock 方法实现,但是区别于 TimingWheel 中单纯的向前移动指针,方法 SystemTimer#advanceClock 会从全局任务队列中获取队头的时间格,并执行时间格中已到期的任务。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/** 将延时任务重新添加到时间轮中 */
private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => this.addTimerTaskEntry(timerTaskEntry)

/**
* 推进时间轮指针,同时处理时间格中到期的任务
*/
override def advanceClock(timeoutMs: Long): Boolean = {
// 超时等待获取时间格对象
var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
if (bucket != null) {
writeLock.lock()
try {
while (bucket != null) {
// 推进时间轮指针,对应的时间戳为当前时间格时间区间上界
timingWheel.advanceClock(bucket.getExpiration)
// 遍历处理当前时间格中的延时任务,提交执行到期但未被取消的任务,
// 对于未到期的任务重新添加到时间轮中继续等待被执行,期间可能会对任务在层级上执行降级
bucket.flush(reinsert)
bucket = delayQueue.poll()
}
} finally {
writeLock.unlock()
}
true
} else {
false
}
}

前面我们曾分析过 TimerTaskList#flush 方法,知道该方法会从对应时间格中移除所有的延时任务并为每个任务应用参数给定的函数,而这里的函数定义则是 reinsert,该函数会对时间格中的每个延时任务应用上面分析过的 SystemTimer#addTimerTaskEntry 操作,即尝试将每个任务再次加入到对应的时间格,并执行已到期未被取消的任务。

这里需要清楚的一点是,从当前时间格中移出但未到期的延时任务,当再次被添加到时间轮中时,不一定会被添加到原来的时间轮中,因为随着时间的流失,距离对应延时任务的时间差也越来越小,这个时候一般会发生时间轮的降级,即从一个较大(时间区间)粒度的时间轮中降落到粒度较小的时间轮中。实际上,从在时间轮中等待到被执行本质上也是一种降级操作,只是这里较小的时间粒度是 0,表示延时任务已经到期,需要立即被执行。

炼狱

上面介绍的定时器 SystemTimer 能够依据给定的延时时间延迟对任务的执行,而在实际应用中时间往往只是触发任务执行的维度之一,一些场景下我们需要对延时任务的执行更加灵活的控制。例如在生产者向服务端发送消息并等待服务端确认时,服务端需要依据客户端指定的 acks 参数等待指定数量的副本确认已完成对当前消息的复制操作才能向客户端发送确认的响应。这本质上是一个异步的操作,而具体的执行时机不能单方面用时间进行衡量,这个时候我们需要利用更多的信息对延迟任务的执行进行控制。

DelayedOperation

DelayedOperation 正是上面描述的这一类延时任务的抽象,它实现了 TimerTask 特质,并定义了以下方法:

  • forceComplete :强制执行延时任务,包括满足执行条件主动触发,以及延时到期。
  • onComplete :延时任务的具体执行逻辑,在整个延时任务的生命周期中只能被调用一次,且只能由 forceComplete 方法调用。
  • onExpiration :当延时任务因为时间到期被执行时会触发该方法中定义的逻辑。
  • tryComplete :检测是否满足延时任务执行条件,若满足则会调用 forceComplete 方法。
  • safeTryComplete :方法 tryComplete 的线程安全版本。
  • isCompleted :检测延时任务是否完成执行。

上述方法中,除了 DelayedOperation#forceComplete 方法外,其余基本为抽象方法声明,这里主要看一下该方法的实现:

1
2
3
4
5
6
7
8
9
10
11
def forceComplete(): Boolean = {
if (completed.compareAndSet(false, true)) { // CAS 操作修改 completed 字段
// 将当前延时任务从时间轮中移除
this.cancel()
// 立即触发执行延时任务
this.onComplete()
true
} else {
false
}
}

当调用 DelayedOperation#forceComplete 方法时,会将当前延时任务从时间轮中移除并立即触发执行。

DelayedOperation 实现了 TimerTask 特质,所以也间接实现了 Runnable 接口,当延时任务到期时会被提交给定时器的线程执行,其 DelayedOperation#run 方法实现如下:

1
2
3
override def run(): Unit = {
if (forceComplete()) onExpiration()
}

当延时任务到期时会触发 DelayedOperation#forceComplete 方法的执行,如果延时任务在其它线程中被执行,则 DelayedOperation#forceComplete 方法会立即返回 false,也就不会继续触发执行 DelayedOperation#onExpiration 方法。

DelayedOperationPurgatory 类提供了对 DelayedOperation 管理的功能,DelayedOperationPurgatory 维护了一个 Pool 类型(key/value 类型)的 watchersForKey 对象,用于记录延时任务及其关注的 key 之间的映射关系,用于支持时间维度以外的其它维度操作。Pool 封装了 ConcurrentHashMap,并在 ConcurrentHashMap 的基础上添加了 Pool#getAndMaybePut 方法,用于在对应 key 不命中时使用给定的 value 更新键值对。Pool 的默认构造方法接收一个 Option[K => V] 类型的 valueFactory 参数,用于为 key 生成对应的 Watchers 对象。

DelayedOperationPurgatory

DelayedOperationPurgatory 类提供了对 DelayedOperation 进行管理的功能,其中定义的 Pool 对象的 key 是 Any 类型,表示可以使用任意类型的对象作为 key,而 value 则是 Watchers 类型,用于封装关注相同 key 的 DelayedOperation 集合,底层依赖于 ConcurrentLinkedQueue 实现。Watchers 中定义的方法实现将在后面分析 DelayedOperationPurgatory 类时一同分析。DelayedOperationPurgatory 类的字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
timeoutTimer: Timer, // 定时器
brokerId: Int = 0, // 所在 broker 节点 ID
purgeInterval: Int = 1000, // 执行清理操作的阈值
reaperEnabled: Boolean = true // 是否启用后台指针推进器
) extends Logging with KafkaMetricsGroup {

/** 用于管理 DelayedOperation,其中 key 是 Watcher 中的 DelayedOperation 集合所关心的对象 */
private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
/** watchersForKey 读写锁 */
private val removeWatchersLock = new ReentrantReadWriteLock()
/** 记录当前 DelayedOperationPurgatory 中延时任务的个数 */
private[this] val estimatedTotalOperations = new AtomicInteger(0)
/**
* 主要具备 2 个作用:
* 1. 推进时间指针
* 2. 定期清理 watchersForKey 中已经完成的延时任务
*/
private val expirationReaper = new ExpiredOperationReaper()

// ... 省略方法定义

}

各字段的含义如代码注释,这里我们主要介绍一下 DelayedOperationPurgatory#expirationReaper 字段,它是 ExpiredOperationReaper 类型。ExpiredOperationReaper 是 DelayedOperationPurgatory 中定义的内部类,它继承自 ShutdownableThread 抽象类,所以我们可以知道它本质上是一个线程类,在构造 DelayedOperationPurgatory 对象时如果设置 reaperEnabled=true 则会启动该线程。ExpiredOperationReaper 类实现如下:

1
2
3
4
5
6
7
private class ExpiredOperationReaper extends ShutdownableThread("ExpirationReaper-%d".format(brokerId), false) {

override def doWork() {
advanceClock(200L)
}

}

ExpiredOperationReaper 的主要作用是调用 DelayedOperationPurgatory#advanceClock 方法在后台推动时间轮指针,并定期清理当前 DelayedOperationPurgatory 中记录的所有已执行完成的延时任务。方法 DelayedOperationPurgatory#advanceClock 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
def advanceClock(timeoutMs: Long) {
// 尝试推进时间轮指针
timeoutTimer.advanceClock(timeoutMs)

// 如果当前炼狱中的已完成任务数超过给定阈值 purgeInterval,则尝试清理
if (estimatedTotalOperations.get - delayed > purgeInterval) {
estimatedTotalOperations.getAndSet(delayed())
debug("Begin purging watch lists")
// 遍历各个 Watcher 集合,执行清理操作
val purged = allWatchers.map(_.purgeCompleted()).sum
debug("Purged %d elements from watch lists.".format(purged))
}
}

上述方法会调用定时器的 SystemTimer#advanceClock 方法推进时间轮指针,并估算当前 DelayedOperationPurgatory 中已经完成的延时任务数目是否超过设置的阈值 purgeInterval,默认为 1000,如果超过该阈值则会触发清理工作。清理期间会获取并处理所有在册的 Watchers 对象,通过调用每个 Watchers 对象的 Watchers#purgeCompleted 方法,对 Watchers 中已经执行完成的延时任务对象进行清理,如果某个 key 的 Watchers 对象在被清理之后不再包含任何等待执行的延时任务,则会调用 DelayedOperationPurgatory#removeKeyIfEmpty 方法将对应的 key 从 DelayedOperationPurgatory#watchersForKey 字段中一并移除,防止内存泄露。

下面我们继续来看一下 DelayedOperationPurgatory 中剩余的两个主要方法实现,即 DelayedOperationPurgatory#tryCompleteElseWatchDelayedOperationPurgatory#checkAndComplete 方法。

方法 DelayedOperationPurgatory#tryCompleteElseWatch 用于往 DelayedOperationPurgatory 中添加延时任务,一个延时任务可以被同时关联到多个 key 对象上,这样可以从多个维度触发执行该延时任务。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
assert(watchKeys.nonEmpty, "The watch key list can't be empty")

// 1. 调用延时任务的 tryComplete 方法,尝试完成延迟操作
var isCompletedByMe = operation.safeTryComplete()
// 如果延时任务已经执行完成,则直接返回
if (isCompletedByMe) return true

// 2. 遍历处理 watchKeys,将延时任务添加其关心的 key 对应的 Watchers 中
var watchCreated = false
for (key <- watchKeys) {
// 如果待添加的延时任务已经执行完成,则放弃添加
if (operation.isCompleted) return false

// 添加延时任务添加到对应 key 的 Watchers 集合中,用于从时间维度以外的维度触发延时任务执行
this.watchForOperation(key, operation)

if (!watchCreated) {
watchCreated = true
// 延时任务计数加 1,一个延时任务可能会被添加到多个 key 对应的 Watchers 集合中,但是任务计数只会增加 1 次
estimatedTotalOperations.incrementAndGet()
}
}

// 3. 再次调用延时任务的 tryComplete 方法,尝试完成延迟操作
isCompletedByMe = operation.safeTryComplete()
if (isCompletedByMe) return true

// 4. 对于未执行的延时任务,尝试添加到定时器中,用于从时间维度触发延时任务执行
if (!operation.isCompleted) {
timeoutTimer.add(operation)
// 再次检测延时任务的执行情况,如果已经完成则从定时器中移除
if (operation.isCompleted) {
operation.cancel()
}
}

false
}

注册延时任务的执行流程如下:

  1. 尝试执行延时任务,如果当前已经完成执行则返回;
  2. 否则,遍历给定的 key 集合,将延时任务添加到每个 key 的 Watchers 中,建立从多个维度触发延时任务执行的条件,期间如果延时任务已经完成执行,则不再继续添加;
  3. 再次尝试执行延时任务,如果当前已经完成执行则返回;
  4. 否则,将延时任务添加到定时器中,建立从时间维度触发延时任务执行的条件,如果期间任务已经完成执行,则从时间轮中取消任务。

整个执行流程我们看到方法多次尝试执行延时任务,以保证对应的延时任务能够尽快被触发,同时减少 DelayedOperationPurgatory 不必要的开销。下面我们主要来看一下步骤 2 的执行逻辑,在该步骤中调用了 DelayedOperationPurgatory#watchForOperation 方法将当前延时任务对象添加到对应 key 的 Watchers 中,方法实现如下:

1
2
3
4
5
6
private def watchForOperation(key: Any, operation: T) {
inReadLock(removeWatchersLock) {
val watcher = watchersForKey.getAndMaybePut(key)
watcher.watch(operation)
}
}

其中,方法 Pool#getAndMaybePut 会尝试获取 key 对应的 Watchers 对象,如果不存在则会创建,然后调用 Watchers#watch 方法将延时任务记录到 Watchers 对象的同步队列中。

在整个 DelayedOperationPurgatory#tryCompleteElseWatch 方法的实现上我们可能会疑惑,为什么既要将延时任务添加到定时器 SystemTimer 中,同时还需要将延时任务添加到每个 key 的 Watchers 中?监听这些 key 的意义又是什么呢?

要理解这一设计,就需要回到本小节开头我们对于 DelayedOperation 抽象类设计的讨论,既然已经有了定时器 SystemTimer 可以执行延时任务,为什么还要实现 DelayedOperation 和 DelayedOperationPurgatory 呢?因为单从时间维度不能覆盖所有的应用场景,而将延时任务记录到其关注的 key 对应的 Watchers 对象中,在相关条件满足时,我们可以通过 key 获取到对应的延时任务,并调用 DelayedOperation#forceComplete 方法提前执行满足执行条件的延时任务,从而能够从多个维度对延时任务进行控制。

方法 DelayedOperationPurgatory#checkAndComplete 用于检测关注指定 key 的延时任务是否满足执行条件,如果满足则触发执行,方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def checkAndComplete(key: Any): Int = {
// 获取 key 对应的 Watchers 对象
val watchers = inReadLock(removeWatchersLock) {
watchersForKey.get(key)
}
if (watchers == null) 0
// 如果存在对应的 Watchers 对象,则对记录在其中待执行的延时任务尝试触发执行,并移除已经执行完成的任务
else watchers.tryCompleteWatched()
}

// Watchers#tryCompleteWatched
def tryCompleteWatched(): Int = {
var completed = 0

// 遍历处理当前 Watchers 对象中的延时任务
val iter = operations.iterator()
while (iter.hasNext) {
val curr = iter.next()
// 如果对应的延时任务已经执行完成,则从 Watchers 中移除
if (curr.isCompleted) {
iter.remove()
}
// 尝试执行延时任务
else if (curr.safeTryComplete()) {
iter.remove()
completed += 1
}
}

// 如果 key 对应的 Watchers 已空,则将 key 从 watchersForKey 中移除,防止内存泄露
if (operations.isEmpty) removeKeyIfEmpty(key, this)

completed
}

上述方法会获取指定 key 对应的 Watchers 对象,并遍历 Watchers 中记录的延时任务,如果对应的延时任务已经完成执行,则将其从 Watchers 对象中移除;否则,会调用 DelayedOperation#safeTryComplete 方法尝试执行对应的延时任务;如果当前 key 的 Watchers 对象已经没有待执行的延时任务,则将 key 从 DelayedOperationPurgatory 中移除,避免因为持有对应 key 对象的引用,导致无法被 GC,而最终导致内存泄露。

基于炼狱的延时任务调度示例

上面分析了这么多,本小节我们列举几个基于炼狱调度的真实延时任务示例,进一步加深对 Kafka 延时任务调度机制的理解。前面分析过的 DelayedOperation 是一个抽象类,围绕该抽象类派生出多个子类,本小节我们主要分析其中 2 个比较典型的延时任务实现:DelayedProduce 和 DelayedFetch。

DelayedProduce

当生产者追加消息到集群时(对应 ProduceRequest 请求),实际上是与对应 topic 分区的 leader 副本进行交互,当消息写入 leader 副本成功后,为了保证 leader 节点宕机时消息数据不丢失,一般需要将消息同步到位于 ISR 集合中的全部 follower 副本,只有当 ISR 集合中所有的 follower 副本成功完成对当前消息的记录之后才认为本次消息追加操作是成功的。这里就存在一个延时任务的适用场景,即当消息被成功追加到 leader 副本之后,我们需要创建一个延时任务等待 ISR 集合中所有的 follower 副本完成同步,并在同步操作完成之后对生产者的请求进行响应,Kafka 定义了 DelayedProduce 类来处理这类需求。

DelayedProduce 继承自 DelayedOperation 抽象类,其字段定义如下:

1
2
3
4
5
6
7
8
9
class DelayedProduce(delayMs: Long, // 延迟时长
produceMetadata: ProduceMetadata, // 用于判断 DelayedProduce 是否满足执行条件
replicaManager: ReplicaManager, // 副本管理器
responseCallback: Map[TopicPartition, PartitionResponse] => Unit // 回调函数,在任务满足条件或到期时执行
) extends DelayedOperation(delayMs) {

// ... 省略方法定义

}

其中 ReplicaManager 用于管理一个 broker 节点上的所有分区副本信息,我们将在下一篇中对其进行深入分析,这里我们主要来看一下 ProduceMetadata 样例类,定义如下:

1
2
3
4
5
6
7
8
9
10
11
case class ProduceMetadata(produceRequiredAcks: Short, // 对应 acks 值设置
produceStatus: Map[TopicPartition, ProducePartitionStatus]) { // 记录每个 topic 分区对应的消息追加状态
}

case class ProducePartitionStatus(requiredOffset: Long, // 对应 topic 分区最后一条消息的 offset
responseStatus: PartitionResponse) { // 记录 ProducerResponse 中的错误码

/** 标识是否正在等待 ISR 集合中的 follower 副本从 leader 副本同步 requiredOffset 之前的消息 */
@volatile var acksPending = false

}

ProduceMetadata 类记录了当前延时任务所关注的 topic 分区的消息追加状态,DelayedProduce 延时任务在被构造时会依据消息写入对应 topic 分区 leader 副本是否成功来对其进行初始化:

1
2
3
4
5
6
7
8
9
10
11
// 依据消息写入 leader 分区操作的错误码对 produceMetadata 的 produceStatus 进行初始化
produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
if (status.responseStatus.error == Errors.NONE) {
// 对应 topic 分区消息写入 leader 副本成功,等待其它副本同步
status.acksPending = true
status.responseStatus.error = Errors.REQUEST_TIMED_OUT // 默认错误码
} else {
// 对应 topic 分区消息写入 leader 副本失败,无需等待
status.acksPending = false
}
}

下面来看一下 DelayedProduce#tryComplete 方法实现,该方法会检测当前延时任务所关注的 topic 分区的运行状态,当满足以下 3 个条件之一时则认为无需再继续等待对应的 topic 分区:

  1. 对应 topic 分区的 leader 副本不再位于当前 broker 节点上。
  2. 检查 ISR 集合中的所有 follower 副本是否完成同步时出现异常。
  3. ISR 集合中所有的 follower 副本完成了同步操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
override def tryComplete(): Boolean = {
// 遍历处理所有的 topic 分区
produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
trace(s"Checking produce satisfaction for $topicPartition, current status $status")
// 仅处理正在等待 follower 副本复制的分区
if (status.acksPending) {
val (hasEnough, error) = replicaManager.getPartition(topicPartition) match {
case Some(partition) =>
// 检测对应分区本次追加的最后一条消息是否已经被 ISR 集合中所有的 follower 副本同步
partition.checkEnoughReplicasReachOffset(status.requiredOffset)
case None =>
// 找不到对应的分区对象,说明对应分区的 leader 副本已经不在当前 broker 节点上
(false, Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
// 出现异常,或所有的 ISR 副本已经同步完成
if (error != Errors.NONE || hasEnough) {
status.acksPending = false // 不再等待
status.responseStatus.error = error
}
}
}

// 如果所有的 topic 分区都已经满足了 DelayedProduce 的执行条件,即不存在等待 ack 的分区,则结束本次延时任务
if (!produceMetadata.produceStatus.values.exists(_.acksPending))
forceComplete()
else
false
}

如果所有的 topic 分区均不再处于等待状态,则上述方法会触发执行 DelayedProduce#forceComplete 操作,即调用 DelayedProduce#onComplete 方法以回调的形式将各个 topic 分区对应的响应状态发送给客户端。

DelayedFetch

消费者和 follower 副本均会向目标 topic 分区的 leader 副本所在 broker 节点发送 FetchRequest 请求来拉取消息,从接收到请求到准备消息数据,再到发送响应之间的过程同样适用于延时任务,Kafka 定义了 DelayedFetch 类来处理这类需求。

DelayedFetch 同样继承自 DelayedOperation 抽象类,其字段定义如下:

1
2
3
4
5
6
7
8
9
10
class DelayedFetch(delayMs: Long, // 延时任务延迟时长
fetchMetadata: FetchMetadata, // 记录对应 topic 分区的状态信息,用于判定当前延时任务是否满足执行条件
replicaManager: ReplicaManager, // 副本管理器
quota: ReplicaQuota,
responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit // 响应回调函数
) extends DelayedOperation(delayMs) {

// ... 省略方法定义

}

其中 FetchMetadata 类型字段 DelayedFetch#fetchMetadata 用于记录当前延时任务关注的 topic 分区的状态信息,用于判定当前延时任务是否满足执行条件。样例类 FetchMetadata 的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
case class FetchMetadata(fetchMinBytes: Int, // 读取的最小字节数
fetchMaxBytes: Int, // 读取的最大字节数
hardMaxBytesLimit: Boolean,
fetchOnlyLeader: Boolean, // 是否只读 leader 副本的消息,一般 debug 模式下可以读 follower 副本的数据
fetchOnlyCommitted: Boolean, // 是否只读已完成提交的消息(即 HW 之前的消息),如果是来自消费者的请求则该参数是 true,如果是 follower 则该参数是 false
isFromFollower: Boolean, // fetch 请求是否来自 follower
replicaId: Int, // fetch 的副本 ID
fetchPartitionStatus: Seq[(TopicPartition, FetchPartitionStatus)]) { // 记录每个 topic 分区的 fetch 状态

}

case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionData) {

}

下面来看一下 DelayedFetch#tryComplete 方法的实现,当满足以下 4 个条件之一时会触发执行延时任务:

  1. 对应 topic 分区的 leader 副本不再位于当前 broker 节点上。
  2. 请求拉取消息的 topic 分区在当前 broker 节点上找不到。
  3. 请求拉取消息的 offset 不位于 activeSegment 对象上,可能已经创建了新的 activeSegment,或者 Log 被截断。
  4. 累计读取的字节数已经达到所要求的最小字节数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
override def tryComplete(): Boolean = {
var accumulatedSize = 0
var accumulatedThrottledSize = 0
// 遍历处理当前延时任务关注的所有 topic 分区的状态信息
fetchMetadata.fetchPartitionStatus.foreach { case (topicPartition, fetchStatus) =>
// 获取上次拉取消息的结束 offset
val fetchOffset = fetchStatus.startOffsetMetadata
try {
if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
// 获取 topic 分区的 leader 副本
val replica = replicaManager.getLeaderReplicaIfLocal(topicPartition)
// 依据发起请求是消费者还是 follower 来确定拉取消息的结束 offset
val endOffset = if (fetchMetadata.fetchOnlyCommitted) replica.highWatermark else replica.logEndOffset

// 校验上次拉取消息完成之后,endOffset 是否发生变化,如果未发送变化则说明数据不够,没有继续的必要,否则继续执行
if (endOffset.messageOffset != fetchOffset.messageOffset) {
if (endOffset.onOlderSegment(fetchOffset)) {
// 条件 3,endOffset 相对于 fetchOffset 较小,说明请求不位于当前 activeSegment 上
debug("Satisfying fetch %s since it is fetching later segments of partition %s.".format(fetchMetadata, topicPartition))
return forceComplete()
} else if (fetchOffset.onOlderSegment(endOffset)) {
// 条件 3,fetchOffset 位于 endOffset 之前,但是 fetchOffset 落在老的 LogSegment 上,而非 activeSegment 上
debug("Satisfying fetch %s immediately since it is fetching older segments.".format(fetchMetadata))
if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId)) return forceComplete()
} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
// fetchOffset 和 endOffset 位于同一个 LogSegment 上,计算累计读取的字节数
val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
if (quota.isThrottled(topicPartition)) accumulatedThrottledSize += bytesAvailable
else accumulatedSize += bytesAvailable
}
}
}
} catch {
// 条件 2,请求 fetch 的 topic 分区在当前 broker 节点上找不到
case _: UnknownTopicOrPartitionException => // Case B
debug("Broker no longer know of %s, satisfy %s immediately".format(topicPartition, fetchMetadata))
return forceComplete()
// 条件 1,对应 topic 分区的 leader 副本不再位于当前 broker 节点上
case _: NotLeaderForPartitionException => // Case A
debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicPartition, fetchMetadata))
return forceComplete()
}
}

// 条件 4,累计读取的字节数已经达到最小字节限制
if (accumulatedSize >= fetchMetadata.fetchMinBytes
|| ((accumulatedSize + accumulatedThrottledSize) >= fetchMetadata.fetchMinBytes && !quota.isQuotaExceeded()))
forceComplete()
else
false
}

如果延时任务满足执行条件,则会触发执行 DelayedFetch#forceComplete 方法,即调用 DelayedFetch#onComplete 方法以回调的形式将各个 topic 分区对应的响应数据封装成 FetchPartitionData 对象发送给请求方。

总结

本文介绍了 Kafka 基于分层时间轮算法实现的延时任务调度策略,相对于 JDK 内建的实现具备更加高效的性能,并支持除时间维度以外多维度的任务触发机制。这一部分的相关实现相对比较独立和通用,感兴趣的读者可以将其抽取出来作为基础组件用于自己的项目中。