Flink 从0-1实现 电商实时数仓 – ODS & DWD(上)

这是我参加8月更文应战的第5天,活动概略查看:8月更文应战

ODS 层

搜集到 kafka 直接作为 ODS 层,不需求额外处理,坚持数据原貌。

日志数据主题:ods_base_log

事务数据主题:ods_base_db_m

DWD 层

日志 DWD 层

  我们前面搜集的日志数据现已保存到Kafka中,作为java难学吗日志数据的ODS层,从kafka的ODS层读取的日志数据分为3类,页面日志、建议日志和曝光日志。这三类数java难学吗据虽然都是用户行为数据,可是有着彻底纷歧实体类转json样的数据结构,所以要拆分处理。数据结构与算法将拆分后的不同的日志写回Kafka不同主题中,作为日志DWD层。

  页面日实体类图怎样画志输出到主流,建议日志输出到建议侧输出流,曝光日志输出java名优馆在线六区到曝光侧输出流。

1. 剖析首要任务

  • 接纳Kafka数据,数据结构严蔚敏过滤空值数据

    对Maxwell抓取的数据处理的常用办法有哪些数据进行ETL,保存实体类转map有用的部分,过滤掉没用的

  • 结束 动态 分流功用

     由于MaxWell是把悉数数据统一redistribute写入一个Topic中, 这样明显不利于日后的数据处理。所以需求把各个表拆开处理。可是由于每个表有不同的特征,有些表是维度表,有些表是实践表,有的表既是实践表在某种情况下也是维度表。

     在实时核算中一般把维度数据写入存储容器,一般是便当经过主键查询的数据库比方HBase,Redis,MySQL等。一般把实践数据写入数据处理包含数据的流中,进行进一步处理,终究形成宽表。可是作为Flink实时核算任务,怎样得知哪些表是维度表,哪些是实践表呢?而这些表又应该搜集哪些字段呢?

     我们可以将上面的内容放到某一个当地,会集装备。这样的装备不适合写在装备文件中,由于事务端跟着需求改动每添实体类的作用加一javascript张表,就要修改装备重启核算程序。所以这儿需求一种动态装备计划,把这数据结构有哪些种装备长时间保存起来,一旦装备有改动,实时核算可以自动感知。

    这种可以有两javascript个计划结束

    • redis缓存种是用Zookeeper存储,经过Watch感知数据改动。

    • 另一种是用mysql数据rediscover库存储,周期性的同步,使用 FlinkCDC 读取。

    这儿挑选第二种计划,首要是mysql关于配java怎样读备数据初始化和维护办理,用sql都比较便当。

    如图:

     事务数据保存到 Kafka 的主题中
     维度数据保存到 Hbase 的表中
    Flink 从0-1完成 电商实时数仓 - ODS & DWD(上)

2. 代码结束

1. 建 装备表
CREATETABLE`table_process`(
`source_table`varchar(200)NOTNULLCOMMENT'来历表',
`operate_type`varchJavaar(200)NOTNULLCOMMENT'操作类型insert,update,deRedislete(用于过滤是否处理 某些操作的数据)',
`sink_type`varchar(200)DE数据处理包含哪些内容FAULTN数据处理包含数据的ULLCOMMENT'输出类型hbasekafka',
`sink_table`varcjava怎样读har(200)DEFAULTNULLC数据处理的最小单位是OMMENT'输出表(主题)',
`sink_columns`varchar(2000)DEFAredis面试题ULTNULLCOMMENT'输出字段(用于过滤一些不需求的字段)数据结构与算法剖析',
`sink_pk`varchar(200)DEFAULTNULLCOMMENT'主键字段',数据处理工程师
`sink_extend`varchar(200)DEFAULTNULLCOMMENT'建表扩展(建hbase表时,的表装备)',
PRIMARYKEY(`sou数据处理办法有哪些rce_table`,`operate_type`)
)数据结构有哪些ENGINE=InnoDBDEFAULTredis面试题CHAR数据处理的最小单位是SET=utf8
2. maven 依靠
&ltjava模拟器;dependency>
<groupId>mysql</groupId>
<artifactId>m数据处理的常用办法有哪些ysql-connector-java</artifactId实体类是什么>
<version>5.1.47<数据处理/version数据结构>
</dependency>
<depende数据处理ncy>
<groupId>com.alibaba.ververica</groupId>
<artifac数据处理包含数据的搜集tId>flink-con实体类转mapnector-mysql-cdc</artifactId>
<versi数据结构与算法剖析on>1.2.0</version>
</dependency>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils<实体类的界说/artifactId&实体类gt;
<version>1.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifacjavascripttId>phoenix-spark</ar实体类tifactId>
<version>5.0.0-HBase-2.0</version&gt数据结构严蔚敏;
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifaredis集群cjavascripttId>javax.el</artifactId>
</exclusion>
&lt数据处理包含数据的搜集;/excl数据处理usions>
</dependency>
3. 装备表实体类
@Data
public class TableProcess {
/**
* 动态java言语分流Sredis数据结构ink常量   改为小写和脚本一起
*/
pubredistributelic static final String SINK_数据处理软件TYPE_HBASE = "hbase";
public static final String SINK_TYPE_KAFKA = "kafka";
pjava难学吗ublic static final String SINK_TYPE实体类转json_CK = "clickhouse";
/**
* 来实体类转map历表
*/
prredis数据结构ivate Stri数据结构严蔚敏ng sourceTable;
/**
*数据处理的常用办法有哪些 操作类型 insert,update,delete
*/
private String operateType;数据处理的常用办法有哪些
/**
* 输出类型 hbase kafka
*/
private String sirediscovernkType;
/**
* 输出表(主题)
*/
private String sinkTable;
/**
* 输出字段
*redis的五种数据类型/
private String sinkColumns;
/**
* 主键字段
*/
private String sinkPk;
/*数据结构有哪些*
* 建表扩展
*/
privaJavat数据处理包含哪些内容e String sinkExtend数据结构严蔚敏第二版课后答案;
}
4数据处理办法有哪些. 在MySQLBinlog 添加对装备数据库的监听,并重启MySQL

修改装备文件

sudo实体类的界说 vim /etc/my.cnf

把寄数据处理的常用办法有哪些存装备数据库(tmall_realtimejava言语添加至 binlog-do-db

server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=tmall
binlog-do-db=tmall_realtime

下期预告:DWD层 用到的东西类java就业培训班

注重专栏继续更新

发表评论

提供最优质的资源集合

立即查看 了解详情