解读 Daft 数据分区和分批:repartition vs into_partitions vs into_batches
故事从某客户的一次吐槽 Daft 执行性能讲起。话说某一天,一个用户突然在群里吐槽说自己以分布式模式运行的 Daft 任务,只有一个 Worker 节点在参与计算,剩余节点都在旁边吃瓜围观。
于是乎让客户提供了一下任务的 explain 信息,发现是一个比较典型的图片分类打标程序,通过读取图片数据集,对图片进行基本的预处理之后,调用模型进行离线分类打标。这是一个 Daft 非常擅长的图片数据处理任务,并且能够充分发挥 Daft 引擎的优势,为何会被用户吐槽执行慢呢?进一步查看 explain 信息发现了端倪,如下图所示:
没错,只有 1 个 ScanTask! 由于数据集本身存储的只是图片的 URL 信息,所以即使是上百万张图片,也只对应一个不到 20M 的 parquet 文件。在没有开启 ScanTask 任务拆分与合并的前提下,Daft 针对这个数据集只创建了 1 个 ScanTask 进行扫描,并且后续操作没有需要切分 Stage 的算子,因此所有算子都在一个 Pipeline 中执行。这也解释了为什么整个集群只有一个 Worker 节点在工作的原因。
🤔 怎么解决呢?
其实我们只需在 read 操作之后增加 repartition 或 into_partitions 算子强行增加数据分区数即可,目标分区数的设置只要大于等于节点数即可,但通常建议设置为节点数的整数倍。
经过上述调整后所有的 Worker 节点都开始工作起来了,但又遇到了新的问题:用户最终生成的 Lance 数据集包含大量的小文件!这其实是一个目前比较常见的问题,原因在于客户的 UDF 设置了较小的 batch_size 参数值,而早期版本在写 Lance 数据集时并没有攒批机制,导致 UDF 每次输出都会以 Lance Fragment 文件形式落盘,从而产生大量小文件。
🤔 怎么解决呢?
我们只需要在 write 操作之前增加 into_batches 算子强行按照行数对数据进行攒批即可,从而控制当数据行数达到一定量级后写 Lance Fragment 文件。
关于 repartition 和 into_batches,官网文档也用专门的页面进行说明:
Use
df.repartition(...)if you are running on a distributed runner and want to split your data into a fixed number of partitions (finite units of work) across workers.Use
df.into_batches(...)if you want to split your data so that each unit (batch) contains a similar number of rows, regardless of whether you are running distributed or on a single node.
似乎看起来很简单,如果你的需求是将数据切分成指定数目的分区,则使用 repartition;如果你的需求是希望将数据按指定的条数切分一个个批次,则使用 into_batches。然而,在实际使用中我们经常会面对客户的疑问,例如:
- 为什么加了 repartition 之后任务运行好像卡住了?甚至 OOM 了?
- 同样是分区,我应该选择 repartition 还是 into_partitions?二者的区别又是什么?
- 都说 into_partitions 和 into_batches 是流式执行的,为什么还是会触发 spill 呢?
- …
因此,本文打算深入剖析一下 repartition、into_partitions 和 into_batches 各自的特性、应用场景,以及背后的实现机制。希望阅读完本文之后,你能够游刃有余的选择合适的方法控制 Daft 的数据分区和分批。
分区 & 分批
使用 repartition 实现数据重分区
Daft 虽然是基于“morsel-driven”设计和实现的计算引擎,并刻意在弱化分区的概念,但在部分场景或 API 定义方面依然保留了分区的字样,典型的包括:读取数据时按分区规划 ScanTask;提供 repartition 算子以实现对数据的重新分区。这样的设计主要还是降低用户的理解成本,毕竟分区的概念已经深入人心。如下所示展示了 repartition API 的定义:
1 |
|
关于 repartition 的一些说明:
- 参数 num 用于设置目标分区数,允许设置为 None 以保持分区数目不变。
- 在实现层面支持 random、hash 和 range 三种分区策略,而在策略选择上完全由引擎控制,例如用户通过
partition_by参数设置了分区列,则会采用 hash 分区策略。 - 目前仅 Ray Runner 支持 repartition 操作,暂不支持 Native Runner。
- 用户可以显式通过调用 repartition 算子触发重分区操作,一些算子在底层实现上也会复用 repartition 的能力,典型的包括 Join、GroupBy、Distinct 等。
Daft 在 repartition 的实现层面提供了多种数据 Shuffle 策略,包括:
- map_reduce:基础 Shuffle 策略实现,适用于分区相对较少的场景,该策略下 Daft 仅仅是将所有输入分区的数据先物化到内存,然后转置重组为输出分区。
- pre_shuffle_merge:当分区几何平均数超过一定阈值时(默认为 200),Daft 会切换采用该策略,通过在 Worker 节点对需要 Shuffle 的数据分区进行预合并以减少输入分区数,从而降低分区数量过多导致到元数据管理、任务调度和数据传输开销。
- flight_shuffle:基于 Arrow Flight 协议的 Shuffle 实现,该策略下会将需要 Shuffle 的数据先写入本地磁盘,再通过 Flight RPC 按需读取,适用于大表 Join 场景,能突破内存限制,避免 OOM。
用户可以通过 shuffle_algorithm 参数手动选择数据 Shuffle 策略,该参数默认为 auto,即让 Daft 自行决策使用哪种 Shuffle 方式。现阶段,当 shuffle_algorithm=auto 时,Daft 会在 map_reduce 和 pre_shuffle_merge 两种策略之间进行选择,判断的依据取决于 Map 端和 Reduce 端分区数的几何平均数,如果该平均数值超过 200 则选择 pre_shuffle_merge 策略,否则使用 map_reduce 策略。
本小节接下来的内容主要介绍 map_reduce 和 pre_shuffle_merge 两种策略的设计与实现,而对于新引入的 flight_shuffle 策略将留到以后通过专门的文章进行介绍。
基于 map_reduce 策略的 repartition 实现
作为最基础的一种 Shuffle 实现策略,map_reduce 执行链路可以抽象成一个经典的“Map 端切分 + Reduce 端聚合” 的过程,其核心特点是:Map 端先把每个输入分区切分成 N 个数据分片(N 为目标分区数),然后在 Reduce 端从 [0, N) 按照索引将相同索引值对应的数据分片聚合作为同一个下游 Task 的输入,形成新的输出分区。
为了便于描述,假设上游 Task 一共有 3 个输入分区,目标分区数为 5。如下图所示,那么 Map 端的切分会产生近似 3 × 5 = 15 个“数据分片”,随后通过一次“矩阵转置”把这些碎片按目标分区重新分组。
Map 端本地将数据切分为 N 个桶
在每个 Worker 节点本地执行过程中,Daft 会将输入分区数据按目标分区数切成 N 份,此时需要考虑具体的分桶策略:
- Hash:对
partition_by表达式求值后做哈希取模,并将余数相同的行稳定映射到同一个桶中。 - Random:按随机方式将行打散分布到不同的桶中,以实现数据随机均衡。
- Range:内部算子分桶策略,目前主要应用于数据排序场景,根据分区边界定义将相同范围的行落到对应范围的桶中。
Reduce 端按分区索引构建并执行下游 Task
Daft 会等待所有 Map 端输出的物化,即阻塞等待所有 Map 端 Task 的执行完成。Map 阶段物化产生的输出可以看成是一个 M × N 的矩阵,其中的每一行对应一个输入分区,每一列对应一个输出分区。因此,Daft 接下来需要做一次“矩阵转置” 将行和列调换,这样就得到一个 N × M 的矩阵,接下来只需要从 [0, N) 遍历即可获取到下游各个 Task 的输入分区数据。
Daft 会基于各个输入分区创建下游 Task,并通过 InMemoryScan 充当下游 Task 的数据摄取节点。在 InMemoryScan 节点内部会持有派发给当前 Task 的分区引用集合(对应 psets 变量),并在执行时通过引用定位并加载数据。
基于 pre_shuffle_merge 策略的 repartition 实现
对于 map_reduce 这类 Shuffle 策略,针对输入的每个分区都会将其切分成 N 份,因此当上游有 M 个输入分区时,中间产物规模接近 M × N,当 Shuffle 输入分区数和输出分区数都很大时会带来显著的元数据管理、调度,以及数据传输压力。
🤔 怎么解决呢?
这里的问题在于
M × N得到的分区积数非常大,所以最容易想到的解法是减小 M 或 N 的值来降低乘积。由于 N 是用户指定的目标分区数,我们不能随意改变,所以只能减小 M 的值,这也是 pre_shuffle_merge 策略的设计目标。
对于 pre_shuffle_merge Shuffle 策略,可以理解为在 map_reduce 策略之前额外插入了一个分区合并的过程。如下图所示,通过将由上游相同 Worker 上产出的大量小分区按照一定策略进行分组,然后再执行标准的 map_reduce 分区过程。
当 Shuffle 的输入分区与输出分区非常多时,直接采用 map_reduce 策略会产生大量碎片与任务调度压力,预合并能显著减少进入真正 Shuffle 的输入 Task 数量,从而降低中间分区碎片数量、RPC 连接数,以及集群的元数据管理和任务调度负担。
触发条件与阈值设定
当 shuffle_algorithm=auto 时,Daft 会基于输入分区数 M 和目标分区数 N 计算 sqrt(M × N) 几何平均数。当该几何平均数默认超过 200 时,Daft 会切换为选择 pre_shuffle_merge 策略,否则走 map_reduce 策略。
如果采用 pre_shuffle_merge 策略,Daft 会在预合并阶段使用基于 pre_shuffle_merge_threshold 参数设定的阈值来决定何时把一批分区数据输出合并成一个新 Task,该参数默认值为 1GB。
按节点合并输入分区
预合并阶段会先将上游 Task 的输出分区按所属 Worker 进行分桶,并合并以作为下游 Task 的输入,核心流程包括:
- 按分区所属 Worker 分桶:每个桶收集来自同一 Worker 上产出的物化输出。
- 累计桶内数据大小:当桶内数据累计大小达到
pre_shuffle_merge_threshold参数设定的阈值时,即将桶内这批数据以“InMemoryScan + psets”的形式打包作为下游 Task 的输入。 - 节点亲和性调度:合并后的 Task 输入带有 Worker 节点亲和性,因此 Daft 会尽量让下游 Task 在对应的 Worker 上执行,以减少跨 Worker 节点拉取分区数据的开销。
后续过程与上一节完全一致,即按照 map_reduce 策略创建 N 个输出分区,并为每个分区生成下游 Task。
使用 into_partitions 实现数据重分区
上面小节我们介绍并分析了 repartition 对数据进行重分区的特点和实现。通过对数据进行全局 Shuffle,使用 repartition 能够较好的实现数据分布的均衡性,同时也支持按列对数据进行重分布。不过因为要对全量数据进行物化,所以使用 repartition 对数据进行重分区是一个相对较重的操作,稍有不慎就会引发 OOM。
如果不追求数据分布的绝对均衡,只是希望调整一下分区的数目,则 Daft 提供了 into_partitions 这一种更加轻量的数据重分区实现。如下所示展示了 into_partitions API 的定义:
1 |
|
相对于 repartition 而言,into_partitions 在 API 层面要简单一些,只接收一个 num 参数,以允许用户设置目标分区数。Daft 在内部会判断上游 Task 的数目与目标分区数的大小关系,以决定在执行时是否需要对输入分区进行处理,如果需要处理的话是切分还是合并,具体细节我们下面展开来介绍。
切分:上游 Task 数目小于目标分区数
如果上游 Task 数目小于用户指定的目标分区数,则需要对数据进行进一步切分以得到更多的分区。这里假设上游 Task 数目为 3,用户通过调用 .into_partitions(10) 希望将数据重新组织成 10 个分区,所以需要控制上游每个 Task 产出 3~4 个分区才能达到目的。
如上图所示,Daft 内部核心也是通过编排每个上游 Task 输出的分区数来实现的,具体执行过程:
- 基于上游 Task 数目 3 和目标分区数 10 计算得到第 1 个 Task 需要产出 4 个分区,剩余 2 个 Task 需要各自产出 3 个分区,总共累计 10 个分区;
- 为每个上游 Task 绑定一个 IntoPartitions 节点以实现节点内重分区操作,比如为第 1 个 Task 绑定
IntoPartitions(4),剩余 2 个 Task 需要分别绑定IntoPartitions(3); - 将上述步骤构造好的 Task 提交执行,并按提交顺序阻塞等待第 1 个 Task 的执行完成;
- 当第 1 个 Task 执行完成后会产出 4 个分区,Daft 会为每个分区绑定 1 个 InMemoryScan 节点作为下游 Task 的输入,并继续提交下游任务。
从上述执行过程可以看出,into_partitions 并不会像 repartition 那样等待上游全部 Task 完成物化再构建并提交执行下游 Task,而是只要一个上游 Task 执行完成后即可开始构建下游 Task,所以更加流式。 这主要得益于总的分区数,以及一个目标分区数据由哪些上游 Task 产生是事先确定的,并且 into_partitions 并不像 repartition 那样追求数据分布的绝对均衡性。
在单机节点上,IntoPartitionsSink 负责将输入的数据按照指定的分区数进行分区。IntoPartitionsSink 本身是一个 Block 节点,它会先收集当前 Pipeline 所有的产出数据,并最后按照设定的分区数按行实施重新切分(例如上述示例中第一个 Task 的分区数是 4,因此会在本地切分成 4 个分区)。如果数据行数不能填充所有分区,则会构造空的分区以保证输入的分区数满足要求。
合并:上游 Task 数目大于目标分区数
如果上游 Task 数目大于用户指定的目标分区数,则需要对数据进行进一步合并以得到更少的分区。这里假设上游 Task 数目为 10,用户通过调用 .into_partitions(3) 希望将数据重新组织成 3 个分区,所以需要控制上游每 3~4 个 Task 产出 1 个分区才能达到目的。
如上图所示,Daft 核心实现也是通过对上游 Task 进行编排分组,以保证多个 Task 产出 1 个分区来实现的,具体执行过程:
- 基于上游 Task 数目 10 和目标分区数 3 计算得到第 1 个目标分区需要 4 个上游 Task 产出数据,剩余 2 个目标分区分别各自需要 3 个上游 Task 产出数据,以实现对这 10 个上游 Task 进行分组;
- 将上述步骤构造好的 Task 按分组提交执行,并按提交顺序阻塞等待第 1 个分组 Task 的执行完成;
- 为每个分组物化结果绑定 1 个
InMemoryScan + IntoPartitions(1)以实现读取并合并分区,并作为下游的输入。
上面曾提到 Daft 是以流式的方式执行 into_partitions,但我们在实际使用时体感并不是很强烈,仍然会大概率看到触发 Ray Object Store Spill,尤其是分区合并的场景。这是因为:
- 单机节点上 IntoPartitionsSink 本身是一个 Block 节点,它会阻塞等待当前节点上 Task 所有的数据处理完成才会对外输出,而分区合并场景下单个 IntoPartitionsSink 需要等待处理完成的数据相对要多很多。
- 虽然 into_partitions 并不需要等待所有上游 Task 执行完成才开始构造下游 Task,但仍然需要按顺序等待至少一个上游 Task 完成拿到物化执行结果。然而,Daft 每次会提交执行一批上游 Task,所以在 Daft 等待物化结果的期间,任何一个 Task 都可能已经执行完成并将结果存储到了 Ray Object Store,从而导致 Object Store 的使用率上涨,甚至触发 Spill。
使用 into_batches 实现数据重分批
上面介绍的 repartition 和 into_partitions 都是对数据集重新分区,也就是说我们事先已经设定好了 N 个桶,然后让 Daft 将数据按策略均匀的分发到各个桶中。本小节我们要介绍的 into_batches 则是对数据进行重新分批,区别在于我们事先并不知道要设定多少个桶,但是我们知道每个桶最多能放多少行数据,然后让 Daft 按照这个上限值将数据封装打包,至于最后需要多少个桶才能容纳所有数据,取决于我们输入的数据量,也取决于我们设置的桶的大小(即 batch_size)。如下所示展示了 into_batches API 的定义:
1 |
|
同 into_partitions 一样,into_batches 在 API 层面也非常简单,只接收一个 batch_size 参数以允许用户设置目标批次大小。Daft 在内部则会采用 Local 和 Global 两阶段执行,具体细节将在接下来展开介绍。
- 阶段一:在 Worker 节点本地进行攒批或切分
阶段一采用宽松模式(即 strict=false),在该模式下假设 batch_size=100,则只要接收到的数据总行数落在 [80, 100] 区间即可以对外输出,如果数据落在区间之外则会进行攒批或切分。例如:
1 | 输入: [30, 60, 150, 20] |
上述示例在 Worker 节点内部的处理过程如下表所示:
| 序号 | 输入 | 输出 | 缓冲 | 说明 |
|---|---|---|---|---|
| 0 | 30 | - | 30 | 小于区间范围,继续攒批 |
| 1 | 60 | 90 | - | 落在区间范围内,触发输出 |
| 2 | 150 | 100 | 50 | 大于区间范围,按照区间上界进行切分输出,并将剩余数据暂存在缓冲区中 |
| 3 | 20 | - | 70 | 小于区间范围,继续攒批 |
| 4 | - | 70 | - | 末尾批次,直接输出 |
- 阶段二:在协调节点上攒批来自所有 Worker 节点的批次
虽然阶段一已经对数据按照 batch_size 配置,在 Worker 节点本地实施攒批或切分处理,使得输出的数据批次大小大部分都落在区间范围内,但依然会有一些“尾巴数据”不能满足要求。因此,Daft 在设计上引入了阶段二的设计,让协调节点对来自所有 Worker 节点的数据进行进一步攒批。
阶段二会提交阶段一的任务并等待至少一个任务执行完成,然后对任务执行结果进行攒批处理,以实现所有发往下游 Task 的数据批次长度都至少不小于 batch_size 的 80%。从这里也可以看出,into_batches 是流式执行的,无需等待所有的数据完成物化。不过需要说明的一点是,虽然大部分场景都可以将来自 Worker 节点的批次数据直接透传,但仍然避免不了对小批次数据进行合并的情况,甚至合并结果长度会超过 batch_size 设置的值。 此时 Daft 也只是简单的将其作为下游 Task 的输入进行透传,并不会做进一步的切分。
如上图所示,我们依然以 batch_size=100 为例,假设有来自两个 Worker 节点的 Task 输出,分别为 [90, 100, 70] 和 [100, 60],这些 Task 输出的数据经过阶段一的整合已经基本满足 batch_size 要求,但仍然存在一些尾巴数据。接下来阶段二会进一步整合,策略是:1)如果数据批次长度落在 [80, 100] 区间范围内则直接向下游透传;2)如果数据批次长度小于 80 则攒批。
因此对于上述示例,长度为 90、100 的批次可以直接往下游透传,但是长度为 70 的批次则需要与后续长度为 100 的批次攒批成长度 170 的批次。虽然这个长度已经超过了 batch_size 设置的值,但考虑毕竟是少数情况,所以现阶段 Daft 也是允许的。
总结
本文我们从一个实际案例切入,深入剖析了 Daft 的数据分区和分批机制,涉及 repartition、into_partitions 和 into_batches 三个算子。最后,我们以一张表格作为结语来总结这三个算子的特性差异和选型建议,助你更加高效的使用 Daft 进行多模态数据处理。
| 对比维度 | repartition | into_partitions | into_batches |
|---|---|---|---|
| 切分类型 | 分区 | 分区 | 分批 |
| 切分策略 | 支持 Random、Hash、Range 三种策略 | 简单的分割/合并,不考虑数据大小或内容 | 按目标行数进行分割 |
| Shuffle | 全局 Shuffle(昂贵操作) | 无 Shuffle,仅分割/合并相邻分区 | 无 Shuffle,仅按行数分割 |
| 顺序保证 | 不保证顺序(会被打乱) | 保持数据顺序 | 保持数据顺序 |
| 性能开销 | 高(需全局数据重组和物化) | 中(本地分割/合并,仅物化部分 Task 即可) | 低(仅物化部分 Task 即可) |
| 数据分布 | 可按列值聚集或均匀分布 | 不改变数据分布,仅调整分区边界 | 不改变数据分布,仅按行数条数数据边界 |
| 使用限制 | 1) 仅支持 Ray Runner;2)需要全局数据重组和物化 | 1)仅支持 Ray Runner;2)数据分布均衡性不及 repartition | 1)支持 Ray Runner 和 Native Runner;2)不能严格保证批次大小,尽力而为 |
| 选型建议 | 重量级分区操作,适用于需要改变数据分区数,且对数据的分布均衡性有要求或者希望将相同 key 分布到相同节点的场景 | 轻量级分区操作,适用于简单调整分区数的场景,对数据分布情况不做要求 | 轻量级分批操作,主要用于优化内存占用和任务运行并行度 |