canal监听mysql实践

 canal是用java开发的根据数据库增量日志解析,供给增量数据订阅&消费的中心件。目前,canal首要支撑了MySQL的binlog解析,解析完成后才运用canal client 用来处理取得的相关数据。(数据库同步需求阿里的otter中心件,根据canal)。运用场景包含:

1.缓存更新

2.异步数据库或者同步到联系型数据库的中心媒介

canal介绍及作业原理

根据日志增量订阅&消费支撑的事务:

  1. 数据库镜像
  2. 数据库实时备份
  3. 多级索引 (卖家和买家各自分库索引)
  4. search build
  5. 事务cache改写
  6. 价格变化等重要事务音讯

这儿也介绍了事务cache改写和价格变化等重要数据改变音讯的监听。

Canal原理相对比较简单:

img

  1. canal模仿mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump恳求,开端推送binary log给slave(也便是canal)
  3. canal解析binary log目标(原始为byte流)

img

Canal架构及作业原理

  1. server 代表一个 canal 运转实例,对应于一个 jvm
  2. instance 对应于一个数据行列 (1个 canal server 对应 1..n 个 instance )
  3. instance 下的子模块
  4. eventParser: 数据源接入,模仿 slave 协议和 master 进行交互,协议解析
  5. eventSink: Parser 和 Store 链接器,进行数据过滤,加工,分发的作业
  6. eventStore: 数据存储
  7. metaManager: 增量订阅 & 消费信息管理器 img
  • EventSink起到一个类似channel的功用,能够对数据进行过滤、分发/路由(1:n)、归并(n:1)和加工。EventSink是衔接EventParser和EventStore的桥梁。
  • EventStore实现形式是内存形式,内存结构为环形行列,由三个指针(Put、Get和Ack)标识数据存储和读取的方位。
  • MetaManager是增量订阅&消费信息管理器,增量订阅和消费之间的协议包含get/ack/rollback,别离为:
  • Message getWithoutAck(int batchSize),答应指定batchSize,一次能够获取多条,每次回来的目标为Message,包含的内容为:batch id[唯一标识]和entries[详细的数据目标]
  • void rollback(long batchId),望文生义,回滚前次的get恳求,从头获取数据。根据get获取的batchId进行提交,防止误操作
  • void ack(long batchId),顾名思议,承认已经消费成功,告诉server删去数据。根据get获取的batchId进行提交,防止误操作

docker canal建立

先在Docker Hub中下载canal-server镜像

docker pull canal/canal-server:latest

先启动Canal,用于仿制properties装备文件

docker run -p 11111:11111 --name canal -d canal/canal-server:latest

初次启动Canal镜像后,将instance.properties文件仿制到宿主机,用于后续挂载运用

docker cp canal:/home/admin/canal-server/conf/example/instance.properties  /mydata/canal/conf/

修改instance.properties,该文件首要装备监听的mysql实例

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0# enable gtid use true/false 未开启gtid主从同步
canal.instance.gtidon=false# position info 在同一宿主机内 若有主从数据库,填写主数据库地址
canal.instance.master.address=172.17.0.1:3306
#需求读取的开端的binlog文件 不填写的话默许应该是从最新的Binlog开端监听
canal.instance.master.journal.name=
#需求读取的开端的binlog文件的偏移量
canal.instance.master.position=
#需求读取的开端的binlog的时刻戳
canal.instance.master.timestamp=
canal.instance.master.gtid=
​
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
​
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal# 从数据库地址 主备切换时运用
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex
canal.instance.filter.regex=.*\..*
# table black regex 不需求监听的名单
canal.instance.filter.black.regex=mysql\..*,sys\..*,performance_schema\..*,information_schema\..*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config 默许的sql存储行列
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\..*,.*\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\..*
#################################################

Canal为我们供给了canal.instance.filter.regex与canal.instance.filter.black.regex选项参数来过滤数据库表数据解析,类似是非名单。常见比如有: ●一切表:.* or ... ●canal schema下一切表:canal..* ●canal下的以canal打头的表:canal.canal.* ●canal schema下的一张表:canal.test1 ●多个规矩组合运用:canal..*,mysql.test1,mysql.test2 (逗号分隔)

修改canal.properties,该文件首要时装备canal server

#################################################
#########     destinations      #############
#################################################
##装备监听大都据实例的地方 单数据库监听的话这儿装备example就能够
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = falsecanal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ 挑选的消费行列
canal.serverMode = tcp

消费行列形式与Server-client形式共同,首要区别如下:

  • 不需求CanalServerWithNetty,改为CanalMQProducer投递音讯给音讯行列
  • 不运用CanalClient,改为MqClient获取音讯行列的音讯进行消费

这种形式比较于Server-client形式

  • 下游解耦,运用音讯行列的特性,能够支撑多个客户端广播消费、集群消费、重复消费等

  • 会添加体系的复杂度,添加一些推迟

image.png

#本地的instance.properties:容器的instance.properties  将容器的instance.properties装备文件挂载到宿主机,方便后续改变
docker stop canal;docker rm canal; 从头生成容器
docker run -p 11111:11111 --name canal -v /mydata/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties -d canal/canal-server:latest

image.png

检查消费实例example的日志能够看出canal监听的binlog方位正好是衔接时的binlog方位,前提是未指定了Binlog的方位。客户端开端衔接后便能够从指定方位开端消费增量的binlog。binlog-format=ROW # 挑选 ROW 形式

java客户端实例消费

1.引入pom文件

            <!--canal-->
            <dependency>
                <groupId>com.alibaba.otter</groupId>
                <artifactId>canal.client</artifactId>
                <version>1.1.5</version>
            </dependency>
​
            <!-- Message、CanalEntry.Entry等来自此安装包 -->
            <dependency>
                <groupId>com.alibaba.otter</groupId>
                <artifactId>canal.protocol</artifactId>
                <version>1.1.5</version>
            </dependency>

2.application.yml装备文件canal

canal:
  serverAddress: 42.192.183.193
  serverPort: 11111
  instance: #多个instance
    - example

对应的properties文件

@Component
@ConfigurationProperties(prefix = "canal")
@Data
public class CanalInstanceProperties {
​
    /**
     * canal server地址
     */
    private String serverAddress;
​
    /**
     * canal server端口
     */
    private Integer serverPort;
​
    /**
     * canal 监听实例
     */
    private Set<String> instance;
​
}

3.监听数据库变动代码

@Component
@Slf4j
public class MysqlDataListening {
​
    private static final ThreadFactory springThreadFactory = new CustomizableThreadFactory("canal-pool-");
​
    private static final ExecutorService executors = Executors.newFixedThreadPool(1, springThreadFactory);
​
    @Autowired
    private CanalInstanceProperties canalInstanceProperties;
​
​
    @PostConstruct
    private void startListening() {
        canalInstanceProperties.getInstance().forEach(
            instanceName -> {
                executors.submit(() -> {
                    connector(instanceName);
                });
            }
        );
    }
​
    /**
     * 消费canal的线程池
     */
    public void connector(String instance){
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(canalInstanceProperties.getServerAddress(), canalInstanceProperties.getServerPort()),
                instance, "", "");
        canalConnector.connect();
        //订阅一切音讯
        canalConnector.subscribe(".*\..*");
        // canalConnector.subscribe("test1.*"); 只订阅test1数据库下的一切表
        //恢复到之前同步的那个方位
        canalConnector.rollback();
​
        for(;;){
            //获取指定数量的数据,可是不做承认标记,下一次取还会取到这些信息。 注:不会堵塞,若不行100,则有多少回来多少
            Message message = canalConnector.getWithoutAck(100);
            //获取音讯id
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (size == 0 || batchId == -1) {
                try{
                    Thread.sleep(1000);
                } catch (InterruptedException ignored) {
                }
            }
            if(batchId != -1){
                log.info("instance -> {}, msgId -> {}", instance, batchId);
                printEnity(message.getEntries());
                //提交承认
                canalConnector.ack(batchId);
                //处理失败,回滚数据
                //canalConnector.rollback(batchId);
            }
        }
    }
​
    private  void printEnity(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
​
            CanalEntry.RowChange rowChange = null;
            try{
                // 序列化数据
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
            assert rowChange != null;
            CanalEntry.EventType eventType = rowChange.getEventType();
            log.info(String.format("================>; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                     eventType));
​
            if (rowChange.getEventType() == CanalEntry.EventType.QUERY || rowChange.getIsDdl()) {
                log.info("sql ------------>{}" ,rowChange.getSql());
            }
​
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    switch (rowChange.getEventType()){
                        //如果希望监听多种事情,能够手动添加case
                        case UPDATE:
                            printColumn(rowData.getAfterColumnsList());
                            printColumn(rowData.getBeforeColumnsList());
                            break;
                        case INSERT:
                            printColumn(rowData.getAfterColumnsList());
                            break;
                        case DELETE:
                            printColumn(rowData.getBeforeColumnsList());
                            break;
                        default:
                    }
                }
​
        }
    }
​
    private void printColumn(List<CanalEntry.Column> columns) {
        StringBuilder sb = new StringBuilder();
        for (CanalEntry.Column column : columns) {
            sb.append("[");
            sb.append(column.getName()).append(" : ").append(column.getValue()).append("    update=").append(column.getUpdated());
            sb.append("]");
            sb.append("    ");
        }
        log.info(sb.toString());
    }

数据库变动效果

image.png

留意的问题canal client: 为了确保有序性,一份实例(instance)同一时刻只能由一个canal client进行get/ack/rollback操作,否则客户端接纳无法确保有序。canal server 上的一个 instance 只能有一个 client 消费。clientId是固定的,Binlog文件落入文件保存。

image.png
因为确保了有序性,出产过快而消费慢的问题,如何解决消费堆积问题

其次在运用Canal自带客户端进行同步时需求自己手动调用get()或者getWithoutAck()进行拉取 拉取日志后进行同步只能一条一条处理,功率比较低 为了解决上面的问题打算在日志同步过程中引入MQ来作为中心同步,Canal支撑RocketMQ和Kafka两种,最终选用Kafka来进行

总结

canal的原理是借助mysql主从仿制的协议,模仿从数据库拉取增量Binlog日。canal经过Instance作为一个从数据库实例,客户端衔接实例后有序消费增量的Binlog日志。有几点特别留意的是,一是canal的出产消费模型是一个带指针的数组,别离指向出产方位、消费方位和ack方位,来操控消费和出产的行列。二是Binlog的装备需求时row格局,canal的解析针对row格局做了适配。三是canal经过client竞争的方法确保消费时只有一个client消费,确保binlog的有序性。四是,出产端数据量大的时候canal会存在消费不及时的问题,存在一定延时性。功能分析时事务binlog入库到canal client拿到数据,根本能够达到10~20w的TPS。详细事务解析时肯定要低于这个,不过对于一般事务来说,已足够用。

参阅

github.com/luozijing/s… 代码仓

blog.csdn.net/gudejundd/a… 缓存删去解决方案

zhuanlan.zhihu.com/p/345736518… canal详解

github.com/alibaba/can… canal详解

github.com/alibaba/can… canal功能