作者:周波,阿里云智能高级开发工程师, Apache RocketMQ Committer

01 从问题中来的 RocketMQConnect

在电商体系、金融体系及物流体系,咱们常常能够看到 RocketMQ 的身影。原因不难理解,跟着数字化转型规模的扩展及进程的加快,事务体系的数据也在每日暴增,此刻为了确保体系的安稳运转,就需求把运转压力分担出去。RocketMQ 就担任着这样的人物,它的异步音讯处理与高并发读写才干,决定了体系底层的重构不会影响上层应用的功用。而 RocketMQ 的另一个优势——可弹性才干,使体系在面临流量的不确认性时,完成对流量的缓冲处理。此外,RocketMQ 的次序设计特性使其成为一个天然的排队引擎,例如,三个应用一起对一个后台引擎建议请求,确保不引起“撞车”事故。因此,RocketMQ 被用在异步解耦、削峰填谷以及事务音讯等场景中。

但是,数字化转型浪潮也带来了更多用户对数据价值的重视——怎么让数据产生更大运用价值?RocketMQ 本身不具有数据剖析才干,但是有不少用户期望从 RocketMQ Topic 中获取数据并进行在线或离线的数据剖析。但是,运用市面上的数据集成或数据同步东西,将 RocketMQ Topic 数据同步到一些剖析体系中虽然是一种可行计划,却会引进新的组件,造成数据同步的链路较长,时延相对较高,用户体验欠安。

举个例子,假设事务场景中运用 OceanBase 作为数据存储,一起期望将这些数据同步到 Elasticsearch 进行全文查找,有两种可行的数据同步计划。

计划一: 从 OceanBase 中获取数据,写入 Elasticsearch 组件并进行数据同步,在数据源较少时此计划没什么问题,一旦数据增多,开发和保护都非常杂乱,此刻就要用到第二种计划。

基于 RocketMQ Connect 构建数据流转处理平台

计划二: 引进音讯中心件对上下流进行解藕,这能处理第一种计划的问题,但是一些较为杂乱的问题还没有彻底处理。比方,怎么将数据从源数据同步到方针体系并确保高功用,假如确保同步使命的部分节点挂掉,数据同步依然正常进行,节点恢复依然能够断点续传,一起跟着数据管道的增多,怎么办理数据管道也变得十分困难。

基于 RocketMQ Connect 构建数据流转处理平台

总的来说,数据集成过程中的应战首要有五个。

应战一: 数据源多,市面上可能有上百个数据源,且各数据源的体系差异较大,完成恣意数据源之间的数据同步工作量较大,研发周期很长。

应战二: 高功用问题,怎么高效地从源数据体系同步到目的数据体系,并确保其功用。

应战三: 高可用问题,即 Failover 才干,当一个节点挂掉是否这个节点的使命就停止了,使命重新发动是否还能够断点续传。

应战四: 弹性扩缩容才干,根据体系流量动态添加或减少节点数量,既能经过扩容满意高峰期事务,也能在低峰期减缩节点,节约本钱。

应战五: 数据管道的办理运维,跟着数据管道的增多,运维监控的数据管道也会变得越来越杂乱,怎么高效办理监控很多的同步使命。

面临上述应战 RocketMQ 怎么处理?

第一,规范化数据集成 API (Open Messaging Connect API)。在 RocketMQ 生态中添加 Connect 组件,一方面临数据集成过程笼统,笼统规范的数据格式以及描绘数据的 Schema,另一方面临同步使命进行笼统,使命的创立、分片都笼统成一套规范化的流程。

第二,根据规范的 API 完成 Connect Runtime。Runtime 供给了集群办理、装备办理、位点办理、负载均衡相关的才干,拥有了这些才干,开发者或许用户就只需求重视数据怎么获取或怎么写入,然后快速构建数据生态,如与OceanBase、MySQL、Elasticsearc 等快速建立衔接,搭建数据集成渠道。整个数据集成渠道的构建也非常简略,经过 Runtime 供给的 RESTFull API 进行简略调用即可。

第三,供给完善的运维东西,便利办理同步使命,一起供给丰富的 Metrics 信息,便利检查同步使命的 TPS、流量等信息。

02 RocketMQConnect 两大运用场景

这里为大家整理了 RocketMQ Connect 的两大运用场景。

场景一,RocketMQ 作为中心媒介,能够将上下流数据打通,比方在新旧体系搬迁的过程中,假如在事务量不大时运用 MySQL 就能够满意事务需求,而跟着事务的添加,MySQL 功用无法满意事务要求时,需求对体系进行晋级,选用分布式数据库OceanBase 提高体系功用。

怎么将旧体系数据无缝搬迁到 OceanBase 中呢?在这个场景中 RocketMQ Connect 就能够发挥作用,RocketMQ Connect 能够构建一个从 MySQL 到 OceanBase 的数据管道,完成数据的滑润搬迁。RocketMQ Connect 还能够用在搭建数据湖、查找引擎、ETL 渠道等场景。例如将各个数据源的数据集成到 RocketMQ Topic傍边,方针存储只需求对接 Elasticsearch 就能够构建一个查找渠道,方针存储假如是数据湖就能够构建一个数据湖渠道。

除此之外,RocketMQ 本身也能够作为一个数据源,将一个 RocketMQ 集群的数据同步到另一个集群,能够构建 RocketMQ 多活容灾才干,这是社区正在孵化的Replicator能够完成的才干。

基于 RocketMQ Connect 构建数据流转处理平台

场景二,RocketMQ 作为端点。 RocketMQ 的生态中供给了流核算才干组件 -RocketMQ Streams,Connector 将各个存储体系的数据集成到 RocketMQ Topic 傍边,下流运用 RocketMQ Streams 流核算的才干就能够构建一个实时的流核算渠道。当然也能够配合事务体系的 Service 完成事务体系快速从其它存储一致快速获取数据的才干。

基于 RocketMQ Connect 构建数据流转处理平台

还能够将 RocketMQ 作为端点的上游,将事务音讯发到 Topic 中,运用 Connector 对数据做持久化或转存的操作。

基于 RocketMQ Connect 构建数据流转处理平台

如此一来,RocketMQ 就具有数据集成才干,能够完成恣意恣意异构数据源之间的数据同步,一起也具有一致的集群办理、监控才干及装备化搭建数据管道搭建才干,开发者或许用户只需求专注于数据拷贝,简略装备就能够得到一个具有装备化、低代码、低延时、高可用,支撑毛病处理和动态扩缩容数据集成渠道。

那么, RocketMQ Connect 是怎么完成的呢?

03 RocketMQConnect 完成原理

在介绍完成原理前,先来了解两个概念。

概念一,什么是 Connector(衔接器)? 它界说数据从哪仿制到哪,是从源数据体系读取数据写入RocketMQ,这种是 SourceConnector,或从 RocketMQ 读数据写入到方针体系,这种是 SinkConnector。Connector 决定需求创立使命的数量,从 Worker 接收装备传递给使命。

基于 RocketMQ Connect 构建数据流转处理平台

概念二,什么是 Task ? Task 是 Connector 使命分片的最小分配单位,是实践将源数据源数据仿制到 RocketMQ(SourceTask),或许将数据从 RocketMQ 读出写入到方针体系(SinkTask)真正的执行者,Task 是无状态的,能够动态的启停使命,多个 Task 能够并行执行,Connector 仿制数据的并行度首要体现在 Task 上。一个 Task 使命能够理解为一个线程,多个 Task 则以多线程的办法运转。

基于 RocketMQ Connect 构建数据流转处理平台

经过 Connect 的 API 也能够看到 Connector 和 Task 各自的责任,Connector 完成时就已经确认数据仿制的流向,Connector 接收数据源相关的装备,taskClass 获取需求创立的使命类型,经过 taskConfigs 的数量确认使命数量,而且为 Task 分配好装备。Task 拿到装备以后数据源建立衔接并获取数据写入到方针存储。经过下面的两张图能够清楚的看到,Connector 和 Task 处理根本流程。

基于 RocketMQ Connect 构建数据流转处理平台

一个 RocketMQ Connect 集群中会有多个 Connector ,每个 Connector 会对应一个或多个 Task,这些使命运转在 Worker(进程)中。Worker 进程是 Connector 和 Task 运转环境,它供给 RESTFull 才干,接收 HTTP 请求,将获取到的装备传递给 Connector 和Task,它还负责发动 Connector 和 Task,保存 Connector 装备信息,保存 Task 同步数据的位点信息,除此以外,Worker 还供给负载均衡才干,Connect 集群高可用、扩缩容、毛病处理首要依靠 Worker 的负责均衡才干完成的。Worker 供给服务的流程如下:

基于 RocketMQ Connect 构建数据流转处理平台

Worker 供给的服务发现及负载均衡的完成原理如下:

服务发现:

基于 RocketMQ Connect 构建数据流转处理平台

用过 RocketMQ 的开发者应该知道,它的运用很简略,便是发送和接收音讯。消费方式分为集群方式和广播方式两种,集群消费方式下一个 Topic 能够有多个 Consumer 消费音讯,恣意一个 Consumer 的上线或下线 RocketMQ 服务端都有感知,而且还能够将客户端上下线信息通知给其它节点,运用 RocketMQ 这个特性就完成了 Worker 的服务发现。

装备/Offset同步:

基于 RocketMQ Connect 构建数据流转处理平台

Connector 的装备 /Offset 信息同步经过每个 Worker 订阅相同的 Topic,不同 Worker 运用不同的 Consumer Group 完成的, Worker 节点能够经过这种办法消费到相同Topic的所有数据,即 Connector 装备 /Offset 信息,这类似于广播消费方式,这种数据同步方式能够确保任何一个 Worker 挂掉,该Worker上的使命仍旧能够在存活的 Worker 正常拉起运转 ,而且能够获取到使命对应的 Offset 信息完成断点续传, 这是毛病搬运以及高可用才干的基础。

负载均衡:

RocketMQ 消费场景中,消费客户端 与Topic Queue 之间有负载均衡才干,Connector 在这一部分也是类似的,只不过它负载均衡的对象不一样,Connector 是 Worker 节点和 Task 之间的负载均衡,与 RocketMQ 客户端负载均衡一样,能够根据运用场景挑选不同负载均衡算法。

基于 RocketMQ Connect 构建数据流转处理平台

上文提到过 RocketMQ Connect 供给 RESTFull API才干。经过 RESTFull AP 能够创立 Connector,办理 Connector 以及检查 Connector 状态,简略列举:

  • POST /connectors/{connector name}
  • GET /connectors/{connector name}/config
  • GET /connectors/{connector name}/status
  • POST /connectors/{connector name}/stop

目前 Connector 支撑单机、集群两种布置方式。集群方式至少要有两个节点,才干确保它的高可用。而且集群能够动态添加或许减少,做到了动态控制提高集群功用和节约本钱节约的才干。单机方式更多便利了开发者开发测验 Connector 。

怎么怎么完成一个 Connector 呢? 还是结合一个具体的场景看一看,例如事务数据当时是写入 MySQL 数据库中的,期望将 MySQL 中数据实时同步到数据湖 Hudi 傍边。只要完成 MySQL Source Connector 、Hudi Sink Connector 这两个 Connector 即可。

基于 RocketMQ Connect 构建数据流转处理平台

下面就以 MySQLSource Connector 为例,来看一下具体的怎么完成。

完成 Connector 最首要的便是完成两个 API 。第一个是 Connector API ,除了完成它生命周期相关的 API 外,还有使命怎么分配,是经过 Topic、Table 还是经过数据库的维度去分。第二个API 是需求创立的 Task,Connector 经过使命分配将相关的装备信息传递给 Task, Task 拿到这些信息,例如数据库账号,暗码,IP,端口后就会创立数据库衔接,再经过 MySQL 供给的 BINLOG 机智获取到表的数据,将这些数据写到一个阻塞队列中。Task 有个 Poll 办法,完成 Connector 时只要调用到 Poll 办法时能够获取到数据即可,这样 Connector 就根本写完了。然后打包以 Jar 包的方式供给出来,将它加载到 Worker 的节点中。

基于 RocketMQ Connect 构建数据流转处理平台

创立 Connector 使命后, Worker 中会创立一个或许多个线程,不停的轮询 Poll 办法,然后获取到 MySQL 表中的数据,再经过 RocketMQ Producer 发送到 RocketMQ Broker中,这便是 Connector 从完成到运转的全体过程(见下图)。

04

基于 RocketMQ Connect 构建数据流转处理平台

04 RocketMQConnect 现状与未来

RocketMQ Connect 的开展历程分为三个阶段。

第一阶段:Preview 阶段

RocketMQ Connect 开展的初期也即 Preview 阶段,完成了 Open Messaging Connect API 1.0 版别,根据该版别完成了 RocketMQ Connect Runtime ,一起供给了 10+ Connector 完成(MySQL,Redis,Kafka,Jms,MongoDB……)。在该阶段,RocketMQ Connect 能够简略完成端到端的数据源同步,但功用还不够完善,不支撑数据转化,序列化等才干,生态相对还比较匮乏。

第二阶段:1.0 阶段

在 1.0 阶段,Open Messaging Connect API 进行了晋级,支撑 Schema、Transform,Converter 等才干,在此基础上对 Connect Runtime 也进行了严重晋级,对数据转化,序列化做了支撑,杂乱Schema也做了完善的支撑。该阶段的 API、Runtime 才干已经根本完善,在此基础上,还有30+ Connecotor 完成,覆盖了 CDC、JDBC、SFTP、NoSQL、缓存 Redis、HTTP、AMQP、JMS、数据湖、实时数仓、Replicator、等 Connector 完成,还做了 Kafka Connector Adaptor 能够运转 Kafka 生态的 Connector。

第三阶段:2.0 阶段

RocketMQ Connect 当时处于这个阶段,要点开展 Connector 生态,当 RocketMQ 的 Connector 生态达到 100 + 时,RocketMQ 根本上能够与恣意的一个数据体系去做衔接。

目前 RocketMQ 社区正在和 OceanBase 社区合作,进行 OceanBase 到 RocketMQ Connect 的研发工作,供给 JDBC 和 CDC 两种方式接入方式,后续会在社区中发布,欢迎感兴趣的同学试用。

05 总结

RocketMQ 是一个可靠的数据集成组件,具有分布式、弹性性、毛病容错等才干,能够完成 RocketMQ 与其他数据体系之间的数据流入与流出。经过 RocketMQ Connect 能够完成 CDC,构建数据湖,结合流核算可完成数据价值。

参加 Apache RocketMQ 社区

十年铸剑,Apache RocketMQ 的生长离不开全球 800+ 位开发者的积极参与贡献,信任在下个版别你便是 Apache RocketMQ 的贡献者,在社区不仅能够结识社区大牛,提高技术水平,也能够提高个人影响力,促进本身生长。

社区 5.x 版别正在进行着如火如荼的开发,以及 30 +个 SIG(兴趣小组)等你参加,欢迎立志打造世界级分布式体系的同学经过以下办法参加社区: