作者介绍:

腾讯云中间件专家工程师

Apache Pulsar PMC,《深化解析Apache Pulsar》作者。

现在专注于中间件范畴,在音讯行列和微服务方向具有丰富的经验。

担任 CKafka、TDMQ的规划与开发作业,现在致力于打造安稳、高效和可扩展的根底组件与服务。

导语

在 Apache Pulsar 中,为了防止音讯的重复投递,顾客进行音讯承认对错常重要的一步。当一条音讯被顾客消费后,需求顾客发送一个Ack恳求给Broker,Broker才会认为这条音讯被真正消费掉。被标记为现已消费的音讯,后续不会再次重复投递给顾客。在这篇文章中,咱们会介绍Pulsar中音讯承认的形式,以及正常音讯承认在Broker侧是怎么完结的。

1 承认音讯的形式

在了解Pulsar音讯承认形式之前,咱们需求先了解一些前置常识 —— Pulsar中的订阅以及游标(Cursor)。Pulsar中有多种消费形式,如:Share、Key_share、Failover等等,无论用户运用哪种消费形式都会创立一个订阅。订阅分为耐久订阅和非耐久订阅,关于耐久订阅,Broker上会有一个耐久化的Cursor,即Cursor的元数据被记载在ZooKeeper。Cursor以订阅(或称为消费组)为单位,保存了当时订阅现已消费到哪个方位了。因为不同顾客运用的订阅形式不同,能够进行的ack行为也不相同。总体来说能够分为以下几种Ack场景:

(1)单条音讯承认(Acknowledge)

和其他的一些音讯体系不同,Pulsar支撑一个Partition被多个顾客消费。假定音讯1、2、3发送给了Consumer-A,音讯4、5、6发送给了Consumer-B,而Consumer-B又消费的比较快,先Ack了音讯4,此刻Cursor中会独自记载音讯4为已Ack状态。假如其他音讯都被消费,但没有被Ack,而且两个顾客都下线或Ack超时,则Broker会只推送音讯1、2、3、5、6,现已被Ack的音讯4不会被再次推送。

(2)累积音讯承认(AcknowledgeCumulative)

假定Consumer承受到了音讯1、2、3、4、5,为了提升Ack的功能,Consumer能够不别离Ack 5条音讯,只需求调用AcknowledgeCumulative,然后把音讯5传入,Broker会把音讯5以及之前的音讯悉数标记为已Ack。

(3)批音讯中的单个音讯承认(Acknowledge)

这种音讯承认形式,调用的接口和单条音讯的承认相同,可是这个能力需求Broker敞开装备项AcknowledgmentAtBatchIndexLevelEnabled。当敞开后,Pulsar能够支撑只Ack一个Batch里面的某些音讯。假定Consumer拿到了一个批音讯,里面有音讯1、2、3,假如不敞开这个选项,咱们只能消费整个Batch再Ack,不然Broker会以批为单位从头悉数投递一次。前面介绍的选项敞开之后,咱们能够经过Acknowledge办法来承认批音讯中的单条音讯。

(4)否定应对(NegativeAcknowledge)

客户端发送一个RedeliverUnacknowledgedMessages指令给Broker,明确奉告Broker,当时Consumer无法消费这条音讯,音讯将会被从头投递。

并不是一切的订阅形式下都能用上述这些ack行为,例如:Shared或许Key_shared形式下就不支撑累积音讯承认(AcknowledgeCumulative)。因为在Shared或许Key_Shared形式下,前面的音讯不一定是被当时Consumer消费的,假如运用AcknowledgeCumulative,会把别人的音讯也一同承认掉。订阅形式与音讯承认之间的联系如下所示:

订阅形式 单条Ack 累积Ack 批量音讯中单个Ack 否定Ack
Exclusive 支撑 支撑 支撑 不支撑
Shared 支撑 不支撑 支撑 支撑
Failover 支撑 支撑 支撑 不支撑
Key_Shared 支撑 不支撑 支撑 支撑

2 Acknowledge与AcknowledgeCumulative的完结

Acknowledge与AcknowledgeCumulative接口不会直接发送音讯承认恳求给Broker,而是把恳求转交给AcknowledgmentsGroupingTracker处理。这是咱们要介绍的Consumer里的第一个Tracker,它仅仅一个接口,接口下有两个完结,一个是耐久化订阅的完结,另一个对错耐久化订阅的完结。因为非耐久化订阅的Tracker完结都是空,即不做任何操作,因而咱们只介绍耐久化订阅的完结——PersistentAcknowledgmentsGroupingTracker。

在Pulsar中,为了确保音讯承认的功能,并防止Broker接收到非常高并发的Ack恳求,Tracker中默许支撑批量承认,即使是单条音讯的承认,也会先进入行列,然后再一批发往Broker。咱们在创立Consumer时能够设置参数AcknowledgementGroupTimeMicros,假如设置为0,则Consumer每次都会立即发送承认恳求。一切的单条承认(individualAck)恳求会先放入一个名为PendingIndividualAcks的Set,默许是每100ms或许堆积的承认恳求超越1000,则发送一批承认恳求。

音讯承认的恳求终究都是异步发送出去,假如Consumer设置了需求回执(Receipt),则会回来一个CompletableFuture,成功或失败都能经过Future感知到。默许都是不需求回执的,此刻直接回来一个现已完结的CompletableFuture。

关于Batch音讯中的单条承认(IndividualBatchAck),用一个名为PendingIndividualBatchIndexAcks的Map进行保存,而不是普通单条音讯的Set。这个Map的Key是Batch音讯的MessageId,Value是一个BitSet,记载这批音讯里哪些需求Ack。运用BitSet能大幅下降保存音讯Id的能存占用,1KB能记载8192个音讯是否被承认。因为BitSet保存的内容都是0和1,因而能够很便利地保存在堆外,BitSet对象也做了池化,能够循环运用,不需求每次都创立新的,对内存非常友好。

如下图所示,只用了8位,就表明了Batch里面8条音讯的Ack状况,下图表明EntryId为0、2、5、6、7的Entry都被承认了,承认的方位会被置为1:

深入解析Apache Pulsar系列(一):客户端消息确认

关于累计承认(CumulativeAck)完结办法就更简略了,Tracker中只保存最新的承认方位点即可。例如,现在Tracker中保存的CumulativeAck方位为5:10,代表该订阅现已消费到LedgerId=5,EntryId=10的这条音讯上了。后续又ack了一个5:20,则直接替换前面的5:10为5:20即可。

最后便是Tracker的Flush,一切的承认终究都需求经过触发flush办法发送到Broker,无论是哪种承认,Flush时创立的都是同一个指令并发送给Broker,不过传参中带的AckType会不相同。

3 NegativeAcknowledge的完结

否定应对和其他音讯承认相同,不会立即恳求Broker,而是把恳求转交给NegativeAcksTracker进行处理。Tracker中记载着每条音讯以及需求推迟的时刻。Tracker复用了PulsarClient的时刻轮,默许是33ms左右一个时刻刻度进行检查,默许推迟时刻是1分钟,抽取出现已到期的音讯并触发从头投递。Tracker主要存在的意义是为了兼并恳求。另外假如推迟时刻还没到,音讯会暂存在内存,假如事务侧有很多的音讯需求推迟消费,还是主张运用ReconsumeLater接口。NegativeAck仅有的好处是,不需求每条音讯都指定时刻,能够全局设置推迟时刻。

4 未承认音讯的处理

假如顾客获取到音讯后一向不Ack会怎么样?这要分两种状况,第一种是事务侧现已调用了Receive办法,或许现已回调了正在异步等候的顾客,此刻音讯的引证会被保存进UnAckedMessageTracker,这是Consumer里的第三个Tracker。UnAckedMessageTracker中维护了一个时刻轮,时刻轮的刻度根据AckTimeoutTickDurationInMs这两个参数生成,每个刻度时刻=AckTimeout / TickDurationInMs。新追踪的音讯会放入最后一个刻度,每次调度都会移除行列头第一个刻度,并新增一个刻度放入行列尾,确保刻度总数不变。每次调度,行列头刻度里的音讯将会被整理,UnAckedMessageTracker会主动把这些音讯做重投递。

重投递便是客户端发送一个RedeliverUnacknowledgedMessages指令给Broker。每一条推送给顾客可是未Ack的音讯,在Broker侧都会有一个调集来记载(PengdingAck),这是用来防止重复投递的。触发重投递后,Broker会把对应的音讯从这个调集里移除,然后这些音讯就能够再次被消费了。留意,当重投递时,假如顾客不是Share形式是无法重投递单条音讯的,只能把这个顾客一切现已接收可是未Ack的音讯悉数从头投递。下图是一个时刻轮的简略示例:

深入解析Apache Pulsar系列(一):客户端消息确认

另外一种状况便是顾客做了预拉取,可是还没调用过任何Receive办法,此刻音讯会一向堆积在本地行列。预拉取是客户端SDK的默许行为,会预先拉取音讯到本地,咱们能够在创立顾客时经过ReceiveQueueSize参数来操控预拉取音讯的数量。Broker侧会把这些现已推送到Consumer本地的音讯记载为PendingAck,而且这些音讯也不会再投递给其他顾客,且不会Ack超时,除非当时Consumer被封闭,音讯才会被从头投递。Broker侧有一个RedeliveryTracker接口,暂时的完结是内存追踪(InMemoryRedeliveryTracker)。这个Tracker会记载音讯到底被从头投递了多少次,每条音讯推送给顾客时,会先从Tracker的哈希表中查询一下重投递的次数,和音讯同时推送给顾客。

由上面的逻辑咱们能够知道,创立顾客时设置的ReceiveQueueSize真的要稳重,防止很多的音讯堆积在某一个Consumer的本地预拉取行列,而其他Consumer又没有音讯可消费。PulsarClient上能够设置启用ConsumerStatsRecorder,启用后,顾客会在固定距离会打印出当时顾客的metrics信息,例如:本地音讯堆积量、承受的音讯数等,便利事务排查功能问题。

结尾

Pulsar中的规划细节非常多,因为篇幅有限,作者会整理一系列的文章进行技能分享,敬请期待。假如各位期望体系性地学习Pulsar,能够购买作者出书的新书《深化解析Apache Pulsar》。

深入解析Apache Pulsar系列(一):客户端消息确认

扫描上方二维码了解概况