预聚合是OLAP系统中常用的一种优化手法,在经过在加载数据时就进行部分聚合核算,生成聚合后的中间表或视图,从而在查询时直接运用这些预先核算好的聚合成果,进步查询功能,完成这种预聚合办法大多都运用物化视图来完成。

ClickHouse社区完成的Projection功用类似于物化视图,原始的概念来源于Vertica,在原始表数据加载时,依据聚合SQL界说的表达式,核算写入数据的聚合数据与原始数据同步写入存储。在数据查询的过程中,假如查询 SQL 经过匹配剖析能够经过聚合数据核算得到,直接查询聚合数据减少核算开支,大幅进步查询功能。

ClickHouse Projection是针对物化视图现有问题,在查询匹配,数据一致性上扩展了运用场景:

  • 支撑normal projection,依照不同列进行数据重排,关于不同条件快速过滤数据
  • 支撑aggregate projection, 运用聚合查询在源表上直接界说出预聚合模型
  • 查询剖析能依据查询价值,主动挑选最优Projection进行查询优化,无需改写查询
  • projeciton数据存储于原始part目录下,在任一时刻针对任一数据改换操作均提供一致性保证
  • 维护简略,不需别的界说新表,在原始表添加projection属性

ByteHouse 是火山引擎依据ClickHouse研制的一款剖析型数据库产品,是一同支撑实时和离线导入的自助数据剖析平台,能够对 PB 级海量数据进行高效剖析。具有真实时剖析、存储-核算分离、多级资源隔离、云上全托管服务四大特点,为了更好的兼容社区的projection功用,扩展projection运用场景,ByteHouse对Projection进行了匹配场景和架构上进行了优化。在ByteHouse商用客户功能测验projection的功能测验,在1.2亿条的实践出产数据集中进行测验,查询并发才能进步10~20倍,下面从projeciton在优化器查询改写和依据ByteHouse框架改善两个方面谈一谈现在的优化工作。

Projection运用

为了进步ByteHouse对社区有很好的兼容性,ByteHouse保留了原有语法的支撑,projection操作分为创立,删去,物化,删去数据几个操作。为了便于理解后面的优化运用行为剖析系统例子作为剖析的对象。

语法

-- 新增projection界说
ALTER  TABLE [db]. table  ADD PROJECTION name ( SELECT  < COLUMN LIST EXPR > [ GROUP  BY ] [ ORDER  BY ] )
-- 删去projection界说而且删去projection数据 
ALTER  TABLE [db]. table  DROP PROJECTION name
-- 物化原表的某个partition数据
ALTER  TABLE [db.] table MATERIALIZE PROJECTION name IN  PARTITION partition_name
-- 删去projection数据但不删去projection界说
ALTER  TABLE [db.] table CLEAR PROJECTION name IN  PARTITION partition_name 

实例

CREATE DATABASE IF NOT EXISTS tea_data;
创立原始数据表
CREATE TABLE tea_data.events(
  app_id UInt32,
  user_id UInt64,
  event_type UInt64,
  cost UInt64,
  action_duration UInt64,
  display_time UInt64,
  event_date Date
) ENGINE = CnchMergeTree PARTITION BY toDate(event_date)
ORDER BY
  (app_id, user_id, event_type);
创立projection前写入 2023-05-28 分区测验数据
INSERT INTO tea_data.events
SELECT
    number / 100,
    number % 10,
    number % 3357,
    number % 166,
    number % 5,
    number % 40,
    '2023-05-28 05:11:55'
FROM system.numbers LIMIT 100000;
创立聚合projection
ALTER TABLE tea_data.events ADD PROJECTION agg_sum_proj_1
(
SELECT
    app_id,
    user_id,
    event_date,
    sum(action_duration)
    GROUP BY app_id,
    user_id, event_date
);
创立projection后写入 2023-05-29 分区测验数据
INSERT INTO tea_data.events
SELECT
    number / 100,
    number % 10,
    number % 3357,
    number % 166,
    number % 5,
    number % 40,
    '2023-05-29 05:11:55'
FROM system.numbers LIMIT 100000;
Note:CnchMergeTree是ByteHouse特有的引擎

Query Optimizer扩展Projection改写

ByteHouse优化器

ByteHouse 优化器为业界现在仅有的ClickHouse 优化器方案。ByteHouse 优化器的才能简略总结如下:

ClickHouse进阶|性能提升20倍!深度解析Projection优化实践

  • RBO:支撑:列裁剪、分区裁剪、表达式简化、子查询解关联、谓词下推、冗余算子消除、Outer-JOIN 转 INNER-JOIN、算子下推存储、分布式算子拆分等常见的启发式优化才能。
  • CBO:依据 Cascade 查找框架,完成了高效的 Join 枚举算法,以及依据 Histogram 的价值预算,对 10 表全连接等级规划的 Join Reorder 问题,能够全量枚举并寻求最优解,一同针对大于10表规划的 Join Reorder 支撑启发式枚举并寻求最优解。CBO 支撑依据规则扩展查找空间,除了常见的 Join Reorder 问题以外,还支撑 Outer-Join/Join Reorder,Magic Set Placement 等相关优化才能。
  • 分布式方案优化:面向分布式MPP数据库,生成分布式查询方案,而且和 CBO 结合在一同。相对业界干流完成:分为两个阶段,首先寻求最优的单机版方案,然后将其分布式化。咱们的方案则是将这两个阶段交融在一同,在整个 CBO 寻求最优解的过程中,会结合分布式方案的诉求,从价值的角度挑选最优的分布式方案。关于 Join/Aggregate 的还支撑 Partition 属性打开。
  • 高阶优化才能:完成了 Dynamic Filter pushdown、单表物化视图改写、依据价值的 CTE (公共表达式同享)。

借助ByteHouse优化器强大的才能,针对projection原有完成的几点局限性做了优化,下面咱们先来看一下社区在projection改写的详细完成。

社区Projection改写完成

在非优化器履行模式下,对原始表的聚合查询可经过 aggregate projection 加快,即读取 projection 中的预聚合数据而不是原始数据。核算支撑了 normal partition 和 projection partition 的混合查询,假如一个 partition 的 projection 还没物化,能够运用原始数据进行核算。

详细改写履行逻辑:

  1. 方案阶段

    1. 将原查询方案和已有projection 进行匹配挑选能满意查询要求的projection candidates;
    2. 依据最小的 mark 读取数挑选最优的 projection candidate;
    3. 对原查询方案中的 ActionDAG 进行改写和折叠,之后用于 projection part 数据的后续核算;
    4. 将当前数据处理阶段进步到 WithMergeableState;
  2. 履行阶段

    1. MergeTreeDataSelectExecutor 会将 aggregate 之前的核算进行拆分:关于 normal part,运用原查询方案进行核算;关于 projection part,运用改写后 ActionDAG 结构QueryPipeline;
    2. 将两份数据合并,用于 aggregate 之后的核算。

ClickHouse进阶|性能提升20倍!深度解析Projection优化实践

ByteHouse优化器改写完成

优化器会将查询切分为不同的plan segment分发到worker节点并行履行,segment之间经过exchange交换数据,在plan segment内部依据query plan 构建pipeline履行,以下面简略聚合查询为例,说明优化器如何匹配projection。

Q1:
SELECT
    app_id,
    user_id,
    sum(action_duration)
FROM tea_data.events
WHERE event_date = '2023-05-29'
GROUP BY
    app_id,
    user_id

在履行方案阶段优化器尽量的将 TableScan 上层的 Partial Aggregation Step,Projection 和 Filter 下推到 TableScan 中,在将plan segment发送到worker节点后,在依据查询价值挑选合适projection进行匹配改写,从下面的履行方案上看,射中projection会在table scan中直接读取AggregateFunction(sum, UInt64)的state数据,比较于没有射中projection的履行方案减少了AggregaingNode的聚合运算。

Q1查询方案(optimizer_projection_support=0) Q1查询方案(optimizer_projection_support=1)
ClickHouse进阶|性能提升20倍!深度解析Projection优化实践
ClickHouse进阶|性能提升20倍!深度解析Projection优化实践
ClickHouse进阶|性能提升20倍!深度解析Projection优化实践
ClickHouse进阶|性能提升20倍!深度解析Projection优化实践

混合读取Projection

Projection在创立之后不支撑更新schema,只能创立新的projection,但是在一些关于projection schema 改变需求频频事务场景下,需要同一个查询既能够读取旧projection也能读取新projection,所以在匹配时需要从partition维度进行匹配而不是从projection界说的维度进行匹配,混合读取不同projection的数据,这样会使查询愈加灵敏,更好的适应事务场景,下面举个详细的实例:

创立新的projection
ALTER TABLE tea_data.events ADD PROJECTION agg_sum_proj_2
(
SELECT
    app_id,
    sum(action_duration),
    sum(cost)
    GROUP BY app_id
);
写入 2023-05-30 的数据
INSERT INTO tea_data.events
SELECT
    number / 10,
    number % 100,
    number % 23,
    number % 3434,
    number % 23,
    number % 55,
    '2023-05-30 04:12:43'
FROM system.numbers LIMIT 100000;
履行查询
Q2:
SELECT
    app_id,
    sum(action_duration)
FROM tea_data.events
WHERE event_date >= '2023-05-28'
GROUP BY app_id
Q2履行方案 依照partition来匹配projection
ClickHouse进阶|性能提升20倍!深度解析Projection优化实践
ClickHouse进阶|性能提升20倍!深度解析Projection优化实践
查询过滤条件WHERE event_date >= ‘2023-05-28’ 会读取是三个分区的数据, 而且agg_sum_proj_1, agg_sum_proj_2都满意Q2的查询条件,所以table scan会读取2023-05-28的原始数据,2023-05-29会读取agg_sum_proj_1的数据,2023-05-30因为agg_sum_proj_2相关于 agg_sum_proj_1的数据聚合度更高,读取价值较小,挑选读取agg_sum_proj_2的数据,混合读取不同projection的数据。

原始表Schema更新

当对原始表添加新字段(维度或目标),对应projection 不包括这些字段,这时候为了使用projection一般情况下需要删去projection重新做物化,比较浪费资源,假如优化器匹配算法能正确处理不存在缺省字段,并运用缺省值参加核算就能够处理这个问题。


ALTER TABLE tea_data.events ADD COLUMN device_id String after event_type;
ALTER TABLE tea_data.events ADD COLUMN stay_time UInt64 after device_id;
履行查询
Q3:
SELECT
    app_id,
    device_id,
    sum(action_duration),
    max(stay_time)
FROM tea_data.events
WHERE event_date >= '2023-05-28'
GROUP BY app_id,device_id
Q3履行方案 默认值参加核算
ClickHouse进阶|性能提升20倍!深度解析Projection优化实践
从查询方案能够看出,即使agg_sum_proj_1和agg_sum_proj_2 并不包括新增的维度字段device_id,目标字段stay_time, 依然能够射中原始的partiton的projection,而且运用默认值来参加核算,这样能够使用旧的projection进行查询加快。

ByteHouse Projection完成

Projection是依照ByteHouse的存算分离架构进行设计的,Projecton数据由分布式存储统一进行办理,而针对projection的查询和核算则在无状况的核算节点上进行。比较于社区版,ByteHouse Projection完成了以下优势:

  • 关于Projection数据的存储节点和核算节点能够独立扩展,即能够依据不同事务关于Projection的运用需求,增加存储或许核算节点。
  • 当进行Projection查询时,能够依据不同Projection的数据查询量来分配核算节点的资源,从而完成资源的隔离和优化,进步查询功率。
  • Projection的元数据存储非常轻量,在事务数据急剧变化的时候,核算节点能够做到事务无感知扩缩容,无需额外的Projection数据迁移。

ClickHouse进阶|性能提升20倍!深度解析Projection优化实践

Projection数据存储

在ByteHouse中,多个projections数据与data数据存储在一个同享存储文件中。文件的外部数据对projections内部的内容没有感知,相当于一个黑盒。当需要读取某个projection时,经过checksums里边存储的projection指针,定位到特定projection方位,完成projection数据解析与加载。

ClickHouse进阶|性能提升20倍!深度解析Projection优化实践

Write操作

Projection写入分为两部分,先在本地做数据写入,产生part文件存储在worker节点本地,然后经过dumpAndCommitCnchParts将数据dump到长途同享存储。

  • 写入本地

    • 经过writeTempPart()将block写入本地,当写完原始part后,循环经过办法addProjectionPart()将每一个projection写入part文件夹,并添加到new_part中进行办理。
  • dump到长途存储

    • dumpCnchParts()的时候,依照上述的存储格局,写入完原始part中的bin和mark数据后,循环将每一个projection文件夹中的数据写入到同享存储文件中,并记载方位和巨细到checksums,如下:
    • 写入header
    • 写入data
    • 写入projections
    • 写入Primary index
    • 写入Checksums
    • 写入Metainfo
    • 写入Unique Key Index
    • 写入data footger

Merge操作

随着时刻的推移,针对同一个partition会存在越来越多的parts,而parts越多查询过滤时的价值就会越大。因而,ByteHouse在后台进程中会merge同一个partition的parts组成更大的part,从而减少part的数量进步查询的功率。

  • 关于每一个要merge的part

    • 关于part中的每一列,缓存对应的segments到本地
    • 创立MergeTreeReaderStreamWithSegmentCache,经过长途文件buffer或许本地segments的buffer初始化
  • 经过MergingSortedTransform或AggregatingSortedTransform等将sources交融成PipelineExecutingBlockInputStream

  • 创立MergedBlockOutputStream

关于projection,进行如下操作

  • 建立每一个projection的读取流,本地缓存buffer或许长途文件buffer
  • 原始表merge过程,对parts中的projections进行merge
  • 经过dumper将新的完好part存储到远端

ClickHouse进阶|性能提升20倍!深度解析Projection优化实践

Mutate操作

ByteHouse采用MVCC的方法,针对mutate涉及的列,新增一个delta part版别存储此次mutate涉及到的列。相应地,咱们在mutate的时候,结构projection的mutate操作的inputstream,将mutate后的projection和原始表数据一同写到同一个delta part中。

  • 在MutationsInterpreter里边,经过InterpreterSelectQuery(mutation_ast)获取BlockInputStream
  • projection经过block和InterpreterSelectQuery(projection.ast)重新构建

ClickHouse进阶|性能提升20倍!深度解析Projection优化实践

Materialize物化操作

如下图所示,依据ByteHouse的part办理方法,针对mutate操作或新增物化操作,咱们为part生成新的delta part,在下图part中,它所办理的三个projections由base part中的proj2,delta part#1中的proj1’,以及delta part#2中的proj3共同构成。当parts加载完成后,delta part#2会存储base part中的proj2的指针和delta part#1中的proj1’指针,以及自身的proj3指针,对上层提供统一的访问服务。

ClickHouse进阶|性能提升20倍!深度解析Projection优化实践

Worker端磁盘缓存

现在,CNCH中针对不同数据设计了不同的缓存类型

  • DiskCacheSegment:办理bin和mark数据
  • ChecksumsDiskCacheSegment:办理checksums数据
  • PrimaryIndexDiskCacheSegment:办理主键索引数据
  • BitMapIndexDiskCacheSegment:办理bitmap索引数据

针对Projection中的数据,别离经过上述的DiskCache,ChecksumsDiskCache和PrimaryIndexDiskCache对bin,mark,checksums以及索引进行缓存。

别的,为了加快Projection数据的加载过程,咱们新增了MetaInfoDiskCacheSegment用于缓存Projection相关的元数据信息。

实践事例剖析

某真实用户场景的数据集,咱们使用它对Projection功能进行了测验。

该数据集约1.2亿条,包括projection约240G巨细,测验机器 80CPU(s) / 376G Mem,配置如下:

  • SET allow_experimental_projection_optimization = 1
  • use_uncompressed_cache = true
  • max_threads = 1
  • log_level = error
  • 敞开Projection查询并发度80,封闭Projection查询并发度为30

测验成果

敞开Projection后,针对1.2亿条的数据集,查询功能进步10~20倍。

QPS (敞开Projection) QPS (封闭Projection)
Q1 87.365 5.697
Q2 124.780 4.511

表结构


CREATE TABLE user.trades(                                     
 `type` UInt8,
 `status` UInt64,
 `block_hash` String, 
 `sequence_number` UInt64, 
 `block_timestamp` DateTime, 
 `transaction_hash` String, 
 `transaction_index` UInt32, 
 `from_address` String, 
 `to_address` String,
 `value` String,
 `input` String,
 `nonce` UInt64, 
 `contract_address` String,
 `gas` UInt64,
 `gas_price` UInt64,
 `gas_used` UInt64, 
 `effective_gas_price` UInt64, 
 `cumulative_gas_used` UInt64, 
 `max_fee_per_gas` UInt64, 
 `max_priority_fee_per_gas` UInt64, 
 `r` String,
 `s` String,
 `v` UInt64,
 `logs_count` UInt32,
 PROJECTION tx_from_address_hit
  (                                         
    SELECT *                                                
    ORDER BY from_address
  ),                                            
 PROJECTION tx_to_address_hit (                                           
    SELECT *                                                 
    ORDER BY to_address 
 ),                                                  
 PROJECTION tx_sequence_number_hit (                                      
    SELECT *                                              
    ORDER BY sequence_number 
 ),                            
 PROJECTION tx_transaction_hash_hit (                                         
    SELECT *                                                  
    ORDER BY transaction_hash 
 )                         
)
ENGINE=CnchMergeTree()
PRIMARY KEY (transaction_hash, from_address, to_address) 
ORDER BY (transaction_hash, from_address, to_address) 
PARTITION BY toDate(toStartOfMonth(`block_timestamp`));                            

敞开Projection

Q1

WITH tx AS ( SELECT * FROM user.trades WHERE from_address = '0x9686cd65a0e998699faf938879fb' ORDER BY sequence_number DESC,transaction_index DESC UNION ALL SELECT * FROM user.trades WHERE to_address = '0x9686cd65a0e998699faf938879fb' ORDER BY sequence_number DESC, transaction_index DESC ) SELECT * FROM tx LIMIT 100;

ClickHouse进阶|性能提升20倍!深度解析Projection优化实践

Q2

with tx as (select sequence_number, transaction_index, transaction_hash, input from user.trades where from_address = '0xdb03b11f5666d0e51934b43bd' order by sequence_number desc,transaction_index desc UNION ALL select sequence_number, transaction_index, transaction_hash, input from user.trades where to_address = '0xdb03b11f5666d0e51934b43bd' order by sequence_number desc, transaction_index desc) select sequence_number, transaction_hash, substring(input,18) as func_sign from tx order by sequence_number desc, transaction_index desc limit 100 settings max_threads = 1, allow_experimental_projection_optimization = 1, use_uncompressed_cache = true;

ClickHouse进阶|性能提升20倍!深度解析Projection优化实践

封闭Projection

Q1

ClickHouse进阶|性能提升20倍!深度解析Projection优化实践

Q2

ClickHouse进阶|性能提升20倍!深度解析Projection优化实践

ClickHouse进阶|性能提升20倍!深度解析Projection优化实践

进入官方交流群,了解更多ClickHouse&ByteHouse干货

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。