如何更好地使用Kafka?

如何更好地使用Kafka?

导言| 要确保Kafka在运用进程中的稳定性,需求从kafka在事务中的运用周期进行依次确保。首要能够分为:事前防备(经过标准的运用、开发,防备问题产生)、运行时监控(确保集群稳定,出问题能及时发现)、毛病时处理(有完整的应急预案)这三阶段。

如何更好地使用Kafka?

事前防备

事前防备即经过标准的运用、开发,防备问题产生。首要包括集群/出产端/消费端的一些最佳实践、上线前测验以及一些针对紧迫状况(如音讯积压等)的暂时开关功用。

Kafka调优原则:

1.确认优化方针,而且定量给出方针(Kafka 常见的优化方针是吞吐量、延时、持久性和可用性)。

2.确认了方针之后,需求清晰优化的维度。

通用性优化:操作体系、JVM 等。

针对性优化:优化 Kafka 的 TPS、处理速度、延时等。

如何更好地使用Kafka?

(一)出产端最佳实践

  • 参数调优

  • 运用 Java 版的 Client;

  • 运用 kafka-producer-perf-test.sh 测验你的环境;

  • 设置内存、CPU、batch 紧缩;

  • batch.size:该值设置越大,吞吐越大,但推迟也会越大;

  • linger.ms:表明 batch 的超时时刻,该值越大,吞吐越大、但推迟也会越大;

  • max.in.flight.requests.per.connection:默以为5,表明 client 在 blocking 之前向单个衔接(broker)发送的未确认恳求的最大数,超越1时,将会影响数据的次序性;

  • compression.type:紧缩设置,会进步吞吐量;

  • acks:数据 durability 的设置;

  • 防止大音讯(占用过多内存、下降broker处理速度);

  • broker调整:添加 num.replica.fetchers,进步 Follower 同步 TPS,防止 Broker Full GC 等;

  • 当吞吐量小于网络带宽时:添加线程、进步 batch.size、添加更多 producer 实例、添加 partition 数;

  • 设置 acks=-1 时,假如推迟增大:能够增大 num.replica.fetchers(follower 同步数据的线程数)来调解;

  • 跨数据中心的传输:添加 socket 缓冲区设置以及 OS tcp 缓冲区设置。

  • 开发实践

a.做好Topic阻隔

依据详细场景(是否允许必定推迟、实时音讯、守时周期使命等)区别kafka topic,防止抢占或堵塞实时事务音讯的处理。

b.做好音讯流控

假如下流音讯消费存在瓶颈或许集群负载过高等,需求在出产端(或音讯网关)实施流量出产速率的操控或许延时/暂定音讯发送等战略,防止短时刻内发送大量音讯。

c.做好音讯补推

手动去查询丢掉的那部分数据,然后将音讯从头发送到mq里边,把丢掉的数据从头补回来。

d.做好音讯次序性确保

假如需求在确保Kafka在分区内严厉有序的话(即需求确保两个音讯是有严厉的先后次序),需求设置key,让某类音讯依据指定规矩路由到同一个topic的同一个分区中(能处理大部分消费次序的问题)。

可是,需求防止分区内音讯倾斜的问题(例如,依照店铺Id进行路由,简单导致音讯不均衡的问题)。

1.出产端:音讯发送指定key,确保相同key的音讯发送到同一个partition。

2.消费端:单线程消费或许写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后关于 N 个线程,每个线程别离消费一个内存 queue。

e.恰当进步音讯发送功率

批量发送:kafka先将音讯缓存在内存中的双端行列(buffer)中,当音讯量到达batch size指定巨细时进行批量发送,减少了网络传输频次,进步了传输功率;

端到端紧缩音讯:将一批音讯打包后进行紧缩,发送给 Broker 服务器后,但频频的紧缩和解压也会下降功用,终究还是以紧缩的办法传递到顾客的手上,在 Consumer 端进行解压;

异步发送:将出产者改造为异步的办法,能够进步发送功率,可是假如音讯异步产生过快,会导致挂起线程过多,内存不足,终究导致音讯丢掉;

索引分区并行消费:当一个时刻相对长的使命在履行时,它会占用该音讯地点索引分区被锁定,后边的使命不能及时派发给闲暇的客户端处理,若服务端假如启用索引分区并行消费的特性,就能够及时的把后边的使命派发给其他的客户端去履行,一起也不需求调整索引的分区数(但此类音讯仅适用于无需确保音讯次序联系的音讯)。

f.确保音讯发送牢靠性

Producer:假如对数据牢靠性要求很高的话,在发送音讯的时候,需求挑选带有 callBack 的api进行发送,并设置 acks、retries、factor等等些参数来确保Producer发送的音讯不丢掉。

Broker:kafka为了得到更高的功用和吞吐量,将数据异步批量的存储在磁盘中,并选用了批量刷盘的做法,假如对数据牢靠性要求很高的话,能够修正为同步刷盘的办法进步音讯的牢靠性。

(二)消费端最佳实践

  • 参数调优

  • 吞吐量:调整partition 数、OS page cache(分配足够的内存来缓存数据);

  • offset topic(__consumer_offsets):offsets.topic.replication.factor(默以为3)、offsets.retention.minutes(默以为1440,即 1day);

  • offset commit较慢:异步 commit 或 手动 commit;

  • fetch.min.bytes 、fetch.max.wait.ms;

  • max.poll.interval.ms:调用 poll() 之后推迟的最大时刻,超越这个时刻没有调用 poll() 的话,就会以为这个 consumer 挂掉了,将会进行 rebalance;

  • max.poll.records:当调用 poll() 之后回来最大的 record 数,默以为500;

  • session.timeout.ms;

  • Consumer Rebalance:check timeouts、check processing times/logic、GC Issues;

  • 网络装备。

  • 开发实践

a.做好音讯消费幂等

音讯消费的幂等首要依据事务逻辑做调整。

以处理订单音讯为例

1.由订单编号+订单状况仅有的幂等key,并存入redis;

2.在处理之前,首先会去查Redis是否存在该Key,假如存在,则阐明现已处理过了,直接丢掉;

3.假如Redis没处理过,则将处理过的数据刺进到事务DB上,再到最后把幂等Key刺进到Redis上;

简而言之,即经过Redis做前置处理 + DB仅有索引做终究确保来完成幂等性。

b.做好Consumer阻隔

在音讯量非常大的状况下,实时和离线顾客一起消费一个集群,离线数据深重的磁盘 IO 操作会直接影响实时事务的实时性和集群的稳定性。

依据消费的实时功用够将音讯顾客行为划分两类:实时顾客和离线顾客。

实时顾客:对数据实时性要求较高;在实时消费的场景下,Kafka 会运用体系的 page cache 缓存,直接从内存转发给实时顾客(热读),磁盘压力为零,适宜广告、引荐等事务场景。

离线顾客(守时周期性顾客):通常是消费数分钟前或是数小时前的音讯,这类音讯通常存储在磁盘中,消费时会触发磁盘的 IO 操作(冷读),适宜报表计算、批量计算等周期性履行的事务场景。

c.防止音讯消费堆积

  • 推迟处理、操控速度,时刻范围内分摊音讯(针对实时性不高的音讯);

  • 出产速度大于消费速度,这样能够恰当添加分区,添加consumer数量,进步消费TPS;

  • 防止很重的消费逻辑,优化consumer TPS:

是否有大量DB操作;

下流/外部服务接口调用超时;

是否有lock操作(导致线程堵塞);

需求特别重视kafka异步链路中的触及音讯放大的逻辑。

  • 假如有较重的消费逻辑,需求调整xx参数,防止音讯没消费完时,消费组退出,造成reblance等问题;

  • 确保consumer端没有因为反常而导致消费hang住;

  • 假如运用的是顾客组,确保没有频频地产生rebalance;

  • 多线程消费,批量拉取处理。

注:批量拉取处理时,需留意下kafka版别,spring-kafka 2.2.11.RELEASE版别以下,假如装备kafka.batchListener=true,可是将音讯接纳的元素设置为单个元素(非批量List),或许会导致kafka在拉取一批音讯后,只是消费了头部的第一个音讯。

d.防止Rebalance问题

  • 触发条件:

1.顾客数量改变:新顾客参加、顾客下线(未能及时发送心跳,被“踢出”Group)、顾客主动退出消费组(Consumer 消费时刻过长导致);

2.消费组内订阅的主题或许主题的分区数量产生改变;

3.消费组对应的 GroupCoorinator 节点产生改变。

  • 怎么防止非必要rebalance(顾客下线、顾客主动退出消费组导致的reblance):

1.需求仔细地设置session.timeout.ms(决定了 Consumer 存活性的时刻距离)和heartbeat.interval.ms(操控发送心跳恳求频率的参数) 的值。

2.max.poll.interval.ms参数装备:操控 Consumer 实际消费才能对 Rebalance 的影响,束缚了 Consumer 端运用程序两次调用 poll 办法的最大时刻距离。默许值是 5 分钟,表明 Consumer 程序假如在 5 分钟之内无法消费完 poll 办法回来的音讯,那么 Consumer 会主动发起“离开组”的恳求,Coordinator 也会敞开新一轮 Rebalance。详细能够计算下历史的时刻花费,把最长的时刻为参阅进行设置。

e.确保音讯消费牢靠性

一般状况下,还是client 消费 broker 丢音讯的场景比较多,想client端消费数据不能丢,肯定是不能运用autoCommit的,所以有必要是手动提交的。

Consumer主动提交的机制是依据必定的时刻距离,将收到的音讯进行commit。commit进程和消费音讯的进程是异步的。也便是说,或许存在消费进程未成功(比方抛出反常),commit音讯现已提交了,则此时音讯就丢掉了。

如何更好地使用Kafka?

f.确保音讯消费次序性

1.不同topic(乱序音讯):假如付出与订单生成对应不同的topic,只能在consumer层面去处理了。

2.同一个topic(乱序音讯):一个topic能够对应多个分区,别离对应了多个consumer,与“不同topic”没什么本质上的不同。(能够理解为咱们的服务有多个pod,出产者次序发送音讯,但被路由到不同分区,就或许变得乱序了,服务消费的便是无序的音讯)。

3.同一个topic,同一个分区(次序音讯):Kafka的音讯在分区内是严厉有序的,例如把同一笔订单的一切音讯,依照生成的次序一个个发送到同一个topic的同一个分区。

针对乱序音讯

例如:订单和付出别离封装了各自的音讯,可是消费端的事务场景需求按订单音讯->付出音讯的次序依次消费音讯。

宽表(事务主题相关的目标、维度、特点关联在一起的一张数据库表):消费音讯时,只更新对应的字段就好,音讯只会存在短暂的状况不共同问题,可是状况终究是共同的。例如订单,付出有自己的状况字段,订单有自己的状况字段,售后有自己的状况字段,就不需求确保付出、订单、售后音讯的有序,即便音讯无序,也只会更新自己的状况字段,不会影响到其他状况;

音讯补偿机制:将音讯与DB进行对比,假如发现数据不共同,再从头发送音讯至主进程处理,确保终究共同性;

MQ行列:一个中间方(比方redis的行列)来保护MQ的次序;

事务确保:经过事务逻辑确保消费次序;

针对次序音讯

两者都是经过将音讯绑定到定向的分区或许行列来确保次序性,经过添加分区或许线程来进步消费才能。

1.Consumer单线程次序消费

出产者在发送音讯时,已确保音讯在分区内有序,一个分区对应了一个顾客,确保了音讯消费的次序性。

如何更好地使用Kafka?

2.Consumer多线程次序消费(详细战略在后边章节)

单线程次序消费的扩展才能很差。为了进步顾客的处理速度,除了横向扩展分区数,添加顾客外,还能够运用多线程次序消费。

将接纳到的kafka数据进行hash取模(留意:假如kafka分区接受音讯现已是取模的了,这儿必定要对id做一次hash再取模)发送到不同的行列,然后敞开多个线程去消费对应行列里边的数据。

此外,这儿经过装备中心进行开关、动态扩容/缩容线程池。

g.处理Consumer的事务

经过事务音讯,能够很好的确保一些事务场景的事务逻辑,不会因为网络不可用等原因呈现体系之间状况不共同。

当更新任何一个服务呈现毛病时就抛出反常,事务音讯不会被提交或回滚,音讯服务器会回调发送端的事务查询接口,确认事务状况,发送端程序能够依据音讯的内容对未做完的使命从头履行,然后告知音讯服务器该事务的状况。

(三)集群装备最佳实践

  • 集群装备

Broker 评价:每个 Broker 的 Partition 数不应该超越2k、操控 partition 巨细(不要超越25GB)。

集群评价(Broker 的数量依据以下条件装备):数据保存时刻、集群的流量巨细。

集群扩容:磁盘运用率应该在 60% 以下、网络运用率应该在 75% 以下。

集群监控:坚持负载均衡、确保 topic 的 partition 均匀分布在一切 Broker 上、确保集群的阶段没有耗尽磁盘或带宽。

  • Topic 评价

1.Partition 数:

Partition 数应该至少与最大 consumer group 中 consumer 线程数共同;

关于运用频频的 topic,应该设置更多的 partition;

操控 partition 的巨细(25GB 左右);

考虑运用未来的增长(能够运用一种机制进行主动扩容);

2.运用带 key 的 topic;

3.partition 扩容:当 partition 的数据量超越一个阈值时应该主动扩容(实际上还应该考虑网络流量)。

  • 分区装备

设置多个分区在必定程度上是能够进步顾客消费的并发度,可是分区数量过多时或许会带来:句柄开销过大、出产端占用内存过大、或许添加端到端的推迟、影响体系可用性、毛病恢复时刻较长等问题。

依据吞吐量的要求设置 partition 数:

1.假定 Producer 单 partition 的吞吐量为 P

2.consumer 消费一个 partition 的吞吐量为 C

3.而要求的吞吐量为 T

4.那么 partition 数至少应该大于 T/P、T/c 的最大值

(四)功用调优

调优方针:高吞吐量、低延时。

  • 分层调优

自上而下分为运用程序层、结构层、JVM层和操作体系层,层级越靠上,调优的作用越显着。

| 调优类型

|

主张

| | — | — | |

操作体系

|

挂载文件体系时禁掉atime更新;挑选ext4或XFS文件体系;swap空间的设置;页缓存巨细

| |

JVM(堆设置和GC收集器)

|

将JVM 堆巨细设置成 6~8GB;主张运用 G1 收集器,便利省劲,比 CMS 收集器的优化难度小

| |

Broker端

|

坚持服务器端和客户端版别共同

| |

运用层

|

要频频地创立Producer和Consumer目标实例;用完及时封闭;合理运用多线程来改善功用

|

  • 吞吐量(TPS)调优

|
| 参数列表
| | — | — | | Broker端

|

恰当添加num.replica.fetchers参数值,但不超越CPU核数

| |
|

调优GC参数以防止经常性的Full GC

| |

Producer端

|

恰当添加batch.size参数值,比方从默许的16KB添加到512KB或1MB

| |
|

恰当添加linger.ms参数值,比方10~100

| |
|

设置compression.type=lz4或zstd

| |
|

设置acks=0或1

| |
|

设置retries=0

| |
|

假如多线程同享同一个Producer实例,则添加buffer.memory参数值

| |

Consumer端

|

选用多Consumer进程或线程一起消费数据

| |
|

添加fetch.min.bytes参数值,比方设置成1KB或更大

|

  • 延时调优

|
| 参数列表 | | — | — | | Broker端

|

恰当设置num.replica.fetchers值

| |

Producer端

|

设置linger.ms=0

| |
|

不启用紧缩,即设置compression.type=none

| |
|

设置ackes=1

| |

Consumer端

|

设置fetch.min.bytes=1

|

(五)稳定性测验

kafka的稳定性测验首要在事务上线前针对Kafka实例/集群健康性、高可用性的测验。

  • 健康性查看

1.查看实例:查看Kafka 实例目标中拿到一切的信息(例如 IP、端口等);

2.测验可用性:拜访出产者和顾客,测验衔接。

  • 高可用测验

单节点反常测验:重启Leader副本或Follower副本地点Pod

步骤:

1.查看topic的副本信息

2.删去相应pod

3.脚本检测Kafka的可用性

预期:对出产者和顾客的可用性均无影响。

集群反常测验:重启一切pod

步骤:

1.删去一切pod

2.脚本检测Kafka的可用性

预期:一切broker ready后服务正常。

如何更好地使用Kafka?

运行时监控

运行时监控首要包括集群稳定性装备与Kafka监控的最佳实践,旨在及时发现Kafka在运行时产生的相关问题与反常。

如何更好地使用Kafka?

(一)集群稳定性监控

  • 腾讯云CKafka集群装备

合理进行kafka实例配,首要重视这几个数据:

  1. 磁盘容量和峰值带宽

  2. 音讯保存时长;

  3. 动态保存战略;

a.磁盘容量和峰值带宽

可依据实际事务的音讯内容巨细、发送音讯qps等进行预估,能够尽量设置大点;详细数值可依据实例监控查看,假如短时刻内磁盘运用百分比就到达较高值,则需扩容。

峰值带宽=最大出产流量*副本数

b.音讯保存时长

音讯即便被消费,也会持久化到磁盘存储保存时长的时刻。该设置会占用磁盘空间,假如每天音讯量很大的话,可恰当缩短保存时刻。

c.动态保存战略

引荐敞开动态保存设置。当磁盘容量到达阈值,则删去最早的音讯,最多删去到保底时长范围外的音讯(淘汰战略),能够很大程度防止磁盘被打满的状况。

但有调整时不会主动通知,但咱们能够经过装备告警感知磁盘容量的改变。

  • 自建Kafka集群装备

1.设置日志装备参数以使日志易于办理;

2.了解 kafka 的(低)硬件需求;

3.充分运用 Apache ZooKeeper;

4.以正确的办法设置仿制和冗余;

5.留意主题装备;

6.运用并行处理;

7.带着安全性思维装备和阻隔 Kafka;

8.经过进步束缚防止停机;

9.坚持低网络推迟;

10.运用有用的监控和警报。

  • 资源阻隔

a.Broker级别物理阻隔

假如不同事务线的 topic 会同享一块磁盘,若某个consumer 呈现问题而导致消费产生 lag,进而导致频频读盘,会影响在同一块磁盘的其他事务线 TP 的写入。

处理:Broker级别物理阻隔:创立Topic、搬迁Topic、宕机恢复流程

b.RPC行列阻隔

Kafka RPC 行列缺少阻隔,一旦某个 topic 处理慢,会导致一切恳求 hang 住。

处理:需求依照操控流、数据流别离,且数据流要能够依照 topic 做阻隔。

1.将 call 行列依照拆解成多个,而且为每个 call 行列都分配一个线程池。

2.一个行列独自处理 controller 恳求的行列(阻隔操控流),其余多个行列依照 topic 做 hash 的分散开(数据流之距离离)。

假如一个 topic 呈现问题,则只会堵塞其间的一个 RPC 处理线程池,以及 call 行列,能够确保其他的处理链路是畅通的。

  • 智能限速

整个限速逻辑完成在 RPC 作业线程处理的末端,一旦 RPC 处理完毕,则经过限速操控模块进行限速检测。

1.装备等待时刻,之后放入到 delayed queue 中,否则放到 response queue 中。

2.放入到 delayed queue 中的恳求,等待时刻到达后,会被 delayed 线程放入到 response queue 中。

3.终究在 response queue 中的恳求被回来给 consumer。

(二)Kafka监控

白盒监控:服务或体系本身目标,如CPU 负载、堆栈信息、衔接数等;

黑盒监控:一般是经过模仿外部用户对其可见的体系功用进行监控的一种监控办法,相关目标如音讯的推迟、过错率和重复率等功用和可用性目标。

| 监控

|

功用/目标

|

概况

| | — | — | — | |

黑盒监控

|

操作

|

主题操作:创立、预览、查看、更新、删去

| |
|

服务

|

数据写入、是否消费成功

| |
|

体系

|

CPU 负载、堆栈信息、衔接数等

| |

白盒监控

|

容量

|

总存储空间、已用存储空间、最大分区运用、集群资源、分区数量、主题数量;

| |
|

流量

|

音讯写入、消费速率、集群网络进出;

| |
|

推迟

|

音讯写入、消费耗时(均匀值、99分位、最大耗时)、主题消费推迟量(offset lag)

| |
|

过错

|

集群反常节点数量、音讯写入拒绝量、音讯消费失利量、依靠zookeeper的相关过错

|

  • 腾讯云CKafka告警

针对CKafka,需求装备告警(此类告警一般为音讯积压、可用性、集群/机器健康性等查看)。

a.目标

如:实例健康状况、节点数量、健康节点数量、问题分区数、出产音讯数、消费恳求数、jvm内存运用率、均匀出产呼应时刻、分区消费偏移量等。

详细目标能够参阅:cloud.tencent.com/document/pr…

b.装备

装备文档:cloud.tencent.com/document/pr…

挑选监控实例,装备告警内容和阈值。

一般会对当时服务本身的kafka集群做告警装备,可是假如是依靠本身音讯的下流服务呈现消费问题,咱们是感知不到了;而且针对消费端服务不共用同一个集群的状况,呈现音讯重复发送的问题,服务本身是很难发现的。

c.预案

在事务上线前,最好整理下本身服务所触及的topic音讯(上游出产端和下流消费端),并细化告警装备,假如呈现上游kafka反常或许下流kafka音讯堆积能够及时感知。特别需求把或许有瞬时大量音讯的场景(如批量数据导入、守时全量数据同步等)做必定的告警或许预案,防止服务不可用或许影响正常事务音讯。

  • 自建告警渠道

经过自建告警渠道装备对服务本身的反常告警,其间包括对结构在运用kafka组件时抛出与kafka消费逻辑进程中抛出的事务反常。

其间,或许需求反常升级的状况(因为)独自做下处理(针对spring kafka):

1.自界说kafka反常处理器:完成KafkaListenerErrorHandler接口的办法,注册自界说反常监听器,区别事务反常并抛出;

2.消费Kafka音讯时,将@KafkaListener的errorHandler参数设置为界说的Kafka反常处理器;

3.尔后,指定的事务反常会被抛出,而不会被封装成Spring kafka的结构反常,导致不能清晰地了解详细反常信息。

  • Kafka监控组件

现在业界并没有公认的处理计划,各家都有各自的监控之道。

Kafka Manager:应该算是最有名的专属 Kafka 监控结构了,是独立的监控体系。

Kafka Monitor:LinkedIn 开源的免费结构,支撑对集群进行体系测验,并实时监控测验成果。

CruiseControl:也是 LinkedIn 公司开源的监控结构,用于实时监测资源运用率,以及供给常用运维操作等。无 UI 界面,只供给 REST API。

JMX 监控:因为 Kafka 供给的监控目标都是依据 JMX 的,因此,市面上任何能够集成 JMX 的结构都能够运用,比方 Zabbix 和 Prometheus。已有大数据渠道自己的监控体系:像 Cloudera 供给的 CDH 这类大数据渠道,天然就供给 Kafka 监控计划。

JMXTool:社区供给的命令行工具,能够实时监控 JMX 目标。答上这一条,属于肯定的加分项,因为知道的人很少,而且会给人一种你对 Kafka 工具非常了解的感觉。假如你暂时不了解它的用法,能够在命令行以无参数办法履行一下kafka-run-class.sh kafka.tools.JmxTool,学习下它的用法。

  • Kafka Monitor

其间,Kafka Monitor经过模仿客户端行为,出产和消费数据并收集音讯的推迟、过错率和重复率等功用和可用性目标,能够很好地发现下流的音讯消费状况进而能够动态地调整音讯的发送。(运用进程中需留意对样本掩盖率、功用掩盖率、流量、数据阻隔、时延的操控)

Kakfa Monitor 优势

1.经过为每个 Partition 发动独自的出产使命,确保监控掩盖一切 Partition。

2.在出产的音讯中包括了时刻戳、序列号,Kafka Monitor 能够依据这些数据对音讯的推迟、丢掉率和重复率进行计算。

3.经过设定音讯生成的频率,来到达操控流量的意图。

4.出产的音讯在序列化时指定为一个可装备的巨细(验证对不同巨细数据的处理才能、相同音讯巨细的功用比较)。

5.经过设定独自的 Topic 和 Producer ID 来操作 Kafka 集群,可防止污染线上数据,做到必定程度上的数据阻隔。

依据Kafka Monitor的规划思维,能够针对事务特点引入对音讯的推迟、过错率和重复率等功用的监控告警目标。

如何更好地使用Kafka?

毛病时处理

防微杜渐,遇到问题/毛病时有完整的应急预案,以快速定位并处理问题。

如何更好地使用Kafka?

(一)Kafka音讯堆积紧迫预案

问题描述:消费端产生音讯积压,导致依靠该音讯的服务不能及时感知事务改变,导致一些事务逻辑、数据处理呈现推迟,简单产生事务堵塞和数据共同性问题。

计划:问题排查、扩容升配战略、音讯Topic转化战略、可装备多线程的消费战略。

  • 问题排查

遇到音讯积压时,详细能够从以下几个视点去定位问题原因:

1.音讯出产端数据量是否存在陡升的状况。

2.音讯消费端消费才能是否有下降。

3.音讯积压是产生在一切的partition还是一切的partition都有积压状况。

关于第1、2点导致的音讯积压:为暂时性的音讯积压,经过扩分区、扩容升配、多线程消费、批量消费等办法进步消费速度能在必定程度上处理这类问题。

关于第3点导致的音讯积压:能够选用音讯Topic中转战略。

  • 扩容升配战略

1.查看出产端消费发送状况(首要查看是否持续有音讯产生、是否存在逻辑缺点、是否有重复音讯发送);

2.调查消费端的消费状况(预估下堆积音讯的处理整理以及是否有下降趋势);

3.若为出产端问题,则评价是否能够经过添加分区数、调整偏移量、删去topic(需求评价影响面)等处理;

4.消费端新增机器及依靠资源,进步消费才能;

5.假如触及数据共同性问题,需求经过数据比对、对账等功用进行校验。

  • 装备多线程的消费战略

简而言之,即线程池消费+动态线程池装备战略:将接纳到的kafka数据进行hash取模(假如kafka分区接受音讯现已是取模的了,这儿必定要对id做一次hash再取模)发送到不同的行列,然后敞开多个线程去消费对应行列里边的数据。

规划思路:

1.在运用发动时初始化对应事务的次序消费线程池(demo中为订单消费线程池);

2.订单监听类拉取音讯提交使命至线程池中对应的行列;

3.线程池的线程处理绑定行列中的使命数据;

4.每个线程处理完使命后添加待提交的offsets标识数;

5.监听类中校验待提交的offsets数与拉取到的记载数是否相等,假如相等则;

6.手动提交offset(封闭kafka的主动提交,待本次拉取到的使命处理完成之后再提交位移)

如何更好地使用Kafka?

别的,能够依据事务流量调整的线程装备与pod的装备,如高峰期设置一个相对较高的并发级别数用来快速处理音讯,平峰期设置一个较小的并发级别数来让出体系资源。这儿,能够参阅美团供给的一种装备中心修正装备动态设置线程池参数的思路,完成动态的扩容或许缩容。

完成了动态扩容与缩容

1.经过装备中心改写OrderKafkaListener监听类中的装备concurrent的值。

2.经过set办法修正concurrent的值时,先修正stopped的值去中止当时正在履行的线程池。

3.履行完毕后经过新的并发级别数新建一个新的线程池,完成了动态扩容与缩容。

如何更好地使用Kafka?

此外,还能够新增开关,它设置为true是能够中断发动中的线程池,毛病时进行功用开关。

留意:假如触及数据共同性问题,需求经过数据比对、对账等功用进行校验。

  • Topic中转战略

当音讯积压是产生在一切的partition还是一切的partition都有积压状况时,只能操作暂时扩容,以更快的速度去消费数据了。

规划思路:

1.暂时树立好原先10倍或许20倍的queue数量(新建一个topic,partition是本来的10倍);

2.然后写一个暂时分发音讯的consumer程序,这个程序布置上去消费积压的音讯,消费之后不做耗时处理,直接均匀轮询写入暂时建好分10数量的queue里边;

3.紧接着征用10倍的机器来布置consumer,每一批consumer消费一个暂时queue的音讯;

4.这种做法相当于暂时将queue资源和consumer资源扩展10倍,以正常速度的10倍来消费音讯。

5.等快速消费完了之后,恢复本来的布置架构,从头用本来的consumer机器来消费音讯。

如何更好地使用Kafka?

改善

1.consumer程序能够写在服务里边;

2.指定一个“预案topic”,在服务中预先写好对“预案topic”;

3.选用战略形式进行”事务topic“->“预案topic”的转化。

留意

1.假如触及数据共同性问题,需求经过数据比对、对账等功用进行校验;

2.需求有个独自的topic转化服务,或修正服务代码,或在事前将多线程逻辑写好。

(二)Kafka消费反常导致消费堵塞

问题描述:某个音讯消费反常或许某个操作较为耗时,导致单个pod的消费才能下降,乃至产生堵塞。

计划:设置偏移量;开关多线程的消费战略。

  • 设置偏移量

1.调整偏移量:联系运维,将offset后移一位;

2.音讯补推:针对跳过的音讯或某个时刻段内的数据进行音讯补推;

3.假如触及数据共同性问题,需求经过数据比对、对账等功用进行校验。

  • 开关多线程的消费战略

参阅上面的“可装备多线程的消费战略”,在产生堵塞时敞开多线程消费开关。

注:需求修正代码或许在事前将多线程逻辑写好

(三)Kafka音讯丢掉预案

问题描述:服务没有依照预期消费到kafka音讯,导致事务产生问题。

计划:根因剖析;音讯补推。

  • 根因剖析

1.出产端是否成功发送消费(源头丢掉)

Broker丢掉音讯:Kafka为了得到更高的功用和吞吐量,将数据异步批量的存储在磁盘中,异步刷盘有肯能造成源头数据丢掉;

Producer丢掉音讯:发送逻辑存在Bug,导致音讯为发送成功。

处理:需求查看出产端与集群健康性;音讯补发。

2.是否被成功消费

Consumer主动提交的机制是依据必定的时刻距离,将收到的音讯进行commit。commit进程和消费音讯的进程是异步的。也便是说,或许存在消费进程未成功(比方抛出反常),commit音讯现已提交了。

此外,假如消费逻辑有bug,也导致音讯丢掉的假象。

处理:修正问题,视状况修正消费确认机制。

3.是否有其他服务共用了同一个消费组

多服务误用同一个消费组会导致音讯必定比率或规律性丢掉。

例如,创立用户的kafka音讯,或许价格中心和促销服务误用了一个消费组,导致每个服务都是消费了部分音讯,导致一些问题呈现偶现的状况。

处理:修正装备,重启服务,各种树立的消费组;事前需求有查看是否有多个服务共用一个消费的状况(检测+比对)。

  • 音讯补推

1.经过事务影响查询影响的数据信息;

2.构建kafka音讯,进行音讯补偿;

3.假如触及数据共同性问题,需求经过数据比对、对账等功用进行校验。

针对每个对外发送的服务,出产端一般都需求有较为完善的音讯补推接口,而且消费端也需求确保音讯消费的幂等。

如何更好地使用Kafka?

其他

如何更好地使用Kafka?

(一)Kafka本钱操控

机器、存储和网络

  • 机器

需求从头评价你的实例类型决策:你的集群是否饱满?在什么状况下饱满?是否存在其他实例类型,或许比你第一次创立集群时挑选的类型更适宜?EBS 优化实例与 GP2/3 或 IO2 驱动器的混合是否真的比 i3 或 i3en 机器(及其带来的优势)有更好的性价比?

  • 存储与网络

紧缩在 Kafka 中并不新鲜,大多数用户现已知道了自己能够在 GZIP、Snappy 和 LZ4 之间做出挑选。但自从KIP-110被合并进 Kafka,并添加了用于 Zstandard 紧缩的紧缩器后,它已完成了明显的功用改善,而且是下降网络本钱的完美办法。

以出产者端略高的 CPU 运用率为代价,你将取得更高的紧缩率并在线上“挤进”更多信息。

Amplitude在他们的帖子中介绍,在切换到 Zstandard 后,他们的带宽运用量减少了三分之二,仅在处理管道上就能够节约每月数万美元的数据传输本钱。

如何更好地使用Kafka?

  • 集群

不平衡的集群或许会损害集群功用,导致某些 borker 比其他 broker 的负载更大,让呼应推迟更高,而且在某些状况下会导致这些 broker 的资源饱满,从而导致不必要的扩容,进而会影响集群本钱。

此外,不平衡集群还面对一个危险:在一个 broker 出毛病后呈现更高的 MTTR(例如当该 broker 不必要地持有更多分区时),以及更高的数据丢掉危险(幻想一个仿制因子为 2 的主题,其间一个节点因为发动时要加载的 segment 过多,所以难以发动)。

(二)音讯消费的幂等

界说:

所谓幂等性,数学概念便是: f(f(x)) = f(x) 。f函数表明对音讯的处理。浅显点来讲便是,在顾客收到重复音讯进行重复处理时,也要确保终究成果的共同性。

比方,银行转账、下单等,不论重试多少次,都要确保终究成果必定是共同的。

  • 运用数据库的仅有束缚

将数据库中的多个字段联合,创立一个仅有束缚,即便屡次操作也能确保表里至多存在一条记载(如创立订单、创立账单、创立流水等)。

此外,只要是支撑相似“INSERT IF NOT EXIST”语义的存储类体系(如Redis的SETNX)都能够用于完成幂等消费。

  • 设置前置条件

1.给数据改变设置一个前置条件(版别号version、updateTime);

2.假如满意条件就更新数据,否则拒绝更新数据;

3.在更新数据的时候,一起改变前置条件中的数据(版别号+1、更新updateTime)。

  • 记载并查看操作

1.给每条音讯都记载一个全局仅有 ID;

2.消费时,先依据这个全局仅有 ID 查看这条音讯是否有被消费过;

3.假如没有消费过,则更新数据,并将消费状况置为“已消费”状况。

其间,在“查看消费状况,然后更新数据而且设置消费状况”中,三个操作有必要作为一组操作确保原子性。

如何更好地使用Kafka?

参阅:

[1]iwiki.woa.com/pages/viewp…
[2]www.infoq.cn/article/ucS…
[3]blog.csdn.net/qq\_3217990…
[4]blog.csdn.net/qq\_3217990…
[5]zhuanlan.zhihu.com/p/513559802…
[6]blog.csdn.net/philip502/a…
[7]www.zhihu.com/question/48…
[8]zhuanlan.zhihu.com/p/354772550…
[9]www.infoq.cn/article/con…
[10]www.infoq.cn/article/BF3…
[11]www.infoq.cn/article/wmM…
[12]www.infoq.cn/article/Q0o…

阅览原文