1. 现象

最近在尝试 Flink CDC 写 Hudi 的湖仓一体计划验证,调试过程中发现:当运用 Flink 写入 Hudi MOR 表时,尽管 log files 正常生成, 可是在 Hudi 表触发第一次 compaction 之前,rt 表一向查不到数据,抱负的情况下,关于 MOR 表,rt 表应该能查到 log files 中的数据才对。

2. 复现过程

2.1 Flink Write to Hudi MOR table

Table API 完成方法,装备如下:

 /**
 * MOR
 *  ro:读优化表
 *  rt:近实时表
 */
String hudiSinkDDLMOR = "CREATE TABLE hudi_table_mor(n"  
        "id String,n"  
        "name String,n"  
        "age Int,n"  
        "PRIMARY KEY (id) NOT ENFORCED n"  
        ") WITH (n"  
        // 根本装备
        "'connector' = 'hudi',n"  
        "'write.operation' = 'upsert',n"   
        "'write.precombine' = 'true',n"   // 默许情况下为 false,会有重复项
        "'table.type' = 'MERGE_ON_READ',n"  
        String.format("'path'= '%s',n", basePath)  
        // 并发参数装备
        "'write.tasks' = '2',n"  
        "'write.bucket_assign.tasks' = '2',n"   // 添加该值会导致bucket数量添加,即添加小文件数
        // hudi schema 同步到 Hive metastore
        "'hive_sync.conf.dir'='/opt/apache-hive-3.1.3-bin/conf',n"  
        "'hive_sync.enabled' = 'true',n"   // 将数据集注册并同步到 hive metastore
        "'hive_sync.mode' = 'hms',n"   // 选用 hive metastore 同步
        "'hive_sync.metastore.uris' = 'thrift://localhost:9083',n"  
        String.format("'hive_sync.db' = '%s',n", dbName)  
        String.format("'hive_sync.table' = '%s'n",tableName)  
        ")"  
        ""  
        ""
        ;

2.2 Insert data into MySQL table

insert into t_mor_test(id,name,age) values('0124-24','test19',19);

2.3 Query rt table by Hive

MySQL 数据只更新一次的情况下,rt 表查询结果为空,当 MySQL 数据写入5次,到达 compaction 条件之后(默许 num_commits 到达 5 次触发 compaction),rt 表中才有数据回来。

这儿能够看到,HDFS 目录下有 log files 文件,可是 rt 表无数据回来:

Flink CDC 实战2:Flink CDC 写 Hudi MOR 表生成了 log files 可是 rt 表查询无数据回来处理

然后我新建一张表插入5次数据后,发现伴随着第一次 compaction 触发,rt 表的数据才开始查询正常:

Flink CDC 实战2:Flink CDC 写 Hudi MOR 表生成了 log files 可是 rt 表查询无数据回来处理

3.环境

Flink:1.17.0

Hudi:0.14.0

Hive:3.1.3

Hadoop:3.3.6

4. 处理

今天逛 github 偶尔看到了某个类似 issue 下贴了玉兆大佬的文档,参照文档我修正了这个问题,详细链接放这儿,我们能够自行参考:www.yuque.com/yuzhao-my9f…

issue 贴:github.com/apache/hudi…

按照玉兆文档中所写,Hive 3.1.0 的兼容性问题会导致这种查询异常问题,所以我也尝试重新编译了下 Hive 3.1.3,这儿浅浅记录一下过程:

  1. 下载源码
## 下载
wget https://dlcdn.apache.org/hive/hive-3.1.3/apache-hive-3.1.3-src.tar.gz
## 解压
sudo tar -zxvf apache-hive-3.1.3-src.tar.gz
  1. 修正对应源码

记得引进对应 java.util 的包

Flink CDC 实战2:Flink CDC 写 Hudi MOR 表生成了 log files 可是 rt 表查询无数据回来处理

  1. 编译 hive-common 和 hive-exec

确保自己安装了 Maven,在 common 下编译 hive-common ,ql 下编译 hive-exec

当然也能够在解压的包下整个编译,可是平台不同或许编译会不那么顺畅,几个小时都有或许,这个看个人选择哈

比如我这儿是 M1,编译到 standalone-metastore 会由于 protobuf:2.5.0 不支持 M1 而报错

由于这儿其他包不影响,所以只编译其间两个即可

mvn clean package -Pdist -DskipTests -Dmaven.javadoc.skip=true

成功后如下图所示:

Flink CDC 实战2:Flink CDC 写 Hudi MOR 表生成了 log files 可是 rt 表查询无数据回来处理

  1. 编译成功后替换 hive-common-xxx.jar, hive-exec-xxx.jar

替换前注意备份原lib下的包,便利出问题后回滚

cp /opt/apache-hive-3.1.3-src/ql/target/hive-exec-3.1.3.jar /opt/apache-hive-3.1.3-bin/lib/
cp /opt/apache-hive-3.1.3-src/common/target/hive-common-3.1.3.jar /opt/apache-hive-3.1.3-bin/lib/
  1. 包替换好后重启 Hive,再次查询 rt 表结果正常

Flink CDC 实战2:Flink CDC 写 Hudi MOR 表生成了 log files 可是 rt 表查询无数据回来处理