本文首要介绍了数据库系统中常用的算子 Join 和 Aggregation 在 TiFlash 中的履行状况,包含查询方案生成、编译阶段与履行阶段,以希望读者对 TiFlash 的算子有初步的了解。

视频

www.bilibili.com/video/BV1tt…

算子概要

在阅览本文之前,推荐阅览本系列的前作:核算层 overview,以对 TiFlash 核算层、MPP 框架有必定了解。

在数据库系统中,算子是履行 SQL 首要逻辑的地方。一条 SQL 会被 parser 解析为一棵算子树(查询方案),然后经过 optimizer 的优化,再交给对应的 executor 履行,如下图所示。

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

本文的首要内容包含

  1. TiDB 怎么生成与优化 MPP 算子与查询方案
  2. Join 算子在 TiFlash 中的编译(编译指的是将 TiDB-server 下发的履行方案片段生成可履行结构的过程,下同)与履行
  3. Aggregation 算子在 TiFlash 中的编译与履行

构建查询方案

一些背景知识:

  1. 逻辑方案与物理方案:可以简单理解为逻辑方案是指算子要做什么,物理方案是指算子怎样去做这件事。比方,“将数据从表 a 和表 b 中读取出来,然后做 join”描绘的是逻辑方案;而“在 TiFlash 中做 shuffle hash join” 描绘的是物理方案。更多信息可以参阅:TiDB 源码阅览系列文章
  2. MPP:大规模并行核算,一般用来描绘节点间可以交流数据的并行核算,在当时版别(6.1.0,下同)的 TiDB 中,MPP 运算都发生在 TiFlash 节点上。推荐观看:源码解读 – TiFlash 核算层 overview。MPP 是物理方案级别的概念。

MPP 方案

在 TiDB 中,可以在 SQL 前加上 explain 来检查这条 SQL 的查询方案,如下图所示,是一棵由物理算子组成的树,可以检查 TiDB 履行方案概览 来对其有更多的了解。

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

MPP 查询方案的共同之处在于查询方案中多出了用于进行数据交流的 ExchangeSender 和 ExchangeReceiver 算子。

履行方案中会有这样的 pattern,代表将会在此处进行数据传输与交流。

     ...
     |_ExchangeReceiver_xx
        |_ ExchangeSender_xx
             …

每个 ExchangeSender 都会有一个 ExchangeType,来标识本次数据传输的类别,包含:

  1. HashPartition,将数据按 Hash 值进行分区之后分发到上游节点。
  2. Broadcast,将自身数据拷贝若干份,播送到一切上游节点中。
  3. PassThrough,将自己的数据悉数传给一个指定节点,此刻接纳方可以是 TiFlash 节点(ExchangeReceiver);也可以是 TiDB-server 节点(TableReader),代表 MPP 运算结束,向 TiDB-server 回来数据。

在上面的查询方案图中,一共有三个 ExchangeSender,id 别离是 19, 13 和 17。其间 ExchangeSender_13 和 ExchangeSender_17 都是将读入后的数据按哈希值 shuffle 到一切节点中,以便进行 join,而 ExchangeSender_19 则是将 join 完结后的数据回来到 TiDB-server 节点中。

增加 Exchange

在优化器的方案探索过程中,会有两处为查询方案树刺进 Exchange 算子:

  1. 一个是 MPP 方案在探索结束后,接入 TiDB 的 tableReader 时。类型为 passThrough type. 源码在函数 func (t *mppTask) convertToRootTaskImpl
  2. 一个是 MPP 方案在探索过程中,发现当时算子的 property(这儿首要指分区特点)不满意上层要求时。例如上层要求需求按 a 列的 hash 值分区,但是基层算子不能满意这个要求,就会刺进一组 Exchange.
func (t *mppTask) enforceExchanger(prop *property.PhysicalProperty) *mppTask {
   if !t.needEnforceExchanger(prop) {
      return t
   }
   return t.copy().(*mppTask).enforceExchangerImpl(prop)
}
// t.partTp 表明当时算子已有的 partition type,prop 表明父算子要求的 partition type
func (t *mppTask) needEnforceExchanger(prop *property.PhysicalProperty) bool {
   switch prop.MPPPartitionTp {
   case property.AnyType:
      return false
   case property.BroadcastType:
      return true
   case property.SinglePartitionType:
      return t.partTp != property.SinglePartitionType
   default:
      if t.partTp != property.HashType {
         return true
      }
      if len(prop.MPPPartitionCols) != len(t.hashCols) {
         return true
      }
      for i, col := range prop.MPPPartitionCols {
         if !col.Equal(t.hashCols[i]) {
            return true
         }
      }
      return false
   }
}

Property 关于分区特点的要求(MPPPartitionTp)有以下几种:

  1. AnyType,对基层算子没有要求,所以并不需求增加 exchange;
  2. BroadcastType,用于 broadcast join,要求基层节点仿制数据并播送到一切节点中,此刻必定需求增加一个 broadcast exchange;
  3. SinglePartitionType,要求基层节点将数据汇总到同一台节点中,此刻假如现已在同一台节点上,则不必再进行 exchange。
  4. HashType,要求基层节点按特定列的哈希值进行分区,假如现已按要求分好区了,则不必再进行 exchange.

在优化器的生成查询方案的探索中,每个算子都会对基层有 property 要求,一起也需求满意上层传下来的 property;当上下两层的 property 无法匹配时,就刺进一个 exchange 算子交流数据。依靠这些 property,可以不重不漏的刺进 exchange 算子。

MPP 算法

是否挑选 MPP 算法是在 TiDB 优化器生成物理方案时决议,即 CBO(Cost-Based Optimization) 阶段。优化器会遍历一切可挑选的方案路径,包含含有 MPP 算法的方案与不含有 MPP 算法的方案,估量它们的价值,并挑选其间总价值最小的一个查询方案。

关于当时的 TiDB repo 代码,有四个位置可以触发 MPP 方案的生成,别离对应于 join、agg、window function、projection 四个算子:

  1. func (p *LogicalJoin) tryToGetMppHashJoin
  2. func (la *LogicalAggregation) tryToGetMppHashAggs
  3. func (lw *LogicalWindow) tryToGetMppWindows
  4. func (p *LogicalProjection) exhaustPhysicalPlans

这儿只描绘具有代表性的 join 和 agg 算子,其他算子同理。

Join

当时 TiDB 支持两种 MPP Join 算法,别离是:

  • Shuffle Hash Join,将两张表的数据各自按 hash key 分区后 shuffle 到各个节点上,然后做 hash join,如上一节中举出的查询方案图所示。
  • Broadcast Join,将小表播送到大表所在的每个节点,然后做 hash join,如下图所示。

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

tryToGetMppHashJoin 函数在构建 join 算子时给出了对子算子的 property 要求:

if useBCJ { // broadcastJoin
    …
    childrenProps[buildside] = {MPPPartitionTp: BroadcastType}
    childrenProps[1-buildside] = {MPPPartitionTp: AnyType}
    …
} else { // shuffle hash join
    …
    childrenProps[0] = {MPPPartitionTp: HashType, key: leftKeys}
    childrenProps[1] = {MPPPartitionTp: HashType, key: rightKeys}
    …
}

如代码所示,broadcast join 要求 buildside(这儿指要播送的小表)具有一个 BroadcastType 的 property,对大表侧则没有要求。而 shuffle hash join 则要求两边都具有 HashType 的分区特点,分区列别离是 left keys 和 right keys。

Aggregation

当时 tryToGetMppHashAggs 或许生成三种 MPP Aggregation 方案:

1.“一阶段 agg”,要求数据先按 group by key 分区,然后再进行聚合。

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

2.“两阶段 agg”,首先在本地节点进行第一阶段聚合,然后按 group by key 分区,再进行一次聚合(用 sum 汇总成果)。

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

3.“scalar agg”,没有分区列的特定状况,在本地节点进行第一阶段聚合,然后汇总到同一台节点上完结第二阶段聚合。

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

一阶段 agg 和两阶段 agg 的区别是是否先在本地节点做一次预聚合,优化器会依据 SQL 与价值估算来挑选履行哪种方式。关于重复值许多的状况,两阶段 agg 可以在网络传输前减少许多数据量,从而减少大量的网络耗费;而假如重复值很少的状况下,这次预聚合并不会减少许多数据量,反而白白增大了 cpu 与内存耗费,此刻就不如运用一阶段 agg。

这儿留一个小思考题,这三种 agg 各自对下方有什么 property 要求?在聚合做完之后又满意了怎样的 property?

答案是:

一阶段 agg 要求 hash,做完满意 hash;二阶段 agg 无要求,做完满意 hash;scalar agg 无要求,做完满意 singlePartition.

编译与履行

履行方案构建好之后,TiDB-server 会将 dag(履行方案的片段)下发给对应的 TiFlash 节点。在 TiFlash 节点中,需求首先解析这些履行方案,这个过程咱们称作“编译”,编译的成果是 BlockInputStream,它是 TiFlash 中的可履行结构;而最终一步就是在 TiFlash 中履行这些 BlockInputStream.

下图是一个 BlockInputStream DAG 的比方,每个 BlockInputStream 都有三个办法:readPrefix, read 和 readSuffix;类似于其他火山模型调用 open、next 和 close。

下图的来源是 TiFlash 履行器线程模型 – 知乎专栏 (zhihu.com),关于履行模型更多的内容,可以参阅这篇文章或许 TiFlash Overview,这儿不再赘述。

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

Join 的编译与履行

TiDB-server 节点会将查询方案按 Exchange 会作为分界,将查询切分为不同的方案片段(task),作为 dag 发给 TiFlash 节点。比方关于下图中所示的查询方案,会切分为这三个红框。

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

TiFlash 节点在编译完结后生成的 BlockInputStream 如下,可以在 debug 日志中看到:

task 1
ExchangeSender
 Expression: <final projection>
  Expression: <projection after push down filter>
   Filter: <push down filter>
    DeltaMergeSegmentThread
task 2
ExchangeSender
 Expression: <final projection>
  Expression: <projection after push down filter>
   Filter: <push down filter>
    DeltaMergeSegmentThread
task 3
CreatingSets
 Union: <for join>
  HashJoinBuildBlockInputStream x 20: <join build, build_side_root_executor_id = ExchangeReceiver_15>, join_kind = Inner
   Expression: <append join key and join filters for build side>
    Expression: <final projection>
     Squashing: <squashing after exchange receiver>
      TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}
 Union: <for mpp>
  ExchangeSender x 20
   Expression: <final projection>
    Expression: <remove useless column after join>
     HashJoinProbe: <join probe, join_executor_id = HashJoin_34>
      Expression: <final projection>
       Squashing: <squashing after exchange receiver>
        TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}

其间 task1 和 task2 是将数据从存储层读出,经过简单的处理之后,发给 ExchangeSender. 在 task3 中,有三个 BlockInpuStream 值得重视,别离是:CreatingSets, HashJoinBuild, HashJoinProbe.

CreatingSetsBlockInputStream

接受一个数据 BlockInputStream 表明 joinProbe,还有若干个代表 JoinBuild 的 Subquery。CreatingSets 会并发发动这些 Subquery, 等待他们履行结束后在开端发动数据 InputStream. 下面两张图别离是 CreatingSets 的 readPrefix 和 read 函数的调用栈。

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

为什么 CreatingSets 或许一起创立多张哈希表?因为在一个多表 join 中,同一个方案片段或许紧接着做屡次 join porbe,如下图所示:

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

task:4
CreatingSets
 Union x 2: <for join>
  HashJoinBuildBlockInputStream x 20: <join build, build_side_root_executor_id = ExchangeReceiver_22>, join_kind = Left
   Expression: <append join key and join filters for build side>
    Expression: <final projection>
     Squashing: <squashing after exchange receiver>
      TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}
 Union: <for mpp>
  ExchangeSender x 20
   Expression: <final projection>
    Expression: <remove useless column after join>
     HashJoinProbe: <join probe, join_executor_id = HashJoin_50>
      Expression: <final projection>
       Expression: <remove useless column after join>
        HashJoinProbe: <join probe, join_executor_id = HashJoin_14>
         Expression: <final projection>
          Squashing: <squashing after exchange receiver>
           TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}

Join Build

留意,join 在此处仅代表 hash join,现已与网络通信和 MPP 级别的算法无关。

关于 join 的代码都在 dbms/src/Interpreters/Join.cpp 中;咱们以下面两张表进行 join 为例来阐明:

left_table l join right_table r
on l.join_key=r.join_key
where l.b>=r.c 

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

默许右表做 build 端,左表做 probe 端。哈希表的值运用链式存储:

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

Join Probe

这儿首要描绘的是 JoinBlockImpl 这个函数的流程:

1.block 包含了左表的内容;创立 added_columns, 即要增加到 block 中的右表的列;然后创立相应的过滤器 replicate_offsets:表明当时共匹配了几行,之后可以用于挑选未匹配上的行,或仿制匹配了多行的行。

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

2.依次查找哈希表,依据查找成果调用相应的 addFound 或 addNotFound 函数,填充 added_columns 和过滤器。

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

从填充的过程中也可以看到,replicate_offsets 左表表明到当时行为止,一共能匹配上的右表的行数。并且 replicate_offsets[i] – replicate_offsets[i-1] 就表明左表第 i 行匹配到的右表的行数。

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

3.将 added_column 直接拼接到 block 上,此刻会有短暂的 block 行数不一致。

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

4.依据过滤器的内容,仿制或过滤掉原先左表中的行。

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

5.最终在 block 上处理 other condition,则得到了 join 的成果。

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

上文中描绘的是关于正常的 “all” join 的状况,需求回来左右表的数据。与之相对的则是 “any” join,表明半衔接,无需回来右表,只需回来左表的数据,则无需运用 replicate_offsets 这个辅助数组,读者可以自行阅览代码。 仍然在 dbms/src/intepreters/Join.cpp 中。

Aggregation 的编译与履行

还是以一个查询方案以及对应的 BlockInputStream 为例:

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

task:1
ExchangeSender
 Expression: <final projection>
  Expression: <before order and select>
   Aggregating
    Concat
     Expression: <before aggregation>
      Expression: <projection>
       Expression: <before projection>
        Expression: <final projection>
         DeltaMergeSegmentThread
task:2
Union: <for mpp>
 ExchangeSender x 20
  Expression: <final projection>
   Expression: <projection>
    Expression: <before projection>
     Expression: <final projection>
      SharedQuery: <restore concurrency>
       ParallelAggregating, max_threads: 20, final: true
        Expression x 20: <before aggregation>
         Squashing: <squashing after exchange receiver>
          TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Int64>, <exchange_receiver_1, Nullable(Int64)>}

从查询方案中可以看到这是一个两阶段 agg,第一阶段对应 task1,履行聚合的 BlockInputStream 是 Aggregating。第二阶段对应 task2,履行聚合的 BlockInputStream 是 ParallelAggragating。两个 task 经过 Exchange 进行网络数据传输。

在 aggregation 的编译期,会检查当时 pipeline 可以提供的并行度,假如只要 1,则运用 AggregatingBlockInputStream 单线程履行,假如大于 1 则运用 ParallelAggragating 并行履行。

DAGQueryBlockInterpreter::executeAggregation(){
    if (pipeline.streams.size() > 1){
        ParallelAggregatingBlockInputStream
    }else {
        AggregatingBlockInputStream
    }
}

AggregatingBlockInputStream 的调用栈如下:

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

ParallelAggregatingBlockInputStream 内部会分两阶段操作(这儿的两阶段是内部履行中的概念,发生在同一台节点上,和查询方案中的两阶段不是一个概念)。partial 阶段别离在 N 个线程构建 HashTable,merge 阶段则将 N 个 HashTable 合并起来,对外输出一个流。调用栈如下:

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现

假如 result 是空,那么会单独调用一次 executeOnBlock 办法,来生成一个默许数据,类似于 count() 没有输入时,会回来一个 0.

两种履行方式都用到了 Aggregator 的 executeOnBlock 办法和 mergeAndConvertToBlocks 办法,他们的调用栈如图所示。前者是实践履行聚合函数的地方,会调用聚合函数的 add 办法,将数据值加入;后者的首要目的是将 ParallelAggregating 并行生成的哈希表合并。

TiFlash 源码阅读(九)TiFlash 中常用算子的设计与实现