概要速览

PrestoDB的Aria项目曾于2020年发布过一组实验性功用,用来提高对表(经过Hive连接器连接并以ORC格局存储数据)的扫描功用。

在本文中,咱们将在根据Docker的PrestoDB测验环境中对这些新功用进行基础性的测验。[1]

Presto

Presto 是一款能够大规模并行处理 (MPP) 的SQL履行引擎。履行引擎与数据存储是分离的,该项目包括大量插件(又称为连接器,connector),它们为Presto引擎供给查询的数据。数据存储中的数据被读取后,交由Presto履行查询操作,比方数据连接(joining)和聚合(aggregation)。这种数据存储和履行分离的架构答应单个Presto实例查询多个数据源,然后供给了十分强大的联合查询层。

Presto有许多可用的连接器,社区也会定时供给用以访问数据存储的新式连接器。

Hive 连接器

Hive 连接器一般被视为Presto的规范连接器。咱们通常用它连接到 Hive Metastore,以此来获取Metastore 中界说的表的元数据信息。数据通常存储在 HDFS 或 S3中,而Metastore供给有关文件存储位置和格局的信息;最常用的是ORC格局,但也支撑 Avro 和 Parquet等其他格局。Hive 连接器答应 Presto 引擎并行地将数据从HDFS/S3扫描到引擎中来履行查询。ORC格局是一种十分规范且常见的数据存储格局,能供给很好的压缩比和功用。

两个用于履行查询的中心服务

Presto有两个用于履行查询的中心服务:一个担任查询解析和任务调度等责任的Coordinator,以及多个担任并行履行查询的Worker。理论上,Coordinator也能够充任Worker的角色,但在出产环境中不会这么操作。鉴于咱们在这里测验的是Presto,为便利起见,咱们只运用一个节点,既作为 Coordinator 也作为 Worker。[2]

咱们将运用单个Docker容器来进行本次Presto的测验。请点击查看布置文档,文档的末尾处有怎么完结单节点 Presto布置的示例。

下面来介绍Presto是怎么履行一条查询语句的:

首先,Presto coordinator先对查询语句进行解析,然后拟定出一个履行方案(下文会供给示例展现)。方案拟定完结之后就会被分成几个阶段(或片段),每个阶段将履行一系列操作,即引擎用来履行查询的特定函数。履行方案通常从连接器扫描数据开端,然后履行一系列操作,如数据过滤、部分聚合以及在Presto worker节点之间交流数据来履行数据连接和最终的数据聚合等。一切这些阶段被分成多个分片(split),即Presto中的并行履行单元。Worker 并行履行可装备数量的分片,然后获得所需的结果。引擎中的一切数据都保存在内存中(前提是不超越集群的容量阈值)。

Hive连接器(以及一切其他连接器)担任将输入数据集拆分为多个分片,供 Presto 并行读取。作为一项优化办法,Presto 引擎将告知连接器查询中运用的谓词(predicate)以及选定的列(column)——称为谓词下推 (predicate pushdown),这使得连接器能够在把数据供给给Presto引擎之前过滤掉不必要的数据,这也是本文的重点所在。

为了演示谓词下推,咱们来看一个基本查询——统计某个数据表内契合条件的行数。咱们的查询示例是根据基准测验数据集TPC-H的lineitem数据表进行的。TPC-H的lineitem表中大约有6亿行记载,它们的shipdate字段取值介于1992和1998之间。下面的查询语句是针对lineitem数据表的设置条件过滤谓词,筛选出shipdate字段为1992年的数据行。咱们先在不启用Aria增强会话特点的情况下,经过运转 EXPLAIN 命令来观察一下查询方案:

presto:tpch> EXPLAIN (TYPE DISTRIBUTED) SELECT COUNT(shipdate) FROM lineitem WHERE shipdate BETWEEN DATE '1992-01-01' AND DATE '1992-12-31';
Fragment 0 [SINGLE]
    Output layout: [count]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - Output[_col0] => [count:bigint]
            _col0 := count
        - Aggregate(FINAL) => [count:bigint]
                count := ""presto.default.count""((count_4))
            - LocalExchange[SINGLE] () => [count_4:bigint]
                - RemoteSource[1] => [count_4:bigint]
Fragment 1 [SOURCE]
    Output layout: [count_4]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - Aggregate(PARTIAL) => [count_4:bigint]
            count_4 := ""presto.default.count""((shipdate))
       -ScanFilter[table = TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=lineitem, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.lineitem{domains={shipdate=[ [[1992-01-01, 1992-12-31]] ]}}]'}, grouped = false, filterPredicate = shipdate BETWEEN (DATE 1992-01-01) AND (DATE 1992-12-31)] => [shipdate:date]
                Estimates: {rows: 600037902 (2.79GB), cpu: 3000189510.00, memory: 0.00, network: 0.00}/{rows: ? (?), cpu: 6000379020.00, memory: 0.00, network: 0.00}
                LAYOUT: tpch.lineitem{domains={shipdate=[ [[1992-01-01, 1992-12-31]] ]}}
                shipdate := shipdate:date:10:REGULAR

查询方案按自下而上顺序来阅读,从 Fragment 1 开端,并行扫描 lineitem 表,运用谓词对shipdate列进行过滤,然后对每个分片履行部分聚合,并将该部分结果交流到下一阶段 Fragment 0 来履行最终的聚合,之后再将结果发送到客户端,查询方案流程拜见下图:(图中靠近底部的水平线标示出哪些代码在 Hive 连接器中履行,哪些代码在 Presto 引擎中履行。)

Meta项目功能测试 | 开启PrestoDB和Aria扫描优化

现在咱们来履行这个查询!

presto:tpch> SELECT COUNT(shipdate) FROM lineitem WHERE shipdate BETWEEN DATE '1992-01-01' AND DATE '1992-12-31';
  _col0  
----------
 76036301
(1 row)
Query 20200609_154258_00019_ug2v4, FINISHED, 1 node
Splits: 367 total, 367 done (100.00%)
0:09 [600M rows, 928MB] [63.2M rows/s, 97.7MB/s]

咱们看到,lineitem 表包括7600多万行shipdate列取值为1992年的记载。履行这个查询大约花费了9 秒,总共处理了 6 亿行数据。

现在咱们来激活会话特点 pushdown_subfields_enabled 和hive.pushdown_filter_enabled,以启用 Aria 功用,下面咱们来看一下查询方案发生了怎样的变化:

presto:tpch> SET SESSION pushdown_subfields_enabled=true;
SET SESSION
presto:tpch> SET SESSION hive.pushdown_filter_enabled=true;
SET SESSION
presto:tpch> EXPLAIN (TYPE DISTRIBUTED) SELECT COUNT(shipdate) FROM lineitem WHERE shipdate BETWEEN DATE '1992-01-01' AND DATE '1992-12-31';
Fragment 0 [SINGLE]
    Output layout: [count]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - Output[_col0] => [count:bigint]
            _col0 := count
        - Aggregate(FINAL) => [count:bigint]
                count := ""presto.default.count""((count_4))
            - LocalExchange[SINGLE] () => [count_4:bigint]
                - RemoteSource[1] => [count_4:bigint]
Fragment 1 [SOURCE]
    Output layout: [count_4]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - Aggregate(PARTIAL) => [count_4:bigint]
            count_4 := ""presto.default.count""((shipdate))
        - TableScan[TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=lineitem, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.lineitem{domains={shipdate=[ [[1992-01-01, 1992-12-31]] ]}}]'}, grouped = false] => [shipdate:date]
                Estimates: {rows: 540034112 (2.51GB), cpu: 2700170559.00, memory: 0.00, network: 0.00}
                LAYOUT: tpch.lineitem{domains={shipdate=[ [[1992-01-01, 1992-12-31]] ]}}
                shipdate := shipdate:date:10:REGULAR
                    :: [[1992-01-01, 1992-12-31]]

注意:查询方案的首要变化坐落底部,即TableScan 操作中包括了shipdate 列。连接器现已接收到shipdate列上的谓词条件——取值介于1992-01-01 和 1992-12-31之间。如下图所示,该谓词被下推到连接器,免去了查询引擎过滤这些数据的必要性。

Meta项目功能测试 | 开启PrestoDB和Aria扫描优化

咱们再一次运转这个查询!

presto:tpch> SELECT COUNT(shipdate) FROM lineitem WHERE shipdate BETWEEN DATE '1992-01-01' AND DATE '1992-12-31';
  _col0  
----------
 76036301
(1 row)
Query 20200609_154413_00023_ug2v4, FINISHED, 1 node
Splits: 367 total, 367 done (100.00%)
0:05 [76M rows, 928MB] [15.5M rows/s, 189MB/s]

运转查询后,咱们得到了相同的结果,但查询时间简直缩短了一半,更重要的是,查询只扫描了7600万行!连接器现已将谓词应用于shipdate 列,而不是让引擎来处理谓词,因而节省了CPU周期,继而加快了查询速度。针对不同的查询和数据集情况可能有所不同,但如果是经过Hive连接器查询ORC文件的场景,该方案绝对值得一试。

**文章作者:**Adam Shook
原文于2020年6月15日宣布在作者的个人博客上:datacatessen.com

参阅

  1. [^](zhuanlan.zhihu.com/p/541968653… engineering.fb.com/2019/06/10/…
  2. [^](zhuanlan.zhihu.com/p/541968653… prestodb.io/docs/curren…