消息队列之 Kafka + EFLFK集群部署
介绍
Kafka是由Apache软件基金会开发的一个开源流处理渠道,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅音讯体系,它能够处理顾客在网站中的一切动作流数据。 这种动作(网页阅览,查找和其他用户的行动)是在现代网络上的许多社会功用的一个关键因素。 这些数据通常是由于吞吐量的要求而经过处理日志和日志聚合来处理。 关于像Hadoop相同的日志数据和离线分析体系,但又要求实时处理的限制,这是一个可行的处理方案。Kafka的意图是经过Hadoop的并行加载机制来一致线上和离线的音讯处理,也是为了经过集群来供给实时的音讯。
pache公司的软件包官方下载地址:Index of /dist (apache.org)
注:kafka从3.0版别之后,不再依靠zookeeper。
Zookeeper 概述
官方下载地址Index of /dist/zookeeper (apache.org)
Zookeeper 界说
Zookeeper是一个开源的分布式的,为分布式框架供给和谐服务的Apache项目。
Zookeeper 作业机制
Zookeeper从规划形式角度来理解:是一个根据观察者形式规划的分布式服务办理框架,它担任存储和办理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状况产生改变,Zookeeper就将担任告诉现已在Zookeeper上注册的那些观察者做出相应的反应。
也便是说 Zookeeper = 文件体系 + 告诉机制。
Zookeeper 特点
- Zookeeper:一个领导者(Leader),多个跟从者(Follower)组成的集群。
- Zookeepe集群中只需有半数以上节点存活,Zookeeper集群就能正常服务。所以Zookeeper合适装置奇数台服务器。
- 全局数据共同:每个Server保存一份相同的数据副本,Client无论衔接到哪个Server,数据都是共同的。
- 更新恳求次序履行,来自同一个Client的更新恳求按其发送次序顺次履行,即先进先出。
- 数据更新原子性,一次数据更新要么成功,要么失利。
- 实时性,在一定时间范围内,Client能读到最新数据。
Zookeeper 数据结构
ZooKeeper数据模型的结构与Linux文件体系很相似,全体上能够看作是一棵树,每个节点称做一个ZNode。每一个ZNode默许能够存储1MB的数据,每个ZNode都能够经过其路径仅有标识。
Zookeeper 运用场景
供给的服务包含:一致命名服务、一致装备办理、一致集群办理、服务器节点动态上下线、软负载均衡等。
一致命名服务
在分布式环境下,经常需求对运用/服务进行一致命名,便于识别。例如:IP不容易记住,而域名容易记住。
一致装备办理
-
分布式环境下,装备文件同步十分常见。一般要求一个集群中,一切节点的装备信息是共同的,比方Kafka集群。对装备文件修正后,希望能够快速同步到各个节点上。
-
装备办理可交由ZooKeeper完结。可将装备信息写入ZooKeeper上的一个Znode。各个客户端服务器监听这个Znode。一旦 Znode中的数据被修正,ZooKeeper将告诉各个客户端服务器。
一致集群办理
-
分布式环境中,实时把握每个节点的状况是必要的。可根据节点实时状况做出一些调整。
-
ZooKeeper能够完结实时监控节点状况改变。可将节点信息写入ZooKeeper上的一个ZNode。监听这个ZNode可获取它的实时状况改变。
服务器动态上下线
客户端能实时洞悉到服务器上下线的改变。
软负载均衡
在Zookeeper中记载每台服务器的拜访数,让拜访数最少的服务器去处理最新的客户端恳求。
Zookeeper 推举机制
第一次发动推举机制
假设有5台服务器:
-
服务器1发动,主张一次推举。服务器1投自己一票。此刻服务器1票数一票,不够半数以上(3票),推举无法完结,服务器1状况坚持为LOOKING;
-
服务器2发动,再主张一次推举。服务器1和2别离投自己一票并交流选票信息:此刻服务器1发现服务器2的myid比自己目前投票推举的(服务器1)大,更改选票为推举服务器2。此刻服务器1票数0票,服务器2票数2票,没有半数以上成果,推举无法完结,服务器1,2状况坚持LOOKING。
-
服务器3发动,主张一次推举。此刻服务器1和2都会更改选票为服务器3。此次投票成果:服务器1为0票,服务器2为0票,服务器3为3票。此刻服务器3的票数现已超越半数,服务器3当选Leader。服务器1,2更改状况为FOLLOWING,服务器3更改状况为LEADING;
-
服务器4发动,主张一次推举。此刻服务器1,2,3现已不是LOOKING状况,不会更改选票信息。交流选票信息成果:服务器3为3票,服务器4为1票。此刻服务器4服从大都,更改选票信息为服务器3,并更改状况为FOLLOWING;
-
服务器5发动,和服务器4相同当小弟。
非第一次发动推举机制
- 当ZooKeeper 集群中的一台服务器呈现以下两种状况之一时,就会开端进入Leader推举:
- 服务器初始化发动。
- 服务器运行期间无法和Leader坚持衔接。
- 而当一台机器进入Leader推举流程时,当时集群也或许会处于以下两种状况:
- 集群中本来就现已存在一个Leader。
- 关于现已存在Leader的状况,机器试图去推举Leader时,会被告知当时服务器的Leader信息,关于该机器来说,只是需求和 Leader机器树立衔接,并进行状况同步即可。
- 集群中确实不存在Leader。
- 假设ZooKeeper由5台服务器组成,SID别离为1、2、3、4、5,ZXID别离为8、8、8、7、7,而且此刻SID为3的服务器是Leader。某一时间,3和5服务器呈现毛病,因此开端进行Leader推举。
- 集群中本来就现已存在一个Leader。
推举Leader规矩
- EPOCH大的直接胜出
- EPOCH相同,事务id大的胜出
- 事务id相同,服务器id大的胜出
- SID:服务器ID。用来仅有标识一台ZooKeeper集群中的机器,每台机器不能重复,和myid共同。
- ZXID:事务ID。ZXID是一个事务ID,用来标识一次服务器状况的变更。在某一时间,集群中的每台机器的ZXID值不一定完全共同,这和ZooKeeper服务器关于客户端“更新恳求”的处理逻辑速度有关。
- Epoch:每个Leader任期的代号。没有Leader时同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加。
布置 Zookeeper 集群
准备 3 台服务器做 Zookeeper 集群:
192.168.44.50
192.168.44.100
192.168.44.150
操作过程(3台服务器操作相同)
server.A=B:C:D
●A是一个数字,表明这个是第几号服务器。集群形式下需求在zoo.cfg中dataDir指定的目录下创立一个文
件myid,这个文件里面有一个数据便是A的值,Zookeeper发动时读取此文件,拿到里面的数据与zoo.cfg里
面的装备信息比较然后判别到底是哪个server。
●B是这个服务器的地址。
●C是这个服务器Follower与集群中的Leader服务器交流信息的端口。
●D是万一集群中的Leader服务器挂了,需求一个端口来从头进行推举,选出一个新的Leader,而这个端口
便是用来履行推举时服务器相互通讯的端口。
-
装备 Zookeeper 发动脚本
vim /etc/init.d/zookeeper
#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/usr/local/zookeeper-3.5.7'
case $1 in
start)
echo "---------- zookeeper 发动 ------------"
$ZK_HOME/bin/zkServer.sh start
;;
stop)
echo "---------- zookeeper 停止 ------------"
$ZK_HOME/bin/zkServer.sh stop
;;
restart)
echo "---------- zookeeper 重启 ------------"
$ZK_HOME/bin/zkServer.sh restart
;;
status)
echo "---------- zookeeper 状况 ------------"
$ZK_HOME/bin/zkServer.sh status
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac
// 设置开机自启
chmod +x /etc/init.d/zookeeper
chkconfig --add zookeeper
//别离发动 Zookeeper
service zookeeper start
//检查当时状况
service zookeeper status
音讯行列概述
为什么需求音讯行列(MQ)
首要原因是由于在高并发环境下,同步恳求来不及处理,恳求往往会产生阻塞。比方大量的恳求并发拜访数据库,导致行锁表锁,最后恳求线程会堆积过多,然后触发 too many connection 错误,引发雪崩效应。
咱们运用音讯行列,经过异步处理恳求,然后缓解体系的压力。音讯行列常运用于异步处理,流量削峰,运用解耦,音讯通讯等场景。
当时比较常见的 MQ 中间件有:ActiveMQ、RabbitMQ、RocketMQ、Kafka 等。
运用音讯行列的优点
解耦
答应你独立的扩展或修正两头的处理过程,只需确保它们遵守同样的接口约束。
可康复性
体系的一部分组件失效时,不会影响到整个体系。音讯行列降低了进程间的耦合度,所以即使一个处理音讯的进程挂掉,参加行列中的音讯仍然能够在体系康复后被处理。
缓冲
有助于操控和优化数据流经过体系的速度,处理出产音讯和消费音讯的处理速度不共同的状况。
灵活性 & 峰值处理才能
在拜访量剧增的状况下,运用仍然需求持续发挥作用,但是这样的突发流量并不常见。假如为以能处理这类峰值拜访为规范来投入资源随时待命无疑是巨大的糟蹋。运用音讯行列能够使关键组件顶住突发的拜访压力,而不会由于突发的超负荷的恳求而完全崩溃。
异步通讯
很多时分,用户不想也不需求当即处理音讯。音讯行列供给了异步处理机制,答运用户把一个音讯放入行列,但并不当即处理它。想向行列中放入多少音讯就放多少,然后在需求的时分再去处理它们。
各MQ比较
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
单机吞吐量 | 万级 | 万级 | 十万级 | 十万级 |
topic数量对吞吐量的影响 | – | – | topic能够到达几百,几千个的级别,吞吐量会有较小幅度的下降 | topic从几十个到几百个的时分,吞吐量会大幅度下降 |
时效性 | 毫秒级}奇妙级 | 毫秒级 | 毫秒级 | |
可用性 | 高 | 高 | 十分高,分布式架构 | 十分高,分布式架构 |
音讯可靠性 | 有较低的概率丢掉数据 | – | 经过参数优化装备,能够做到0丢掉 | 经过参数优化装备,音讯能够做到0丢掉 |
功用支撑 | 完善 | 并发才能强,功用及其好,延时很低 | MQ功用较为完善,还是分布式的,扩展性好 | 功用较为简单,首要支撑简单的MQ功用,在大数据范畴的实时计算以及日志搜集被大规模运用,是事实上的规范 |
优劣势总结 | 十分成熟,功用强大,偶尔会有较低概率丢掉音讯,社区不活跃了 | 功用及其好,延时很低,功用完善,供给办理页面,社区比较活跃,吞吐量较低,运用erlang开发源码阅览不便利 | 接口简单易用,吞吐量高,分布式扩展便利,社区还算活跃,经过双11的考验 | MQ功用比较少,吞吐量高,分布式架构,或许存在信息重复消费问题,首要适用大数据实时计算以及日志搜集 |
音讯行列的两种形式
点对点形式
1对1,顾客自动拉取数据,音讯收到后音讯铲除
- 音讯出产者出产音讯发送到音讯行列中,然后音讯顾客从音讯行列中取出而且消费音讯。音讯被消费以后,音讯行列中不再有存储,所以音讯顾客不或许消费到现已被消费的音讯。音讯行列支撑存在多个顾客,但是对一个音讯而言,只会有一个顾客能够消费。
发布/订阅形式
一对多,又名观察者形式,顾客消费数据之后不会铲除音讯
- 音讯出产者(发布)将音讯发布到 topic 中,一起有多个音讯顾客(订阅)消费该音讯。和点对点方式不同,发布到 topic 的音讯会被一切订阅者消费。
- 发布/订阅形式是界说目标间一种一对多的依靠联系,使得每当一个目标(方针目标)的状况产生改变,则一切依靠于它的目标(观察者目标)都会得到告诉并自动更新。
Kafka概述
Kafka 界说
Kafka 是一个分布式的根据发布/订阅形式的音讯行列(MQ,Message Queue),首要运用于大数据实时处理范畴。
Kafka 简介
Kafka 是最初由 Linkedin 公司开发,是一个分布式、支撑分区的(partition)、多副本的(replica),根据 Zookeeper 和谐的分布式音讯中间件体系,它的最大的特性便是能够实时的处理大量数据以满意各种需求场景,比方根据 hadoop 的批处理体系、低推迟的实时体系、Spark/Flink 流式处理引擎,nginx 拜访日志,音讯服务等等,用 scala 语言编写, Linkedin 于 2010 年贡献给了 Apache 基金会并成为尖端开源项目。
Kafka 的特性
高吞吐量、低推迟
Kafka 每秒能够处理几十万条音讯,它的推迟最低只要几毫秒。每个 topic 能够分多个 Partition,Consumer Group 对 Partition 进行消费操作,进步负载均衡才能和消费才能。
可扩展性
kafka 集群支撑热扩展。
耐久性、可靠性
音讯被耐久化到本地磁盘,而且支撑数据备份避免数据丢掉。
容错性
答应集群中节点失利(多副本状况下,若副本数量为 n,则答应 n-1 个节点失利)。
高并发
支撑数千个客户端一起读写。
Kafka 体系架构
Broker
一台 kafka 服务器便是一个 broker。一个集群由多个 broker 组成。一个 broker 能够包容多个 topic。
Topic
能够理解为一个行列,出产者和顾客面向的都是一个 topic。 相似于数据库的表名或许 ES 的 index 物理上不同 topic 的音讯分开存储
Partition
为了完结扩展性,一个十分大的 topic 能够分布到多个 broker(即服务器)上,一个 topic 能够切割为一个或多个 partition,每个 partition 是一个有序的行列。Kafka 只确保 partition 内的记载是有序的,而不确保 topic 中不同 partition 的次序。
每个 topic 至少有一个 partition,当出产者产生数据的时分,会根据分配策略挑选分区,然后将音讯追加到指定的分区的行列末尾。
Partation 数据路由规矩
- 指定了 patition,则直接运用;
- 未指定 patition 但指定 key(相当于音讯中某个属性),经过对 key 的 value 进行 hash 取模,选出一个 patition;
- patition 和 key 都未指定,运用轮询选出一个 patition。
注意
- 每条音讯都会有一个自增的编号,用于标识音讯的偏移量,标识次序从 0 开端。
- 每个 partition 中的数据运用多个 segment 文件存储。
- 假如 topic 有多个 partition,消费数据时就不能确保数据的次序。严厉确保音讯的消费次序的场景下(例如产品秒杀、 抢红包),需求将 partition 数目设为 1。
broker、topic、partition三者的联系
- broker 存储 topic 的数据。假如某 topic 有 N 个 partition,集群有 N 个 broker,那么每个 broker 存储该 topic 的一个 partition。
- 假如某 topic 有 N 个 partition,集群有 (N+M) 个 broker,那么其中有 N 个 broker 存储 topic 的一个 partition, 剩余的 M 个 broker 不存储该 topic 的 partition 数据。
- 假如某 topic 有 N 个 partition,集群中 broker 数目少于 N 个,那么一个 broker 存储该 topic 的一个或多个 partition。在实践出产环境中,尽量避免这种状况的产生,这种状况容易导致 Kafka 集群数据不均衡。
分区的原因
- 便利在集群中扩展,每个Partition能够经过调整以习惯它地点的机器,而一个topic又能够有多个Partition组成,因此整个集群就能够习惯恣意巨细的数据了;
- 能够进步并发,由于能够以Partition为单位读写了。
Replica
副本,为确保集群中的某个节点产生毛病时,该节点上的 partition 数据不丢掉,且 kafka 仍然能够持续作业,kafka 供给了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
Leader
每个 partition 有多个副本,其中有且仅有一个作为 Leader,Leader 是当时担任数据的读写的 partition。
Follower
Follower 跟从 Leader,一切写恳求都经过 Leader 路由,数据变更会播送给一切 Follower,Follower 与 Leader 坚持数据同步。Follower 只担任备份,不担任数据的读写。
假如 Leader 毛病,则从 Follower 中推举出一个新的 Leader。
当 Follower 挂掉、卡住或许同步太慢,Leader 会把这个 Follower 从 ISR(Leader 保护的一个和 Leader 坚持同步的 Follower 调集) 列表中删去,从头创立一个 Follower。
Producer
出产者即数据的发布者,该人物将音讯 push 发布到 Kafka 的 topic 中。
broker 接收到出产者发送的音讯后,broker 将该音讯追加到当时用于追加数据的 segment 文件中。
出产者发送的音讯,存储到一个 partition 中,出产者也能够指定数据存储的 partition。
Consumer
顾客能够从 broker 中 pull 拉取数据。顾客能够消费多个 topic 中的数据。
Consumer Group(CG)
- 顾客组,由多个 consumer 组成。
- 一切的顾客都归于某个顾客组,即顾客组是逻辑上的一个订阅者。可为每个顾客指定组名,若不指定组名则归于默许的组。
- 将多个顾客会集到一起去处理某一个 Topic 的数据,能够更快的进步数据的消费才能。
- 顾客组内每个顾客担任消费不同分区的数据,一个分区只能由一个组内顾客消费,避免数据被重复读取。
- 顾客组之间互不影响。
offset 偏移量
- 能够仅有的标识一条音讯。
- 偏移量决定读取数据的方位,不会有线程安全的问题,顾客经过偏移量来决定下次读取的音讯(即消费方位)。
- 音讯被消费之后,并不被立刻删去,这样多个事务就能够重复运用 Kafka 的音讯。
- 某一个事务也能够经过修正偏移量到达从头读取音讯的意图,偏移量由用户操控。
- 音讯最终还是会被删去的,默许生命周期为 1 周(7*24小时)。
Zookeeper
Kafka 经过 Zookeeper 来存储集群的 meta 信息。
由于 consumer 在消费过程中或许会呈现断电宕机等毛病,consumer 康复后,需求从毛病前的方位的持续消费,所以 consumer 需求实时记载自己消费到了哪个 offset,以便毛病康复后持续消费。
Kafka 0.9 版别之前,consumer 默许将 offset 保存在 Zookeeper 中;从 0.9 版别开端,consumer 默许将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。
也便是说,zookeeper的作用便是,出产者push数据到kafka集群,就必须要找到kafka集群的节点在哪里,这些都是经过zookeeper去寻找的。顾客消费哪一条数据,也需求zookeeper的支撑,从zookeeper获得offset,offset记载上一次消费的数据消费到哪里,这样就能够接着下一条数据进行消费。
布置 kafka 集群
zookeeper存储kafka集群的元数据
- 要先布置zookeeper集群,在zookeeper集群基础上装置kafka运用,节点数量要是>=3的奇数台
- 出产者推送数据到kafxa集群需求先经过zookeeper确认kafka的方位,音讯者消费的数据到哪里了也要从存储在zookeeper上的offset确认
- offer偏移量记载上一条顾客消费的数据方位,以便在毛病康复后能够接着下一跳数据持续消费
- 几个kaf ka服务器便是几个broker,生成推送数据到topic当中,topie能够被分区多个Partition,一个Partition能够右多个leplicalieplica能够是一个leoder和多个folower,leader担任数据的读写,follower仅担任仿制备份,顾客面向topic进行数据消费
试验环境
准备 3 台服务器做 Zookeeper 集群:
192.168.44.50
192.168.44.100
192.168.44.150
操作过程
-
装备 Zookeeper 发动脚本
vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)
echo "---------- Kafka 发动 ------------"
${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
echo "---------- Kafka 停止 ------------"
${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
$0 stop
$0 start
;;
status)
echo "---------- Kafka 状况 ------------"
count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
if [ "$count" -eq 0 ];then
echo "kafka is not running"
else
echo "kafka is running"
fi
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac
//设置开机自启
chmod +x /etc/init.d/kafka
chkconfig --add kafka
//别离发动 Kafka
service kafka start
Kafka 命令行操作
创立topic
kafka-topics.sh --create --zookeeper 192.168.44.50:2181,192.168.44.100:2181,192.168.44.150:2181 --replication-factor 2 --partitions 3 --topic test
- –zookeeper:界说 zookeeper 集群服务器地址,假如有多个 IP 地址运用逗号切割,一般运用一个 IP 即可
- –replication-factor:界说分区副本数,1 代表单副本,主张为 2
- –partitions:界说分区数
- –topic:界说 topic 称号
检查当时服务器中的一切 topic
kafka-topics.sh --list --zookeeper 192.168.44.50:2181,192.168.44.100:2181,192.168.44.150:2181
检查某个 topic 的详情
kafka-topics.sh --describe --zookeeper 192.168.44.50:2181,192.168.44.100:2181,192.168.44.150:2181
发布音讯
kafka-console-producer.sh --broker-list 192.168.44.50:2181,192.168.44.100:2181,192.168.44.150:2181 --topic test
消费音讯
kafka-console-consumer.sh --bootstrap-server 192.168.44.50:2181,192.168.44.100:2181,192.168.44.150:2181 --topic test --from-beginning
–from-beginning:会把主题中以往一切的数据都读取出来
修正分区数
kafka-topics.sh --zookeeper 192.168.44.50:2181,192.168.44.100:2181,192.168.44.150:2181 --alter --topic test --partitions 6
删去 topic
kafka-topics.sh --delete --zookeeper 192.168.44.50:2181,192.168.44.100:2181,192.168.44.150:2181 --topic test
布置Filebeat+Kafka+ELK
-
布置 Zookeeper+Kafka 集群
-
布置 Filebeat
cd /usr/local/filebeat
vim filebeat.yml
filebeat.prospectors:
- type: log
enabled: true
paths:
- /var/log/httpd/access_log
tags: ["access"]
- type: log
enabled: true
paths:
- /var/log/httpd/error_log
tags: ["error"]
......
#增加输出到 Kafka 的装备
output.kafka:
enabled: true
hosts: ["192.168.44.50:9092","192.168.44.100:9092","192.168.44.150:9092"] #指定 Kafka 集群装备
topic: "httpd" #指定 Kafka 的 topic
#发动 filebeat
./filebeat -e -c filebeat.yml
- 布置 ELK,在 Logstash 组件地点节点上新建一个 Logstash 装备文件
cd /etc/logstash/conf.d/
vim kafka.conf
input {
kafka {
bootstrap_servers => "192.168.44.50:9092","192.168.44.100:9092","192.168.44.150:9092" #kafka集群地址
topics => "httpd" #拉取的kafka的指定topic
type => "httpd_kafka" #指定 type 字段
codec => "json" #解析json格局的日志数据
auto_offset_reset => "latest" #拉取最近数据,earliest为从头开端拉取
decorate_events => true #传递给elasticsearch的数据额定增加kafka的属性数据
}
}
output {
if "access" in [tags] {
elasticsearch {
hosts => ["192.168.44.20:9200"]
index => "httpd_access-%{+YYYY.MM.dd}"
}
}
if "error" in [tags] {
elasticsearch {
hosts => ["192.168.44.20:9200"]
index => "httpd_error-%{+YYYY.MM.dd}"
}
}
stdout { codec => rubydebug }
}
#发动 logstash
logstash -f kafka.conf
- 阅览器拜访 http://192.168.44.40:5601 登录 Kibana,单击“Create Index Pattern”按钮增加索引“filebeat_test-*”,单击 “create” 按钮创立,单击 “Discover” 按钮可检查图表信息及日志信息。