作者:韩山杰

Databend Cloud 研发工程师

github.com/hantmac

使用轻量级 CDC debezium-server-databend 构建实时数据同步

简介

Debezium Server Databend 是一个根据 Debezium Engine 自研的轻量级 CDC 项目,用于实时捕获数据库更改并将其作为事情流传递最终将数据写入目标数据库 Databend。它供给了一种简略的方法来监督和捕获关系型数据库的变化,并支持将这些变化转换为可消费事情。

运用 Debezium server databend 实现 CDC 无须依靠大型的 Data Infra 比如 Flink, Kafka, Spark 等,只需一个发动脚本即可敞开实时数据同步

这篇教程将展示怎么根据 Debezium server databend 快速构建 MySQL 到 Databend 的实时数据同步。

假设咱们有电子商务业务,产品的数据存储在 MySQL ,咱们需求实时把它同步到 Databend 中。

接下来的内容将介绍怎么运用 Debezium server databend CDC 来实现这个需求,系统的全体架构如下图所示:

使用轻量级 CDC debezium-server-databend 构建实时数据同步

预备阶段

预备一台现已安装了 Docker ,docker-compose 以及 Java 11 环境 的 Linux 或许 MacOS 。

预备教程所需求的组件

接下来的教程将以 docker-compose 的方法预备所需求的组件。

debezium-MySQL

docker-compose.yaml

version: '2.1'
services:
  postgres:
    image: debezium/example-postgres:1.1
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_DB=postgres
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
  mysql:
    image: debezium/example-mysql:1.1
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw

Debezium Server Databend

  • Clone 项目: git clone ``https://github.com/databendcloud/debezium-server-databend.git

  • 从项目根目录开始:

    • 构建和打包 debezium server: mvn -Passembly -Dmaven.test.skip package
    • 构建完成后,解压服务器分发包: unzip debezium-server-databend-dist/target/debezium-server-databend-dist*.zip -d databendDist
    • 进入解压后的文件夹: cd databendDist
    • 创立 application.properties 文件并修正: nano conf/application.properties,将下面的 application.properties 复制进去,根据用户实际情况修正相应的装备。
    • 运用供给的脚本运行服务: bash run.sh
    • Debezium Server with Databend 将会发动

一起咱们也供给了相应的 Docker image,能够在容器中一键发动:

version: '2.1'
services:
  debezium:
    image: ghcr.io/databendcloud/debezium-server-databend:pr-2
    ports:
      - "8080:8080"
      - "8083:8083"
    volumes:
      - $PWD/conf:/app/conf
      - $PWD/data:/app/data

NOTE: 在容器中发动注意所连接数据库的网络。

Debezium Server Databend Application Properties

本文章运用下面供给的装备,更多的参数阐明以及装备能够参考文档。

debezium.sink.type=databend
debezium.sink.databend.upsert=true
debezium.sink.databend.upsert-keep-deletes=false
debezium.sink.databend.database.databaseName=debezium
debezium.sink.databend.database.url=jdbc:databend://tnf34b0rm--xxxxxx.default.databend.cn:443
debezium.sink.databend.database.username=cloudapp
debezium.sink.databend.database.password=password
debezium.sink.databend.database.primaryKey=id
debezium.sink.databend.database.tableName=products
debezium.sink.databend.database.param.ssl=true
# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json
# mysql source
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=60000
debezium.source.database.hostname=127.0.0.1
debezium.source.database.port=3306
debezium.source.database.user=root
debezium.source.database.password=123456
debezium.source.database.dbname=mydb
debezium.source.database.server.name=from_mysql
debezium.source.include.schema.changes=false
debezium.source.table.include.list=mydb.products
# debezium.source.database.ssl.mode=required
# Run without Kafka, use local file to store checkpoints
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=data/status.dat
# do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true
# ############ SET LOG LEVELS ############
quarkus.log.level=INFO
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN

预备数据

MySQL 数据库中预备数据

进入 MySQL 容器

docker-compose exec mysql mysql -uroot -p123456

创立数据库 mydb 和表 products,并刺进数据:

CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));
ALTER TABLE products AUTO_INCREMENT = 10;
INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"cloud","test for databend"),
(default,"spare tire","24 inch spare tire");

在 Databend 中创立 Database

使用轻量级 CDC debezium-server-databend 构建实时数据同步

NOTE: 用户能够不必先在 Databend 中创立表,系统检测到后会主动为用户建表。

发动 Debezium Server Databend

bash run.sh

使用轻量级 CDC debezium-server-databend 构建实时数据同步

首次发动会进入 init snapshot 形式,通过装备的 Batch Size 全量将 MySQL 中的数据同步到 Databend,所以在 Databend 中能够看到 MySQL 中的数据现已同步过来了:

使用轻量级 CDC debezium-server-databend 构建实时数据同步

同步 Insert 数据

咱们持续往 MySQL 中刺进 5 条数据:

INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer");

Debezium server databend 日志:

使用轻量级 CDC debezium-server-databend 构建实时数据同步

一起在 Databend 中能够查到 5 条数据现已同步过来了:

使用轻量级 CDC debezium-server-databend 构建实时数据同步

同步 Update 数据

装备文件中 debezium.sink.databend.upsert=true ,所以咱们也能够处理 Update/Delete 的事情。

在 MySQL 中更新 id=10 的数据:

update products set name="from debezium" where id=10;

在 Databend 中能够查到 id 为 10 的数据现已被更新:

使用轻量级 CDC debezium-server-databend 构建实时数据同步

同步 Delete 数据

在装备文件中,有以下的装备,既可敞开处理 Delete 事情的能力:

debezium.sink.databend.upsert-keep-deletes=false
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

Debezim Server 对 Delete 的处理比较复杂,在 DELETE 操作下会生成两条事情记录:

  1. 一个包含 “op”: “d”,其他的行数据以及字段;
  2. 一个tombstones记录,它具有与被删去行相同的键,但值为null。

这两条事情会一起宣布,在 Debezium Server Databend 中咱们挑选对 Delete 数据实行软删去,这就要求咱们在 target table 中具有 __deleted 字段,当 Delete 事情过来的时候咱们将该字段置为 TRUE 后刺进到目标表。

这样规划的好处是,有些用户想要保存这些数据,但可能未来会想到将其删去,这样就为用户供给了可选的方案,未来想要删去这些数据的时候,只需求 delete from table where __deleted=true 即可。

关于 Debezium 对删去事情的阐明以及处理方法,详情可参考文档。

在 MySQL 中删去 id=12 的数据:

delete from products where id=12;

在 Databend 中能够观察到 id=12 的值的 __deleted 字段现已被置为 true

环境整理

操作结束后,在 docker-compose.yml 文件地点的目录下履行如下指令停止所有容器:

docker-compose down

定论

以上就是根据轻量级 CDC debezium server databend 构建 MySQL 到 Databend 的 实时数据同步的悉数进程,这种方法不需求依靠 Flink, Kafka 等大型组件,发动和管理十分便利。