论文阅读《Presto: SQL on Everything》
论文《Presto: SQL on Everything》介绍的是 Facebook 在生产环境中长期使用并开源的分布式 SQL 查询引擎 Presto,该引擎允许用户用统一的 SQL 查询 Hadoop 数据仓库、RDBMS、NoSQL、流系统,以及各种内部服务。论文中的 Presto 同时支撑交互式分析、Batch ETL、A/B 实验、外部报表等场景,这些负载对延迟、吞吐、并发和资源隔离的要求差异很大,因此 Presto 的核心设计可以概括为:以 Connector 连接异构数据源,以流水线执行降低查询延迟,以自适应调度和资源管理支撑多租户负载。
整体架构
Presto 是一个典型的 MPP 查询引擎。所谓 MPP,可以理解为把一次 SQL 查询拆成多个可并行执行的计算单元,分发到多台机器上同时执行,最后再汇总结果。
一个 Presto 集群主要由两类节点组成:
- Coordinator:负责接收 SQL、排队、解析、语义分析、逻辑计划生成、查询优化、分布式计划生成和任务调度。
- Worker:负责执行具体计算任务,包括读取外部数据、执行算子、处理 Shuffle 数据、维护中间状态和返回结果。
一次查询的大致执行链路如下:
1 | Client |
这条链路中有几个核心概念:
- Stage:分布式执行计划中的一个阶段,Stage 之间通常通过 Shuffle 交换数据。
- Task:Stage 在某个 Worker 上的执行实例,同一个 Stage 会分发成多个 Task 并行执行。
- Split:外部数据源中可被处理的一小块数据,例如 HDFS 文件的某个 offset 范围、MySQL 分片中的一段数据。
- Pipeline:Task 内部的一串算子链,数据在 Pipeline 中尽量以流式方式向下游传递。
- Operator:具体的执行算子,例如 Scan、Filter、Project、HashJoin、Aggregation。
- Page / Block:Presto 内部的列式数据表示,其中 Page 表示一批行,Block 表示 Page 中的一列。
这些概念可以按“从大到小”的层级来理解。一次 Query 不是直接被丢给某台机器执行,而是先被拆成多个 Stage;Stage 描述的是一段可以独立并行执行的计划片段,Stage 之间通过 Shuffle 传递数据。每个 Stage 会进一步拆成多个 Task,同一批 Task 执行相同的计算逻辑,只是处理的数据不同。对于需要读取外部数据的 Task,输入数据又会被拆成很多 Split,每个 Split 是一个可以独立调度的数据切片。Task 拿到 Split 后会在 Worker 本地把数据转成 Page,并让 Page 依次流过 Scan、Filter、Join、Aggregation 等 Operator。
可以把这个过程类比成工厂流水线:Stage 像不同车间,例如“读取原料”、“组装零件”、“最终质检”;Task 像同一个车间里的多条并行产线;Split 像分配给每条产线的一箱箱原料;Operator 像产线上的工序;Page 则是工序之间传递的一批半成品。这样理解之后,Presto 的并行性就很清楚了:跨机器的并行主要来自 Stage 被拆成多个 Task,读数据的并行主要来自 Split 的细粒度划分,而单机内部的执行效率则依赖 Pipeline 和 Operator 之间持续流动的 Page。
举个例子,假设要扫描一张按天分区的 orders 表。Coordinator 会先生成一个负责扫描的 Stage;如果集群里有 100 个 Worker,这个 Stage 可能会被拆成 100 个 Task 分发出去;Connector 会把 dt = '2021-01-12' 这个分区下的文件切成很多 Split,例如每个 Split 对应一个文件片段;Worker 上的 Task 拿到 Split 后,通过 Scan Operator 读出 Page,再交给 Filter、Project、Join、Aggregation 等后续 Operator 处理。如果后续聚合需要按 orderkey 重新分布数据,当前 Stage 的输出就会通过 Shuffle 发送到下一个 Stage。这样看,Stage 是分布式执行的边界,Task 是单机执行的容器,Split 是并行读取的最小调度单位,Page 则是算子之间真正流动的数据批次。
可以用一个简单 SQL 来串起这些概念:
1 | SELECT |
如下图所示,Coordinator 会先把 SQL 解析成逻辑计划:扫描 orders 和 lineitem,对 lineitem 做过滤,再 Join,最后聚合。随后优化器会把它改写成分布式计划,例如把扫描、Join、聚合拆到不同 Stage 中,Stage 内部再拆成多个 Task 分发给 Worker。Worker 读取 Split 后,以 Page 为单位在 Operator 之间传递数据。
1 | Aggregate [SUM(tax)] |
系统设计
Presto 的设计目标并不是单纯追求某个 Benchmark 上的极致性能,而是在多数据源、多租户、多查询形态下尽量保持低延迟和高资源利用率。
本章节,我们结合论文深入介绍一下 Presto 的系统设计。从 Coordinator 视角看,一条 SQL 会经历几个阶段:
- Parsing:用 ANTLR 解析 SQL,生成语法树。
- Analysis:解析表、列、函数、类型、作用域、子查询、聚合、窗口函数等语义信息。
- Logical Planning:把语法树转换成由 PlanNode 组成的逻辑计划树。
- Optimization:应用规则和代价模型,改写逻辑计划,生成更适合执行的计划。
- Distributed Planning:把计划切分成多个 Stage,并在 Stage 之间插入数据交换。
- Scheduling:把 Stage 分发为 Task,把 Split 分配给 Task,并选择 Worker 执行。
- Execution:Worker 以 Pipeline 和 Operator 的方式执行任务,并通过 Shuffle 传递中间结果。
可以把逻辑计划理解成“做什么”,把分布式计划理解成“在哪些机器上、以什么并行方式做”。
分布式执行
Presto 的执行是尽量流水线化的。上游 Worker 一旦产生数据,就可以通过 Shuffle 发送给下游 Worker,而不是等整个 Stage 全部结束后再落盘。这种设计和 MapReduce 风格的阶段式执行不同:
- MapReduce 更像“阶段 A 全部写盘,阶段 B 再读盘”。
- Presto 更像“上游边生产,下游边消费”。
流水线执行的好处是端到端延迟低,特别适合交互式查询。例如一个带 LIMIT 的查询,可能不需要扫描完整数据集就能返回结果:
1 | SELECT * |
如果系统必须先枚举所有文件、扫描所有 Split,再返回结果,用户体验会很差。Presto 通过 Lazy Split 枚举和流水线执行,让查询尽快启动,并在满足 LIMIT 后尽早结束。
Stage 调度
Presto 支持两类 Stage 调度策略:
- All-at-once:尽可能同时启动所有 Stage,数据一产生就向下游流动,适合低延迟查询。
- Phased:按阶段启动,先完成部分依赖再启动后续 Stage,适合控制内存占用。
All-at-once 的优势是响应快,但可能占用更多内存和线程资源;Phased 的优势是更容易控制资源,尤其是在大规模 Hash Join 中,可以先构建 Hash Table,再启动 Probe 侧输入,降低同时运行的状态量。
Task 与 Split 调度
Presto 把 Stage 分成多个 Task,并把外部数据源划分成多个 Split。对于读取外部数据的 Leaf Stage,调度器需要考虑数据位置、网络拓扑和 Connector 约束。
例如在共享存储架构下,所有 Worker 都能读取远端数据,Presto 通常倾向于让更多节点参与扫描,以提高并行度;在 shared-nothing 架构下,数据和计算节点绑定更强,调度器则应该优先把 Task 放到数据所在节点。
Split 调度有一个很重要的设计:Presto 不会一次性枚举并分配所有 Split,而是按小批次懒加载。这样做有几个好处:
- 查询可以更快开始,不必等待 Hive Connector 枚举海量分区和文件。
LIMIT查询或被用户提前取消的查询,可以避免不必要的 Split 枚举。- Worker 只维护较短的 Split 队列,更容易适应不同 Split 处理成本的差异。
- Coordinator 不需要一次性持有所有 Split 元数据,降低内存压力。
本地执行模型
在 Worker 内部,Presto 以 Driver Loop 推动 Operator 执行。每个 Split 被分配给线程后,会以 Page 为单位在 Operator 之间流动。
Page 是 Presto 内部的列式批数据结构:
1 | Page |
列式结构非常适合分析型查询,因为查询通常只访问部分列。Block 还可以是不同编码形式,例如普通数组、字典编码、Run-Length Encoding。这样 Presto 可以在保持统一执行接口的同时,利用底层列式文件和压缩格式的优势。
Shuffle 与 Backpressure
Stage 之间的数据交换通过 Shuffle 完成。Presto 使用基于 HTTP long-polling 的内存 Shuffle:上游 Task 把中间结果放入输出缓冲区,下游 Task 主动拉取数据。这种设计有两个特点:
- 低延迟:中间结果不必先持久化到磁盘,下游可以尽快消费。
- 可反压:如果下游消费慢,上游输出缓冲区会变满,系统会降低相关 Split 的运行并发。
Backpressure 对多租户系统非常关键。假设一个 BI 工具客户端消费结果很慢,如果没有反压机制,上游 Worker 可能持续生产数据,把大量内存占在输出缓冲区里,影响其他查询。Presto 通过监控输入、输出缓冲区利用率,动态调整请求并发和 Split 执行并发,使数据流动速度和消费能力保持匹配。
Connector 体系
Presto 能做到 “SQL on Everything”,关键在于 Connector。Connector 是 Presto 和外部数据源之间的适配层,它把不同系统抽象成统一的表、列、Split 和 Page。
论文中将 Connector API 拆成几类能力:
- Metadata API:提供库、表、列、类型、统计信息等元数据。
- Data Location API:提供数据位置、Split 划分、分区、排序、分桶、索引等物理布局信息。
- Data Source API:Worker 根据 Split 读取外部数据,并转换成 Presto 内部 Page。
- Data Sink API:支持 ETL 任务把查询结果写回外部系统。
这个抽象不只是为了能读数据,更重要的是让外部数据源把自己的物理特性告诉 Presto 优化器。例如 Hive Connector 可以暴露分区、文件格式、列统计信息;MySQL 分片 Connector 可以暴露索引和分片信息;Raptor 这类面向 Presto 优化的存储则可以暴露更适合本地计算的数据布局。
假设有如下查询:
1 | SELECT user_id, count(*) |
如果 Connector 告诉 Presto 表按 dt 分区,Presto 就可以只读取目标日期分区。如果底层文件是 ORC 或 Parquet,并且文件元数据中记录了列级统计信息,Presto 还能进一步跳过不可能匹配 event_type = 'purchase' 的数据块。也就是说,Connector 暴露的信息越丰富,Presto 的优化空间就越大。
资源管理
Presto 的资源管理主要围绕 CPU 和内存展开。
CPU 方面,Presto 使用协作式多任务模型。每个 Split 只能运行一个较短的时间片,然后让出线程。调度器根据 Task 累计 CPU 时间,将任务放入多级反馈队列。刚进入系统、CPU 消耗少的查询会获得更高优先级,已经消耗大量 CPU 的长查询会逐渐降低优先级。
这种策略对交互式查询很友好。一个很小的查询不应该长时间排在大 ETL 后面;而大查询虽然优先级会下降,但仍然可以持续推进。
内存方面,Presto 将内存分为两类:
- User memory:用户相对能理解的内存,例如聚合哈希表、Join 哈希表。
- System memory:系统实现产生的内存,例如 Shuffle buffer。
系统会限制查询的全局内存和单节点内存。当内存不足时,Presto 可以通过 spilling 把部分 Join 或聚合状态写到磁盘,也可以使用 reserved pool 让某个占用内存较大的查询继续执行,避免整个集群陷入死锁式等待。
查询优化
Presto 的查询优化目标可以概括为:少读数据、少传数据、少做重复计算,并尽量利用 Connector 暴露的物理信息。
规则优化
Presto 优化器会对逻辑计划应用一系列规则,直到计划达到稳定状态。常见规则包括:
- Predicate Pushdown:把过滤条件下推到 Connector 或存储层。
- Limit Pushdown:把
LIMIT尽量下推,减少无意义的数据生产。 - Column Pruning:只读取查询真正需要的列。
- Projection Pushdown:把表达式计算尽量靠近数据源或早期阶段。
- Decorrelation:改写相关子查询,使其更适合 Join 或聚合执行。
例如:
1 | SELECT user_id |
如果 dt 是分区列,event_type 有文件级统计信息,那么 Presto 可以先做分区裁剪,再利用 ORC/Parquet 元数据跳过无关数据块,最后只读取 user_id 和 event_type 这两列。相比全表全列扫描,这会显著减少 IO、解压、解码和网络传输成本。
Join 优化
Join 往往是分布式查询中最昂贵的部分,因为它可能需要大量 Shuffle。Presto 会根据统计信息、数据布局和 Connector 能力选择不同 Join 策略。
常见优化包括:
- Join Reordering:调整多表 Join 顺序,让中间结果尽量小。
- Join Strategy Selection:选择广播 Join、分区 Join、索引 Join 等策略。
- Co-located Join:利用相同分区或分桶布局,避免重新 Shuffle。
例如两张表都按 user_id 分桶:
1 | SELECT |
如果优化器不知道两张表的数据布局,它可能会把两边数据重新按 user_id Shuffle;如果 Connector 告诉优化器两张表已经按相同列分布,Presto 就可以选择 co-located join,在本地完成 Join,省掉一次昂贵的网络重分布。
Shuffle 优化
分布式 SQL 中,Shuffle 的代价通常很高,它消耗网络、CPU、内存缓冲区,并增加端到端延迟。因此 Presto 会尽量减少 Stage 之间的数据交换。
优化器会根据 PlanNode 的输出属性和下游所需属性判断 Shuffle 是否必要。例如某个上游输出已经满足下游聚合所需的分区方式,那么对应 Shuffle 就可以省略。如果多个下游算子需要不同的分区属性,优化器也会尝试选择一个能同时满足更多需求的分区方式,减少整体数据交换次数。
这里的关键思想是:分布式计划不只是算子树,还携带数据分布属性。优化器不仅要知道“要做什么计算”,还要知道“数据当前如何分布、下游希望如何分布”。
文件格式优化
Presto 对 ORC、Parquet 等列式格式做了大量优化。Connector 读取数据后返回 Page,Page 中每一列是一个 Block。Block 可以直接承载文件中的压缩编码形式,例如字典编码和 RLE。这带来两个优势:
- 跳读能力:利用文件 footer、min/max、Bloom Filter 等信息跳过无关数据。
- 压缩态计算:尽量在字典编码或 RLE 编码上直接计算,避免不必要的解码。
例如一个低基数字段:
1 | country: CN, US, CN, CN, US, SG, CN |
字典编码后可以表示成:
1 | dictionary: [CN, US, SG] |
如果一个过滤或表达式只需要对不同字典值计算一次,再映射回 indices,就可以避免对每一行重复计算。对于国家、状态、类型这类低基数字段,这种优化非常有效。
Lazy Loading
Presto 支持 lazy materialization,也就是延迟物化。Connector 可以返回 lazy block,只有当某列真的被访问时,才读取、解压和解码。例如:
1 | SELECT user_id |
如果 events 表里还有 payload、device_info、extra 等大字段,但查询不需要它们,Presto 就可以避免读取或解码这些列。论文中的生产负载实验显示,lazy loading 能显著减少数据读取量和 CPU 消耗。
Code Generation
Presto 使用 JVM bytecode generation 来加速表达式计算和部分算子执行。解释执行虽然灵活,但对海量数据处理来说开销太高,因为每一行都要反复遍历表达式树、判断类型、调用通用函数。例如:
1 | WHERE price * (1 - discount) > 100 |
如果解释执行,系统每处理一行都要按表达式树逐层求值;如果生成字节码,Presto 可以为当前查询生成一段更直接的计算逻辑,让 JVM JIT 更容易做内联、循环展开、分支优化和寄存器分配。
可以把 Code Generation 理解为 不要用一个通用解释器反复解释同一段逻辑,而是为当前 SQL 临时生成一段专用程序。
工程取舍
Presto 的设计并不是没有代价。它选择了低延迟、流水线执行和内存优先,也就意味着在一些方面做了主动取舍。
最典型的是容错。论文指出,Presto 对 Coordinator 或 Worker 崩溃没有完整的内建容错能力:Coordinator 失败会导致集群不可用,Worker 失败会导致运行在该 Worker 上的查询失败。Presto 主要依赖客户端重试、外部编排、备用 Coordinator、多活集群和故障节点摘除来缓解。
这和 Spark SQL、Hive on Tez 这类系统不同。后者通常会把中间结果持久化,具备更强的阶段级重算能力,但代价是更高的延迟。Presto 则更偏向于让大多数查询更快完成,如果失败则由外部机制重试。
此外,Presto 也非常依赖可观测性。论文提到 Facebook 的 Presto Worker 会导出大量实时性能指标,并记录 operator、task、stage 级别的统计信息。对于一个多租户查询引擎来说,性能问题可能来自 Connector 读取慢、Join 数据倾斜、Shuffle 过重、客户端消费慢、某个算子内存异常等,没有细粒度指标就很难定位。
总结
本篇论文展示的是一个工业级分布式 SQL 引擎如何在真实生产环境中做系统设计。Presto 的核心不是替代某个存储系统,而是在各种数据源之上提供一个统一、低延迟、可扩展的 SQL 执行层。
从架构设计上看,Presto 通过 Coordinator 和 Worker 组织分布式执行,通过 Connector 接入异构数据源,通过 Stage、Task、Split、Pipeline 和 Operator 把 SQL 拆成可并行执行的计算单元。
从系统设计上看,Presto 依靠流水线执行、Lazy Split 枚举、内存 Shuffle、Backpressure、协作式 CPU 调度和精细化内存管理,在交互式查询和长时间 ETL 之间寻找平衡。
从查询优化上看,Presto 的关键思路是让优化器与 Connector 协作,尽量利用分区、索引、统计信息、文件格式、数据布局和压缩编码,减少不必要的读取、解码、计算和 Shuffle。
因此,Presto 能在大数据计算引擎领域占据重要位置,靠的不是单方面的神奇优化,而是一组围绕“低延迟查询异构数据”这个目标展开的系统性设计。