本文已参加“新人创造礼”活动,一同敞开创造之路。

之前一篇博客记录了将Oracle的数据实时同步到MySQL中,由于项目需求,将多个数据源的数据先集成到一致的MySQL数据库中,然后再将这些数据传输到Rabbitmq中,最终经过Spark传输到HBASE中。之前也考虑过将这些制作到一个管道中,最终由于Rabbitmq我并没有找到能够作为中间的过程的组件,只能拆分红两个管道,这篇就记录一下将MySQL中的数据传输到Rabbitmq Producer中。关于前面讲Oracle数据同步到MySQL的能够看这篇:运用Streamsets将Oracle数据实时同步到MySQL中

1、首要制作整个的管道

使用Streamsets将MySQL的数据同步到Rabbitmq生产者中
使用Streamsets将MySQL的数据同步到Rabbitmq生产者中

2、装备MySQL Binary Log参数

A、装备MySQL Binary Log

使用Streamsets将MySQL的数据同步到Rabbitmq生产者中
使用Streamsets将MySQL的数据同步到Rabbitmq生产者中

这儿假如需求从二进制日志的最初开端读取事情则勾上。未挑选时,原点将从上次保存的偏移开端读取事情。

B、填写衔接MySQL的账号与密码

使用Streamsets将MySQL的数据同步到Rabbitmq生产者中
使用Streamsets将MySQL的数据同步到Rabbitmq生产者中

C、装备Advanced

使用Streamsets将MySQL的数据同步到Rabbitmq生产者中
使用Streamsets将MySQL的数据同步到Rabbitmq生产者中

读取二进制日志文件中的事情时要监听的表的列表。假如是多个表的话格局:

<database name>.<table name>,<database name>.<table name>......

3、装备Rabbitmq Producer参数

A、装备通用参数

使用Streamsets将MySQL的数据同步到Rabbitmq生产者中
使用Streamsets将MySQL的数据同步到Rabbitmq生产者中

B、装备RabbitMQ

使用Streamsets将MySQL的数据同步到Rabbitmq生产者中
使用Streamsets将MySQL的数据同步到Rabbitmq生产者中

RabbitMQ URI。通常运用以下格局: amqp:< host >:< port >/< virtualhost >

C、装备Queue

使用Streamsets将MySQL的数据同步到Rabbitmq生产者中
使用Streamsets将MySQL的数据同步到Rabbitmq生产者中

这儿的Name填写的是运用或新创建的行列的名称。

D、装备Exchange

使用Streamsets将MySQL的数据同步到Rabbitmq生产者中
使用Streamsets将MySQL的数据同步到Rabbitmq生产者中

能够挑选为要运用的binding装备以下绑定特点。假如未装备任何binding,则运用默许交流。这些特点直接对应于RabbitMQ特点。有关更多信息,请拜见RabbitMQ文档。

这儿binding的类别有三种:direct exchange、topic exchange与fanout exchange。

使用Streamsets将MySQL的数据同步到Rabbitmq生产者中
使用Streamsets将MySQL的数据同步到Rabbitmq生产者中

direct exchange:

此类型的exchange路由规矩很简单,exchange在和queue进行binding时会设置routingkey, 然后我们在将音讯发送到exchange时会设置对应的routingkey。
在direct类型的exchange中,只要这两个routingkey完全相同,exchange才会挑选对应的binging进行音讯路由。

topic exchange:

此类型exchange和上面的direct类型差不多,但direct类型要求routingkey完全持平,这儿的routingkey能够有通配符:’‘,’#’.
其间’
‘表示匹配一个单词, ‘#’则表示匹配没有或许多个单词

fanout exchange:

此exchange的路由规矩很简单直接将音讯路由到一切绑定的行列中,无须对音讯的routingkey进行匹配操作。

E、装备Advanced

使用Streamsets将MySQL的数据同步到Rabbitmq生产者中
使用Streamsets将MySQL的数据同步到Rabbitmq生产者中

F、装备DataFormat

使用Streamsets将MySQL的数据同步到Rabbitmq生产者中
使用Streamsets将MySQL的数据同步到Rabbitmq生产者中

这儿DataFormat支持的音讯的数据格局:

  • Avro
  • Binary
  • Delimited
  • JSON
  • Protobuf
  • SDC Record
  • Text

Text Field Path:包含要写入的文本数据的字段。一切数据必须合并到指定字段中。

4、解决问题

第一次启动时报了个错:

使用Streamsets将MySQL的数据同步到Rabbitmq生产者中
使用Streamsets将MySQL的数据同步到Rabbitmq生产者中

报错:

RUNNING_ERROR: MYSQL_006 – MySql server error: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event ‘binlog.000006’ at 1661, the last event read from ‘./binlog.000006’ at 1692, the last byte read from ‘./binlog.000006’ at 1692…………

由于MySQL服务器之前测验具有相同server_uuid / server_id的从服务器已衔接到主服务器,所以这儿需求到 MySQL Binary Log中修改Server ID。

使用Streamsets将MySQL的数据同步到Rabbitmq生产者中
使用Streamsets将MySQL的数据同步到Rabbitmq生产者中

5、测验数据

Oracle中增加了数据后:

使用Streamsets将MySQL的数据同步到Rabbitmq生产者中
使用Streamsets将MySQL的数据同步到Rabbitmq生产者中

由于之前的管道流装备MySQL中也会增加数据:

使用Streamsets将MySQL的数据同步到Rabbitmq生产者中
使用Streamsets将MySQL的数据同步到Rabbitmq生产者中

最终经过这次装备的管道流增加数据到Rabbitmq中。能够打开rabbitmq的管理界面: http://192.168.105.77:15672

登录后挑选Queues,能够看到刚刚新建的:

使用Streamsets将MySQL的数据同步到Rabbitmq生产者中
使用Streamsets将MySQL的数据同步到Rabbitmq生产者中

然后再Streamsets的管道中看到启动成功:

使用Streamsets将MySQL的数据同步到Rabbitmq生产者中
使用Streamsets将MySQL的数据同步到Rabbitmq生产者中