SeaTunnel StarRocks 连接器的使用及原理介绍

Apache SeaTunnel 现已支撑 StarRocks Connector,使其“Connector 方阵”进一步扩展。StarRocks 是一个用户基础广泛的 MPP 数据库,SeaTunnel 将 StarRocks 纳入支撑列表,将便利用户更好地处理数据同步问题。

在 2023 年 3 月 30 日 的 SeaTunnel 线上活动上,贡献者 毕博 为咱们共享了《SeaTunnel StarRocks 连接器的运用及原理介绍》,内容精华整理如下。

贡献者简介

SeaTunnel StarRocks 连接器的使用及原理介绍

毕博,马蜂窝数据渠道负责人,Apache SeaTunnel 贡献者

共享纲要:

● Seatunnel StarRocks 连接器简介

● StarRocks Connector 功用特性

● StarRocks Connector 数据读取解析

● StarRocks Connector 数据写入解析

● StarRocks Connector 的运用示例

● StarRocks Connector的后续规划

SeaTunnel StarRocks 连接器简介

首要介绍一下数据同步渠道 Apache SeaTunnel 的基本架构。

SeaTunnel StarRocks 连接器的使用及原理介绍

上图为 SeaTunnel 架构图,它供给了一套笼统的 API,包含 Source 、Transform、Sink API等。依据这些笼统 API 可扩展出各种各样 Connector,其间依据Source API 完结的 Source Connector 能够从左边很多的数据源中读取数据,Transform Connector 用于完结数据 Pineline 中的数据转化处理,而 Sink Connector 能够将数据写入到右侧多种异构的数据源中。

一起,在运转时,SeaTunnel 供给了还一个翻译层,会将 Connector 完结的 Source 和 Sink API 翻译成引擎内部可运转的 API,使连接器的能够在不同引擎上运转。现在 SeaTunnel 支撑三种履行引擎,Spark,Flink,以及 SeaTunnel 自研履行引擎 Zeta Engine。

以上就是 SeaTunnel 的全体架构,能够看出,SeaTunnel 经过 Souce Connector 和 Sink Connector 完结与不同数据源进行链接。据了解,现在 SeaTunnel 现已支撑 100+ 种丰厚的连接器,其间包含 StarRocks Connector。

SeaTunnel StarRocks 连接器的使用及原理介绍

上图描绘了 Seatunnel 和 StarRocks 结合的全体状况,SeaTunnel 供给了Source Connector 和 Sink Connector。在 Source 部分 StarRocks 的表作为数据源,经过 Seatunel Source Connector 散布式地去提取 StarRocks 的数据;中间经过 SeaTunnel 供给的 Transform Connector 做一些散布式的数据处理和转化;后面的 StarRocks Sink 连接器则主要是把 SeaTunnel 内存里的数据,经过 StarRocks 供给的 Stream Load API,将数据导入到StarRocks。

StarRocks Connector 功用特性

Source 功用特性

SeaTunnel StarRocks 连接器的使用及原理介绍

这张表格展现了现在 SeaTunnel Source 连接器当时支撑的核心功用和特性,包含:

● 字段投影: 假定待读取表有多个字段,可是整个的数据处理的 Pipline中 只用到部分字段,那么针对这些部分列做数据同步就是字段投影的运用;

● 谓词下推: 谓词下推能够在数据扫描的时分过滤大量用不到的数据,经过下推到引擎,能够削减数据传输的数据量;

● 数据类型的主动映射:是关于从 StarRocks 读取的数据类型与 Seatunnel 内部数据类型的映射,后面会介绍现在支撑的数据类型;

● 用户自定义分片: 是经过将待读取数据源的整个数据集拆分多个分片,每一个分片能够独自查询,并且在分片生成的阶段用户能够经过装备参数去控制分片生成的数量;

● 并行读取:首要 StarRocks 是支撑并行的读取数据源,一起依据上面的数据源分片的切分,在读取时多个分片一起独立进行,终究经过并行读取加快读取的速率;

● 状况的康复:Source 连接器读取阶段切分多个分片之后,连接器在读取进程中会守时将未进行读取分片信息保存在 State 中;这样在毛病康复的时分,结合 state 中分片的方位信息进行从头读取;

● 至少一次:得益于状况康复,所以在读取端供给至少一次的语义;

● batch形式:现在 StarRocks 连接器 Source 部分只支撑批形式。

Sink 功用特性

SeaTunnel StarRocks 连接器的使用及原理介绍

Sink 的数据导入是依据 StarRocks 的 Stream Load 完结,由于 Stream Load 支撑 Csv 和Json 两种文件格局,所以连接器在 sink 端能够指定 csv 和 json 两种文件格局进行导入。

写入时,考虑到写入的效率,所以会触及数据赞批,进行批量数据写入,而不是单条提交。

假设写入呈现了反常 ,程序会主动判别是不是可康复的反常,再依据一定的战略进行重试。

关于 CDC, 现在 Seatunnel 支撑数据库的 changelog 捕捉,再结合 StarRocks 的 Stream Load 接口,能够对 StarRocks 的主键模型表进行数据改变,包含刺进、更新和删除数据,所以连接器当时支撑将 Seatunnel 获取的 cdc 数据导入到 StarRocks 中来。

上述列表展现了现在 Source 和 Sink 连接器现已支撑的功用和特性,希望在实际应用中能够给咱们供给一些参阅。

StarRocks Connector读取原理

接下来我会着重介绍 StarRocks Connector 的读取原理,协助咱们更好地运用连接器功用。

字段投影

咱们在读 StarRocks 表的时分,是能够选择部分字段读取的,比方这里咱们有一个 StarRocks 表,有 4 个字段。可是实际同步运用到的字段只要 lo_orderkey、lo_number 两个字段,关于指定部分列的提取数据场景,能够在装备 Source 连接器的时分,经过 fields 参数来指定要查询的字段和数据读到 Seatunnel 上面的字段数据类型。

SeaTunnel StarRocks 连接器的使用及原理介绍

这样,在 Seatunnel 真实履行的时分,就能只同步指定的字段,终究同步到 StarRocks 数据如下图。

SeaTunnel StarRocks 连接器的使用及原理介绍

经过削减投影字段能够下降同步进程网络、内存资源耗费,提高同步性能。

谓词下推

在实际运用中,咱们可能需求过滤掉部分行的数据,如获取表中 linenum< 3 的部分数据。这时,咱们能够在装备 Source 连接器的时分,经过装备 scan-filter 参数来过滤指定的部分行。

在实际履行中会将条件下推到 StarRocks,在 StarRocks 引擎内进行分区剪裁或分桶剪裁等优化处理。

SeaTunnel StarRocks 连接器的使用及原理介绍

这样,在读取数据阶段越过全表扫描,能够大大削减数据处理的数据量,提高读取数据的效率。

字段投影&谓词下推完结

在详细完结上,经过用户在连接器中装备中指定的 fields 和 scan-filter 参数,连接器在程序中会主动生成适用于 StarRocks 的查询语句。如图,经进程序转化,终究生成 SQL。

SeaTunnel StarRocks 连接器的使用及原理介绍

并行读取:完结方案

并行读取 StarRocks 数据主要有两种方案,以 Flink 引擎读取 StarRocks 为例。

第一个方案是直接经过 JDBC 协议读取数据,数据终究需求经过 fe 单节点将数据抽取上来,读取效率较低;

SeaTunnel StarRocks 连接器的使用及原理介绍

第二个方案是进行散布式的设计,先经过 fe 查询对应 StarRocks 表的分片的元数据信息, 获取待读取数据的数据散布状况,再用散布式并行的方式直接从多个 be 节点读取数据。

SeaTunnel StarRocks 连接器的使用及原理介绍
这样做让全体的吞吐能力得到很大的提高,现在 StarRocks Connector 依据第二种方案。

并行读取:获取SR的数据散布

Source 连接器完结并行读取,首要要知道 StarRocks 表的存储的数据散布状况。当时 StarRocks 的 FE 供给了获取单表查询方案接口,经过指定要查询的表及 sql 进行 api 接口的调用。

SeaTunnel StarRocks 连接器的使用及原理介绍

右侧是 fe 接口回来的成果经过序列化后对应的数据结构,query plane 为查询方案的字符串。

parrtions 是一个 map, key 是 starrocks tablet id ,value 为分桶的散布在 be 节点的地址,由于 starrocks 表的数据是多副本管理,所以会有多个 be 地址。

经过以上信息信息,就能够知道表中要查询数据的数据散布状况。

并行读取:split切分(依据数据散布)

要完结并行读取,就需求要对待查询的方针表的数据规模区分,再进行分片切分,让并行的线程读取特定分片的数据。

SeaTunnel StarRocks 连接器的使用及原理介绍

在 Souce Connector 完结中,分片切分是依据 StarRocks 表的数据散布进行数据规模区分。

左边则是描绘了 StarRocks 数据散布。StarRocks 运用列式存储,选用分区分桶机制进行数据管理。对应图中表 A 依照日期“月”区分分区,进一步的 2023-01 月份的分区切分为 5 个 分桶(A、B、C、D、E.)。

分桶是 StarRocks 中最小的数据管理单元,每个分桶运用多副本进行安排, 对应图中分别为分桶 A,有 A-1 、A-2、A-3 个副本;分桶 B 有 B-1,B-2,B-3 等,这些分桶副本终究会存储在不同的 BE 节点中。

假设咱们要同步表 A 中 2023年 1月份的数据,首要要知道这部分数据的数据散布状况。之前介绍了经过 Fe Api 能够获取 StarRocks 表的数据散布状况,对应图中,分桶 A 数据保存在BE-1、BE-2、 BE-3 上。

下一步,经过 一定战略,为每一个 tablet 选择最优的 be 查询节点,原则是终究成果中每一个be 节点有相对平等数量的分桶等候被查询,这样能够保证在并行查询时,每一个 be 节查询负载相对均衡。

最后,依据前面为每一个分桶选择查询 be 节点信息生成的 split分片。

并行读取:用户自定义分片

Source Connector 支撑自定义分片,也就是用户能够控制分片生成, 经过 request_tablet-size这个装备参数制。

SeaTunnel StarRocks 连接器的使用及原理介绍

刚刚咱们介绍了生成 split 分片切分的进程,StarRocks 表 A 的 5 个分桶 A\B\C\D\E, 终究生成了 3 个分片对图中上半部分。假加咱们想让查询数据的并发度更高,就需求生成更多的分片。这时,咱们能够设置 request_tablet-size,约束每个分片中 tablet 的数量。比方咱们装备request_tablet-siz=1, 表明每个分片的分桶最多为 1, 那么终究将会生成 5 个分片 效果如上图。

并行读取:分配split到reader

Split 切分好了,需求分配给每一个并行的 Reader。

Reader 数量的指定是经过在使命的 env 装备并行度(图左边),装备好就会有几个并行的 Reader 去读取数据源。

SeaTunnel StarRocks 连接器的使用及原理介绍
图中右侧是详细分片分配给 Reader 的进程:Split 经过 split 中的特点 id 向 Reader 数取模,使每一个 Reader 上分配的分片数相对一致。

并行读取:reader读取数据

将 split 分片配给 Reader 之后,每一个 Reader 就开端实际的数据读取,该进程是每个 Reader 经过 be 供给的一组 thrift 协议向 be 节点扫描。分桶对应的数据如图中所示,每个分片包含了需求向哪个 be 节点查询及需求扫描 be 上的哪些分桶数据。

SeaTunnel StarRocks 连接器的使用及原理介绍

下图是 be 供给 thrift 协议详细接口。

SeaTunnel StarRocks 连接器的使用及原理介绍
有三个重要的办法,首要创建一个scanner ,经过相似游标的方式,屡次调用 getnext 获取悉数数据,终究数据都完结回来后,经过 close scaner 开释资源。

并行读取:arrow -> seatunnel row的数据转化

Reader 经过 thrift 协议向 be节点扫描数据,终究从 be 获取到的数据是 apache arrrow 的数据格局。

由于 StarRocks 表的数据经过 Seatunnel 读取出来之后首要要转化为 seatunnel 自己的数据结构 SeatunnelRow,之后才能够在 Seatunnel 内部进行数据转化及写出,因而需求将 apache arrow 的数据类型转化为 Seatunnel 的数据类型。

整个转化进程如图所示:

SeaTunnel StarRocks 连接器的使用及原理介绍
SeaTunnel StarRocks 连接器的使用及原理介绍

其间 apache arrow 的 vachar 能够依据用户在 souece 连接器装备数据投影的数据类型转化为 Date、Timestamp 和 String。

数据类型映射

终究 从StarRocks 读取的数据类型、从 be 节点获取的 apache-arrrow 格局的数据类型 以及 转化后Seatunnel 的数据类型之间的映射关系如上图,也是现在 StarRocks 连接器支撑的数据类型映射,基本上覆盖了所有的数据类型,但 ARRAY、HLL、BITMAP 等暂时还不支撑。

SeaTunnel StarRocks 连接器的使用及原理介绍

在运用中咱们只需求关心 StarRocks 的 Datatype 和 Seatunnel Datatype 的映射就能够,apache-arrrow 部分的转化是程序主动完结的。

并行读取: 状况康复

在读取的时分还会触及到状况康复,由于假设使命读取的数据量比较大,读取的时刻会较长,中间假设呈现过错或许反常,需求从犯错的方位从头读取,相似于断点续传。

这里边有两个比较重要的进程:

● 状况的保存: 经过 Reader 把 未读取的 split 信息存到 state里,引擎在读取进程会守时对 state 做快照,如 snapshotState 办法的逻辑;

● 状况康复:Reader 的状况康复主要是经过最后一次快照,进行康复后继续读取。在开端读取数据的时分,从未读取的分片调集中里边去消费,之后开端实际读取,对应 pollNext 办法逻辑。

StarRocks Connector 写入原理

介绍完 Source Connector 的写入原理,咱们再来看 Sink 连接器的写入原理。

StarRocks Sink 写入是依据 Stream Load 接口,在写入时需求做处理批量和重试。关于批量,数据是在写入之后,先缓存在内存中,达到一定阈值之后再进行批量数据的提交。

阈值现在包含批数据的大小、数据条数约束,一起连接器也支撑守时提交,一守时刻间隔下提交一次。

注意,在 sink 的时分,需求留意”too many tablet versions” 报错,呈现问题是由于导入频率太快,数据没能及时兼并 (Compaction) ,从而导致版本数超越支撑的最大未兼并版本数。

除了优化 BE 的装备,调整兼并战略,如 cumulative_compaction_num_threads_per_disk、base_compaction_num_threads_per_disk 等来加快兼并,也能够在 sink 端控制批量的提交的阈值,增大单次导入的数据量,下降导入频率。

关于重试,SeaTunnel 支撑装备重试战略,如重试次数,等候间隔与最大重试次数等。

CDC 数据写入支撑

现在,SeaTunnel 已支撑数据库改变数据捕获(CDC github.com/apache/incu… 将捕获到的数据更改分为以下 4 种类型:iNSERT(数据刺进)、UPDATE_BEFORE(数据更改前的旧值)、UPDATE_AFTER(数据更改后的新值)、DELETE (数据删除)。

SeaTunnel StarRocks 连接器的使用及原理介绍

在写入的方针数据上面,StarRocks 数据源的主键模型支撑经过 Stream Load 导入作业,一起对 StarRocks 表进行数据改变,包含刺进、更新和删除数据。

因而,将Seatunnel changelog 数据的改变类型转化成 StarRocks 支撑的改变类型,使Seatunnel Connector 能够支撑 cdc 写入 StarRocks。

假设上图所示,在Seatunnel中假设 cdc 数据如图所示,分别刺进主键为1\2\ 3的数据,对主键 1 进行 UPDATE, 会生成 update_before、update_after 、dedete 的 cdc changelog event, 经过 sink 连接器装备中 enable_upsert_delete = true,开启将 cdc 数据写入 StarRocks 的支撑。

StarRocks Connector 运用示例

以在 StarRocks与 StarRocks 之间同步数据这个运用场景为例,介绍怎么装备运用连接器。假定在 StarRocks 有一张数据表 customer_1,有四个数据列,咱们方针将数据同步到一个张表 customer_2 ,首要在seatunnel 使命装备文件中装备 Source Connector,数据表有 4 个字段列,咱们只需求 2 个字段,所以装备数据投影。

在 Transform Connector 装备中咱们进一步进行数据处理,希望将 c_name 字段中 customer前缀去除,保存数字部分一起导入数据字段称号跟方针表称号表不一致,需求经过 sql 重命名。

最后装备 Sink Connector,装备方针数据源的链接信息,指定 Stream Load 数据导入的文件格局为 Json。

在 env 里边对使命参数进行指定,如使命的全体并行度,当然也能够在 connector 的装备里边独自指定并行度。

终究导入到方针表的 customer_2 如下图:

SeaTunnel StarRocks 连接器的使用及原理介绍

连接器后续规划

至此,咱们能够看到,SeaTunnel 的基本数据同步功用现已十分完善了,但一些数据同步场景对数据可靠性有着更高的要求,在sink侧需求有仅一次和至少一次的语义支撑,这两点现已在社区的支撑方案中了。

其间关于 EOS 语义,StarRocks 2.4 版本供给了 Stream Load 事务接口,为完结高效导入一起统筹 EOS 供给了完结的基础;

别的,社区还方案在 Source 和 Sink 连接器中支撑更多的数据类型映射,如 bitmap、hill、array 等,丰厚连接器的功用。

本文由博客一文多发渠道 OpenWrite 发布!