通过图片分类任务探寻 Daft 运行机制之 UDF 篇

在大数据时代,用户通常习惯通过 SQL 开展数据分析任务,用户编写的 SQL 语句最终会被解析、转换成计算引擎的内置算子执行。然而,在如今 DATA + AI 的时代背景下,数据处理分析任务绕不开对于模型的调用,加上算法同学通常对 SQL 不够熟悉,因此“DataFrame + UDF”的应用组合逐渐成为主流,UDF 也由大数据时代的二等公民逐渐走到台前,扮演着与内置算子相当甚至更重要的角色。

image

在此前的两篇文章《通过图片分类任务探寻 Daft 运行机制之 Swordfish 引擎篇》和《通过图片分类任务探寻 Daft 运行机制之 Flotilla 引擎篇》中,我们从一个图片分类任务示例切入,分别分析了 Daft 单机执行引擎 Swordfish 和分布式执行引擎 Flotilla 的运行机制。本文我们继续沿用这个示例,将视角进一步收窄到一个更具体的问题,即 分布式模式下,Daft UDF 究竟是如何被执行的?

图片分类示例程序

我们再来回顾一下这个图片分类任务示例程序,可以简化为如下结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
df = (
daft.read_parquet("s3://.../*.parquet", io_config=IO_CONFIG)
.with_column("image", col("bytes").decode_image(mode=ImageMode.RGB))
# 图片预处理 UDF
.with_column(
"tensor",
col("image").apply(
func=lambda img: preprocess(img),
return_dtype=daft.DataType.tensor(dtype=daft.DataType.float32()),
),
)
# 图片分类打标 UDF
.with_column("labels", ImageClassifier(col("tensor")))
.exclude("url", "image", "tensor")
)

df.write_lance("s3://...", mode="overwrite", io_config=IO_CONFIG)

其中图片分类打标 UDF 大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@daft.udf(
return_dtype=DataType.list(DataType.float32()),
num_cpus=0,
num_gpus=0.5,
concurrency=16,
batch_size=8,
)
class ImageClassifier:
def **init**(self, top_k=5):
# 昂贵初始化:加载 ResNet50 模型和 ImageNet 标签
self.model = load_resnet50()
self.labels = load_imagenet_labels()

def **call**(self, images):
# 每次处理一个 batch 的图片 tensor
return predict(self.model, images)

这个示例里其实有两个 UDF:

  • Lambda UDF col("image").apply(lambda ...):以内联的方式对数据实施按行处理。
  • Class UDF ImageClassifier:包含相对耗时的模型加载初始化操作,并且需要调度到 GPU 节点上执行。

Daft UDF 的执行模式

从用户视角来看,UDF 只是 DataFrame 表达式中的一个普通函数调用,但从执行引擎视角看,需要区分执行模式(单机 or 分布式)分而治之。在单机模式下,UDF 主要由 NativeExecutor 本地执行,而在分布式模式下,Ray Runner 会在每个 RaySwordfishActor 内部复用这套本地执行机制,并在额外引入基于 Ray Actor 的分布式执行路径。

Swordfish UDF 执行模式

单机模式下,查询计划会直接落到本地 NativeExecutor,由 Swordfish 的本地执行管线负责把 UDFProject 翻译成 UdfOperator 执行节点,再由 UdfOperator 调用用户编写的 Python 逻辑。从这个角度看,单机模式下的 UDF 执行按执行载体可以概括为两类:

执行模式 触发方式 执行位置 适用场景
线程模式 默认行为 NativeExecutor 所在进程 轻量 Python 转换、普通数据清洗、I/O 型 async UDF
进程模式 显式指定 use_process=True 参数 UdfOperator 懒启动的 daft.execution.udf_worker 子进程 进程隔离、规避 GIL、非线程安全依赖、昂贵初始化实例复用

线程模式是最轻量级的执行方式,比如图片预处理中的 Lambda UDF,如果没有额外设置 use_process=True,通常就会在当前 NativeExecutor 进程内执行:

1
2
3
4
5
6
7
df = df.with_column(
"tensor",
col("image").apply(
func=lambda img: preprocess(img),
return_dtype=daft.DataType.tensor(dtype=daft.DataType.float32()),
),
)

如果 UDF 依赖的第三方库不适合在线程环境中复用,或者长时间持有 GIL,就可以显式通过设置 use_process=True 开启进程模式:

1
2
3
4
5
6
@daft.func(return_dtype=DataType.tensor(DataType.float32()), use_process=True)
def preprocess_in_process(image):
# 假设 native_image_lib 不是线程安全的,或者会长时间持有 GIL。
import native_image_lib

return native_image_lib.preprocess(image)

在进程模式下,如果设置了 concurrency / max_concurrency 参数,则 Daft 至多会启动对应数量的进程运行该 UDF。以 ImageClassifier 为例,设置 concurrency=16 并不是一开始就立刻拉起 16 个进程,而是表示这条 UDF 在本地最多可以拥有 16 个并发执行实例。当这些实例真正处理输入时,最多会懒启动 16 个 daft.execution.udf_worker 子进程。

在实现上,Daft 会先把用户设置的并发值记录到 UDFProperties 中,并在创建 UdfOperator 节点时使用该值作为并发参数。如果用户没有显式指定,则 Daft 会根据本机可用 CPU 和资源请求推导一个默认并发度。随后,UdfOperator 会按这个并发度创建对应数量的本地执行槽位(Worker State),每个槽位持有一份 UDF 表达式,以及一个负责执行 UDF 的句柄。

Flotilla UDF 执行模式

分布式模式下,Daft 的执行边界会多一层 Ray Runner。Flotilla 会先将查询计划拆成一系列 SwordfishTask,然后把这些 task 分发给 Ray 集群中的 RaySwordfishActor。每个 RaySwordfishActor 内部仍然持有 NativeExecutor 单机执行引擎,因此单机模式中的线程模式和进程模式并没有消失,只是被包在了 RaySwordfishActor 中。

真正新增的是 Actor 模式,对于 Ray Runner 来说,非 async UDF 一旦设置了 concurrency / max_concurrency 参数,就会在分布式计划翻译阶段被识别为 Actor UDF,并启动运行一组独立的 Ray UDFActor 实例。

执行模式 触发方式 执行位置 适用场景
线程模式 默认行为 RaySwordfishActor 内部的 NativeExecutor 轻量转换、普通预处理
进程模式 设置 use_process=True 参数 RaySwordfishActor 内部额外启动的 UDF 子进程 非线程安全依赖、GIL、native library 隔离
Actor 模式 非 async UDF 设置 concurrency / max_concurrency 参数 独立的 Ray UDFActor 实例 GPU 推理、模型状态复用、跨节点资源调度

在图片分类任务示例中,预处理 UDF 没有设置并发和进程隔离,因此会随着 SwordfishTask 进入某个 RaySwordfishActor,并在该 Actor 内部的 NativeExecutor 中以线程模式完成执行。ImageClassifier 则不同,它是一个带状态的 Class UDF,并且显式设置了 concurrency=16batch_size=8 参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@daft.udf(
return_dtype=DataType.list(DataType.float32()),
num_cpus=0,
num_gpus=0.5,
concurrency=16,
batch_size=8,
)
class ImageClassifier:
def **init**(self):
# 模型只在每个 UDFActor 初始化时加载一次。
self.model = load_resnet50()

def **call**(self, images):
return predict(self.model, images)

在 Ray Runner 中,设置 concurrency=16 参数会触发以 Actor 模式执行该 UDF。Daft 会创建 16 个 Ray UDFActor 实例,并让这些长期运行的 Actor 复用 ImageClassifier 的模型状态,而参数 batch_size=8 则决定每次发送给 UDFActor 的输入批次大小,用于平衡吞吐、延迟和内存占用。

关于 async UDF 的限制说明

截止目前,Ray Runner 下的 Actor 模式只适用于非 async UDF。对于 async UDF,max_concurrency 的语义仍然是限制协程并发数,而不是创建 Ray UDFActor 实例。因此,如果 UDF 存在昂贵的初始化操作,例如加载模型或常驻 GPU 资源,通常应该使用同步的 Class UDF,并通过设置 concurrency / max_concurrency 参数控制 Actor 数量,而不是把执行方法声明为 async

Daft UDF 的实现机制

前面我们已经从使用者视角区分了单机执行和分布式执行模式下 UDF 都有哪些执行模式,本小节我们继续来分析这些模式在引擎内部是如何落地的?这一节仍然先从单机模式说起,因为分布式模式中的线程模式和进程模式本质上也是复用这套本地执行机制。

Swordfish UDF 实现机制

单机模式下,查询计划会直接交给 NativeExecutor 执行。图片分类任务示例程序中的的 with_column(...apply(...))ImageClassifier(...) 算子最终都会先被表达成 PhysicalPlan 中的 UDF 节点,然后再由 NativeExecutor 翻译成本地执行管线里的算子。

如下所示,最底部的 ScanTaskSource 负责读取图片字节和文件名;紧接着的 Project 负责执行内置的图片解码;随后出现第一个 UDF 节点,也就是图片预处理中的 Lambda UDF;再往上是一个 Project,用来整理中间列和保留原始列;最上面的 UDF 节点则对应 ImageClassifier,负责把对图片进行分类打标。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
* UDF **main**.ImageClassifier:
| Expr = py_udf(col(0: **TruncateRootUDF_0-2-0**)) as labels
| Passthrough Columns = [col(1: bytes), col(2: name)]
| Properties = { batch_size = 8, concurrency = 16, async = false, scalar = false }
| Resource request = { num_cpus = 0, num_gpus = 0.5 }
| Stats = { Approx num rows = 801,395,720, Approx size bytes = 30.04 GiB, Accumulated selectivity = 1.00 }
| Batch Size = 8
|
* Project: col(2: **TruncateRootUDF_0-2-0**), col(0: bytes), col(1: name)
| Stats = { Approx num rows = 801,395,720, Approx size bytes = 30.04 GiB, Accumulated selectivity = 1.00 }
| Batch Size = Range(0, 8]
|
* UDF **main**.<lambda>:
| Expr = py_udf(col(0: **TruncateRootUDF_1-2-0**)) as **TruncateRootUDF_0-2-0**
| Passthrough Columns = [col(1: bytes), col(2: name)]
| Properties = { concurrency = 12, async = false, scalar = false }
| Resource request = None
| Stats = { Approx num rows = 801,395,720, Approx size bytes = 30.04 GiB, Accumulated selectivity = 1.00 }
| Batch Size = Range(0, 8]
|
* Project: image_decode(col(0: bytes), lit("raise"), lit(PyObject(RGB))) as **TruncateRootUDF_1-2-0**, col(0: bytes), col(1: name)
| Stats = { Approx num rows = 801,395,720, Approx size bytes = 30.04 GiB, Accumulated selectivity = 1.00 }
| Batch Size = Range(0, 8]
|
* ScanTaskSource:
| Num Scan Tasks = 111
| Estimated Scan Bytes = 32256179872
| Num Parallel Scan Tasks = 8
| Schema: {bytes#Binary, name#Utf8}
| Scan Tasks: [ ... ]
| Stats = { Approx num rows = 801,395,720, Approx size bytes = 30.04 GiB, Accumulated selectivity = 1.00 }
| Batch Size = Range(0, 8]

这段计划里有几个信息很关键。

  1. 两个 UDF 节点都带有 Expr 字段,它描述了真正要执行的 Python 表达式。例如 Lambda UDF 的输入是解码后的图片列,输出是一个临时 tensor 列;ImageClassifier 的输入则是这个 tensor 列,输出是最终的 labels 列。计划中的 __TruncateRootUDF_... 是 Daft 生成的中间列名,用来把上下游表达式串起来。
  2. UDF 节点只负责产生自己的输出列,并不会重新计算整张表。Passthrough Columns 表示哪些列不需要传给 Python UDF,但需要在 UDF 执行完成后继续保留下来。比如 ImageClassifier 只需要 tensor 作为输入,但 bytesname 仍然要跟着 labels 一起传到下游。
  3. Properties 和 Batch Size 会影响 UdfOperator 的本地执行方式。ImageClassifier 的 batch_size=8 会让输入按更适合模型推理的批次进入 UDF;concurrency=16 则会成为本地 UDF 算子的并发度上限。Lambda UDF 中出现的 concurrency=12 则可以理解为本地执行器根据资源推导出的并发度。

围绕这份 PhysicalPlan,NativeExecutor 要做的事情就很明确了:每遇到一个 UDF 节点,就在本地执行管线中创建一个 UdfOperator 节点。这里的 UdfOperator 可以理解为 Swordfish 专门用来执行 Python UDF 的本地算子,它从上游接收 MicroPartition,裁剪出 UDF 需要的输入列,调用 UDF 执行计算逻辑得到输出列,再把输出列和 Passthrough Columns 拼回结果中。

对于计划里的每一个 UDF 节点,创建 UdfOperator 时都要先确定两个关键属性,即 并发度执行句柄。并发度来自 UDF Properties 或运行时资源推导,决定本地最多有多少个 UDF 执行槽位;执行句柄则决定每个槽位是在当前进程内执行,还是通过独立 Python 子进程执行。源码逻辑可以简化为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
let concurrency = udf_properties
.concurrency
.map(|c| c.get())
.unwrap_or(max_concurrency_from_cpu_and_resource_request);

let use_process = (udf_properties.is_actor_pool_udf()
|| udf_properties.use_process.unwrap_or(false))
&& is_arrow_dtype;

let udf_handle = if use_process {
UdfHandle::Process(None) // 首次执行时懒启动 Python UDF 子进程
} else {
UdfHandle::Thread
};

接下来分别看 Thread 和 Process 两条路径。它们虽然执行载体不同,但都服务于上面 PhysicalPlan 中的同一种 UDF 节点:上游传来的 MicroPartition 会先被拆成 RecordBatch;每个 batch 再按 Expr 实际依赖的列做裁剪,只把 Required Columns 传给 UDF;UDF 返回单列结果后,UdfOperator 再把这列结果和计划中的 Passthrough Columns 拼回新的 MicroPartition。

1
2
3
4
5
6
7
8
9
10
11
12
13
for batch in input.record_batches() {
// 只取 UDF 真正需要的输入列,避免把整张表都传给 Python
let func_input = batch.get_columns(required_cols.as_slice());

let result = match udf_handle {
UdfHandle::Thread => eval_input_inline(func_input),
UdfHandle::Process(_) => eval_input_with_handle(func_input),
};

// UDF 只负责产出新列,其余列由 passthrough columns 拼回
let passthrough = batch.eval_expression_list(passthrough_columns.as_slice())?;
output.push(passthrough.append_column(output_schema, result)?);
}

在线程模式下,执行链路可以拆成几步来看:

  1. 本地执行器 NativeExecutor 启动执行管线时,会根据 UDF 算子 UdfOperator 的并发度创建多个执行槽位。
  2. 上游输入被切成一个个数据批次后,会由本地执行管线提交给计算运行时调度执行。
  3. 某个执行槽位拿到输入数据后,会在当前进程内调用进程内执行方法(eval_input_inline)。这个方法会先执行 UDF 初始化逻辑(_initialize_udfs()),完成 Class UDF 实例化或函数包装初始化。
  4. 初始化完成后,Daft 会应用 Expression 求值逻辑(RecordBatch.eval_expression_with_metrics)执行 UDF 表达式。

也就是说,示例程序中的 preprocess(img) 函数并不是被 UdfOperator 直接裸调,而是先被包进 Expression 系统中,再由记录批次的 Expression 求值逻辑逐行或按批调用。

线程模式的容错边界也在当前进程内。用户 UDF 抛出的异常会沿着表达式求值链路向上传播;如果 UDF 配置了重试次数(max_retries),调用层会先按重试策略重新执行;如果最终仍失败,则根据错误处理策略(on_error)决定是抛出异常,还是记录日志并返回空值。由于没有进程隔离,底层 native library 的崩溃可能会影响当前 worker 进程,这也是进程模式存在的原因。

在进程模式下,执行链路会多一层父子进程协议:

  1. 每个执行槽位第一次使用进程句柄(UdfHandle::Process(None))时,才会创建 Python 侧的 UDF 子进程控制句柄 UdfHandle。
  2. 这个控制句柄会通过 subprocess.Popen 执行 python -m daft.execution.udf_worker 命令,启动 UDF 子进程。
  3. 父进程和子进程之间会建立带认证的本地 socket 连接,用来传递控制消息和共享内存元信息。
  4. 连接建立后,父进程会把序列化后的 UDF 表达式发送给子进程。
  5. 子进程收到表达式后并不会立刻执行用户代码,而是在第一次收到输入数据批次时再执行 UDF 初始化逻辑,完成用户 UDF 的初始化。

进程模式下的数据传输也发生在这条父子进程协议里:

  1. 父进程先把裁剪后的 UDF 输入列序列化成 IPC 字节流,并写入共享内存。
  2. 父进程只通过 socket 把共享内存的名称和大小发送给子进程。
  3. 子进程读取共享内存恢复出记录批次,然后调用表达式求值逻辑执行 UDF。
  4. 执行完成后,子进程把输出记录批次再次写入共享内存,并把位置返回给父进程。

这种设计避免了把大块数据直接塞进 socket,更适合批量数据传输。

进程模式的容错分两层。用户代码抛出的 UDF 异常会被子进程捕获,并把错误消息、调用栈,以及可序列化的原始异常传回父进程,父进程再重新抛出,尽量保留用户侧调用栈。如果子进程异常退出、连接断开,或者返回了非预期响应,父进程会把它视为运行时错误。同时,父进程会持续读取子进程标准输出,避免用户 UDF 打印过多日志导致管道阻塞。执行槽位销毁时,控制句柄还会向子进程发送退出信号并等待回收,超时则终止子进程。

Flotilla UDF 实现机制

分布式模式下,Daft 并不是把每一次 UDF 调用都直接提交成 Ray Task,而是先由 Flotilla 将查询计划拆成一系列 SwordfishTask,再把这些任务调度到 Ray 集群中的 RaySwordfishActor 节点。RaySwordfishActor 可以理解为运行在 Ray Worker 节点上的 Swordfish 执行容器;每个 RaySwordfishActor 内部都会持有一个 NativeExecutor,用来执行被分配到当前 Actor 的本地 PhysicalPlan。

因此,分布式实现机制可以分成两层来看:外层是 Flotilla 和 Ray 负责的任务调度层,决定哪个 SwordfishTask 交给哪个 RaySwordfishActor;内层是 NativeExecutor 负责的本地执行层,决定 UDF 最终由本地线程、本地子进程,还是远端 UDFActor 来执行。

示例程序在分布式模式下生成的 PhysicalPlan 如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
* ActorUDF: **main**.ImageClassifier
| Projection = [col(1: bytes), col(2: name), py_udf(col(0: **TruncateRootUDF_0-2-0**)) as labels]
| Properties = { batch_size = 8, concurrency = 16, async = false, scalar = false }
| Resource request = { num_cpus = 0, num_gpus = 0.5 }
|
* Project: col(2: **TruncateRootUDF_0-2-0**), col(0: bytes), col(1: name)
| Resource request = None
|
* UDF **main**.<lambda>:
| Expr = py_udf(col(0: **TruncateRootUDF_1-2-0**)) as **TruncateRootUDF_0-2-0**
| Passthrough Columns = [col(1: bytes), col(2: name)]
| Properties = { async = false, scalar = false }
| Resource request = None
|
* Project: image_decode(col(0: bytes), lit("raise"), lit(PyObject(RGB))) as **TruncateRootUDF_1-2-0**, col(0: bytes), col(1: name)
| Resource request = None
|
* ScanTaskSource:
| Num Scan Tasks = 111
| Estimated Scan Bytes = 32256179872
| Schema: {bytes#Binary, name#Utf8}
| Scan Tasks: [ ... ]

这份计划里有两个和 UDF 相关的节点:预处理逻辑仍然是普通 UDF;ImageClassifier 则被提升成 ActorUDF。二者的差异不是发生在用户函数调用那一刻,而是发生在分布式计划翻译阶段。Daft 会先判断 UDF 是否需要 Actor Pool,是则生成 ActorUDF,否则继续生成普通 UDFNode。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
impl UDFProperties {
pub fn is_actor_pool_udf(&self) -> bool {
// 只有设置了 concurrency,且不是 async UDF,才进入 Actor 模式。
self.concurrency.is_some() && !self.is_async
}
}

match logical_plan {
LogicalPlan::UDFProject(udf) if udf.is_actor_pool_udf() => {
ActorUDF::new(...)
}
LogicalPlan::UDFProject(udf) => {
UDFNode::new(...)
}
}

这个分流决定了后续的执行形态:普通 UDFNode 会随着 SwordfishTask 一起进入 RaySwordfishActor,并在其中复用前面章节分析过的 UdfOperator;ActorUDF 则会先启动一组 Ray UDFActor 实例,然后把一个 DistributedActorPoolProject 算子追加到每个 SwordfishTask 的本地计划中。

对于普通 UDFNode 来说,分布式模式只是给单机执行机制套了一层 Ray 调度外壳:Flotilla 负责切分 SwordfishTask,Ray 负责把任务送到 RaySwordfishActor,而真正执行 UDF 的仍然是 RaySwordfishActor 内部的 NativeExecutor。进入 NativeExecutor 之后,线程模式继续选择 UdfHandle::Thread,进程模式继续选择 UdfHandle::Process,后续的列裁剪、UDF 初始化、表达式求值、输出列与 Passthrough Columns 拼接,都和单机实现基本一致。

因此,线程模式和进程模式在当前章节我们不再过多撰述,它们的差异主要体现在 RaySwordfishActor 内部使用当前进程还是本地 UDF 子进程执行用户代码。真正值得展开的,是 Actor 模式这条 Ray Runner 新增的链路。

Actor 模式的实现机制

作为分布式模式下独有的执行模式,Actor 模式不再让 UDF 留在 RaySwordfishActor 内部直接执行,而是把带状态的 UDF 提升为一组长期运行的 Ray UDFActor 实例,再让 SwordfishTask 中的 DistributedActorPoolProject 算子负责把输入数据发送给这些 Actor。

这条路径和普通 UDFNode 最大的区别有两个:一是 PhysicalPlan 中的节点从普通 UDF 变成 ActorUDF;二是每个下游 SwordfishTask 的本地计划里会被追加一个 DistributedActorPoolProject,用来把本地数据流接到远端 UDFActor Pool。其中 DistributedActorPoolProject 可以看做是请求发送端,而 UDFActor 可以看作是请求响应端。

Actor 模式的启动链路可以拆成几步来看:

  1. 计划翻译阶段,ImageClassifier 因为设置了 concurrency=16 且不是 async UDF,会被翻译成 ActorUDF。
  2. ActorUDF 开始消费上游任务流时,会根据 UDFProperties 创建一组 Ray UDFActor 实例,数量由 concurrency 参数决定。
  3. 每个 UDFActor 初始化时,会对传入的 UDF 表达式执行 UDF 初始化逻辑,对于 ImageClassifier 来说,模型加载就发生在这里。
  4. UDFActor 全部就绪后,ActorUDF 会把这些 Actor handle 追加到后续 SwordfishTask 的本地计划中,并生成 DistributedActorPoolProject 算子。
  5. SwordfishTask 被调度到 RaySwordfishActor 后,NativeExecutor 会执行 DistributedActorPoolProject,由它负责选择 UDFActor、裁剪输入列、远程调用 Actor,并拼回输出列。

Ray UDFActor 的核心逻辑可以简化为:

1
2
3
4
5
6
7
8
9
10
11
12
13
@ray.remote(max_restarts=4, max_task_retries=4)
class UDFActor:
def **init**(self, uninitialized_projection):
# 初始化 UDF 表达式;ImageClassifier 的模型会在这里加载
self.projection = ExpressionsProjection([
expr._initialize_udfs()
for expr in uninitialized_projection
])

def eval_input(self, input):
# 后续输入批次会复用同一个 actor 内部的 UDF 状态
mp = MicroPartition._from_pymicropartition(input)
return mp.eval_expression_list(self.projection)._micropartition

不过,启动一组 UDFActor 还只是第一步。对于每个 RaySwordfishActor 内部的 DistributedActorPoolProject 来说,它拿到的是同一组 Actor handle,真正发送请求前还要先做一次本地性筛选。Python 侧会读取当前 Ray 进程所在节点 ID,再查询每个 UDFActor 的节点 ID,并把它们分成 localremote 两类:

1
2
3
4
5
6
7
8
current_node_id = ray.get_runtime_context().get_node_id()

for actor_handle in actor_handles:
actor_state = actors(actor_handle.actor_id())
if actor_state["Address"]["NodeID"] == current_node_id:
local_actors.append(actor_handle)
else:
remote_actors.append(actor_handle)

随后 Rust 侧的 DistributedActorPoolProjectOperator 会优先选择 local Actor,只有当前 RaySwordfishActor 所在节点上没有可用 UDFActor 时,才会回退到 remote Actor 列表

1
2
3
4
5
6
7
let (local_actor_handles, remote_actor_handles) =
ActorHandle::get_actors_on_current_node(actor_handles)?;

let actor_handles = match local_actor_handles.len() {
0 => remote_actor_handles,
_ => local_actor_handles,
};

这个设计的核心原因主要是考虑数据本地性。DistributedActorPoolProject 发送给 UDFActor 的不是一个轻量控制消息,而是裁剪后的 MicroPartition。假如当前 RaySwordfishActor 所在节点刚好有 UDFActor,那么优先调用本地 Actor 可以减少跨节点数据传输,也能降低远程调用延迟。反过来,如果当前节点没有本地 UDFActor,Daft 也不会让任务卡住,而是回退到远端 Actor,保证执行能够继续推进。

举个例子,假设 Ray 集群里有三个节点,UDFActor 分布如下:

当前 RaySwordfishActor 所在节点 全部 UDFActor Daft 实际候选列表
node-a A1、A2 在 node-a;B1 在 node-b;C1 在 node-c A1、A2
node-b A1、A2 在 node-a;B1 在 node-b;C1 在 node-c B1
node-d A1、A2 在 node-a;B1 在 node-b;C1 在 node-c A1、A2、B1、C1

也就是说,只要本节点存在 local Actor,Daft 就不会把请求发给 remote Actor;只有没有 local Actor 时,才会在全部 remote Actor 中选择。

确定候选 Actor 列表之后,Daft 还要决定具体使用哪个 Actor。这里并不是每次都从第一个 Actor 开始,而是先随机生成一个初始偏移量,然后在创建 DistributedActorPoolProject 的执行状态时按轮询方式取模选择:

1
2
3
4
5
let init_counter = rand::rng().random_range(0..actor_handles.len());

let next_actor_handle_idx =
self.counter.fetch_add(1, Ordering::SeqCst) % self.actor_handles.len();
let next_actor_handle = &self.actor_handles[next_actor_handle_idx];

这里的随机起点是为了避免所有 RaySwordfishActor 都从候选列表的第一个 Actor 开始打请求,造成第一个 Actor 短时间内成为热点;而后续的取模轮询则保证同一个 DistributedActorPoolProject 内部的多个执行状态能较均匀地分摊到候选 Actor 上。

继续用上面的例子说明。如果当前节点是 node-a,候选列表是 [A1, A2],这表示当前节点只有两个候选 UDFActor。假设接下来创建 4 个执行状态:

  • 如果随机起点是 0,这 4 个执行状态会依次绑定到 A1、A2、A1、A2。
  • 如果随机起点是 1,这 4 个执行状态会依次绑定到 A2、A1、A2、A1。

这里重复出现的 A1 和 A2 并不表示又创建了新的 UDFActor,而是表示多个执行状态在同一组候选 UDFActor 上轮询绑定。

如果当前节点是 node-d,没有 local Actor,候选列表退化为 [A1, A2, B1, C1]。假设随机起点是 2,并且接下来创建 6 个执行状态,那么它们会依次绑定到 B1、C1、A1、A2、B1、C1。这样虽然发生了跨节点调用,但请求仍然会在可用 Actor 之间轮询展开。

需要注意的是,选择 Actor 的动作发生在算子执行状态创建阶段,而不是每处理一行数据都重新选择一次。DistributedActorPoolProject 的并发度被设置为候选 Actor 数量的 2 倍,也就是每个 Actor 最多可以对应两个本地提交执行状态,用来让输入批次在 UDFActor 执行期间仍然能够提前排队。

从调度策略上看,这里仍然有优化空间

当前实现是 local 优先 + 随机起点轮询,并不会感知不同 DistributedActorPoolProject 之间的全局请求分布,也不会根据 UDFActor 当前的 in-flight 请求数或队列长度做动态选择。如果每个 RaySwordfishActor 只提交很少的请求,且多个执行端随机起点刚好相同,请求仍可能短时间集中到同一个或某几个 UDFActor 上;当 UDF 本身执行耗时较长时,这种倾斜会更明显。

DistributedActorPoolProject 是 Actor 模式连接本地数据流和远端 UDFActor Pool 的关键算子。它的执行过程和 UdfOperator 有相似之处:同样要裁剪输入列,同样要求 UDF 返回单列结果,也同样要把输出列和 Passthrough Columns 拼回去。不同之处在于,真正执行 UDF 的地方已经从当前进程变成了 Ray UDFActor 实例。具体到每个执行状态处理输入时,过程大致如下:

  1. 执行状态已经绑定到某个 UDFActor。
  2. 输入 MicroPartition 被裁剪,只保留 ImageClassifier 依赖的 tensor 列(该特性由本文作者贡献 #5884)。
  3. 裁剪后的输入通过 actor.eval_input(...) 发送给 UDFActor。
  4. UDFActor 使用已经初始化好的模型状态执行表达式,并返回单列输出。
  5. DistributedActorPoolProject 检查输出列数和行数是否与输入匹配。
  6. 检查通过后,再把 labels 输出列和 Passthrough Columns 拼回新的 MicroPartition。

这里的 concurrency=16 控制的是 UDFActor 的数量,也就是模型服务实例的数量;batch_size=8 控制的是每次送入 UDFActor 的输入批次大小;num_cpusnum_gpusray_options 则会转化为 Ray Actor 的资源请求,影响这些 UDFActor 实例被调度运行在哪些节点上。

Actor 模式的容错边界也比线程模式和进程模式更靠外:

  • 启动阶段,ActorUDF 会超时等待 UDFActor 准备就绪。需要说明的是,这里并不要求所有 UDFActor 都准备就绪,只要至少一个 UDFActor 在超时前就绪,Daft 就会拿这部分已就绪的 Actor 继续执行。当所有 UDFActor 在既定时间内都未完成准备时,任务会失败并提示调大 actor_udf_ready_timeout 等待超时参数。
  • 运行阶段,UDFActor 本身依赖 Ray Actor 的重启和任务重试配置。当所有下游 SwordfishTask 完成后,ActorUDF 会统一回收这些 UDFActor 实例。

回到图片分类任务,整条链路就变得很清楚了:图片解码和预处理继续贴着 RaySwordfishActor 内部的数据流执行;图片分类阶段则被拆出去,由一组复用模型状态的 UDFActor 承接。最终,DistributedActorPoolProject 把 tensor 列送到 UDFActor 执行计算,并返回 labels 打标列,再把它和原始的 bytesname 等 Passthrough Columns 拼回同一条数据流中。

结语

本文从内部视角梳理了 Daft UDF 在单机和分布式执行引擎中的运行机制。可以看到,Daft 并没有把 UDF 简单视为表达式系统外侧的一段 Python 回调,而是将其纳入 PhysicalPlan、执行算子、资源配置和调度策略之中,作为与内置算子同等重要的一等公民进行实现。

在单机模式下,NativeExecutor 通过 UdfOperator 统一承接线程模式和进程模式:前者强调低开销的进程内执行,后者强调通过子进程提供更清晰的隔离边界。在分布式模式下,Ray Runner 仍然复用这套本地执行链路,并进一步通过 Actor 模式让重状态 UDF 能够借助 Ray Actor 能力长期存活、跨任务复用。

当然,UDF 运行机制背后的细节远不止本文展开的这些,更多细节还需要读者亲自去阅读源码。本文更多是尝试把执行模式、数据裁剪、远程调用和容错边界串成一条主线,帮助读者从整体上理解 Daft 如何实现并执行用户编写的 UDF 逻辑。

参考