论文阅读《Presto: SQL on Everything》

论文《Presto: SQL on Everything》介绍的是 Facebook 在生产环境中长期使用并开源的分布式 SQL 查询引擎 Presto,该引擎允许用户用统一的 SQL 查询 Hadoop 数据仓库、RDBMS、NoSQL、流系统,以及各种内部服务。论文中的 Presto 同时支撑交互式分析、Batch ETL、A/B 实验、外部报表等场景,这些负载对延迟、吞吐、并发和资源隔离的要求差异很大,因此 Presto 的核心设计可以概括为:以 Connector 连接异构数据源,以流水线执行降低查询延迟,以自适应调度和资源管理支撑多租户负载。

整体架构

Presto 是一个典型的 MPP 查询引擎。所谓 MPP,可以理解为把一次 SQL 查询拆成多个可并行执行的计算单元,分发到多台机器上同时执行,最后再汇总结果。

一个 Presto 集群主要由两类节点组成:

  • Coordinator:负责接收 SQL、排队、解析、语义分析、逻辑计划生成、查询优化、分布式计划生成和任务调度。
  • Worker:负责执行具体计算任务,包括读取外部数据、执行算子、处理 Shuffle 数据、维护中间状态和返回结果。

一次查询的大致执行链路如下:

1
2
3
4
5
6
7
8
Client
-> Coordinator 接收 SQL
-> Parser / Analyzer 解析与语义分析
-> Planner 生成逻辑计划
-> Optimizer 改写和优化计划
-> Scheduler 拆分 Stage / Task / Split
-> Worker 执行 Pipeline / Operator
-> Client 拉取最终结果

这条链路中有几个核心概念:

  • Stage:分布式执行计划中的一个阶段,Stage 之间通常通过 Shuffle 交换数据。
  • Task:Stage 在某个 Worker 上的执行实例,同一个 Stage 会分发成多个 Task 并行执行。
  • Split:外部数据源中可被处理的一小块数据,例如 HDFS 文件的某个 offset 范围、MySQL 分片中的一段数据。
  • Pipeline:Task 内部的一串算子链,数据在 Pipeline 中尽量以流式方式向下游传递。
  • Operator:具体的执行算子,例如 Scan、Filter、Project、HashJoin、Aggregation。
  • Page / Block:Presto 内部的列式数据表示,其中 Page 表示一批行,Block 表示 Page 中的一列。

这些概念可以按“从大到小”的层级来理解。一次 Query 不是直接被丢给某台机器执行,而是先被拆成多个 Stage;Stage 描述的是一段可以独立并行执行的计划片段,Stage 之间通过 Shuffle 传递数据。每个 Stage 会进一步拆成多个 Task,同一批 Task 执行相同的计算逻辑,只是处理的数据不同。对于需要读取外部数据的 Task,输入数据又会被拆成很多 Split,每个 Split 是一个可以独立调度的数据切片。Task 拿到 Split 后会在 Worker 本地把数据转成 Page,并让 Page 依次流过 Scan、Filter、Join、Aggregation 等 Operator。

可以把这个过程类比成工厂流水线:Stage 像不同车间,例如“读取原料”、“组装零件”、“最终质检”;Task 像同一个车间里的多条并行产线;Split 像分配给每条产线的一箱箱原料;Operator 像产线上的工序;Page 则是工序之间传递的一批半成品。这样理解之后,Presto 的并行性就很清楚了:跨机器的并行主要来自 Stage 被拆成多个 Task,读数据的并行主要来自 Split 的细粒度划分,而单机内部的执行效率则依赖 Pipeline 和 Operator 之间持续流动的 Page。

举个例子,假设要扫描一张按天分区的 orders 表。Coordinator 会先生成一个负责扫描的 Stage;如果集群里有 100 个 Worker,这个 Stage 可能会被拆成 100 个 Task 分发出去;Connector 会把 dt = '2021-01-12' 这个分区下的文件切成很多 Split,例如每个 Split 对应一个文件片段;Worker 上的 Task 拿到 Split 后,通过 Scan Operator 读出 Page,再交给 Filter、Project、Join、Aggregation 等后续 Operator 处理。如果后续聚合需要按 orderkey 重新分布数据,当前 Stage 的输出就会通过 Shuffle 发送到下一个 Stage。这样看,Stage 是分布式执行的边界,Task 是单机执行的容器,Split 是并行读取的最小调度单位,Page 则是算子之间真正流动的数据批次。

可以用一个简单 SQL 来串起这些概念:

1
2
3
4
5
6
7
8
SELECT
orderkey,
sum(tax) AS total_tax
FROM orders
JOIN lineitem
ON orders.orderkey = lineitem.orderkey
WHERE discount = 0
GROUP BY orderkey;

如下图所示,Coordinator 会先把 SQL 解析成逻辑计划:扫描 orderslineitem,对 lineitem 做过滤,再 Join,最后聚合。随后优化器会把它改写成分布式计划,例如把扫描、Join、聚合拆到不同 Stage 中,Stage 内部再拆成多个 Task 分发给 Worker。Worker 读取 Split 后,以 Page 为单位在 Operator 之间传递数据。

1
2
3
4
5
Aggregate [SUM(tax)]
└── LeftJoin [ON orderkey]
├── Scan [orders]
└── Filter [discount = 0]
└── Scan [lineitem]

系统设计

Presto 的设计目标并不是单纯追求某个 Benchmark 上的极致性能,而是在多数据源、多租户、多查询形态下尽量保持低延迟和高资源利用率。

本章节,我们结合论文深入介绍一下 Presto 的系统设计。从 Coordinator 视角看,一条 SQL 会经历几个阶段:

  • Parsing:用 ANTLR 解析 SQL,生成语法树。
  • Analysis:解析表、列、函数、类型、作用域、子查询、聚合、窗口函数等语义信息。
  • Logical Planning:把语法树转换成由 PlanNode 组成的逻辑计划树。
  • Optimization:应用规则和代价模型,改写逻辑计划,生成更适合执行的计划。
  • Distributed Planning:把计划切分成多个 Stage,并在 Stage 之间插入数据交换。
  • Scheduling:把 Stage 分发为 Task,把 Split 分配给 Task,并选择 Worker 执行。
  • Execution:Worker 以 Pipeline 和 Operator 的方式执行任务,并通过 Shuffle 传递中间结果。

可以把逻辑计划理解成“做什么”,把分布式计划理解成“在哪些机器上、以什么并行方式做”。

分布式执行

Presto 的执行是尽量流水线化的。上游 Worker 一旦产生数据,就可以通过 Shuffle 发送给下游 Worker,而不是等整个 Stage 全部结束后再落盘。这种设计和 MapReduce 风格的阶段式执行不同:

  • MapReduce 更像“阶段 A 全部写盘,阶段 B 再读盘”。
  • Presto 更像“上游边生产,下游边消费”。

流水线执行的好处是端到端延迟低,特别适合交互式查询。例如一个带 LIMIT 的查询,可能不需要扫描完整数据集就能返回结果:

1
2
3
4
SELECT *
FROM hive.logs.access_log
WHERE dt = '2021-01-12'
LIMIT 100;

如果系统必须先枚举所有文件、扫描所有 Split,再返回结果,用户体验会很差。Presto 通过 Lazy Split 枚举和流水线执行,让查询尽快启动,并在满足 LIMIT 后尽早结束。

Stage 调度

Presto 支持两类 Stage 调度策略:

  • All-at-once:尽可能同时启动所有 Stage,数据一产生就向下游流动,适合低延迟查询。
  • Phased:按阶段启动,先完成部分依赖再启动后续 Stage,适合控制内存占用。

All-at-once 的优势是响应快,但可能占用更多内存和线程资源;Phased 的优势是更容易控制资源,尤其是在大规模 Hash Join 中,可以先构建 Hash Table,再启动 Probe 侧输入,降低同时运行的状态量。

Task 与 Split 调度

Presto 把 Stage 分成多个 Task,并把外部数据源划分成多个 Split。对于读取外部数据的 Leaf Stage,调度器需要考虑数据位置、网络拓扑和 Connector 约束。

例如在共享存储架构下,所有 Worker 都能读取远端数据,Presto 通常倾向于让更多节点参与扫描,以提高并行度;在 shared-nothing 架构下,数据和计算节点绑定更强,调度器则应该优先把 Task 放到数据所在节点。

Split 调度有一个很重要的设计:Presto 不会一次性枚举并分配所有 Split,而是按小批次懒加载。这样做有几个好处:

  • 查询可以更快开始,不必等待 Hive Connector 枚举海量分区和文件。
  • LIMIT 查询或被用户提前取消的查询,可以避免不必要的 Split 枚举。
  • Worker 只维护较短的 Split 队列,更容易适应不同 Split 处理成本的差异。
  • Coordinator 不需要一次性持有所有 Split 元数据,降低内存压力。

本地执行模型

在 Worker 内部,Presto 以 Driver Loop 推动 Operator 执行。每个 Split 被分配给线程后,会以 Page 为单位在 Operator 之间流动。

Page 是 Presto 内部的列式批数据结构:

1
2
3
4
Page
Block(orderkey)
Block(custkey)
Block(totalprice)

列式结构非常适合分析型查询,因为查询通常只访问部分列。Block 还可以是不同编码形式,例如普通数组、字典编码、Run-Length Encoding。这样 Presto 可以在保持统一执行接口的同时,利用底层列式文件和压缩格式的优势。

Shuffle 与 Backpressure

Stage 之间的数据交换通过 Shuffle 完成。Presto 使用基于 HTTP long-polling 的内存 Shuffle:上游 Task 把中间结果放入输出缓冲区,下游 Task 主动拉取数据。这种设计有两个特点:

  • 低延迟:中间结果不必先持久化到磁盘,下游可以尽快消费。
  • 可反压:如果下游消费慢,上游输出缓冲区会变满,系统会降低相关 Split 的运行并发。

Backpressure 对多租户系统非常关键。假设一个 BI 工具客户端消费结果很慢,如果没有反压机制,上游 Worker 可能持续生产数据,把大量内存占在输出缓冲区里,影响其他查询。Presto 通过监控输入、输出缓冲区利用率,动态调整请求并发和 Split 执行并发,使数据流动速度和消费能力保持匹配。

Connector 体系

Presto 能做到 “SQL on Everything”,关键在于 Connector。Connector 是 Presto 和外部数据源之间的适配层,它把不同系统抽象成统一的表、列、Split 和 Page。

论文中将 Connector API 拆成几类能力:

  • Metadata API:提供库、表、列、类型、统计信息等元数据。
  • Data Location API:提供数据位置、Split 划分、分区、排序、分桶、索引等物理布局信息。
  • Data Source API:Worker 根据 Split 读取外部数据,并转换成 Presto 内部 Page。
  • Data Sink API:支持 ETL 任务把查询结果写回外部系统。

这个抽象不只是为了能读数据,更重要的是让外部数据源把自己的物理特性告诉 Presto 优化器。例如 Hive Connector 可以暴露分区、文件格式、列统计信息;MySQL 分片 Connector 可以暴露索引和分片信息;Raptor 这类面向 Presto 优化的存储则可以暴露更适合本地计算的数据布局。

假设有如下查询:

1
2
3
4
5
SELECT user_id, count(*)
FROM hive.logs.events
WHERE dt = '2021-01-12'
AND event_type = 'purchase'
GROUP BY user_id;

如果 Connector 告诉 Presto 表按 dt 分区,Presto 就可以只读取目标日期分区。如果底层文件是 ORC 或 Parquet,并且文件元数据中记录了列级统计信息,Presto 还能进一步跳过不可能匹配 event_type = 'purchase' 的数据块。也就是说,Connector 暴露的信息越丰富,Presto 的优化空间就越大。

资源管理

Presto 的资源管理主要围绕 CPU 和内存展开。

CPU 方面,Presto 使用协作式多任务模型。每个 Split 只能运行一个较短的时间片,然后让出线程。调度器根据 Task 累计 CPU 时间,将任务放入多级反馈队列。刚进入系统、CPU 消耗少的查询会获得更高优先级,已经消耗大量 CPU 的长查询会逐渐降低优先级。

这种策略对交互式查询很友好。一个很小的查询不应该长时间排在大 ETL 后面;而大查询虽然优先级会下降,但仍然可以持续推进。

内存方面,Presto 将内存分为两类:

  • User memory:用户相对能理解的内存,例如聚合哈希表、Join 哈希表。
  • System memory:系统实现产生的内存,例如 Shuffle buffer。

系统会限制查询的全局内存和单节点内存。当内存不足时,Presto 可以通过 spilling 把部分 Join 或聚合状态写到磁盘,也可以使用 reserved pool 让某个占用内存较大的查询继续执行,避免整个集群陷入死锁式等待。

查询优化

Presto 的查询优化目标可以概括为:少读数据、少传数据、少做重复计算,并尽量利用 Connector 暴露的物理信息。

规则优化

Presto 优化器会对逻辑计划应用一系列规则,直到计划达到稳定状态。常见规则包括:

  • Predicate Pushdown:把过滤条件下推到 Connector 或存储层。
  • Limit Pushdown:把 LIMIT 尽量下推,减少无意义的数据生产。
  • Column Pruning:只读取查询真正需要的列。
  • Projection Pushdown:把表达式计算尽量靠近数据源或早期阶段。
  • Decorrelation:改写相关子查询,使其更适合 Join 或聚合执行。

例如:

1
2
3
4
SELECT user_id
FROM events
WHERE dt = '2021-01-12'
AND event_type = 'purchase';

如果 dt 是分区列,event_type 有文件级统计信息,那么 Presto 可以先做分区裁剪,再利用 ORC/Parquet 元数据跳过无关数据块,最后只读取 user_idevent_type 这两列。相比全表全列扫描,这会显著减少 IO、解压、解码和网络传输成本。

Join 优化

Join 往往是分布式查询中最昂贵的部分,因为它可能需要大量 Shuffle。Presto 会根据统计信息、数据布局和 Connector 能力选择不同 Join 策略。

常见优化包括:

  • Join Reordering:调整多表 Join 顺序,让中间结果尽量小。
  • Join Strategy Selection:选择广播 Join、分区 Join、索引 Join 等策略。
  • Co-located Join:利用相同分区或分桶布局,避免重新 Shuffle。

例如两张表都按 user_id 分桶:

1
2
3
4
5
6
7
8
SELECT
e.exp_id,
count(*)
FROM experiment_users e
JOIN user_events v
ON e.user_id = v.user_id
WHERE e.exp_id = 10086
GROUP BY e.exp_id;

如果优化器不知道两张表的数据布局,它可能会把两边数据重新按 user_id Shuffle;如果 Connector 告诉优化器两张表已经按相同列分布,Presto 就可以选择 co-located join,在本地完成 Join,省掉一次昂贵的网络重分布。

Shuffle 优化

分布式 SQL 中,Shuffle 的代价通常很高,它消耗网络、CPU、内存缓冲区,并增加端到端延迟。因此 Presto 会尽量减少 Stage 之间的数据交换。

优化器会根据 PlanNode 的输出属性和下游所需属性判断 Shuffle 是否必要。例如某个上游输出已经满足下游聚合所需的分区方式,那么对应 Shuffle 就可以省略。如果多个下游算子需要不同的分区属性,优化器也会尝试选择一个能同时满足更多需求的分区方式,减少整体数据交换次数。

这里的关键思想是:分布式计划不只是算子树,还携带数据分布属性。优化器不仅要知道“要做什么计算”,还要知道“数据当前如何分布、下游希望如何分布”。

文件格式优化

Presto 对 ORC、Parquet 等列式格式做了大量优化。Connector 读取数据后返回 Page,Page 中每一列是一个 Block。Block 可以直接承载文件中的压缩编码形式,例如字典编码和 RLE。这带来两个优势:

  • 跳读能力:利用文件 footer、min/max、Bloom Filter 等信息跳过无关数据。
  • 压缩态计算:尽量在字典编码或 RLE 编码上直接计算,避免不必要的解码。

例如一个低基数字段:

1
country: CN, US, CN, CN, US, SG, CN

字典编码后可以表示成:

1
2
dictionary: [CN, US, SG]
indices: [0, 1, 0, 0, 1, 2, 0]

如果一个过滤或表达式只需要对不同字典值计算一次,再映射回 indices,就可以避免对每一行重复计算。对于国家、状态、类型这类低基数字段,这种优化非常有效。

Lazy Loading

Presto 支持 lazy materialization,也就是延迟物化。Connector 可以返回 lazy block,只有当某列真的被访问时,才读取、解压和解码。例如:

1
2
3
SELECT user_id
FROM events
WHERE event_type = 'purchase';

如果 events 表里还有 payloaddevice_infoextra 等大字段,但查询不需要它们,Presto 就可以避免读取或解码这些列。论文中的生产负载实验显示,lazy loading 能显著减少数据读取量和 CPU 消耗。

Code Generation

Presto 使用 JVM bytecode generation 来加速表达式计算和部分算子执行。解释执行虽然灵活,但对海量数据处理来说开销太高,因为每一行都要反复遍历表达式树、判断类型、调用通用函数。例如:

1
WHERE price * (1 - discount) > 100

如果解释执行,系统每处理一行都要按表达式树逐层求值;如果生成字节码,Presto 可以为当前查询生成一段更直接的计算逻辑,让 JVM JIT 更容易做内联、循环展开、分支优化和寄存器分配。

可以把 Code Generation 理解为 不要用一个通用解释器反复解释同一段逻辑,而是为当前 SQL 临时生成一段专用程序

工程取舍

Presto 的设计并不是没有代价。它选择了低延迟、流水线执行和内存优先,也就意味着在一些方面做了主动取舍。

最典型的是容错。论文指出,Presto 对 Coordinator 或 Worker 崩溃没有完整的内建容错能力:Coordinator 失败会导致集群不可用,Worker 失败会导致运行在该 Worker 上的查询失败。Presto 主要依赖客户端重试、外部编排、备用 Coordinator、多活集群和故障节点摘除来缓解。

这和 Spark SQL、Hive on Tez 这类系统不同。后者通常会把中间结果持久化,具备更强的阶段级重算能力,但代价是更高的延迟。Presto 则更偏向于让大多数查询更快完成,如果失败则由外部机制重试。

此外,Presto 也非常依赖可观测性。论文提到 Facebook 的 Presto Worker 会导出大量实时性能指标,并记录 operator、task、stage 级别的统计信息。对于一个多租户查询引擎来说,性能问题可能来自 Connector 读取慢、Join 数据倾斜、Shuffle 过重、客户端消费慢、某个算子内存异常等,没有细粒度指标就很难定位。

总结

本篇论文展示的是一个工业级分布式 SQL 引擎如何在真实生产环境中做系统设计。Presto 的核心不是替代某个存储系统,而是在各种数据源之上提供一个统一、低延迟、可扩展的 SQL 执行层。

从架构设计上看,Presto 通过 Coordinator 和 Worker 组织分布式执行,通过 Connector 接入异构数据源,通过 Stage、Task、Split、Pipeline 和 Operator 把 SQL 拆成可并行执行的计算单元。

从系统设计上看,Presto 依靠流水线执行、Lazy Split 枚举、内存 Shuffle、Backpressure、协作式 CPU 调度和精细化内存管理,在交互式查询和长时间 ETL 之间寻找平衡。

从查询优化上看,Presto 的关键思路是让优化器与 Connector 协作,尽量利用分区、索引、统计信息、文件格式、数据布局和压缩编码,减少不必要的读取、解码、计算和 Shuffle。

因此,Presto 能在大数据计算引擎领域占据重要位置,靠的不是单方面的神奇优化,而是一组围绕“低延迟查询异构数据”这个目标展开的系统性设计。