前语

2006 年诞生的 hadoop 和 她周边的生态, 在曩昔的这些年里为大数据的炽热提供了满足的能量, 十几年曩昔了, 场景在改变, 技能在演化, 咱们对数据的认知现已不再局限于T+1 与 高吞吐高延迟 为首要特征的上一代结构理念, 在实在的场景里,实时, 准确, 多变 的数据也发挥着越来越重要的作用

为满意这些新的需求, 各种结构和中间件如漫山遍野般不断涌出

hive 的呈现让这头大象有了一个精致但呆滞的面庞, hbase 与 impala 开端测验将其提速, spark/flink 作为新的流处理结构, 测验经过实时核算的方法, 将数据更快地输送到事务方面前, presto/dremio 从数据模型入手, 测验经过虚拟化实时调集来自不同数据源的数据, 变相到达实时的目的, 而各种新型的 OLAP 数据库, 以 clickhouse 为代表, 企图提供近实时的海量数据统计分析计划, 在不同的细分范畴, 比方 时序/特征 等范畴, 也各自呈现了赋有特征的产品出来

实时数据引擎系列 (一): 新鲜的数据流

与传统的商业软件发展方法不同, 这个实时数据相关的赛道中, 开源现已逐步成为不谋而合的选择,talk is cheap, show me the code, 咱们各凭本事说话

而基础结构就像是心爱的姑娘, 每个人都觉得自己的才是最好的, TAPDATA 在实时数据计划的落地过程中, 也逐步感觉到了现有的各种技能产品总是在什么地方差点东西, 一个个场景做下来, 一个个客户谈下来, 去完结一个归于自己的流核算结构的主意在脑海中越来越明确。

实时数据引擎系列 (一): 新鲜的数据流

在给客户发生直接价值的同时, 把这些经历累积起来, 去做一个能够影响更多人的技能产品, 可能是一件更有意思的工作

为此, 我前几天登录了好久没用的知乎账号, 在这个人均百万的平台下, 开端了这个系列的分享, 去把 TAPDATA 关于实时核算引擎的一些思考整理成文字, 咱们看了假如觉得有用, 能够默默保藏, 假如觉得哪里写得不对, 能够评论或许私信我, 假如觉得这个东西方向有问题, 或许说便是一些没有价值的垃圾, 也欢迎提醒我, 咱们共同前进

新鲜的, 才是最好的

完结一个实时的数据核算, 第一步是数据来源怎么取得, 根据 JDBC 或许各个数据库驱动的 Query, 能够很便利拿到批量的数据, 可是更实时的数据拿起来, 就不是那么的清楚明了和规范化

实时数据的获取, 有一个名词叫 CDC, 全称是change data capture, 能够想见一个场景假如有一个专门的名词缩写来描绘, 一般都不会很简单

CDC 的完结一般有以下几种方法:

轮询

最直接的主意是经过 Query, 定期轮询最新的数据, 这么做的好处是简直悉数的数据库都能够直接支撑, 开发起来本钱也低, 可是问题也很显着, 首要有:

  1. 轮询需求有条件, 这个条件一般是递加字段, 或许时间属性, 对事务上有侵略
  2. 最小延时 为轮询距离
  3. 轮询对数据库造成了额外的查询压力
  4. 最丧命的是, 轮询无法获取被删去的数据, 也无法得知更新的数据更新了哪些内容, 这些尽管在工程上能够经过各种手段去找一个折衷计划, 但终究会存在各式各样的问题

因为完结简单, 轮询是最早也是现在最广泛被使用于实践场景的计划, 可是也因为缺点许多, 在最近呈现的各种核算结构中, 轮询一般作为保底而不是首选计划呈现

触发器

不少数据库都有触发器(Trigger) 的规划, 在对数据行列进行读写时, 能够触发一个存储过程, 完结一系列的操作, 根据这个前提, 能够对数据库的写操作编写一个自定义触发器, 完结数据获取, 常见的计划有:

  1. 数据触发保存到独自的一张表, 典型的产品化完结有 SQL Server, 其他的数据库也能够自己完结相似的逻辑, 然后经过轮询这张表取得改变
  2. 数据触发到外部音讯行列, 顾客经过音讯行列获取数据
  3. 经过 api 直接发送到方针端

实时数据引擎系列 (一): 新鲜的数据流

比较轮询, 触发器能够更全面地获取更具体的实时数据, 不过问题也有许多, 首要是的问题有:

  1. 没有规范: 用户需求根据每种数据库的触发器去规划自己的数据获取计划
  2. 通用性不够: 部分数据库没有触发器规划
  3. 影响性能: 触发器在数据写入的时候, 在数据处理逻辑里增加了一段逻辑, 尽管有些触发器的规划是异步的, 不影响延时, 可是因为占用了数据库自身的核算资源, 对吞吐有一些影响

比较轮询, 触发器子计划在延时和数据准确性上有了一些打破, 是一种计划的前进

数据库日志

绝大数据数据库都有各式各样的日志, 其间一种日志用来记载每个操作发生的数据改变, 许多数据库都用这份日志来做多副本同步, 或许用来做数据恢复

而外部服务也能够经过这种方法拿到最新的实时改变, 比较轮询, 经过日志拿到的数据延时一般在亚秒内, 并且对数据库的性能影响非常低, 同时支撑的数据库类型比较触发器更多, 只要存在副本, 就存在相似的日志规划

实时数据引擎系列 (一): 新鲜的数据流

因为根据数据库日志的计划具备其他两种计划不行比拟的优势, 现已逐步成为实时核算结构首选的数据获取计划, 可是这种计划因为使用了数据库内部的规划,开发难度和完结本钱是最高的, 这个也约束了计划的使用

音讯行列

除此之外, 还有一些来自使用的音讯, 或许一些其他的事务自定义数据, 大多数都经过各种音讯行列来中转, 典型的有 kafka 和 各种名字的 MQ, 因为更多是事务定制在里面, 这儿各家都有各家的场景, 一致来做是比较困难的

数据库日志的难题

在之前提到的各种 CDC 计划中, 数据库日志具有非常显着的成果优势, 可是因为开发困难, 现在使用规模也不是特别广泛, 数据库日志计划的问题首要有以下几种

数据库品种繁多

数据库日志归于数据库内部完结逻辑, 除了特意为兼容去规划之外, 很少有相同或许相似的对外接口, 不管是从 API, 仍是日志格式上来说, 基本是各家有各家的做法, 对流核算结构来说, 适配起来要一个个做, 没有捷径能够走, 本钱很高

实时数据引擎系列 (一): 新鲜的数据流

当前市面上用的比较多的数据库少说有几十种, 假如想掩盖全, 大概有两百种左右的适配工作量, 放眼看去现在并没有哪个开源或许闭源的计划, 在这方面做得比较全面, 除了开源数据库之外, 还有一些商业数据库, 比方 db2, gaussdb, hana, 文档的缺失, 开源计划的缺失, 导致这些计划完结起来很麻烦

不兼容的版别

即使是同一种数据库, 不同的版别之间也往往有不兼容的情况, 极少有数据库能够在一个副本内运行不同的大版别, 比方 oracle 的 8 到 20 之间的版别, mongodb 的 2 到 5 之间的版别, 会存在许多细节和规划的不同

实时数据引擎系列 (一): 新鲜的数据流

数据库品种现已许多, 加上版别的不兼容, 要完好处理这些场景, 适配的数量一会儿增加到五百种以上, 困难成倍提高

布置架构多种多样

第三种多样性来自于布置架构, 即使是同一个数据库的同一个版别, 也存在各式各样的布置架构, 比方对 Mysql, 有包含 PXC, Myshard, Mycat 在内的各种集群计划, PG 也有 GP, XL, XC, Citus 在内的各种计划, oracle 有 DG, RAC, mongodb 有 副本, 分片

这些多样性与前几种相互组合, 最终的完好的工作量现已到达简直人力不行为的程度

不规范的格式

假如说多样性只是工作量上的问题, 数据库日志的一些规划, 则从理念上造成了一些困难

因为数据库的日志更多是为了主从同步规划, 首要是确保数据的最终一致, 这个与实时核算的场景需求存在一些差异, 比方咱们以 MongoDB 的一个删去日志来做示例

rs0:PRIMARY>usemockswitchedtodbmockrs0:PRIMARY>db.t.insert({a:1,b:1}) WriteResult({"nInserted" : 1 }) rs0:PRIMARY>db.t.remove({}) WriteResult({"nRemoved" : 1 }) rs0:PRIMARY>uselocalswitchedtodblocalrs0:PRIMARY>db.oplog.rs.find({ns:"mock.t"}).pretty() {"op" : "i", "ns" : "mock.t", "ui" : UUID("9bf0197e-0e59-45d6-b5a1-21726c281afd"), "o" : { "_id" : ObjectId("610eba317d24f05b0e9fdb3b"), "a" : 1, "b" : 1 },"ts" : Timestamp(1628355121, 2), "t" : NumberLong(1), "wall" : ISODate("2021-08-07T16:52:01.890Z"), "v" : NumberLong(2) }{"op" : "d", "ns" : "mock.t", "ui" : UUID("9bf0197e-0e59-45d6-b5a1-21726c281afd"), "o" : { "_id" : ObjectId("610eba317d24f05b0e9fdb3b") },"ts" : Timestamp(1628355126, 1), "t" : NumberLong(1), "wall" : ISODate("2021-08-07T16:52:06.191Z"), "v" : NumberLong(2) }

仿制代码

刺进一条数据, 将其删去, 查询一下数据库日志, 重视删去那条记载, 里面只记载将主键删去的信息, 并无法得到原始字段的值

实时核算一个比较典型的场景是多表 JOIN, 假如咱们以 a 为字段进行 JOIN, 来自数据源为 MongoDB 的实时流因为无法拿到被删去的数据中 a 字段的值是多少, 这个会导致实时的 JOIN 无法获取最新的成果

为了完结完好的流核算的需求, 只确保数据同步一致性的日志是不满足的, 咱们往往需求完好的数据库改变数据

一些现存的处理计划

尽管数据库日志有着各式各样的问题, 可是因为其过于显着的优势, 越来越成为实时流结构的当红炸子鸡选型, 那上面的问题, 也逐步有了解法

针对完结工作量的问题, 现在呈现了三种流派 :

一个是专精派, 每个计划只处理一个数据库, 或许只专注处理一个数据库, 比方 oracle 的 ogg, mysql 的 canal, 都专注在自己的范畴去做到很高的深度

一个是容纳万象派, 典型的有 debezium, 经过插件的形式去兼容各个数据库的规范

最终一个是交融派, 他们自己不做完结, 只是将来自一和二的计划再经过一次笼统, 做成交融的一个处理计划(没错的, 说的便是github.com/ververica/f…\

而针对数据日志不规范的问题, 在技能上一般是经过一个完好数据的缓存层来完结日志的二次加工, 尽管在功能上完结了较好的弥补, 可是因为完好保存了数据, 资源消耗也比较高, 并且现在没有看到一致的产品呈现, 更多是停留在一些场景里做计划弥补

TAPDATA 的处理计划

在咱们的计划里, 是依照容纳万象 + 必要的数据缓存 结合的方法去处理的这个问题

比较与 debezium, 咱们在性能上做了很多的优化, 在解析速度上有数倍提高, 同时, 支撑的数据库品种现已扩展到三十种以上

对数据库日志不规范的问题, 也完结了必要的存储笼统, 一个典型的用法如下:

CacheConfigcacheConfig=TapCache.config("source-cache")..setSize("1g").setTtl("3d");DataSource<Record>source=TapSources.mongodb("mongodb-source").setHost("127.0.0.1").setPort(27017).setUser("root").setPassword("xxx").withCdc().formatCdc(cacheConfig).build()

仿制代码

来构建一个完好的实时数据流, 其间流出的数据, 包含了完好的 全量 + 增量数据, 并使用了内存缓存对增量日志做了规整化

对下游来讲, 这便是新鲜的, 实时的数据流了

留一个小问题

仔细的朋友现已现已发现了, 这儿的数据包含了全量与增量, 可是咱们的数据格式, 并没有像 flink 或许 hazelcast jet 这些通用的做法相同, 分成了BatchSource, Record, ChangeRecord这些类别, 是出于什么考虑呢?

重视 Tapdata 微信大众号, 带给你最新的实时核算引擎的思考。本文作者 Tapdata 技能合伙人肖贝贝,更多技能博客:tapdata.net/blog.html