本文由同城艺龙大数据开发工程师张军分享,首要介绍同城艺龙 Flink 集成 Iiceberg 的出产实践。内容包含:

  1. 布景及痛点
  2. Flink + Iceberg 的落地
  3. Ice大数据修仙berg 优化实践
  4. 后续作业
  5. 收益及总结

一、布景及痛点

命令行务布景

同程艺龙是一个供给机票、住宿、交通等服务的在线旅游服务途径,现在我地址的部分归于公司的研发部分,首要职责是为公司开源矿工内其他业务部分供给一些基础服务,咱们的大数据体系首要接受的业务是部本分的一些大数据相关的数据核算、剖析作业等。数据来历有网关日志数据、服务器监控数据、K8s 容器的相关日志数据,App 的打点日志, MySQL 的 binlog 日志等。咱们首要的大数据使命是根据上述日志构建实时报表,供给根据 Presto 的报表展现和即时查询服务,一同也会根据 Flink 开发一些实时、批处理使命,为业务方供给准确及时的数据支撑。

原架构方案开源节流是什么意思是什么

因为我命令行参数们悉数的原始数据都是存储在 Kafka 的,所以本来的技能架构就是首先是 Flink 使命消费 Kafka 的数据,通过 Flink SQL 或许 Flink jar 的各种处理之后实时写入 Hive,其间绝大部分使命都是 Flink SQL 使命,因为我认为 SQL 开发相对代码要简单的多运维的薪酬是多少,而且保护方便、好了解,所以能用 SQL 写的都尽量用 SQL 来写。
提交 Flink 的途径运用的是 Zeppe运维工程师是干什么的lin,其间提交开源软件 Flink SQL 使命是 Zeppelin 自带的功用,提交 jar 包使命是我自己根据 Application 办法开发的 Ze命令行参数怎样运用ppelin 插件。
关于落地到 Hive 的数据,运大数据技能与运用专业用开源的报表体系 metabase (底层运用 Presto) 供给实时报表展现、守时发送邮件报表,以及自定义 SQ运维工程师一月多少钱L 查询服务。因为业务对数据的实时性要求比较高,期望数据能尽快的展现出来,所以咱们许多的 Flink 流式使命的 checkpoint 设置为 1 分钟,数据格局选用大数据专业作业远景怎样的是 orc 格局。

痛点

因为选用的是列式存储格局 ORC,无法像行式存储格局那样进行追加操作命令行关机,所以不可避免的产生了一个大数据领域十分常见且十分扎手的问题,即 HDFS 小文件问题。

开始的时分咱们的小文件处理方案运维是自己写的一个小文件紧缩东西,守时去吞并,咱们的 Hive 分区一般都是天等运维宝级的,所以这个东西的原理就是每天gitlab凌晨建议一个守时使命去紧缩昨日的数据,首先把昨日的数据写入一个临时文件夹,紧缩完gitee,和本来的数据命令行进入指定目录进行记载数的比对查验,数据条数一起之后,用紧缩后的数据掩盖原giti轮胎是什么品牌本的数据,可是因为无法保证业务,所以出现了许多问题:

  • 紧缩的一同因为推迟数据的到来导致昨日的 Hive 分区又有数据写入了,查验就会失利,导致吞并小文件失利。
  • 替换旧数据的操作是没有业务保证giti轮胎是什么品牌的,假定替换的进程中旧分区有新的数据写入,就会掩盖新写入的数据,构成数据丢掉。
  • 没有业务的支撑,无法实时吞并当时分区的数据,只能吞并紧缩前一个分区的,最新的分区数据仍然有小文giti件的问题,导致最新开源节流数据查询功能行进不了。

二、Flink+Iceberg 的落命令行参数怎样运用

I命令行参数ceberg 技能调运维宝

所以根据以上的 HDFS 小文件、查大数据技能询慢等问题,结合咱们的现状,我调研了现在市面上的数据giti轮胎是什么品牌湖技能:Delta、Apache Iceberg 和 Apache Hudi,考虑了现在数据湖结构支撑的功用和往后的社区规划,究竟咱们是挑选了 Iceberg,其间考虑github永久回家地址的原因有以下几方面:

■ Iceberg 深度集大数据专业作业远景怎样成 Fl大数据修仙ink

前面讲到,咱们的绝大部分使命都是 Flink 使命,包含批处理使命和流处理使命,现在这三个数据湖结构,Iceberg 是集成 Flink 做的最完善的,假定选用 Iceberg大数据杀熟 代替 Hive命令行 之后,搬迁的本钱十分小,对用户几乎是无感知的,
比方咱们原大数据是什么意思本的 SQL 是这样的:

INSERT INTO hive_catalog.db.hive_table SE开源代码网站githubLECT * FROM kafka_table

搬迁到 Iceberg 往后,只需求批运维的薪酬是多少改 catalog 就行。运维工程师一月多少钱

INSERT INTO iceberg_catalog.db.iIcebergithub敞开私库gceberg_table SELECT * FROM kafka_table

Presto 查询也是和这个相似,只需求修改 catalog 就行了。

■ Iceberg 的规划架构使得查询更快

Flink集成Iceberg在同程艺龙的实践

在 Iceberg 的规划架构中,manifest 文件存储了分区相关信息、data files 的相关核算信息(max/min)等,去查询一些大的分区的数据,就能够直接定位到所要的数据,而不是像 Hive 相同去 list 整个 HDFS 文件夹,时间复杂度从 O(n) 降到了 O(1)命令行是什么意思,使得一些大的查询速度有了明显的前进,在 Iceberg PMC Chair Ryan B命令行怎样切换到d盘lue 的演讲中,咱们看到射中 filter 的使命履行大数据技能与运用时间从 6开源矿工1.5 小时降到了 22 分钟。

■ 运用 Flink SQL 将 C大数据专业作业远景怎样DC 数据写入 Iceberg

Flink CDC 供给了直接读取 MySQL binlog 的办法,相对从前需求运用 canal 读取 binlog 写入 Icebegitirg,然后再去消费 Iceberg 数据。少了两个组件的保护,链路减少了,节省了保护的本钱和犯错的概运维工程师是干什么的率。而且能够结束导入全量数据和增量数据的完美对接,所以运用 Flink SQL 将 MySQL binlog 数据导入 Iceberg 来做 MySQL->Iceberg 的导入将会是一件十分有意义的作业。

此外关于咱们开始的紧缩小git教程文件的需求,虽然 Iceberg 现在还无法结束自动紧缩,可是它供给了一个批处理使命,现已能满意咱们的需求。

■ Hive 表搬迁 Iceberg 表

搬迁预备作业

现在咱们的悉数数据都是存储在 Hive 表的,在验证完 Iceberg 之后,咱们决定将 Hive 的数据搬迁到 I运维ceberg,所以我写了一个东西,能够运用运维工程师需求把握什么技能 Hive 的数据,然后新建一个 Icebe大数据rg 表,为其建立相应的元数据,可是查验的时分发现,假定选用这种办法,需求把写入 Hive 的程序间断,因为假定 I开源阅览ceberg 和 Hive 运用同一个数据文件,而紧缩程序会不断地紧命令行关机缩 Iceberg 表的github永久回家地址小文件,紧缩运维方与股东交流的途径是完之后,不会立刻删去旧数据,所以 Hive 表就会查到双份的数据,故咱们开源是什么意思选用双写的战略,本来写入 Hive 的程序不动,新建议一套程序写入 Iceb开源代码网站githuberg,这样能对 Igithub永久回家地址ceberg 表查询一段时间。还能和本来 Hive 中的数据进行比对,来验证程序的正确性。

通过一段时间查询,每天将近几十亿条数据、紧缩后几个 T 巨细的 Hive 表和 Iceberg 表,一条数据也不差。所以在究竟比照数据没有问题之后,把 Hive 表间断写入,运用新的 Iceberg 表。

搬迁东西

我将这个 Hive 表搬迁 Iceberg 表的东西做成了一个根据 Flink batch job 的 Iceber运维是做什么的g Action,提交了社区,不过现在还没吞并:github.com/apache/iceb… Hive 原始的数据不动,然后新建一个 Iceberg table,再为这个新的 Iceberg table 生成对应的元数据,咱们有需求的话能够先看看。

此外,Iceb运维erg 社区,还有一个把现有的数据搬迁到已存在的 Iceberg table 的东西,相似 Hive 的 LOAD DATA INPATH开源节流是什么意思是什么 … INTO TABLE ,是用 Spark 的存储进程做的,咱们也能够注重下:gi开源节流thub.com/apache/iceb…

三、Iceberg 优化实践

紧缩小文件

现在紧缩小文件是选用的一个额定批使命来进行的,Icebegithubrg 供给了一个 Spark 版其他 action,我在做功用查验的时分发现了一些问运维工程师一月多少钱题,此外我对命令行界面 Spark 也不是十分了解,担忧出了问题欠好排查,所以参照 Spark 版其他自己结束了一个 Flink 版别,并修正了一些 bug,进行了一些功用的优化。

因为咱们的 Iceberg 的元数据都是存储在 Hive 中的,也就是咱们运用了 HiveCatalog,所以紧缩大数据技能与运用作业方向及远景程序的逻辑是把 Hive 中悉数的 Iceberg 表悉数都查出来,顺次紧缩。紧github缩没有过滤条件,不管是分区表仍是非分区表,都进行全表的紧缩,这样做是为了处理某些运用 eventtime 的 Flink 任命令行界面务。假定有推迟的数据的到来,就会把数据写入从前的分区,假定不是全表紧缩只紧缩当天分区的话,新写入的其他天的数据就不会被紧缩。

之所以没有敞开守时使命来紧缩,命令行进入指定目录是因为比方守时五分钟紧缩一个表,假定五分钟之内开源众包这个紧缩使命没结束,没有提交新的 snapshot,下一个守时使命又敞命令行进入指定目录开了,就会把上一个没有结束的紧缩使射中的数据从头紧缩一次,所以每个表顺次紧缩的战略能够开源矿工保证某一时间一个表只需一个使命在紧缩。

代码示例参看:

StreamExecutio开源阅览app下载安装nEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Actions.forTable(env, table) .rewriteDa大数据与会计taFiles()大数据是什么意思 //.maxParallelism(parallelism) //.filter(Expressions.e命令行参数qual("da大数据与会计y", day)运维) //.targetSizeInBytes(targetSizeInBytes) .execute();

现在体系工作安稳,现已结束了几万次使命的紧缩。

Flink集成Iceberg在同程艺龙的实践

命令行窗口快捷键意:
不过现在关于新发布的 Ic大数据是什么意思eberg 0.11 来说,还有一个已知的 bug,即当紧缩前的文件开源代码网站github巨细大于要紧缩的巨细(targetSizeInBy命令行窗口快捷键tes)时,会构成数据丢掉,其实这个开源问题我在最开始查验小文件紧缩的时分就命令行窗口快捷键发现了,而且提了一github永久回家地址个 pr,我的战略是大于政策文件的开源数据文件不参与紧缩,不过这个 pr 没有吞并到 0.11 版别中,后来社区其他一个兄弟也发现了相同的问题,提交了一个 pr( github.com/apache/iceb… ) ,战略是将这个大文件拆分到政策文件巨细,现在现已吞并到 master,会在下一个 bug fix 版别 0.11.1 中发布。

查询优化

■ 批处理守时使命

现在关于守时调度中的批处理使命,Flink 的 SQL 客户端还没 Hive 那样做的很完善,比方履行 hive-f 来履行一个文件。而且不同的使命需求不同大数据技能与运用的资源,并行度等。

所以我自己封装了一个 Flink 程序,通过调用这命令行窗口怎样打开个程序命令行进入指定目录来进行处理,读取一个指定文件里面的 SQL,来提交批任gitee务。在命令行操控使命的资源和并行度等。

/home/flink/bin/fFlinklinklink run -p 10 -m yarn-c运维工程师需求把握什么技能luster /home/work/大数据技能iceberg-s大数据技能与运用cheduler.jar my运维宝.sql

■ 优化

批使命的查询这github块,我做了一些优化作业,比方 limit 下推,filter 下推命令行参数怎样运用,查询并行度揣度等,能够大大行进查询的速度,这些优化都现已推回给社区,而且在 Iceberg 0.11 版别中发布。

运维处理

■ 拾掇 orphan 文件

  1. 守时使命删去

在运用 Iceberg 的进程中,有时分会有这样的情况大数据技能与运用作业方向及远景,我提交了一个 Flink 使命,因为各种原因,把它停了,这个时运维工程师考什么证书分 Iceberg 还没提交相应的快照。此外因为一些反常导致程序失利,会产生一些不在 Iceb大数据erg 元数据里面的孤立的数据文件,这些文件对 Iceberg 来说是不可达开源的,也是没用的。所以咱们需求像 jvm 的废物回收相同来拾掇这些文件Git

现在 Iceberg 供给了一个 Spark 版其他 action 来运维工程师考什么证书处理这些没用的文件,咱们采用的github战略和紧运维工程师考什么证书缩小文件相同,获取 Hive 中的悉数的 Iceberg 表。每隔一个小时履行一giti次守时使命来删去这些没用的文件。

SparkSess运维的薪酬是多少ion spark = ...... Actions.forTable(spark, table) .remove大数据技能与运用OrphanFiles() //.deleteWith(...) .开源矿工execute();
  1. 踩坑

咱们在程序工作进程中出现了正常的数据文件被删去的问题,通过调研,因为快照保存设置是一小时,这个拾掇程序拾掇时间也是设置一个小时,通过日志发现是这个拾掇程序删去开源是什么意思了正常的数据。查了查代码,应该是设置了相同的时间,在拾掇孤立文件的时分,有其他程序正在读取要 expired 的 snapshot,导致删去了正常的数据。最后把这个拾掇程序的拾掇时间改成默许的三天,没有再出现删去数据文件的问题。
当然,为了保险起见,咱们能够掩盖本来的删去文件的办法,改成将文件到一个备份文件夹,检查没有问题之后,手艺删去。

运维是做什么的 快照过期处理

咱们的快照过期战略,是和紧缩小文件的批处运维是做什么的理使命写在一同的,紧缩完小文件之后,进行表运维工程师需求把握什么技能开源代码网站github快照过期处理,现在保存的时间是一个小时。这是因为关于有一些比较大的表,命令行分区比较多,而且 checkpoint 比较短,假定保存的快照过长的话,仍是会保存过多小文件,咱们暂时没有查询前史快照的需求,所以我将快照的Git保存时间设置了一个小时。

long olderThanT开源代码网站githubimestamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1);table.expireS开源节流是什么意思是什么napshots()// .retainLast(20).expireOlderThan(olderThanTimestamp).运维的薪酬是多少commit();

■ 数据处理

写入了数据之后,当想检查相应的快照有多少数据文件时,直接查询 Spark 无法知道哪个是有用的,哪个是没用的。所以需求有对应的处理东西。现在 Flink 这块还不太老到,咱们大数据修仙能够运用 Spark3 供给的东西来检查。

  1. Dgithub敞开私库DL

现在 create t运维工程师考什么证书able 这些操作咱们是通过 Flink SQL Client 来做的。其他相关的 DDL 的操作能够运用 Spark 来做:iceberg.apache.org/spark/#ddl-…

  1. DML

一些相关的数据的操作,比方删去数据等能够通过 MySQL 来结束,Presto 现在只支撑分区等级的删去功用。

  1. show partitions & show create table

在咱们操作 Hive 的时分,有一些很常用的操作,比方 show partitions、 show create table 等,这些现在 Flink 还没有支撑,所以在操作 Iceberg 的时分开源软件就很不方便,咱们自己根据 Flink 1.12 做 了修改,不过现在还没有彻底提交到社区,后续有时间会提交到 Flink 和 Iceberg 社区。

四、后续作业

  • Flink SQL 接入 CDC 数据到 Iceber命令行怎样切换到d盘g

现在在咱们内部的版别中,我现已查验通过能够运用 Flink SQL 将 CDC开源是什么意思 数据(比方 MySQL binlog)写入 Iceberg,社区的版别中结束该功用还需求做一些作业,开源中国我也提交了一些相关的 PR 来推进这个作业。

  • 运用 SQL 进行删去和更新

关于 copy-on-write 表,咱们能够运用 Spark SQL 来进行行级的删去和更新。详细的支撑的语法能够参看源码中的查验类:

org大数据修仙.apache.iceber命令行界面g.spark.extensi命令行界面ons.TestDelete & org.apache.iceberg.spark.extensions.TestUpdate,这些功用我在查验github中文官网网页环境查验是能够的,可是还没有来得及更新到出产。

  • 运用 Flink SQL 进行 str大数据技能与运用专业e开源阅览app下载安装aming read

在作业中会有一些这样的场景,因为数据比较大,Iceberg 的数据只存了较短的时间,假开源众包设很不幸因为程序写错了等原因,gitee想从更早的时间来消费就力不从心了。
当引入了 Iceberg 的 streaming read 之后,这些问题就能够处理了,因为 Iceberg 存储了悉数的数据,当然这里有一个条件就是关于数据没有要求特别准确,比方抵达秒等级,因为现在 Flink 写入 Iceberg 的业务提交是根据 Flink Checkpointgit命令 距离的。

五、收益及总结

通过对 Iceb开源是什么意思erg 大概一个季度的调研github中文官网网页,查验,优化和 bug 修正,咱们将现有的 Hi大数据技能与运用专业ve 表都搬迁到大数据技能与运用作业方向及远景了 Iceb开源节流erg,完美处理了本来的悉数的痛点问题,现在体系安稳工作,而且相对 Hive 得到了许多的收益:

  • Flink 写入的资源减少

举一个比方,默许装备下,本来一运维工程师需求把握什么技能个 flink运维 读取 kafka 写入 hive 的使命,需求60个并行度才不会让 Kafka 产生积压。改成写入 iceberg 之后,只需求20个并行度就够了。

  • 查询速度变快

前面咱们讲到 Iceberg 查询的Git时分不会像 Hive 相同去 list 整个文件夹来获取gitlab分区数据,而是先从 manifest 文件中获取相关数据,查询的功能得到了明显的前进,一些大的报表的查询速度从 50 秒行进到 30 秒。

  • 并发读写

因为 Iceberg 的业务支撑,咱们能够结束对一个表进行并发读写,Flink 流式数据开源中国实时入湖,紧缩程序一同紧缩小文件,拾掇过开源中国期文件和快照的程命令行关机序一giti同拾掇无用的文件,这样就能更及时的供给数据,做到分钟级的推运维延,查询最新分区数据的速度大大加快了,而且因为 Iceberg 的 ACID 特功能够保证数据的准确性。

  • time travel

能够回溯查询从前某一时间的数据。

总结一下,咱们现在能够结束运用 Flink SQL 对 Iceberg 进行批、流的读写,并能够对小文大数据技能与运用专业件进行实时的紧缩,运用 Spark SQL 做一些 delete 和 u命令行pda运维宝te 作业以及一些 DDL 操作,后续能够运用 Fl运维工程师有出路吗ink SQL 将 CDC 的数据写入git教程 Iceberg。现在对 Iceberg命令行进入指定目录 的悉数的优化和 bug fix,我命令行窗口怎样打开现已贡献给社区。因为笔者水平有限,有时分也难免有错误,还请咱们不吝赐教。

作者介绍:
张军,同程艺龙大数据开发工程师