【Kafka实战】才智工厂IOT项目Kafka音讯积压优化计划

欢迎关注,​分享更多原创技能内容~

微信大众号:ByteRaccoon、知乎:一只大狸花啊、稀土:浣熊say

微信大众号海量Java、数字孪生、工业互联网电子书免费送~

​概述

Kafka 体系在本公司的才智工厂 IOT 项目中得到了广泛应用。​现在,全国包含海外有上千个矿山工厂、光伏工厂、Cableway工厂等运用咱们的体系,尽管这些工厂的事务层逻辑各不相同,而且对应的服务体系也不完全一样。可是作为音讯中间件和缓存中间件的Kafka集群和MQTT集群整个公司都是运用的同一套底层服务,所以也面临着巨大的负载压力。跟着事务的迅猛发展, Kafka 集群规划快速增长。现在,公司的 Kafka 集群每日处理音讯总量现已抵达抵达数十亿等级,峰值音讯处理速度在千万条/s,跟着事务规划的扩大,咱们也面临着新的问题和技能应战也逐步添加。

事务场景

【Kafka实战】才智工厂IOT项目Kafka音讯积压优化计划

从事务场景上来说,Kafka集群在本项目中首要被分为了3类集群,分别是音讯集群、Log集群和耐久化集群,以满意不同事务的不同需求。

可是关于IOT项目来说,咱们的项目与互联网项目有所不同,咱们还有一个由MQTT集群构成的IOT数据搜集集群,之所以不选用Kafka的原因首要是针对物联网场景下网络信号不安稳和数据包巨细约束的原因。

此外,作为全体解决计划,咱们还引入了数据Mirror服务(Web Service),专门担任将数据从音讯集群、LOG集群传输、IOT数据集群传送到耐久化集群以及分发到web前端进行实时展现。

因而,本才智工厂IOT项目的集群构成如下:

  1. 音讯集群(Kafka):Kafka作为音讯中间件,为不同的在线事务供给异步的音讯通知服务。

  2. LOG 集群(Kafka):事务程序将生成的日志直接发送至Kafka,经过Kafka进行高效的传输与搜集。因为数据不落地,LOG集群对Kafka的可用性要求极高。这个集群还为重要的实时核算和模型练习供给了安稳而高效的数据源。

  3. 耐久化集群(Kafka):LOG数据的最终汇聚点,实时将数据转储到MongDB集群傍边,用于离线处理。除了供给耐久化数据,耐久化集群还为非必须的实时核算和实时练习供给了关键的数据支撑。

  4. IOT数据集群(MQTT):该集群的音讯来源比较杂乱,有工厂现场的嵌入式设备直接发送的IOT数据包,也有工厂现场服务处理之后发送的数据包。

对Kafka进行物理集群区分的首要目的在于确保服务质量,以及约束Kafka集群问题对全体体系的影响。这种集群区分的规划有助于更好地掌控和保护Kafka在不同事务场景中的功能和可用性。

事务规划

参数 数值
总机器数量 200 台
集群数量 8 个
Topic 数量 1200 个
TP(Topic Partition)数量 2 万个
每日处理音讯数 数十亿条
峰值音讯处理才能 百万条

如上表所示,咱们具有一个相对庞大的 Kafka 集群,具体参数如下:

  • 总机器数量: 200台,为集群供给了充足的核算资源。

  • 集群数量: 8个独立的集群,每个集群在处理特定使命和数据流时有必定的独立性。

  • Topic数量: 体系中定义了1200个主题,每个主题代表着一类特定的数据流或信息分类。

  • TP(Topic Partition)数量: 2万个分区,散布在整个集群中以完成更有用的数据处理和负载均衡。

  • 每日处理音讯数: 集群每天处理着数十亿条音讯,这些音讯或许来自不同的事务流程和数据源。

  • 峰值音讯处理才能: 具有百万条每秒的音讯处理峰值,显现了其在处理高峰时期的出色功能。

这个规划的 Kafka 集群为百亿等级的每日音讯处理和百万音讯处理才能的峰值状况下,供给了强壮的支撑,确保了数据的高效办理和分发。 ​​

Kafka集群音讯积压优化

跟着用户规划不断扩大,现在全国包含海外有上千个矿山工厂、光伏工厂、​Cableway工厂等运用咱们的体系,尽管这些工厂的事务端逻辑各不相同,而且对应的服务体系也不完全一样。可是作为音讯中间件和缓存中间件的Kafka集群和MQTT集群整个公司都是运用的同一套底层服务。工厂规划扩大的一起,随之而来的是音讯量越来越来多,导致经常产生顾客处理不过来,音讯积压的状况时有产生。

因为工厂IOT设备时时刻刻都在发送IOT数据到Kafka集群,而Kafka集群担任将数据耐久化到MongoDB中,在数据的峰值时刻(IOT项目的数据峰值一般都比较安稳),可以抵达百万条/s。这些数据都需求依照时刻次序被耐久化到MongoDB集群中,给算法团队进行剖析以及进行工业大数据预测。

当音讯积压的状况产生的时分,因为Kafka Broker的磁盘空间对错常有限的,而许多音讯出现堆积的状况时或许会形成日志被删去,导致音讯丢失的状况产生。

尽管说关于音讯积压的问题,直接扩容添加TP数目是一个很好的解决计划,可是公司许多采购设备的批阅流程杂乱,而且本着省钱的常规,先做体系优化。以下是一些在出产实践进程中所遇到的关于Kafka音讯积压的相关问题和解决计划,这儿做一个简略且汇总。

IOT数据包过大

问题原因:

【Kafka实战】才智工厂IOT项目Kafka音讯积压优化计划

如上图所示,尽管Kafka声称支撑百万级的TPS,然而,在音讯从出产到消费的进程中,涉及到屡次网络IO和磁盘IO操作,关于音讯体过大的状况,这些IO操作的耗时会进一步添加,直接影响了Kafka的出产和消费速度。此外,慢速的顾客会导致音讯积压,而音讯体过大还或许糟蹋服务器磁盘空间,乃至引发磁盘空间缺乏问题。

在面临这些问题时,需求有针对性地优化音讯体过大的状况,总的来说,当数据包巨细过大的时分会有以下问题:

  1. IO操作导致的功能影响: Kafka的出产者发送音讯到Broker,Broker写数据到磁盘,以及顾客从Broker获取音讯,这三个环节分别涉及一次网络IO和一次磁盘IO(写或读操作)。关于一次简略音讯的完整进程,共需经过2次网络IO和2次磁盘IO,增大音讯体将进一步加剧IO操作的耗时,对全体功能产生负面影响。

  2. 顾客速度缺乏导致积压: 慢速的顾客会导致音讯积压,使体系处理音讯的才能受到约束,然后影响实时性和功能。

  3. 磁盘空间糟蹋及缺乏问题: 大音讯体会占用更多的磁盘空间,或许导致服务器磁盘空间的糟蹋。若不加以注意,或许会引发磁盘空间缺乏的问题,影响体系的正常运行。

优化战略:以​Cableway工厂为例

【Kafka实战】才智工厂IOT项目Kafka音讯积压优化计划

  • IOT数据音讯拆分:

    每条索道上存在着上百个吊厢,每个吊厢有其单独的IOT设备来实时获取吊厢的定位信息,监控信息等数据。

    在项目的初始阶段,因为数据量不大,是将每条索道的一切吊厢数据经现场服务搜集处理之后再组合成一个Json数据包发送给云端的MQTT集群的,数据再从MQTT集群依照原格局发送给Kafka集群进行耐久化和音讯分发。

而现场的监控数据信息会经过工厂现场布置的微服务进行处理和本地耐久化之后,再依照时刻距离采样之后传送到Kafka服务器最终在云端进行耐久化操作。

因而,咱们将这些过大的IOT数据包进行了拆分,拆分成若干个小的IOT数据包再发送到MQTT集群,最终转发到Kafka集群。

  • IOT数据音讯压缩:

    IOT设备自身供给的是串口服务器编码的字节省数据,项目开始为了满意现场数据的实时展现,会在本地对IOT的字节省数据进行解码再转发到MQTT服务器,这一进程IOT数据的编码格局由字节省编码变成了Json格局的数据,大大添加了传送数据包的巨细。

    因而,优化计划中,将字节省IOT数据直接丢给MQTT集群并进行转发,由耐久化Kafka集群拿到数据之后,再改集群的处理流中进行数据的解码和耐久化操作。

经过以上2个音讯包优化战略,在必定程度上解决了音讯积压的问题。

​顾客线程模型问题

接着上面的问题来说,咱们在出产者端将数据进行压缩处理和缩小包巨细之后,Producer端的确磁盘的IO压力降低了,出产者的音讯推送速度也随之变快了,一起,顾客端的磁盘IO压力也变小了,接收音讯的速度也更快了,理论上来说,整个体系的音讯积压问题会得到必定程度上的缓解。

可是,经过Kafka Manager监控发现,此时整个体系的音讯积压程度并没有得到改善,反而愈加严峻了。剖析顾客的日志咱们发现,尽管IOT数据包巨细变小了,可是咱们其实是将本因由出产者执行的音讯解析的逻辑丢给了顾客端来执行。这样一来,顾客端在进行数据解析的进程中耗费了许多的时刻,是的消费的速度相关于曾经降低了许多,这就使得音讯积压反而更严峻了。

所以,咱们团队考虑经过优化顾客端线程模型的办法来添加顾客消费音讯的速度,缓解音讯积压的压力。

Kafka的音讯拉取模型

在介绍咱们的线程优化计划之前,先了解一下Kafka顾客的音讯拉取模型。

【Kafka实战】才智工厂IOT项目Kafka音讯积压优化计划

如上图所示,Kafka是选用poll形式对Broker中的音讯进行拉取的:

  • 当客户端恳求音讯的时分Kafka Consumer会先尝试从本地缓存获取,假如获取到了音讯则直接回来。

  • 假如本地缓存中没有音讯,则真正调用网络恳求从Kafka集群中拉取,而且直接回来。

  • 当网络恳求完毕,Kafka Consumer经过回调的办法来获取数据。

可是,因为出于顾客功能上的考虑,Kafka Consumer的线程形式对错线程安全的,用户无法在多个线程中共享一个Kafka Consumer实例,当Kafka Consumer检测到多线程访问的时分会直接抛出反常。

本来的顾客线程模型 —— 每个线程保护一个 Kafka Consumer

项目中本来的Kafka的消费模型尽管也是选用的线程池的办法,可是选用的是每一个线程保护一个​Kafka Consumer的办法来完成的,如下图所示:

【Kafka实战】才智工厂IOT项目Kafka音讯积压优化计划

这种消费形式下,每个线程办理一个 Kafka Consumer,然后完成线程隔离消费。因为每个分区在同一时刻只能由一个顾客处理,因而这种模型天然支撑次序消费。可是这种模型的缺陷在于无法有用进步单个分区的消费才能。

在咱们的项目中一个主题具有许多分区,只能经过添加 Kafka Consumer 实例的办法来进步全体消费才能,可是这也导致了线程数量的添加,也会产生了项目 Socket 衔接的巨大开支。

优化后的线程模型——单Kafka Consumer实例 + 多 worker 线程

【Kafka实战】才智工厂IOT项目Kafka音讯积压优化计划

这种消费模型将 Kafka Consumer 实例与音讯消费逻辑解耦,然后完成多线程消费而无需创立多个 Kafka Consumer 实例。它还具备动态调整 worker 线程的才能,使其依据消费负载状况进行灵活调整。这种模型在进步并发功能方面表现出色,却无法确保音讯的次序性。

假如需求确保音讯依照特定次序被消费,就需求在选用第二种消费模型的基础上进行修正,这儿咱们选用了如下计划来解决音讯的次序消费问题:

【Kafka实战】才智工厂IOT项目Kafka音讯积压优化计划

​1. 消费线程池初始化: 在初始化阶段,对消费线程池进行初始化。创立若干个单线程线程池,数量由 threadsNumMax 决议。每个线程池的目的是为了确保每个分区取模后能获取到一个线程池,以完成串行消费。

  1. 音讯分配到线程池: 对音讯分区进行取模,依据取模后的序号从线程池列表缓存中获取一个线程池。相同分区的音讯分配到相同线程池中执行,确保相同分区的音讯串行执行,完成音讯的次序性。

  2. 位移提交机制: 在消费前,创立一个 CountDownLatch,计数为本次音讯数量。消费逻辑中,在 finally 中执行 countDown 操作,主线程堵塞等候本次音讯消费完成。为了确保手动提交位移的正确性,只有当本次拉取的音讯悉数消费完毕后才进行位移提交。

  3. 防止音讯积压:

假如某些分区的音讯堆积量少于500条,会继续从其他分区拉取音讯,以抵达每次拉取500条音讯的方针。当每个分区的积压超越500条时,每次拉取的音讯中很或许只包含一个分区。在这种状况下,使用分区取模将同一个分区的音讯放到指定的线程池中进行消费,确保音讯的次序性。

以上机制确保了在各种状况下,可以有用完成次序消费线程模型。

MongDB单调集数据量过大

当MongoDB单表数据量过大的时分会影响数据刺进功率,即使选用了sharding形式,刺进功能也会收到很大的影响,然后影响咱们Kafka整个集群的消费功率。为此,本文先对MongoDB单表刺进功能受调集巨细的影响程度进行展现,然后剖析解决办法。因为咱们首要评论刺进功率导致的Kafka音讯堵塞,因而这儿只列出MongDB刺进功能的测验数据。

MongoDB单调集刺进测验

【Kafka实战】才智工厂IOT项目Kafka音讯积压优化计划

如上图所示,为测验MongoDB集群的刺进功能开支示意图,测验服务器运用的是CentOS操作体系,24GB内存。这儿分别测验了MongDB的单条记载刺进和批量刺进在单机形式和Sharding形式下的刺进功率。可以看出,当MongDB单调集数据量抵达2000万条的时分,无论是单机形式还是sharding形式刺进功能都出现了陡降。

关于单机形式来说,抵达2000万条功能骤减是因为服务器内存刚好悉数占满,MongDB的内存映射办法使妥当一切数据都在内存上的时分刺进速度飞快,可是当部分数据需求移动到磁盘之后刺进的功能下降严峻。因为当调集中数据量过大时,MongDB会不断将磁盘的数据换入内存,形成IO压力十分大。

MongoDB 单调集索引测验

【Kafka实战】才智工厂IOT项目Kafka音讯积压优化计划

如上图所示,跟着MongDB单调集的数据量添加,MongoDB的索引巨细简直也是直线型上升的,当数据量大概在2亿的时分,简直占满了整个机器的内存。

显然易见的是,当MongoDB单表数据量过大的时分,索引更新愈加困难,此时,MongoDB不仅要处理数据写入磁盘的作业还需求更新巨大的索引,这无疑也会对数据刺进功能形成严峻影响。

优化办法

理解了音讯刺进功率变低的原因之后,优化办法其实十分简略,只要控制MongoDB集群中的单表巨细即可。

咱们的项目中,一般一个主题是对应的一个工厂中的某类IOT数据,这些数据一般会被写入同一个姓名的MongoDB 调集傍边,而这个调集一般依照时刻来拆分指令。

在优化之前,关于单表大概是以月为周期来进行表的拆分,这样的好处是方便依据时刻查询历史数据的时分快速定位到相应的调集傍边。可是跟着项目的扩大,现在单表的数据量现已抵达了数亿条的等级,无疑这样的数据量的状况下再进行数据的刺进是很不正确的行为。

因而,咱们对MongoDB的表拆分的时刻距离进行了调整,大概一周会拆分出一个新的表来进行数据的存储。关于历史数据,咱们选用其它的办法来做查询优化,因为和Kafka的音讯堆积无关这儿咱们不做评论。优化之后,单表的数据量控制在几千万~1亿条之间,可以较好的进步刺进功率。

在进步了刺进功能的一起,Kafka的消费功率得到了必定程度上的进步,有用的缓解了音讯积压的状况产生。

总结

​这篇文章深入探讨了在才智工厂IOT项目中,Kafka音讯积压的优化计划。

首要,文章从事务场景、事务规划等方面介绍了该项目的背景,并具体说明晰Kafka集群的构成,包含音讯集群、Log集群、耐久化集群和IOT数据集群。

在面临音讯积压的应战时,文章提出了一系列优化计划。经过IOT数据包的拆分和压缩,降低了音讯体的巨细,减轻了IO操作的担负,然后进步了Kafka的出产和消费功率。其次,经过优化顾客线程模型,选用单Kafka Consumer实例与多worker线程的形式,进步了全体的消费才能。文章还具体介绍了Kafka的音讯拉取模型,并说明晰为了确保音讯的次序性而选用的线程模型调整计划。

在处理MongoDB单表数据量过大的问题上,文章剖析了刺进功能受调集巨细的影响,经过单表拆分和合理的索引办理等办法,有用进步了刺进功率,缓解了Kafka集群的音讯堵塞问题。

最终,文章总结了优化计划的核心思想,即经过合理的数据处理和办理战略,控制数据量巨细,进步体系全体功能。这篇文章为处理大规划项目中Kafka音讯积压问题供给了有力的实践经验和解决思路。