Kafka 源码解析:延时任务调度策略
Kafka 一些组件的命名很是有趣,比如炼狱(purgatory)、死神(reaper)等,在日常开发中也建议大家在类和方法命名上能够以一些能够表达类或方法意图的人或事物的名词进行命名,让项目显得更加的生动。今天我们要分析的组件就是以 purgatory 命名的 DelayedOperationPurgatory,DelayedOperationPurgatory 是一个相对独立的组件,我们可以将其抽取出来用于自己的日常项目中,DelayedOperationPurgatory 主要用于管理延时任务,底层依赖于分层时间轮算法实现。
说到延时任务调度,对于 java 开发者来说,日常用到比较多的可能是 JDK 自带的 Timer、ScheduledThreadPoolExecutor 和 DelayQueue 等,但是对于 Kafka 这类需要频繁执行复杂延时任务的分布式系统来说,这些组件在性能上还稍显不足,所以 Kafka 自定义了分层时间轮算法,提供了 O(m)
时间复杂度(m 为时间轮层级数)的任务插入性能和 O(1)
时间复杂度的任务删除性能,要优于 JDK 自带的基于堆实现的 O(log(n))
时间复杂度的延时任务调度组件。
传统的时间轮算法在实现上采用单环实现,环上对应指定数量的时间格,然后将延时任务分散到对应的时间格上,随着时间指针的推进触发相应的任务执行。这样设计的缺点在于只能添加位于一个特定区间内的延时任务,如果延时任务的跨度较长,则时间轮需要被设计的非常大,在一定程度上是一种浪费,同时也无法彻底解决其表现力不足的问题。分层时间轮则采用多环实现,每个环的时间格在粒度上存在差异(一般上层时间轮一格的时间跨度等于下层整个时间轮的跨度),通过多环嵌套能够让时间轮表示任意延迟时长的任务调度。
时间轮算法定义了从时间维度触发延时任务的执行,实际应用中可能还需要允许从其他维度对延迟任务在到达延时时间之前提前触发。为此,Kafka 定义了 DelayedOperation 和 DelayedOperationPurgatory 组件,通过在注册延时任务时让每个延时任务关注一个或多个 key,当这些 key 状态发生变更时触发调用对应的延时任务,以实现对延时任务的多维度控制。
分层时间轮
Kafka 的分层时间轮算法在实现上主要涉及 TimingWheel、TimerTaskList、TimerTaskEntry,以及 TimerTask 这 4 个类,各个类的作用说明如下:
- TimerTask :特质类型,用于描述延时任务。
- TimerTaskList :时间格,采用环形双向链表实现,记录位于同一个时间格中的延时任务。
- TimerTaskEntry :时间格链表中的一个结点,是对延时任务 TimerTask 的封装。
- TimingWheel :时间轮,采用定长数组记录放置时间格。
TimerTask 和 TimerTaskEntry
本小节来看一下分层时间轮算法的具体实现,首先来看一下对于延时任务的描述,即 TimerTask 特质,而 TimerTask 特质和类 TimerTaskEntry 在实现上是相互引用的,所以需要将这两者结合起来分析。TimerTask 继承了 Runnable 接口,其字段定义如下:
1 | trait TimerTask extends Runnable { |
其中,字段 TimerTask#delayMs
用于设置当前延时任务的延时时长,例如延迟 1 分钟执行,则 delayMs 即等于 1 * 60 * 1000
,即 60000 毫秒。而字段 TimerTask#timerTaskEntry
则用于封装当前延时任务并记录到时间格中,属于延时任务与时间格之间建立关系的桥梁。TimerTask 中定义了绑定和获取 timerTaskEntry 的方法 TimerTask#setTimerTaskEntry
和 TimerTask#getTimerTaskEntry
。
需要知晓的一点是,如果当前任务之前已经被添加到时间格中,即对应的 timerTaskEntry 已经被赋值过,当再次执行绑定时,如果是绑定到新的时间格结点则需要先从之前绑定的时间格中移除当前延时任务。此外,TimerTask#cancel
方法用于取消对应的延时任务,实际上就是调用 TimerTaskEntry#remove
方法从时间格中移除对应的结点。
TimerTaskEntry 类本质上是一个链表结点的定义,其字段定义了结点的前置和后置指针,以及所属的时间格,类字段定义如下:
1 | private[timer] class TimerTaskEntry(val timerTask: TimerTask, // 封装的延时任务 |
TimerTaskEntry 类对象在被构造时会建立延时任务与结点之间的映射关系,并提供了获取延时任务取消状态,以及移除对应延时任务的操作。这里的 TimerTaskEntry#expirationMs
字段是延时任务到期时间戳,也就是延时任务应该被触发执行的时间戳,在计算上等于延时任务的 TimerTask#delayMs
时间加上任务被添加到时间轮中的时间戳。
TimerTaskList
TimerTaskList 描述了时间轮的一格(即时间格),在实现上采用双向链表实现,用于封装位于特定时间区间范围内的所有的延时任务,其字段定义如下:
1 | private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { |
TimerTaskList 的默认构造方法接收一个 AtomicInteger 类型的计数器变量,该变量在整个时间轮设计中是共享的,用于记录整个分层时间轮中持有的延时任务总数。同时 TimerTaskList 还定义了一个 TimerTaskList#expiration
字段,用于记录当前时间格对应时间区间的上界。TimerTaskList 提供了添加和移除延时任务的方法:TimerTaskList#add
和 TimerTaskList#remove
。需要注意的一点是,如果当前添加的延时任务在之前被添加过,则再次添加时会先移除之前的添加记录。同时,TimerTaskList 还提供了 TimerTaskList#flush
方法,该方法接收一个 TimerTaskEntry => Unit
类型的函数 f,用于从当前时间格中移除所有延时任务,并对每个任务应用 f 函数,在后面分析推进时间轮指针时将会看到,借助 TimerTaskList#flush
方法可以执行一个时间格中所有到期且未被取消的任务,并对未到期的任务重新放入对应层级的时间轮中,继续等待调度。
TimingWheel
介绍完了 TimerTask、TimerTaskEntry 和 TimerTaskList,最后来重点看一下分层时间轮 TimingWheel 的实现,其字段定义如下:
1 | private[timer] class TimingWheel(tickMs: Long, // 当前时间轮中一格的时间跨度 |
一个时间轮中包含多个时间格,TimingWheel 使用 TimingWheel#wheelSize
字段记录单层时间轮中的时间格格数,并使用 TimingWheel#tickMs
字段记录一个时间格的时间跨度,同时用一个数组 TimingWheel#buckets
记录这些时间格。作为分层结构设计,TimingWheel 定义了 TimingWheel#overflowWheel
字段用于对上层时间轮进行引用,上层时间轮的时间格跨度为当前时间轮的总时间跨度,对应 TimingWheel#interval
字段。一个 TimingWheel 对应的具体延时任务处理时间是在 [startMs - (startMs % tickMs), startMs - (startMs % tickMs) + interval)
之间,超过该区间的任务将会被提交给上层时间轮进行管理。字段 TimingWheel#currentTime
表示对应时间轮的指针,在 TimingWheel 刚刚被构造出来时,其值等于 startMs - (startMs % tickMs)
,之所以需要减去 (startMs % tickMs)
,是为了保持与 tickMs 对齐。各层级的 TimingWheel 共用一个 DelayQueue 对象,其中记录了所有的延时任务,DelayQueue 的每个结点对应一个隶属于某个时间轮的时间格对象。
介绍完了 TimingWheel 的字段定义,我们来看一下 TimingWheel 的方法实现,TimingWheel 总共定义了 3 个方法:TimingWheel#addOverflowWheel
、TimingWheel#add
和 TimingWheel#advanceClock
。
方法 TimingWheel#addOverflowWheel
用于添加并初始化上层时间轮,在 Kafka 的分层时间轮算法设计中,上层时间轮是按需添加的,只要在当前时间轮容纳不了给定的延时任务时,才会触发将该延时任务提交给上层时间轮管理,此时如果上层时间轮还未定义,则会调用该方法初始化上层时间轮。方法实现如下:
1 | private[this] def addOverflowWheel(): Unit = { |
创建上层时间轮无非就是新建一个 TimingWheel 对象,并赋值给当前时间轮的 TimingWheel#overflowWheel
字段。这里需要注意的地方就是对应上层时间轮的字段赋值,由方法实现可以看出上层时间轮中每一个时间格的时间跨度 tickMs 等于当前时间轮的总时间跨度 interval,而时间格格数仍保持不变,对应的任务计数器 taskCounter 和任务队列 queue 都是全局共用的。
方法 TimingWheel#add
用于往时间轮中添加延时任务,该方法接收一个 TimerTaskEntry 类型对象,即对延时任务 TimerTask 的封装。方法实现如下:
1 | def add(timerTaskEntry: TimerTaskEntry): Boolean = { |
上述方法的返回值为 Boolean 类型,后面的分析中将会看到,如果方法返回 false 且对应的任务未被取消,则会立即提交执行该任务。针对那些已经到期或已经被取消的任务会立即返回 false,这类任务不需要被添加到时间轮中。对于剩下的延时任务,如果任务的到期时间正好位于当前时间轮处理的时间区间内,则会将任务添加到时间轮对应的时间格中,同时将对应的时间格记录到全局 DelayQueue 中用于后续管理。如果待添加的延时任务已经超出了当前时间轮的处理范围,则会提交给上层时间轮进行管理,这一步会尝试触发创建并初始化上层时间轮。
方法 TimingWheel#advanceClock
用于推动当前时间轮指针(对应 TimingWheel#currentTime
字段),如果存在上层时间轮,则会尝试继续推动上层时间轮。方法实现如下:
1 | def advanceClock(timeMs: Long): Unit = { |
定时器
上面介绍的 TimingWheel 提供了添加延时任务和推进时间轮指针的操作,而具体执行延时任务的操作则交由定时器 SystemTimer 完成。SystemTimer 类实现了 Timer 特质,该特质描绘了定时器应该具备的基本方法,定义如下:
1 | trait Timer { |
SystemTimer 类字段定义如下:
1 | class SystemTimer(executorName: String, |
由上面的字段定义可以看出 SystemTimer 是对时间轮 TimingWheel 的封装,并提供了线程池 taskExecutor 以执行到期的延时任务。SystemTimer 实现了 Timer 特质中声明的所有方法,其中 SystemTimer#size
和 SystemTimer#shutdown
方法的实现都比较简单,下面我们重点来看一下用于添加延时任务的 SystemTimer#add
方法,以及推动时间轮指针的 SystemTimer#advanceClock
方法实现。
方法 SystemTimer#add
会将待添加的延时任务 TimerTask 对象封装成 TimerTaskEntry 对象添加到对应的时间格中,添加的过程调用的是 TimingWheel#add
方法。前面曾介绍过该方法会将未到期的延时任务添加到对应的时间轮中并返回 true,对于已到期或已经被取消的延时任务则会立即返回 false。由下面的实现可以看到,对于那些已经到期但是未被取消的任务,会立即被提交给执行线程予以执行。
1 | override def add(timerTask: TimerTask): Unit = { |
方法 SystemTimer#advanceClock
用于推动时间轮指针,推动的操作本质上是调用 TimingWheel#advanceClock
方法实现,但是区别于 TimingWheel 中单纯的向前移动指针,方法 SystemTimer#advanceClock
会从全局任务队列中获取队头的时间格,并执行时间格中已到期的任务。方法实现如下:
1 | /** 将延时任务重新添加到时间轮中 */ |
前面我们曾分析过 TimerTaskList#flush
方法,知道该方法会从对应时间格中移除所有的延时任务并为每个任务应用参数给定的函数,而这里的函数定义则是 reinsert,该函数会对时间格中的每个延时任务应用上面分析过的 SystemTimer#addTimerTaskEntry
操作,即尝试将每个任务再次加入到对应的时间格,并执行已到期未被取消的任务。
这里需要清楚的一点是,从当前时间格中移出但未到期的延时任务,当再次被添加到时间轮中时,不一定会被添加到原来的时间轮中,因为随着时间的流失,距离对应延时任务的时间差也越来越小,这个时候一般会发生时间轮的降级,即从一个较大(时间区间)粒度的时间轮中降落到粒度较小的时间轮中。实际上,从在时间轮中等待到被执行本质上也是一种降级操作,只是这里较小的时间粒度是 0,表示延时任务已经到期,需要立即被执行。
炼狱
上面介绍的定时器 SystemTimer 能够依据给定的延时时间延迟对任务的执行,而在实际应用中时间往往只是触发任务执行的维度之一,一些场景下我们需要对延时任务的执行更加灵活的控制。例如在生产者向服务端发送消息并等待服务端确认时,服务端需要依据客户端指定的 acks 参数等待指定数量的副本确认已完成对当前消息的复制操作才能向客户端发送确认的响应。这本质上是一个异步的操作,而具体的执行时机不能单方面用时间进行衡量,这个时候我们需要利用更多的信息对延迟任务的执行进行控制。
DelayedOperation
DelayedOperation 正是上面描述的这一类延时任务的抽象,它实现了 TimerTask 特质,并定义了以下方法:
- forceComplete :强制执行延时任务,包括满足执行条件主动触发,以及延时到期。
- onComplete :延时任务的具体执行逻辑,在整个延时任务的生命周期中只能被调用一次,且只能由 forceComplete 方法调用。
- onExpiration :当延时任务因为时间到期被执行时会触发该方法中定义的逻辑。
- tryComplete :检测是否满足延时任务执行条件,若满足则会调用 forceComplete 方法。
- safeTryComplete :方法 tryComplete 的线程安全版本。
- isCompleted :检测延时任务是否完成执行。
上述方法中,除了 DelayedOperation#forceComplete
方法外,其余基本为抽象方法声明,这里主要看一下该方法的实现:
1 | def forceComplete(): Boolean = { |
当调用 DelayedOperation#forceComplete
方法时,会将当前延时任务从时间轮中移除并立即触发执行。
DelayedOperation 实现了 TimerTask 特质,所以也间接实现了 Runnable 接口,当延时任务到期时会被提交给定时器的线程执行,其 DelayedOperation#run
方法实现如下:
1 | override def run(): Unit = { |
当延时任务到期时会触发 DelayedOperation#forceComplete
方法的执行,如果延时任务在其它线程中被执行,则 DelayedOperation#forceComplete
方法会立即返回 false,也就不会继续触发执行 DelayedOperation#onExpiration
方法。
DelayedOperationPurgatory 类提供了对 DelayedOperation 管理的功能,DelayedOperationPurgatory 维护了一个 Pool 类型(key/value 类型)的 watchersForKey 对象,用于记录延时任务及其关注的 key 之间的映射关系,用于支持时间维度以外的其它维度操作。Pool 封装了 ConcurrentHashMap,并在 ConcurrentHashMap 的基础上添加了 Pool#getAndMaybePut
方法,用于在对应 key 不命中时使用给定的 value 更新键值对。Pool 的默认构造方法接收一个 Option[K => V]
类型的 valueFactory 参数,用于为 key 生成对应的 Watchers 对象。
DelayedOperationPurgatory
DelayedOperationPurgatory 类提供了对 DelayedOperation 进行管理的功能,其中定义的 Pool 对象的 key 是 Any 类型,表示可以使用任意类型的对象作为 key,而 value 则是 Watchers 类型,用于封装关注相同 key 的 DelayedOperation 集合,底层依赖于 ConcurrentLinkedQueue 实现。Watchers 中定义的方法实现将在后面分析 DelayedOperationPurgatory 类时一同分析。DelayedOperationPurgatory 类的字段定义如下:
1 | class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, |
各字段的含义如代码注释,这里我们主要介绍一下 DelayedOperationPurgatory#expirationReaper
字段,它是 ExpiredOperationReaper 类型。ExpiredOperationReaper 是 DelayedOperationPurgatory 中定义的内部类,它继承自 ShutdownableThread 抽象类,所以我们可以知道它本质上是一个线程类,在构造 DelayedOperationPurgatory 对象时如果设置 reaperEnabled=true
则会启动该线程。ExpiredOperationReaper 类实现如下:
1 | private class ExpiredOperationReaper extends ShutdownableThread("ExpirationReaper-%d".format(brokerId), false) { |
ExpiredOperationReaper 的主要作用是调用 DelayedOperationPurgatory#advanceClock
方法在后台推动时间轮指针,并定期清理当前 DelayedOperationPurgatory 中记录的所有已执行完成的延时任务。方法 DelayedOperationPurgatory#advanceClock
的实现如下:
1 | def advanceClock(timeoutMs: Long) { |
上述方法会调用定时器的 SystemTimer#advanceClock
方法推进时间轮指针,并估算当前 DelayedOperationPurgatory 中已经完成的延时任务数目是否超过设置的阈值 purgeInterval,默认为 1000,如果超过该阈值则会触发清理工作。清理期间会获取并处理所有在册的 Watchers 对象,通过调用每个 Watchers 对象的 Watchers#purgeCompleted
方法,对 Watchers 中已经执行完成的延时任务对象进行清理,如果某个 key 的 Watchers 对象在被清理之后不再包含任何等待执行的延时任务,则会调用 DelayedOperationPurgatory#removeKeyIfEmpty
方法将对应的 key 从 DelayedOperationPurgatory#watchersForKey
字段中一并移除,防止内存泄露。
下面我们继续来看一下 DelayedOperationPurgatory 中剩余的两个主要方法实现,即 DelayedOperationPurgatory#tryCompleteElseWatch
和 DelayedOperationPurgatory#checkAndComplete
方法。
方法 DelayedOperationPurgatory#tryCompleteElseWatch
用于往 DelayedOperationPurgatory 中添加延时任务,一个延时任务可以被同时关联到多个 key 对象上,这样可以从多个维度触发执行该延时任务。方法实现如下:
1 | def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = { |
注册延时任务的执行流程如下:
- 尝试执行延时任务,如果当前已经完成执行则返回;
- 否则,遍历给定的 key 集合,将延时任务添加到每个 key 的 Watchers 中,建立从多个维度触发延时任务执行的条件,期间如果延时任务已经完成执行,则不再继续添加;
- 再次尝试执行延时任务,如果当前已经完成执行则返回;
- 否则,将延时任务添加到定时器中,建立从时间维度触发延时任务执行的条件,如果期间任务已经完成执行,则从时间轮中取消任务。
整个执行流程我们看到方法多次尝试执行延时任务,以保证对应的延时任务能够尽快被触发,同时减少 DelayedOperationPurgatory 不必要的开销。下面我们主要来看一下步骤 2 的执行逻辑,在该步骤中调用了 DelayedOperationPurgatory#watchForOperation
方法将当前延时任务对象添加到对应 key 的 Watchers 中,方法实现如下:
1 | private def watchForOperation(key: Any, operation: T) { |
其中,方法 Pool#getAndMaybePut
会尝试获取 key 对应的 Watchers 对象,如果不存在则会创建,然后调用 Watchers#watch
方法将延时任务记录到 Watchers 对象的同步队列中。
在整个 DelayedOperationPurgatory#tryCompleteElseWatch
方法的实现上我们可能会疑惑,为什么既要将延时任务添加到定时器 SystemTimer 中,同时还需要将延时任务添加到每个 key 的 Watchers 中?监听这些 key 的意义又是什么呢?
要理解这一设计,就需要回到本小节开头我们对于 DelayedOperation 抽象类设计的讨论,既然已经有了定时器 SystemTimer 可以执行延时任务,为什么还要实现 DelayedOperation 和 DelayedOperationPurgatory 呢?因为单从时间维度不能覆盖所有的应用场景,而将延时任务记录到其关注的 key 对应的 Watchers 对象中,在相关条件满足时,我们可以通过 key 获取到对应的延时任务,并调用 DelayedOperation#forceComplete
方法提前执行满足执行条件的延时任务,从而能够从多个维度对延时任务进行控制。
方法 DelayedOperationPurgatory#checkAndComplete
用于检测关注指定 key 的延时任务是否满足执行条件,如果满足则触发执行,方法实现如下:
1 | def checkAndComplete(key: Any): Int = { |
上述方法会获取指定 key 对应的 Watchers 对象,并遍历 Watchers 中记录的延时任务,如果对应的延时任务已经完成执行,则将其从 Watchers 对象中移除;否则,会调用 DelayedOperation#safeTryComplete
方法尝试执行对应的延时任务;如果当前 key 的 Watchers 对象已经没有待执行的延时任务,则将 key 从 DelayedOperationPurgatory 中移除,避免因为持有对应 key 对象的引用,导致无法被 GC,而最终导致内存泄露。
基于炼狱的延时任务调度示例
上面分析了这么多,本小节我们列举几个基于炼狱调度的真实延时任务示例,进一步加深对 Kafka 延时任务调度机制的理解。前面分析过的 DelayedOperation 是一个抽象类,围绕该抽象类派生出多个子类,本小节我们主要分析其中 2 个比较典型的延时任务实现:DelayedProduce 和 DelayedFetch。
DelayedProduce
当生产者追加消息到集群时(对应 ProduceRequest 请求),实际上是与对应 topic 分区的 leader 副本进行交互,当消息写入 leader 副本成功后,为了保证 leader 节点宕机时消息数据不丢失,一般需要将消息同步到位于 ISR 集合中的全部 follower 副本,只有当 ISR 集合中所有的 follower 副本成功完成对当前消息的记录之后才认为本次消息追加操作是成功的。这里就存在一个延时任务的适用场景,即当消息被成功追加到 leader 副本之后,我们需要创建一个延时任务等待 ISR 集合中所有的 follower 副本完成同步,并在同步操作完成之后对生产者的请求进行响应,Kafka 定义了 DelayedProduce 类来处理这类需求。
DelayedProduce 继承自 DelayedOperation 抽象类,其字段定义如下:
1 | class DelayedProduce(delayMs: Long, // 延迟时长 |
其中 ReplicaManager 用于管理一个 broker 节点上的所有分区副本信息,我们将在下一篇中对其进行深入分析,这里我们主要来看一下 ProduceMetadata 样例类,定义如下:
1 | case class ProduceMetadata(produceRequiredAcks: Short, // 对应 acks 值设置 |
ProduceMetadata 类记录了当前延时任务所关注的 topic 分区的消息追加状态,DelayedProduce 延时任务在被构造时会依据消息写入对应 topic 分区 leader 副本是否成功来对其进行初始化:
1 | // 依据消息写入 leader 分区操作的错误码对 produceMetadata 的 produceStatus 进行初始化 |
下面来看一下 DelayedProduce#tryComplete
方法实现,该方法会检测当前延时任务所关注的 topic 分区的运行状态,当满足以下 3 个条件之一时则认为无需再继续等待对应的 topic 分区:
- 对应 topic 分区的 leader 副本不再位于当前 broker 节点上。
- 检查 ISR 集合中的所有 follower 副本是否完成同步时出现异常。
- ISR 集合中所有的 follower 副本完成了同步操作。
1 | override def tryComplete(): Boolean = { |
如果所有的 topic 分区均不再处于等待状态,则上述方法会触发执行 DelayedProduce#forceComplete
操作,即调用 DelayedProduce#onComplete
方法以回调的形式将各个 topic 分区对应的响应状态发送给客户端。
DelayedFetch
消费者和 follower 副本均会向目标 topic 分区的 leader 副本所在 broker 节点发送 FetchRequest 请求来拉取消息,从接收到请求到准备消息数据,再到发送响应之间的过程同样适用于延时任务,Kafka 定义了 DelayedFetch 类来处理这类需求。
DelayedFetch 同样继承自 DelayedOperation 抽象类,其字段定义如下:
1 | class DelayedFetch(delayMs: Long, // 延时任务延迟时长 |
其中 FetchMetadata 类型字段 DelayedFetch#fetchMetadata
用于记录当前延时任务关注的 topic 分区的状态信息,用于判定当前延时任务是否满足执行条件。样例类 FetchMetadata 的定义如下:
1 | case class FetchMetadata(fetchMinBytes: Int, // 读取的最小字节数 |
下面来看一下 DelayedFetch#tryComplete
方法的实现,当满足以下 4 个条件之一时会触发执行延时任务:
- 对应 topic 分区的 leader 副本不再位于当前 broker 节点上。
- 请求拉取消息的 topic 分区在当前 broker 节点上找不到。
- 请求拉取消息的 offset 不位于 activeSegment 对象上,可能已经创建了新的 activeSegment,或者 Log 被截断。
- 累计读取的字节数已经达到所要求的最小字节数。
1 | override def tryComplete(): Boolean = { |
如果延时任务满足执行条件,则会触发执行 DelayedFetch#forceComplete
方法,即调用 DelayedFetch#onComplete
方法以回调的形式将各个 topic 分区对应的响应数据封装成 FetchPartitionData 对象发送给请求方。
总结
本文介绍了 Kafka 基于分层时间轮算法实现的延时任务调度策略,相对于 JDK 内建的实现具备更加高效的性能,并支持除时间维度以外多维度的任务触发机制。这一部分的相关实现相对比较独立和通用,感兴趣的读者可以将其抽取出来作为基础组件用于自己的项目中。