这是我参加「第四届青训营 」笔记创作活动的第9天。

Flink Exactly-once 的完结办法

Checkpoint 机制和 Two-phase commit protocol 是完结Flink Exactly-once语义的两大要害机制

一致性确保语义:

  • At-most-once:每条数据消费至多一次,处理推迟低
  • At-least-once:每条数据消费至少一次,一条数据或许存在重复消费
  • Exactly-once:每条数据都被消费且仅被消费一次,似乎毛病从未产生

Checkpoint机制能够确保作业呈现 fail-over 后能够从最新的快照进行康复,即分布式快照机制能够确保 Flink 体系内部的“精确一次”处理。可是咱们在实践出产体系中,Flink 会对接各式各样的外部体系,比方 Kafka、HDFS 等,一旦 Flink 作业呈现失利,作业会从头消费旧数据,这时候就会呈现从头消费的状况,也就是重复消费。关于Checkpoint机制的详细分析能够参阅这篇博客。

针对这种状况,Flink 1.4 版别引入了一个很重要的功用:两阶段提交,也就是 TwoPhaseCommitSinkFunction。两阶段调配特定的 sourcesink(特别是 0.11 版别 Kafka)使得“精确一次处理语义”成为或许。

在 Flink 中两阶段提交的完结办法被封装到了 TwoPhaseCommitSinkFunction 这个抽象类中,咱们只需求完结其中的beginTransactionpreCommitcommitabort 四个办法就能够完结“Exactly-once”的处理语义,完结的办法咱们能够在官网中查到:

  1. BeginTransaction,在敞开业务之前,咱们在方针文件体系的临时目录中创立一个临时文件,后面在处理数据时将数据写入此文件;

  2. PreCommit,在预提交阶段,刷写(flush)文件,然后封闭文件,之后就不能写入到文件了,咱们还将为属于下一个检查点的任何后续写入发动新业务;

  3. Commit,在提交阶段,咱们将预提交的文件原子性移动到真正的方针目录中,请留意,这会添加输出数据可见性的推迟;

  4. Abort,在中止阶段,咱们删去临时文件。

Checkpoint 和 两阶段提交协议(2PC)简单介绍

Checkpoint

  • Checkpoint barrier 的下发

  • 算子状况制造和 barrier 传递

  • 多个上游的等候 barrier 对齐现象

  • Checkpoint 并不堵塞算子数据处

  • Checkpoint ACK和制造完结

两阶段提交协议(2PC)

  • Coordinator:协作者,同步和和谐一切节点处理逻辑的中心节点
  • Participant:参加者,被中心节点调度的其他履行处理逻辑的业务节点

两阶段提交协议在 Flink 中的运用

  • Flink 中协作者和参加者的人物分配

  • 协作者(JobManager)建议阶段一提交

  • 各算子 Checkpoint 的制造

  • 提交阶段及 Checkpoint 的制造完结

Flink-Kafka Exactly-once语义完结全进程

微信图片_20220426143935.png

如上图所示,咱们用 Kafka-Flink-Kafka 这个事例来介绍一下完结“ 端到端 Exactly-Once”语义的进程,整个进程包含:

  • 从 Kafka 读取数据

  • 窗口聚合操作

  • 将数据写回 Kafka

整个进程能够总结为下面四个阶段:

  1. 一旦 Flink 开端做 checkpoint 操作,那么就会进入 pre-commit 阶段,一起 Flink JobManager 会将检查点 Barrier 注入数据流中 ;

  2. 当一切的 barrier 在算子中成功进行一遍传递,并完结快照后,则 pre-commit 阶段完结;

  3. 等一切的算子完结“预提交”,就会建议一个“提交”动作,可是任何一个“预提交”失利都会导致 Flink 回滚到最近的 checkpoint;

  4. pre-commit 完结,必需求确保 commit 也要成功,上图中的 Sink Operators 和 Kafka Sink 会共同来确保。

Checkpoint 和 两阶段提交协议(2PC)存在的问题

Checkpoint

Flink 的 Checkpoint 机制能够在不中止整个 application 的状况下,从流运用中生成一致性分布式的检查点。然而,它会添加application的处理延时(processing latency)

在一个task对它的状况做检查点时,它会堵塞,并缓存它的输入。由于state能够变的很大,而且检查点的操作需求经过网络写入数据到一个远端存储体系,所以做检查点的操作或许会很简单就花费几秒到几分钟,这关于延时敏感的application来说,延时过长了。

虽然增量 checkpoint 在大状况场景下极大削减了 checkpoint 的制造时刻,但背面存在一些权衡,也就带来一些问题:

  • 每一次 checkpoint 仅生成增量文件,完好状况文件依赖多个 checkpoint
  • 由于要从多个 checkpoint 中读取康复数据,使命康复时刻变久
  • 虽然 checkpoint 自身有清理机制,但由于 checkpoint 之间存在依赖联系,旧的 checkpoint 或许并不会被删去,文件数会膨胀

例如咱们有一个大状况作业,整个 checkpoint 文件大小超过 10T,shared 共享文件数多达 20W 个,单个算子的文件大小超过 1T

image.png

两阶段提交协议(2PC)

  • 功用问题:无论是在第一阶段的进程中,还是在第二阶段,一切的参加者资源和和谐者资源都是被锁住的,只要当一切节点准备结束,业务和谐者才会告诉进行全局提交,参加者进行本地业务提交后才会开释资源。这样的进程会比较绵长,对功用影响比较大。
  • 单节点毛病:由于和谐者的重要性,一旦和谐者产生毛病。参加者会一向堵塞下去。尤其在第二阶段,和谐者产生毛病,那么一切的参加者还都处于锁定业务资源的状况中,而无法持续完结业务操作。(虽然和谐者挂掉,能够从头选举一个和谐者,可是无法处理由于和谐者宕机导致的参加者处于堵塞状况的问题)。

总结咱们上面讨论的问题:2PC在四个方面使体系功用下降:推迟(协议的时刻加上抵触业务的停顿时刻),吞吐量(由于它需求防止在协议期间运转其他抵触的业务,banq注:只能让业务串行履行,区块链其实是一个业务链),扩展性(更大)在体系中,业务变得多分区而且有必要付出2PC的吞吐量和推迟本钱以及可用性(咱们上面讨论的堵塞问题)的或许性越大。

相应的优化方案

针对 Checkpoint 机制

一、设置最小时刻距离

当Flink运用敞开Checkpoint功用,并装备Checkpoint时刻距离,运用中就会依据指定的时刻距离周期性地对运用进行Checkpoint操作。默许状况下Checkpoint操作都是同步进行,也就是说,当时面触发的Checkpoint动作没有完全结束时,之后的Checkpoint操作将不会被触发。在这种状况下,假如Checkpoint进程持续的时刻超过了装备的时刻距离,就会呈现排队的状况。假如有十分多的Checkpoint操作在排队,就会占用额定的体系资源用于Checkpoint,此刻用于使命核算的资源将会削减,进而影响到整个运用的功用和正常履行。

在这种状况下,假如大状况数据确实需求很长的时刻来进行Checkpoint,那么只能对Checkpoint的时刻距离进行优化,能够经过Checkpoint之间的最小距离参数进行装备,让Checkpoint之间依据Checkpoint履行速度进行调整,前面的Checkpoint没有完全结束,后面的Checkpoint操作也不会触发。

streamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)

仿制

经过最小时刻距离参数装备,能够下降Checkpoint对体系的功用影响,但需求留意的事,关于十分大的状况数据,最小时刻距离只能减轻Checkpoint之间的堆积状况。假如不能有用快速地完结Checkpoint,将会导致体系Checkpoint频次越来越低,当体系呈现问题时,没有及时对状况数据有用地耐久化,或许会导致体系丢失数据。因而,关于十分大的状况数据而言,应该对Checkpoint进程进行优化和调整,例如采用增量Checkpoint的办法等。

用户也能够经过装备CheckpointConfig中setMaxConcurrentCheckpoints()办法设定并行履行的checkpoint数量,这种办法也能有用下降checkpoint堆积的问题,但会进步资源占用。一起,假如开端了并行checkpoint操作,当用户以手动办法触发savepoint的时候,checkpoint操作也将持续履行,这将影响到savepoint进程中对状况数据的耐久化。

二、预估状况容量

除了对现已运转的使命进行checkpoint优化,对整个使命需求的状况数据量进行预估也十分重要,这样才能挑选合适的checkpoint战略。对使命状况数据存储的规划依赖于如下根本规矩:

  1. 正常状况下应该尽或许留有满足的资源来应对频频的反压。
  2. 需求尽或许供给给额定的资源,以便在使命呈现异常中止的状况下处理积压的数据。这些资源的预估都取决于使命中止进程中数据的积压量,以及对使命康复时刻的要求。
  3. 体系中呈现临时性的反压没有太大的问题,可是假如体系中频频呈现临时性的反压,例如下游外部体系临时性变慢导致数据输出速率下降,这种状况就需求考虑给予算子必定的资源。
  4. 部分算子导致下游的算子的负载十分高,下游的算子完全是取决于上游算子的输出,因而对类似于窗口算子的估量也将会影响到整个使命的履行,应该尽或许给这些算子留有满足的资源以应对上游算子产生的影响。

三、异步Snapshot

默许状况下,运用中的checkpoint操作都是同步履行的,在条件允许的状况下应该尽或许地运用异步的snapshot,这样讲大幅度提高checkpoint的功用,尤其是在十分复杂的流式运用中,如多数据源关联、co-functions操作或windows操作等,都会有较好的功用改善。

Flink供给了异步快照(Asynchronous Snapshot)的机制。当实践履行快照时,Flink能够立即向下广播Checkpoint Barrier,表明自己现已履行完自己部分的快照。一起,Flink发动一个后台线程,它创立本地状况的一份复制,这个线程用来将本地状况的复制同步到State Backend上,一旦数据同步完结,再给Checkpoint Coordinator发送确认信息。复制一份数据必定占用更多内存,这时能够运用写入时仿制(Copy-on-Write)的优化战略。Copy-on-Write指:假如这份内存数据没有任何修正,那没必要生成一份复制,只需求有一个指向这份数据的指针,经过指针将本地数据同步到State Backend上;假如这份内存数据有一些更新,那再去恳求额定的内存空间并维护两份数据,一份是快照时的数据,一份是更新后的数据。

在运用异步快照需求确认运用遵从以下两点要求:

  1. 首要有必要是Flink保管状况,即运用Flink内部供给的保管状况所对应的数据结构,例如常用的有ValueState、ListState、ReducingState等类型状况。
  2. StateBackend有必要支撑异步快照,在Flink1.2的版别之前,只要RocksDB完好地支撑异步的Snapshot操作,从Flink1.3版别以后能够在heap-based StateBackend中支撑异步快照功用。

四、紧缩状况数据

Flink中供给了针对checkpoint和savepoint的数据进行紧缩的办法,现在Flink仅支撑经过用snappy紧缩算法对状况数据进行紧缩,在未来的版别中Flink将支撑其他紧缩算法。在紧缩进程中,Flink的紧缩算法支撑key-group层面紧缩,也就是不同的key-group分别被紧缩成不同的部分,因而解紧缩进程能够并发履行,这对大规模数据的紧缩和解紧缩带来十分高的功用提高和较强的可扩展性。Flink中运用的紧缩算法在ExecutionConfig中进行指定,经过将setUseSnapshotCompression办法中的值设定为true即可。

五、观察checkpoint推迟时刻

checkpoint推迟发动时刻并不会直接露出在客户端中,而是需求经过以下公式核算得出。假如改时刻过长,则表明算子在进行barrier对齐,等候上游的算子将数据写入到当时算子中,说明体系正处于一个反压状况下。checkpoint推迟时刻能够经过整个端到端的核算时刻减去异步持续的时刻和同步持续的时刻得出。

针对 两阶段提交协议(2PC)

针对两阶段提交协议(2PC)的优化方案仍在研究,供给一些思路作为参阅:

  • 底存存储需求露出sharding细节,供给以分区为单位的业务上下文管理机制,使得在预处理进程中,行锁和数据修正为内存操作,防止耐久化的代价。
  • 简化和谐者为无状况逻辑
  • 削减2PC履行要害途径上的耐久化和RPC次数

采用三阶段提交协议能够部分处理两阶段提交协议的问题:

履行过程:

  1. CanCommit阶段

    1. 业务问询,和谐者向参加者发送precommit恳求。问询是否能够履行业务。
    2. 呼应反应参加者接到Cancommit恳求。
  2. PreCommit阶段

    1. 和谐者依据参加者的返回来决议是否持续业务的precommit操作。可是会呈现两种状况
    2. 和谐者从一切参加者获得反应都是yes,那么业务会持续履行
    3. 假如有任何一个参加者向和谐者发送了No呼应,或许等候超时,和谐者没有接收到参加者的呼应那么业务中止。
  3. DoCommit阶段

    1. 履行提交

      1. 发送提交恳求和谐接收到参加者发送的ACK呼应,那么他将从预提交状况进入到提交状况。并向一切参加者发送doCommit恳求。
      2. 业务提交 参加者接收到doCommit恳求之后,履行正式的业务提交。并在完结业务提交之后开释一切业务资源。
      3. 呼应反应 业务提交完之后,向和谐者发送Ack呼应。
      4. 完结业务 和谐者接收到一切参加者的ack呼应之后,完结业务。
    2. 业务中止,和谐者没有接收到参加者发送的ACK呼应(或许是接受者发送的不是ACK呼应,也或许呼应超时),那么就会履行中止业务。

      1. 发送中止恳求 和谐者向一切参加者发送abort恳求
      2. 业务回滚 参加者接收到abort恳求之后,运用其在阶段二记录的undo信息来履行业务的回滚操作,并在完结回滚之后开释一切的业务资源。
      3. 反应成果 参加者完结业务回滚之后,向和谐者发送ACK音讯
      4. 中止业务 和谐者接收到参加者反应的ACK音讯之后,履行业务的中止。

        
三阶段与二阶段的不同点在于。

  1. 关于和谐者和参加者都设置了超时机制(2阶段中只要和谐者有)。和谐者假如在必定时刻内没有收到参加者的音讯则默许失利,参加者无法及时收到和谐者的信息,他会默许履行commit,不会一向持有业务。
  2. 处理了单点毛病,并削减了堵塞。可是也会带来数据不一致的问题。假如网络犯错,和谐者发送的恳求没有及时被参加者收到,那么参加者在等候超时之后履行了commit操作。这样就和其他接到abort命令并履行回滚的参加者之间存在数据不一致的状况。