Zookeeper 界说

Zookeeper是一个开源的分布式的,为分布式结构供给协调服务的Apache项目。

Zookeeper 作业机制

Zookeeper从规划形式视点来了解:是一个依据观察者形式规划的分布式服务办理结构,它担任存储和办理大家都关心的数据,然后承受观察者的注册,一旦这些数据的状况产生改动,Zookeeper就将担任告诉现已在Zookeeper上注册的那些观察者做出相应的反应。

也便是说 Zookeeper = 文件体系 + 告诉机制。

zookeeper+kafka

Zookeeper 特点

(1)Zookeeper:一个领导者(Leader),多个跟从者(Follower)组成的集群。

(2)Zookeepe集群中只需有对折以上节点存活,Zookeeper集群就能正常服务。所以Zookeeper适合装置奇数台服务器。

(3)大局数据共同:每个Server保存一份相同的数据副本,Client不管衔接到哪个Server,数据都是共同的。

(4)更新恳求次序履行,来自同一个Client的更新恳求按其发送次序顺次履行,即先进先出。

(5)数据更新原子性,一次数据更新要么成功,要么失利。

(6)实时性,在必定时刻范围内,Client能读到最新数据。

zookeeper+kafka

Zookeeper 数据结构

ZooKeeper数据模型的结构与Linux文件体系很类似,整体上能够看作是一棵树,每个节点称做一个ZNode。每一个ZNode默许能够存储1MB的数据,每个ZNode都能够经过其路径仅有标识。

Zookeeper 运用场景

供给的服务包含:共同命名服务、共同装备办理、共同集群办理、服务器节点动态上下线、软负载均衡等。

共同命名服务

在分布式环境下,经常需求对运用/服务进行共同命名,便于识别。例如:IP不简单记住,而域名简单记住。

共同装备办理

(1)分布式环境下,装备文件同步十分常见。一般要求一个集群中,一切节点的装备信息是共同的,比方Kafka集群。对装备文件修正后,希望能够快速同步到各个节点上。

(2)装备办理可交由ZooKeeper完结。可将装备信息写入ZooKeeper上的一个Znode。各个客户端服务器监听这个Znode。一旦 Znode中的数据被修正,ZooKeeper将告诉各个客户端服务器。

共同集群办理

(1)分布式环境中,实时掌握每个节点的状况是必要的。可依据节点实时状况做出一些调整。

(2)ZooKeeper能够完结实时监控节点状况改动。可将节点信息写入ZooKeeper上的一个ZNode。监听这个ZNode可获取它的实时状况改动。

服务器动态上下线

客户端能实时洞悉到服务器上下线的改动。

软负载均衡

在Zookeeper中记载每台服务器的拜访数,让拜访数最少的服务器去处理最新的客户端恳求。

Zookeeper 推举机制

第一次发动推举机制

假定有5台服务器:

(1)服务器1发动,主张一次推举。服务器1投自己一票。此刻服务器1票数一票,不够对折以上(3票),推举无法完结,服务器1状况坚持为LOOKING;

(2)服务器2发动,再主张一次推举。服务器1和2别离投自己一票并交流选票信息:此刻服务器1发现服务器2的myid比自己目前投票推举的(服务器1)大,更改选票为推举服务器2。此刻服务器1票数0票,服务器2票数2票,没有对折以上成果,推举无法完结,服务器1,2状况坚持LOOKING。

(3)服务器3发动,主张一次推举。此刻服务器1和2都会更改选票为服务器3。此次投票成果:服务器1为0票,服务器2为0票,服务器3为3票。此刻服务器3的票数现已超越对折,服务器3当选Leader。服务器1,2更改状况为FOLLOWING,服务器3更改状况为LEADING;

(4)服务器4发动,主张一次推举。此刻服务器1,2,3现已不是LOOKING状况,不会更改选票信息。交流选票信息成果:服务器3为3票,服务器4为1票。此刻服务器4恪守大都,更改选票信息为服务器3,并更改状况为FOLLOWING;

(5)服务器5发动,和服务器4一样当小弟。

zookeeper+kafka

非第一次发动推举机制

1、当ZooKeeper 集群中的一台服务器出现以下两种状况之一时,就会开端进入Leader推举:

(1)服务器初始化发动。

(2)服务器运行期间无法和Leader坚持衔接。

2、而当一台机器进入Leader推举流程时,当时集群也或许会处于以下两种状况:

(1)集群中本来就现已存在一个Leader。

  • 关于现已存在Leader的状况,机器试图去推举Leader时,会被奉告当时服务器的Leader信息,关于该机器来说,只是需求和 Leader机器树立衔接,并进行状况同步即可。

(2)集群中的确不存在Leader。

  • 假定ZooKeeper由5台服务器组成,SID别离为1、2、3、4、5,ZXID别离为8、8、8、7、7,而且此刻SID为3的服务器是Leader。某一时刻,3和5服务器出现毛病,因此开端进行Leader推举。

  • 推举Leader规则:

    1. EPOCH大的直接胜出
    2. EPOCH相同,事务id大的胜出
    3. 事务id相同,服务器id大的胜出

小贴士:

  • SID:服务器ID。用来仅有标识一台ZooKeeper集群中的机器,每台机器不能重复,和myid共同。
  • ZXID:事务ID。ZXID是一个事务ID,用来标识一次服务器状况的改动。在某一时刻,集群中的每台机器的ZXID值不必定完全共同,这和ZooKeeper服务器关于客户端“更新恳求”的处理逻辑速度有关。
  • Epoch:每个Leader任期的代号。没有Leader时同一轮投票进程中的逻辑时钟值是相同的。每投完一次票这个数据就会添加。

zookeeper+kafka

布置 Zookeeper 集群

试验环境:

预备 3 台服务器做 Zookeeper 集群:

192.168.20.10

192.168.20.20

192.168.20.30

试验操作:

##1.装置前预备
#封闭防火墙
systemctl stop firewalld
systemctl disable firewalld
setenforce 0
​
#装置 JDK 环境。假如服务器无法衔接外网,需求先搭建本地yum仓库
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
java -version
​
#zookeeper下载装置包
#官方下载地址:https://archive.apache.org/dist/zookeeper/#wget命令是Linux体系用于从Web下载文件的命令行工具,服务器联通外网的状况下,可运用此种方法下载软件包。假如服务器无法衔接外网,需求提早预备好软件包,放入/opt/目录下。
cd /opt/
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz
​
##2.装置 Zookeeper。提早将zookeeper的装置包传到/opt/目录下。
cd /opt/
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7
​
##3.修正装备文件
cd /usr/local/zookeeper-3.5.7/conf/
cp zoo_sample.cfg zoo.cfg
​
vim zoo.cfg
 tickTime=2000   #通讯心跳时刻,Zookeeper服务器与客户端心跳时刻,单位毫秒
 initLimit=10    #Leader和Follower初始衔接时能忍受的最多心跳数(tickTime的数量),这儿表明为10*2s
 syncLimit=5     #Leader和Follower之间同步通讯的超时时刻,这儿表明假如超越5*2s,Leader以为Follwer死掉,并从服务器列表中删去Follwer
 dataDir=/usr/local/zookeeper-3.5.7/data      ●修正,指定保存 Zookeeper中的数据的目录,目录需求单独创立
 dataLogDir=/usr/local/zookeeper-3.5.7/logs   ●添加,指定寄存日志的目录,目录需求单独创立
 clientPort=2181   #客户端衔接端口
 #添加集群信息
 server.1=192.168.20.10:3188:3288
 server.2=192.168.20.20:3188:3288
 server.3=192.168.20.30:3188:3288
-------------------------------------------------------------------------------------
server.A=B:C:D
●A是一个数字,表明这个是第几号服务器。集群形式下需求在zoo.cfg中dataDir指定的目录下创立一个文件myid,这个文件里边有一个数据便是A的值,Zookeeper发动时读取此文件,拿到里边的数据与zoo.cfg里边的装备信息比较然后判断到底是哪个server。
●B是这个服务器的地址。
●C是这个服务器Follower与集群中的Leader服务器交流信息的端口。
●D是假如集群中的Leader服务器挂了,需求一个端口来从头进行推举,选出一个新的Leader,而这个端口便是用来履行推举时服务器相互通讯的端口。
-------------------------------------------------------------------------------------
​
#在每个节点上创立数据目录和日志目录
mkdir /usr/local/zookeeper-3.5.7/data
mkdir /usr/local/zookeeper-3.5.7/logs
​
#在每个节点的dataDir指定的目录下创立一个 myid 的文件,留意每个节点的myid不能相同
echo 1 > /usr/local/zookeeper-3.5.7/data/myid
 echo 2 > /usr/local/zookeeper-3.5.7/data/myid
 echo 3 > /usr/local/zookeeper-3.5.7/data/myid
​
##4.装备 Zookeeper 发动脚本,将zookeeper加入体系服务办理
 vim /etc/init.d/zookeeper
#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/usr/local/zookeeper-3.5.7'
case $1 in
start)
  echo "---------- zookeeper 发动 ------------"
  $ZK_HOME/bin/zkServer.sh start
   ;;
stop)
  echo "---------- zookeeper 中止 ------------"
  $ZK_HOME/bin/zkServer.sh stop
   ;;
restart)
  echo "---------- zookeeper 重启 ------------"
  $ZK_HOME/bin/zkServer.sh restart
   ;;
status)
  echo "---------- zookeeper 状况 ------------"
  $ZK_HOME/bin/zkServer.sh status
   ;;
*)
  echo "Usage: $0 {start|stop|restart|status}"
esac
EOF
​
#为脚本添加履行权限。添加到发动服务,设置为开机自启。
chmod +x /etc/init.d/zookeeper
chkconfig --add zookeeper
#chkconfig --list zookeeper 可检查发动服务
 #别离发动 Zookeeper
 service zookeeper start
 #检查当时状况
 service zookeeper status
仿制代码

1.装置前预备

zookeeper+kafka

zookeeper+kafka
2、装置Zookeeper
zookeeper+kafka

zookeeper+kafka
修正装备文件
zookeeper+kafka

zookeeper+kafka
装备 Zookeeper 发动脚本

zookeeper+kafka

zookeeper+kafka

zookeeper+kafka

音讯行列概述

为什么需求音讯行列(MQ)

首要原因是因为在高并发环境下,同步恳求来不及处理,恳求往往会产生阻塞。比方很多的恳求并发拜访数据库,导致行锁表锁,终究恳求线程会堆积过多,然后触发 too many connection 过错,引发雪崩效应。

我们运用音讯行列,经过异步处理恳求,然后缓解体系的压力。音讯行列常运用于异步处理,流量削峰,运用解耦,音讯通讯等场景。

当时比较常见的 MQ 中间件有:ActiveMQ、RabbitMQ、RocketMQ、Kafka 等。

运用音讯行列的优点

解耦

答应你独立的扩展或修正两头的处理进程,只需确保它们恪守同样的接口束缚。

可康复性

体系的一部分组件失效时,不会影响到整个体系。音讯行列降低了进程间的耦合度,所以即便一个处理音讯的进程挂掉,加入行列中的音讯仍然能够在体系康复后被处理。

缓冲

有助于操控和优化数据流经过体系的速度,解决出产音讯和消费音讯的处理速度不共同的状况。

灵活性 & 峰值处理才能

在拜访量剧增的状况下,运用仍然需求持续发挥效果,可是这样的突发流量并不常见。假如为以能处理这类峰值拜访为规范来投入资源随时待命无疑是巨大的浪费。运用音讯行列能够使关键组件顶住突发的拜访压力,而不会因为突发的超负荷的恳求而完全崩溃。

异步通讯

很多时分,用户不想也不需求当即处理音讯。音讯行列供给了异步处理机制,答使用户把一个音讯放入行列,但并不当即处理它。想向行列中放入多少音讯就放多少,然后在需求的时分再去处理它们。

zookeeper+kafka

音讯行列的两种形式

(1)点对点形式(1对1,顾客自动拉取数据,音讯收到后音讯铲除)

  • 音讯出产者出产音讯发送到音讯行列中,然后音讯顾客从音讯行列中取出而且消费音讯。音讯被消费以后,音讯行列中不再有存储,所以音讯顾客不或许消费到现已被消费的音讯。音讯行列支撑存在多个顾客,可是对一个音讯而言,只会有一个顾客能够消费。

zookeeper+kafka

(2)发布/订阅形式(一对多,又叫观察者形式,顾客消费数据之后不会铲除音讯)

  • 音讯出产者(发布)将音讯发布到 topic 中,一起有多个音讯顾客(订阅)消费该音讯。和点对点方式不同,发布到 topic 的音讯会被一切订阅者消费。
  • 发布/订阅形式是界说目标间一种一对多的依靠关系,使得每当一个目标(方针目标)的状况产生改动,则一切依靠于它的目标(观察者目标)都会得到告诉并自动更新。

zookeeper+kafka

Kafka概述

官方下载地址:kafka.apache.org/downloads.h…

Kafka 界说

Kafka 是一个分布式的依据发布/订阅形式的音讯行列(MQ,Message Queue),首要运用于大数据实时处理范畴。

Kafka 简介

Kafka 是最初由 Linkedin 公司开发,是一个分布式、支撑分区的(partition)、多副本的(replica),依据 Zookeeper 协调的分布式音讯中间件体系,它的最大的特性便是能够实时的处理很多数据以满意各种需求场景,比方依据 hadoop 的批处理体系、低推迟的实时体系、Spark/Flink 流式处理引擎,nginx 拜访日志,音讯服务等等,用 scala 言语编写, Linkedin 于 2010 年贡献给了 Apache 基金会并成为尖端开源项目。

Kafka 的特性

高吞吐量、低推迟

Kafka 每秒能够处理几十万条音讯,它的推迟最低只要几毫秒。每个 topic 能够分多个 Partition,Consumer Group 对 Partition 进行消费操作,进步负载均衡才能和消费才能。

可扩展性

kafka 集群支撑热扩展。

持久性、可靠性

音讯被持久化到本地磁盘,而且支撑数据备份避免数据丢掉。

容错性

答应集群中节点失利(多副本状况下,若副本数量为 n,则答应 n-1 个节点失利)。

高并发

支撑数千个客户端一起读写。

Kafka 体系架构

Broker

一台 kafka 服务器便是一个 broker。一个集群由多个 broker 组成。一个 broker 能够包容多个 topic。

Topic(主题)

能够了解为一个行列,出产者和顾客面向的都是一个 topic。

类似于数据库的表名或许 ES 的 index。

物理上不同 topic 的音讯分隔存储。

Partition(分区,完结数据分片)

  • 为了完结扩展性,一个十分大的 topic 能够分布到多个 broker(即服务器)上,一个 topic 能够切割为一个或多个 partition,每个 partition 是一个有序的行列。Kafka 只确保 partition 内的记载是有序的,而不确保 topic 中不同 partition 的次序。
  • 每个 topic 至少有一个 partition,当出产者产生数据的时分,会依据分配战略挑选分区,然后将音讯追加到指定的分区的行列结尾。

Partation 数据路由规则:

  1. 指定了 patition,则直接运用;
  2. 未指定 patition 但指定 key(相当于音讯中某个属性),经过对 key 的 value 进行 hash 取模,选出一个 patition;
  3. patition 和 key 都未指定,运用轮询选出一个 patition。

留意:

  • 每条音讯都会有一个自增的编号,用于标识音讯的偏移量,标识次序从 0 开端。
  • 每个 partition 中的数据运用多个 segment 文件存储。
  • 假如 topic 有多个 partition,消费数据时就不能确保数据的次序。严格确保音讯的消费次序的场景下(例如商品秒杀、 抢红包),需求将 partition 数目设为 1。

broker、topic、partition三者的关系:

  • broker 存储 topic 的数据。假如某 topic 有 N 个 partition,集群有 N 个 broker,那么每个 broker 存储该 topic 的一个 partition。
  • 假如某 topic 有 N 个 partition,集群有 (N+M) 个 broker,那么其中有 N 个 broker 存储 topic 的一个 partition, 剩下的 M 个 broker 不存储该 topic 的 partition 数据。
  • 假如某 topic 有 N 个 partition,集群中 broker 数目少于 N 个,那么一个 broker 存储该 topic 的一个或多个 partition。在实践出产环境中,尽量避免这种状况的产生,这种状况简单导致 Kafka 集群数据不均衡。

分区的原因:

  • 方便在集群中扩展,每个Partition能够经过调整以习惯它所在的机器,而一个topic又能够有多个Partition组成,因此整个集群就能够习惯任意巨细的数据了;
  • 能够进步并发,因为能够以Partition为单位读写了。

Replication(副本)

副本,为确保集群中的某个节点产生毛病时,该节点上的 partition 数据不丢掉,且 kafka 仍然能够持续作业,kafka 供给了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。

Leader

每个 partition 有多个副本,其中有且仅有一个作为 Leader,Leader 是当时担任数据的读写的 partition。

Follower

  • Follower 跟从 Leader,一切写恳求都经过 Leader 路由,数据改动会广播给一切 Follower,Follower 与 Leader 坚持数据同步。Follower 只担任备份,不担任数据的读写。
  • 假如 Leader 毛病,则从 Follower 中推举出一个新的 Leader。
  • 当 Follower 挂掉、卡住或许同步太慢,Leader 会把这个 Follower 从 ISR(Leader 维护的一个和 Leader 坚持同步的 Follower 集合) 列表中删去,从头创立一个 Follower。

Producer

  • 出产者即数据的发布者,该角色将音讯 push 发布到 Kafka 的 topic 中。
  • broker 接纳到出产者发送的音讯后,broker 将该音讯追加到当时用于追加数据的 segment 文件中。
  • 出产者发送的音讯,存储到一个 partition 中,出产者也能够指定数据存储的 partition。

Consumer

顾客能够从 broker 中 pull 拉取数据。顾客能够消费多个 topic 中的数据。

Consumer Group

  • 顾客组,由多个 consumer 组成。
  • 一切的顾客都归于某个顾客组,即顾客组是逻辑上的一个订阅者。可为每个顾客指定组名,若不指定组名则归于默许的组。
  • 将多个顾客集中到一起去处理某一个 Topic 的数据,能够更快的进步数据的消费才能。
  • 顾客组内每个顾客担任消费不同分区的数据,一个分区只能由一个组内顾客消费,避免数据被重复读取。
  • 顾客组之间互不影响。

offset 偏移量

  • 能够仅有的标识一条音讯。
  • 偏移量决议读取数据的方位,不会有线程安全的问题,顾客经过偏移量来决议下次读取的音讯(即消费方位)。
  • 音讯被消费之后,并不被马上删去,这样多个事务就能够重复运用 Kafka 的音讯。
  • 某一个事务也能够经过修正偏移量到达从头读取音讯的目的,偏移量由用户操控。
  • 音讯终究仍是会被删去的,默许生命周期为 1 周(7*24小时)。

Zookeeper

  • Kafka 经过 Zookeeper 来存储集群的 meta 信息。
  • 因为 consumer 在消费进程中或许会出现断电宕机等毛病,consumer 康复后,需求从毛病前的方位的持续消费,所以 consumer 需求实时记载自己消费到了哪个 offset,以便毛病康复后持续消费。
  • Kafka 0.9 版别之前,consumer 默许将 offset 保存在 Zookeeper 中;从 0.9 版别开端,consumer 默许将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为 __consumer_offsets。
  • 也便是说,zookeeper的效果便是,出产者push数据到kafka集群,就必须要找到kafka集群的节点在哪里,这些都是经过zookeeper去寻觅的。顾客消费哪一条数据,也需求zookeeper的支撑,从zookeeper取得offset,offset记载上一次消费的数据消费到哪里,这样就能够接着下一条数据进行消费。

五、布置 kafka 集群

试验环境:

三台服务器已搭建好Zookeeper 集群:

192.168.20.10

192.168.20.20

192.168.20.30

试验过程:

## 1.下载装置包 ##
#官方下载地址:http://kafka.apache.org/downloads.html
cd /opt
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz
​
#假如服务器无法衔接外网,需求提早下载好装置包,放在/opt/目录下。
## 2.装置 Kafka ##
cd /opt/
tar zxvf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka
​
## 3.修正装备文件 ##
cd /usr/local/kafka/config/
cp server.properties{,.bak}
vim server.properties
broker.id=0    ●21行,broker的大局仅有编号,每个broker不能重复,因此要在其他机器上装备 broker.id=1、broker.id=2
listeners=PLAINTEXT://192.168.20.10:9092    ●31行,指定监听的IP和端口,假如修正每个broker的IP需区分隔来,也可坚持默许装备不用修正
num.network.threads=3    #42行,broker 处理网络恳求的线程数量,一般状况下不需求去修正
num.io.threads=8         #45行,用来处理磁盘IO的线程数量,数值应该大于硬盘数
socket.send.buffer.bytes=102400       #48行,发送套接字的缓冲区巨细
socket.receive.buffer.bytes=102400    #51行,接纳套接字的缓冲区巨细
socket.request.max.bytes=104857600    #54行,恳求套接字的缓冲区巨细
log.dirs=/usr/local/kafka/logs        #60行,kafka运行日志寄存的路径,也是数据寄存的路径
num.partitions=1    #65行,topic在当时broker上的默许分区个数,会被topic创立时的指定参数覆盖
num.recovery.threads.per.data.dir=1    #69行,用来康复和清理data下数据的线程数量
log.retention.hours=168    #103行,segment文件(数据文件)保留的最长时刻,单位为小时,默许为7天,超时将被删去
log.segment.bytes=1073741824    #110行,一个segment文件最大的巨细,默许为 1G,超出将新建一个新的segment文件
zookeeper.connect=192.168.20.10:2181,192.168.20.20:2181,192.168.20.30:2181    ●123行,装备衔接Zookeeper集群地址
## 4.修正环境变量 ##
 vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
​
 source /etc/profile
​
## 5.装备 kafka 发动脚本,将kafka添加进体系服务办理 ##
vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)
  echo "---------- Kafka 发动 ------------"
  ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon         ${KAFKA_HOME}/config/server.properties
   ;;
stop)
  echo "---------- Kafka 中止 ------------"
  ${KAFKA_HOME}/bin/kafka-server-stop.sh
   ;;
restart)
  $0 stop
  $0 start
   ;;
status)
  echo "---------- Kafka 状况 ------------"
  count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
  if [ "$count" -eq 0 ];then
    echo "kafka is not running"
  else
    echo "kafka is running"
  fi
   ;;
*)
  echo "Usage: $0 {start|stop|restart|status}"
esac#设置开机自启
chmod +x /etc/init.d/kafka
chkconfig --add kafka
​
#别离发动 Kafka
service kafka start
​
 3.Kafka 命令行操作
//创立topic
kafka-topics.sh --create --zookeeper 192.168.20.10:2181,192.168.20.20:2181,192.168.20.30:2181 --replication-factor 2 --partitions 3 --topic test
-------------------------------------------------------------------------------------
--zookeeper:界说 zookeeper 集群服务器地址,假如有多个 IP 地址运用逗号切割,一般运用一个 IP 即可
--replication-factor:界说分区副本数,1 代表单副本,主张为 2 
--partitions:界说分区数 
--topic:界说 topic 称号
-------------------------------------------------------------------------------------
//检查当时服务器中的一切 topic
kafka-topics.sh --list --zookeeper 192.168.20.10:2181,192.168.20.20:2181,192.168.20.30:2181 
//检查某个 topic 的概况
kafka-topics.sh  --describe --zookeeper 192.168.20.10:2181,192.168.20.20:2181,192.168.20.30:2181 
//发布音讯
kafka-console-producer.sh --broker-list 192.168.20.10:9092,192.168.20.20:9092,192.168.20.30:9092  --topic test
//消费音讯
kafka-console-consumer.sh --bootstrap-server 192.168.20.10:9092,192.168.20.20:9092,192.168.20.30:9092 --topic test --from-beginning
-------------------------------------------------------------------------------------
--from-beginning:会把主题中以往一切的数据都读取出来
-------------------------------------------------------------------------------------
//修正分区数
kafka-topics.sh --zookeeper 192.168.20.10:2181,192.168.20.20:2181,192.168.20.30:2181 --alter --topic test --partitions 6
//删去 topic
kafka-topics.sh --delete --zookeeper 192.168.20.10:2181,192.168.20.20:2181,192.168.20.30:2181 --topic test
仿制代码

下载装置包

zookeeper+kafka

装置 Kafka

zookeeper+kafka

zookeeper+kafka
修正装备文件

zookeeper+kafka

zookeeper+kafka

zookeeper+kafka

zookeeper+kafka

zookeeper+kafka

zookeeper+kafka
修正环境变量

zookeeper+kafka
zookeeper+kafka
装备 Zookeeper 发动脚本

zookeeper+kafka
设置开机自启

zookeeper+kafka
别离发动 Kafka

zookeeper+kafka
Kafka 命令行操作

创立topic

zookeeper+kafka
检查某个 topic 的概况

zookeeper+kafka
检查某个 topic 的概况

zookeeper+kafka
发布音讯

zookeeper+kafka
消费音讯

zookeeper+kafka
修正分区数

zookeeper+kafka
删去 topic

zookeeper+kafka

Kafka 架构深化

Kafka 作业流程及文件存储机制

Kafka 中音讯是以 topic 进行分类的,出产者出产音讯,顾客消费音讯,都是面向 topic 的。

topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的便是 producer 出产的数据。Producer 出产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset。 顾客组中的每个顾客,都会实时记载自己消费到了哪个 offset,以便出错康复时,从前次的方位持续消费。

因为出产者出产的音讯会不断追加到 log 文件结尾,为避免 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。每个 segment 对应两个文件:“.index” 文件和 “.log” 文件。这些文件坐落一个文件夹下,该文件夹的命名规则为:topic称号+分区序号。例如,test 这个 topic 有三个分区, 则其对应的文件夹为 test-0、test-1、test-2。

index 和 log 文件以当时 segment 的第一条音讯的 offset 命名。

“.index” 文件存储很多的索引信息,“.log” 文件存储很多的数据,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。

数据可靠性确保

为确保 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到 producer 发送的数据后, 都需求向 producer 发送 ack(acknowledgement 承认收到),假如 producer 收到 ack,就会进行下一轮的发送,不然从头发送数据。

数据共同性问题

LEO:指的是每个副本最大的 offset; HW:指的是顾客能见到的最大的 offset,一切副本中最小的 LEO。

(1)follower 毛病

follower 产生毛病后会被暂时踢出 ISR(Leader 维护的一个和 Leader 坚持同步的 Follower 集合),待该 follower 康复后,follower 会读取本地磁盘记载的前次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开端向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就能够从头加入 ISR 了。

(2)leader 毛病

leader 产生毛病之后,会从 ISR 中选出一个新的 leader, 之后,为确保多个副本之间的数据共同性,其他的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后重新的 leader 同步数据。

这只能确保副本之间的数据共同性,并不能确保数据不丢掉或许不重复。

ack 应对机制

关于某些不太重要的数据,对数据的可靠性要求不是很高,能够忍受数据的少量丢掉,所以没必要等 ISR 中的 follower 全部接纳成功。所以 Kafka 为用户供给了三种可靠性级别,用户依据对可靠性和推迟的要求进行权衡挑选。

当 producer 向 leader 发送数据时,能够经过 request.required.acks 参数来设置数据可靠性的级别: ●0:这意味着producer无需等候来自broker的承认而持续发送下一批音讯。这种状况下数据传输效率最高,可是数据可靠性确是最低的。当broker毛病时有或许丢掉数据。

●1(默许装备):这意味着producer在ISR中的leader已成功收到的数据并得到承认后发送下一条message。假如在follower同步成功之前leader毛病,那么将会丢掉数据。

●-1(或许是all):producer需求等候ISR中的一切follower都承认接纳到数据后才算一次发送完结,可靠性最高。可是假如在 follower 同步完结后,broker 发送ack 之前,leader 产生毛病,那么会形成数据重复。

三种机制功能顺次递减,数据可靠性顺次递增。

注:在 0.11 版别以前的Kafka,对此是力不从心的,只能确保数据不丢掉,再在下游顾客对数据做大局去重。在 0.11 及以后版别的 Kafka,引入了一项严重特性:幂等性。所谓的幂等性便是指 Producer 不论向 Server 发送多少次重复数据, Server 端都只会持久化一条。

Filebeat+Kafka+ELK

布置 Zookeeper+Kafka 集群

上文中已布置完结。

布置 Filebeat

cd /usr/local/filebeat-6.7.2-linux-x86_64
​
vim filebeat.yml
filebeat.prospectors:
- type: log
  enabled: true
  paths:
  - /var/log/messages
  - /var/log/*.log
......
#添加输出到 Kafka 的装备
output.kafka:
  enabled: true
  hosts: ["192.168.20.10:9092","192.168.20.20:9092","192.168.20.30:9092"]  #指定 Kafka 集群装备
  topic: "filebeat_test"  #指定 Kafka 的 topic
 
#发动 filebeat
./filebeat -e -c filebeat.yml
仿制代码

zookeeper+kafka

zookeeper+kafka
zookeeper+kafka

zookeeper+kafka

布置 ELK

在 Logstash 组件所在节点上新建一个 Logstash 装备文件。

cd /etc/logstash/conf.d/
​
vim filebeat.conf
input {
   kafka {
     bootstrap_servers => "192.168.20.10:9092,192.168.20.20:9092,192.168.20.30:9092"
     topics => "filebeat_test"
     group_id => "test123"
     auto_offset_reset => "earliest"
   }
}
​
output {
   elasticsearch {
     hosts => ["192.168.20.30:9200"]
     index => "filebeat_test-%{+YYYY.MM.dd}"
   }
   stdout {
     codec => rubydebug
   }
}
​
#发动 logstash
logstash -f filebeat.conf
仿制代码

zookeeper+kafka

浏览器拜访测验

浏览器拜访 http://192.168.20.10:5601 登录 Kibana,单击“Create Index Pattern”按钮添加索引“filebeat_test-*”,单击 “create” 按钮创立,单击 “Discover” 按钮可检查图表信息及日志信息。

总结

EFLFK架构:ELK + Filebeat + Kafka。

布置 kafka 需求先布置 zookeeper。(kafka从3.0版别之后,不再依靠zookeeper)


zookeeper

zookeeper : 分布式的体系办理结构, 效果: 文件体系 + 告诉机制

实质: 存储和办理 分布式运用的元数据,假如运用服务状况产生改动则会告诉客户端。


音讯行列 MQ

web运用中间件 : nginx tomcat apache haproxy squid varnish

MQ音讯行列中间件 : redis kafka rabbitMQ rocketMQ activeMQ


kafka 架构

broker: kafka服务器,一个kafka由多个broker组成。

topic: 一个音讯行列,出产者和顾客面向的都是topic。

producer: 出产者push 推送音讯数据到broker 的topic中。

consumer: 顾客pull 从broker的topic中拉取音讯数据。

partition: 分区,一个topic能够被分成一个或许多个partition分区,用来加速音讯的传输(读写)。

  • partition中的音讯数据是有序的,partition之间是无序。在秒杀、红包等要求有序场景中,只能运用一个partition。

副本: 对partition进行备份,leader担任读写,follow担任备份。

offset: 偏移量,记载顾客消费音讯的方位,记载顾客上一次消费的数据到哪里了,这样就能够接着下-a条数据持续进行消费。

zookeeper: 保存kafka集群的元信息,保存offset。 结合kafka,出产者推送数据到kafka集群时需求经过zk去寻觅kafka的方位,顾客消费哪条数据也需求zk的支撑,因为能够从zk中取得offset。