0 前言

DTS是数据传输渠道(Data Transfer Platform的缩写)

跟着得物App的用户流量增长,事务挑选的数据库越来越多样化,异构数据源之间的数据同步需求也逐步增多。为了控制本钱并更好地支撑事务发展,咱们决定自建DTS渠道。本文首要从技能选型、才能支撑与演化的角度动身,分享了在DTS渠道升级过程中取得的经验,并供给一些参考。

1 技能选型

DTS的首要方针是支撑不同类型的数据源之间的数据交互,包含关系型数据库(RDBMS)、NoSQL数据库、OLAP等,一起整合了数据库装备办理、数据订阅、数据同步、数据搬迁、DRC双活数据同步支撑、数据巡检、监控报警、一致权限等多个模块,以构建安全、可扩展、高可用的数据架构渠道。

1.1 才能对比

得物自建DTS平台的技术演进 | 精选

1.2DTS 1.0 – 以 canal/otter/datax 作为履行引擎

得物自建DTS平台的技术演进 | 精选

1.3 为什么要切换到Flink?

为了支撑多种读端数据源和写端数据源,需求一个一致数据处理结构,以削减重复组件和进步开发功率。一起数据源类型和组件的保护难度与杂乱度呈线性增长,现有的组件需求一致保护到一个项目中。

Canal和Otter等组件的社区活泼度低,很长时刻没有得到保护更新。因而,需求挑选一个新的、活泼的结构。此外,现有组件也无法有效支撑全量+增量一体化的操作。

因而,运用一个一致的数据处理结构,能够一起支撑多种读端数据源和写端数据源,以及全量+增量一体化的功用,是必要的。这样能够下降组件的保护难度和杂乱度,进步开发功率。

经过DTS 2.0,咱们期望将canal/otter/datax演化为一个使命履行结构+办理渠道,能够为后续很多数据源迭代提速。

1.4 DTS 2.0 以Flink作为履行引擎

现有的开发流程:

  • 一致的使命履行结构,集成flink并引进connectors根据装备组装出具体的DTS使命

  • 保护并研制新的 connector

当咱们需求支撑新的数据源, 首先将数据源相关插件保护在connector中,接着在履行结构中引进需求的组件,其间存在很多的可复用的功用,这样就做到了connector及功用组件复用的效果。

2 DTS 现有才能

得物自建DTS平台的技术演进 | 精选

3 咱们做了什么?

3.1DTS Connectors结构 – 数据源支撑提速

在Flink CDC基础上完成的全量/增量使命同步结构,根本的架构如下

得物自建DTS平台的技术演进 | 精选

其间Connector中别离完成了Flink供给的SourceFunction和SinkFunction函数,别离担任从读端读取数据,往写端写入数据,因而一个Connector可一起存在于上游或许下游。

使命的发动流程:****
– 指定使命Json装备, 根据类型加载SourceFunction和SinkFunction构建通用才能函数并发动

a. 使命的Main函数如下所示, 根据如下的Json文件加载到对应的Connector中的SourceFactory或许SinkFactory来结构对应的DataStream。

DataStream是Flink中供给的数据流操作类

public class Main {    public static void main(String[] args) throws Exception {
        // 解析参数        ParameterTool parameterTool = ParameterTool.fromArgs(args);        String[] parsedArgs = parseArgs(parameterTool);
        Options options = new OptionParser(parsedArgs).getOptions();        options.setJobName(options.getJobName());
        // 履行使命        StreamExecutionEnvironment environment =                EnvFactory.createStreamExecutionEnvironment(options);        exeJob(environment, options);    }

使命Json装备:


{  "job":{    "content":{      "reader":{        "name":"binlogreader",        "parameter":{          "accessKey":"",          "binlogOssApiUrl":"",          "delayBetweenRestartAttempts":2000,          "fetchSize":1,          "instanceId":"",          "rdsPlatform":"",          "restartAttempts":5,          "secretKey":"",          "serverTimezone":"",          "splitSize":1024,          "startupMode":"LATEST_OFFSET"        }      },      "writer":{        "name":"jdbcwriter",        "parameter":{          "batchSize":10000,          "concurrentWrite":true,          ],          "dryRun":false,          "dumpCommitData":false,          "errorRecord":0,          "flushIntervalMills":30000,          "poolSize":10,          "retries":3,          "smallBatchSize":200        }      }    },
  }}

b. 咱们供给了两个笼统工厂类,SourceFactory, SinkFactory, 其间的createSource, createSink便是子工厂需求完成的办法,不同的数据源完成不同。


public abstract class SourceFactory<T> {    public abstract DataStream<T> createSource();}public abstract class SinkFactory<T> {    public abstract void createSink(DataStream<T> rowData) throws Exception;}

c.接下来,咱们只需求完成对应的子工厂办法就可以了

public class BinlogSourceFactory extends AbstractJdbcSourceFactory {    @Override    public DataStream<TableRowData> createSource() {
        List<String> tables = this.binlogSourceConf.getConnection().getTable();        Set<String> databaseList = new HashSet<>(2);
        // 运用对应的Connector构建DataStream    }}

d.通用才能函数:RateLimitFunction, BinlogPositionFunction 其间别离完成了对应的使命才能,例如限流,使命位点保存等。


public class RateLimiterMapFunction<T> extends RichMapFunction<T, T> {
    private transient FlinkConnectorRateLimiter rateLimiter;
    @Override    public T map(T value) throws Exception {        if (rateLimiterEnabled) {            rateLimiter.acquire(1);        }        return value;    }

当使命所需的函数都创立完成后,使命就真实开始运行了。

收益:

运用一套封装完善且易扩展的结构能够进步开发功率并下降后续代码的保护本钱。比较于DTS1.0、Canal和Otter等项目,该项目的保护本钱大大下降,一起供给了更好的扩展性,使得咱们能够在短期内支撑PostgreSQL、MongoDB、Hbase、StarRocks等不同的数据源。

3.2RDS日志获取

DTS经过供给增量和全量同步才能为事务供给数据同步功用,但在增量订阅/同步使命履行过程中,可能会遇到一些异常情况。其间,以下三种情况需求特别处理:

  • Binlog可用性

云厂商的数据库实例本地binlog有效期8小时,过期部分进行OSS备份。MySQL事务高峰期或许DDL变产生很多的binlog, DTS使命测验获取过期数据失利,使命因而中断。因而,DTS支撑了本地binlog+OSS备份binlog的获取及切换,保障日志可用性。

  • 数据库 实例主从切换

RDS常常会产生主备节点切换,在切换的过程中要确保数据不丢。因为切换前后两个数据库实例 Binlog 文件一般都是不一致的,此时使命位点记载方法是 BinlogPosition 方法,则在切换之后使命需求主动进行 Binlog 对齐操作,进而确保数据的完整性。将新数据实例上的位点查询时刻戳提早1-2分钟即可。

  • 读实例订阅支撑

DTS使命binlog dump连接数过多形成主库压力及影响DDL改变,因而需求支撑读库订阅。云厂商的读库不供给备份,在读库日志过期时需求切换到主库进行读取。

3.3 全量增量一体化功用

得物自建DTS平台的技术演进 | 精选

全量增量一体化是指先同步存量数据,待存量完毕之后再开始同步增量数据。其间也加入了增量阶段的OSS备份日志获取。但存量阶段仍然存在一些问题,需求进一步改造优化。

全量方法下新增表先进行存量数据同步再进行增量数据同步,该使命中已存在的表会因而导致数据推迟。待新增表数据同步完成,使命推迟则会恢复正常。

3.4 数据源接入- starrocks, postgres等

支撑从mysql同步到starrocks和postgres, 在使命履行结构的基础上,只需求开发starrocks-connector, postgres connector支撑对应的数据源即可。其间的其他才能,像多表同步、分库分表等场景都可以到达复用的效果。

3.5 JBDC写入改造

脚本扩展和动态表名路由:

得物自建DTS平台的技术演进 | 精选

数据合并和多线程写入:

得物自建DTS平台的技术演进 | 精选

3.6 监控告警

DTS使命需求采集flink使命指标,首要包含使命推迟、各个算子阶段的写入速率,算子被压及运用率等。其间 使命推迟需求接入告警服务,所以咱们挑选了引进redis来缓存使命的推迟时刻,再上报到告警服务来完成飞书的消息和电话告警。

4 最佳实践

4.1 0000-00-00 00:00:00时刻戳的问题

MySQL的时刻戳允许为0000-00-00 00:00:00, 在Flink使命中通常会被转换为null, 导致写入下游数据源失利, 因而需求做特殊符号对于不同的数据源做不同的转化确保写入的正切行。

4.2 Flink CDC 使命 serverId仅有性

Flink CDC source 会伪装成 MySQL slave节点,为了确保数据的准确性,每个slave必须具有仅有的serverId来符号该slave的仅有性。因而在flink cdc的使命中咱们为每一个使命分配了一个仅有的serverId区间(范围区间是为了支撑多并行度)。

4.3 Flink使命数据序列化瓶颈

在flink使命中运用DataStreamAPI并运用比较杂乱的数据结构进行传输时,算子之间的序列化本钱较高,两个方向,一是树立更为高效的数据结构进行传输,二是敞开flink对象复用,并尽可能削减不同并行度之间的数据传输。

5 未来演进

DTS作为一个数据同步渠道首要功用是尽可能供给高效的数据源同步功用,助力于多变的事务场景。

5.1 基于Flink SQL的ETL使命办理

流式数据处理除了现有的DataStream API还存在SQL的方法,SQL作为一种通用的语言,对于数据相关的事务同学极大的下降了学习本钱。而经过Flink SQL可以做到的ETL流式数据加工也能处理一些杂乱事务场景的处理逻辑,将事务逻辑转化为DAG的流式处理图,经过拖拽的方法也能方便运用,FLINK SQL的演进方向能够和现有的Flink DataStream API互补。

使用场景:ETL强大的流式数据转换处理才能大幅提升数据集成功率,也能建实时报表体系,进步分析功率,一起也可以使用于一些实时大屏的场景。

5.2 一致技能栈

将现有的DTS才能都搬迁到Flink渠道上,坚持一致的技能栈,能够极大的下降保护本钱。现有留传的双向同步、数据比对等才能需求做进一步的改造和搬迁,符合全体技能收敛的趋势。

6 总结

本文首要分享了以下几个方面:Flink比较现有的技能栈带来的收益,切换到Flink以后的迭代方向及架构功用上的改变、带来新的问题怎么处理,以及未来的一些迭代方向,期望能让大家有所收成。

*文/风子

本文属得物技能原创,更多精彩文章请看:得物技能官网

未经得物技能答应严禁转载,否则依法追究法律责任!