【中间件】通过 docker-compose 快速部署 Kafka 保姆级教程

一、概述

Kafka是由Apache基金会开发的分布式流处理渠道,采用发布-订阅形式,支持高吞吐量、低推迟的数据传输。首要用于处理实时数据管道、数据存储和数据剖析等大数据运用场景。Kafka采用高效的数据压缩算法,能够在集群中存储大量的数据,并经过分区机制来完成数据的高牢靠性和可扩展性。Kafka常用于以下场景:

  • 数据管道:在数据搜集和分发进程中构建可扩展的流式数据管道,用于实时数据处理和剖析。例如,数据搜集、日志聚合、网络追踪、用户活动盯梢等。

  • 数据存储:将Kafka作为耐久化存储来存储大量的数据,以便用于后续的批量处理和离线剖析,例如数据发掘、机器学习等运用场景。

  • 实时流处理:经过将Kafka与寻求低推迟的流式处理渠道,例如Apache Storm、Apache Samza和Apache Flink等相结合,能够完成实时数据处理和剖析。这是许多实时数据剖析和日志处理需求的首要场景。

  • 体系日志盯梢:经过Kafka将来自不同体系的日志数据统一搜集和存储,便于进行统一的日志剖析和事情盯梢,在软件开发进程中能够快速定位和处理问题。

总归,Kafka是高功能、牢靠、可扩展的分布式流处理渠道,可用于实时数据管道、数据存储、实时流处理和日志盯梢等多个范畴。它已被广泛运用于各种大数据场景,并成为了大数据架构中的一个重要组成部分。

【中间件】通过 docker-compose 快速部署 Kafka 保姆级教程
这儿只是讲解kafka容器快速布置,用于测验和学习效果,生成不主张运用容器布置,想了解更多的kafka知识点可参阅我这篇文章:Kafka原理介绍+装置+基本操作

二、前期准备

1)布置 docker

# 装置yum-config-manager装备东西
yum -y install yum-utils
# 主张运用阿里云yum源:(引荐)
#yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
# 装置docker-ce版别
yum install -y docker-ce
# 发动并开机发动
systemctl enable --now docker
docker --version

2)布置 docker-compose

curl -SL https://github.com/docker/compose/releases/download/v2.16.0/docker-compose-linux-x86_64 -o /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose
docker-compose --version

三、创立网络

# 创立,留意不能运用hadoop_network,要不然发动hs2服务的时分会有问题!!!
docker network create hadoop-network
# 检查
docker network ls

四、装置 Zookeeper

Zookeeper在Kafka中扮演重要的人物,首要用于办理Kafka集群的元数据和完成Kafka集群的协谐和办理。在Kafka集群中,Zookeeper首要有以下效果:

  • 装备办理:Kafka集群的装备信息存储在ZK节点中,包括Kafka Broker的装备信息、Topic的分区信息、顾客和生产者的相关装备等。Kafka能够经过ZK感知集群状态的改变,并主动重新分配Topic的分区和对应的Broker。

  • Broker控制:Kafka集群中的一切Broker都衔接到ZK中。ZK保护了一切活动Broker的列表和状态信息,包括Leader、Follower等信息。假如某个Broker出现故障,ZK能够主动感知它的下线,并告诉集群中的其他Broker重新分配Leader。

  • 分布式锁:Zookeeper供给群众同步的机制,使得多个Kafka Broker的协谐和办理变得可行。Kafka中的一些操作需求集群中的一切Broker都达到一致意见,因而需求运用ZK和谐器的分布式锁机制来保护这些操作的一致性,并避免数据的意外损坏。

Zookeeper快速布置教程可参阅我上一篇文章:【中心件】经过 docker-compose 快速布置 Zookeeper 保姆级教程

总归,Zookeeper在Kafka集群中发挥了重要的人物,它办理着Kafka的发布/订阅机制、Broker状态信息、Topic的元数据信息等,使得Kafka集群的分布式协同和和谐变得或许。在Kafka写操作(生产者或办理员在Kafka生产或保护上修正了装备)上,ZK用于协作确定。在Kafka读操作(顾客将订阅的主题分区元数据读取到kafka顾客中)上,ZK用于协作。

五、Kafka 编列布置

1)下载 Kafka

wget https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz --no-check-certificate

留意还需求java环境,能够去官网下载,也能够在我下面供给的地址下载:

链接: pan.baidu.com/s/1o_z3t16v… 提取码: kuac

2)装备

  • config/kafka-node1/server.properties
# 常见装备挂载目录
mkdir config/{kafka-node1,kafka-node2,kafka-node3} -p
# 装备
cat >config/kafka-node1/server.properties<<EOF
#broker的大局仅有编号,不能重复
broker.id=1
#删去topic功用使能
delete.topic.enable=true
#处理网络恳求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区巨细
socket.send.buffer.bytes=102400
#接纳套接字的缓冲区巨细
socket.receive.buffer.bytes=102400
#恳求套接字的缓冲区巨细
socket.request.max.bytes=104857600
#kafka数据的存储方位
log.dirs=/opt/apache/kafka/logs
#指定Topic的分区数量,这儿设置为3。 默许只要一个分区,设置多分区能够支持并发读写和负载均衡
num.partitions=3
#副本,默许只要一个副本,不会进行数据备份和冗余
replication.factor=3
#用来康复和整理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保存的最长时刻,超时将被删去
log.retention.hours=168
#装备衔接Zookeeper集群地址
zookeeper.connect=zookeeper-node1:2181,zookeeper-node2:2181,zookeeper-node3:2181
#zookeeper衔接超时时刻
zookeeper.connection.timeout.ms=60000
EOF
  • config/kafka-node2/server.properties
cat >config/kafka-node2/server.properties<<EOF
#broker的大局仅有编号,不能重复
broker.id=2
#删去topic功用使能
delete.topic.enable=true
#处理网络恳求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区巨细
socket.send.buffer.bytes=102400
#接纳套接字的缓冲区巨细
socket.receive.buffer.bytes=102400
#恳求套接字的缓冲区巨细
socket.request.max.bytes=104857600
#kafka数据的存储方位
log.dirs=/opt/apache/kafka/logs
#指定Topic的分区数量,这儿设置为3。 默许只要一个分区,设置多分区能够支持并发读写和负载均衡
num.partitions=3
#副本,默许只要一个副本,不会进行数据备份和冗余
replication.factor=3
#用来康复和整理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保存的最长时刻,超时将被删去
log.retention.hours=168
#装备衔接Zookeeper集群地址
zookeeper.connect=zookeeper-node1:2181,zookeeper-node2:2181,zookeeper-node3:2181
#zookeeper衔接超时时刻
zookeeper.connection.timeout.ms=60000
EOF
  • config/kafka-node3/server.properties
cat >config/kafka-node3/server.properties<<EOF
#broker的大局仅有编号,不能重复
broker.id=3
#删去topic功用使能
delete.topic.enable=true
#处理网络恳求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区巨细
socket.send.buffer.bytes=102400
#接纳套接字的缓冲区巨细
socket.receive.buffer.bytes=102400
#恳求套接字的缓冲区巨细
socket.request.max.bytes=104857600
#kafka数据的存储方位
log.dirs=/opt/apache/kafka/logs
#指定Topic的分区数量,这儿设置为3。 默许只要一个分区,设置多分区能够支持并发读写和负载均衡
num.partitions=3
#副本,默许只要一个副本,不会进行数据备份和冗余
replication.factor=3
#用来康复和整理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保存的最长时刻,超时将被删去
log.retention.hours=168
#装备衔接Zookeeper集群地址
zookeeper.connect=zookeeper-node1:2181,zookeeper-node2:2181,zookeeper-node3:2181
#zookeeper衔接超时时刻
zookeeper.connection.timeout.ms=60000
EOF

3)发动脚本 bootstrap.sh

#!/usr/bin/env sh
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties

4)构建镜像 Dockerfile

FROM registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/centos:7.7.1908
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-8
# 创立用户和用户组,跟yaml编列里的user: 10000:10000
RUN groupadd --system --gid=10000 hadoop && useradd --system --home-dir /home/hadoop --uid=10000 --gid=hadoop hadoop -m
# 装置sudo
RUN yum -y install sudo ; chmod 640 /etc/sudoers
# 给hadoop添加sudo权限
RUN echo "hadoop ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers
RUN yum -y install install net-tools telnet wget nc less
RUN mkdir /opt/apache/
# 添加装备 JDK
ADD jdk-8u212-linux-x64.tar.gz /opt/apache/
ENV JAVA_HOME /opt/apache/jdk1.8.0_212
ENV PATH $JAVA_HOME/bin:$PATH
# 添加装备 kafka server
ENV KAFKA_VERSION 2.12-3.4.0
ADD kafka_${KAFKA_VERSION}.tgz /opt/apache/
ENV KAFKA_HOME /opt/apache/kafka
RUN ln -s /opt/apache/kafka_${KAFKA_VERSION}-bin $KAFKA_HOME
# 创立数据存储目录
RUN mkdir -p ${KAFKA_HOME}/data/logs
# copy bootstrap.sh
COPY bootstrap.sh /opt/apache/
RUN chmod +x /opt/apache/bootstrap.sh
RUN chown -R hadoop:hadoop /opt/apache
WORKDIR $KAFKA_HOME

开端构建镜像

# 需求检查构建镜像详细进程则需求加上 --progress=plain 选项
docker build -t registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:2.12-3.4.0 . --no-cache --progress=plain
# 为了便利小伙伴下载即可运用,我这儿将镜像文件推送到阿里云的镜像仓库
docker push registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:2.12-3.4.0
### 参数解释
# -t:指定镜像名称
# . :当前目录Dockerfile
# -f:指定Dockerfile途径
#  --no-cache:不缓存

5)编列 docker-compose.yaml

version: '3'
services:
  kafka-node1:
    image: registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:2.12-3.4.0
    user: "hadoop:hadoop"
    container_name: kafka-node1
    hostname: kafka-node1
    restart: always
    privileged: true
    env_file:
      - .env
    volumes:
      - ./config/kafka-node1/server.properties:${KAFKA_HOME}/config/server.properties
    ports:
      - "${KAFKA_NODE1_SERVER_PORT}:9092"
    expose:
      - 2888
      - 3888
    command: ["sh","-c","/opt/apache/bootstrap.sh"]
    networks:
      - hadoop-network
    healthcheck:
      test: ["CMD-SHELL", "netstat -tnlp|grep :9092 || exit 1"]
      interval: 10s
      timeout: 10s
      retries: 5
  kafka-node2:
    image: registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:2.12-3.4.0
    user: "hadoop:hadoop"
    container_name: kafka-node2
    hostname: kafka-node2
    restart: always
    privileged: true
    env_file:
      - .env
    volumes:
      - ./config/kafka-node2/server.properties:${KAFKA_HOME}/config/server.properties
    ports:
      - "${KAFKA_NODE2_SERVER_PORT}:9092"
    expose:
      - 2888
      - 3888
    command: ["sh","-c","/opt/apache/bootstrap.sh"]
    networks:
      - hadoop-network
    healthcheck:
      test: ["CMD-SHELL", "netstat -tnlp|grep :9092 || exit 1"]
      interval: 10s
      timeout: 10s
      retries: 5
  kafka-node3:
    image: registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:2.12-3.4.0
    user: "hadoop:hadoop"
    container_name: kafka-node3
    hostname: kafka-node3
    restart: always
    privileged: true
    env_file:
      - .env
    volumes:
      - ./config/kafka-node3/server.properties:${KAFKA_HOME}/config/server.properties
    ports:
      - "${KAFKA_NODE3_SERVER_PORT}:9092"
    expose:
      - 2888
      - 3888
    command: ["sh","-c","/opt/apache/bootstrap.sh"]
    networks:
      - hadoop-network
    healthcheck:
      test: ["CMD-SHELL", "netstat -tnlp|grep :9092 || exit 1"]
      interval: 10s
      timeout: 10s
      retries: 5
# 衔接外部网络
networks:
  hadoop-network:
    external: true

.env 环境变量文件内容如下:

# 对外暴露的端口
cat << EOF > .env
KAFKA_HOME=/opt/apache/kafka
KAFKA_NODE1_SERVER_PORT=39092
KAFKA_NODE2_SERVER_PORT=39093
KAFKA_NODE3_SERVER_PORT=39094
EOF

6)开端布置

docker-compose -f docker-compose.yaml up -d
# 检查
docker-compose -f docker-compose.yaml ps

六、简略测验验证

# 登录zookeeper,在zookeeper检查brokers
${ZOOKEEPER_HOME}/bin/zkCli.sh ls /brokers/ids
${ZOOKEEPER_HOME}/bin/zkCli.sh get /brokers/ids/1
${ZOOKEEPER_HOME}/bin/zkCli.sh get /brokers/ids/2
${ZOOKEEPER_HOME}/bin/zkCli.sh get /brokers/ids/3

七、常用的 Kafka 客户端指令

1)添加topic

# 随便登录
docker exec -it kafka-node1 bash
# 创立topic,1副本,1分区,设置数据过期时刻72小时(-1表明不过期),单位ms,72*3600*1000=259200000
${KAFKA_HOME}/bin/kafka-topics.sh --create --topic test002 --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092  --partitions 1 --replication-factor 1 --config retention.ms=259200000

2)检查topic

# 检查topic列表
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --list
# 检查topic列表详情
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe
# 指定topic
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe --topic test002
# 检查顾客组
${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --list
kafka-consumer-groups.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe  --group test002

3)修正topic

# 修正分区,扩分区,不能减少分区
${KAFKA_HOME}/bin/kafka-topics.sh --alter --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002 --partitions 2
# 修正过期时刻,下面两行都能够
${KAFKA_HOME}/bin/kafka-configs.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --alter --topic test002 --add-config retention.ms=86400000
${KAFKA_HOME}/bin/kafka-configs.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --alter --entity-name test002 --entity-type topics --add-config retention.ms=86400000
# 修正副本数,将副本数修正成3
$ cat >1.json<<EOF
{"version":1,
"partitions":[
{"topic":"test002","partition":0,"replicas":[0,1,2]},
{"topic":"test002","partition":1,"replicas":[1,2,0]},
{"topic":"test002","partition":2,"replicas":[2,0,1]}
]}
EOF
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe --topic test002

4)扩容分区

#把test002 topic扩容为6个分区。
#留意:目前不支持减少分区,扩容前必须存在这个主题。
${KAFKA_HOME}/bin/kafka-topics.sh -alter --partitions 6 --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe

【中间件】通过 docker-compose 快速部署 Kafka 保姆级教程

5)删去topic

${KAFKA_HOME}/bin/kafka-topics.sh --delete --topic test002 --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092

6)生成者和顾客

生产者

${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002
{"id":"1","name":"n1","age":"20"}
{"id":"2","name":"n2","age":"21"}
{"id":"3","name":"n3","age":"22"}

顾客

# 从头开端消费
${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002 --from-beginning
# 指定从分区的某个方位开端消费,这儿只指定了一个分区,能够多写几行或许遍历对应的一切分区
${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002 --partition 0 --offset 100

7)消费组

在 Kafka 中,消费组(Consumer Group)是一组独立顾客的集合,它们一起消费一个或多个 Topic 中的数据。消费组内的顾客协同作业,经过分摊该 Topic 中的一切分区,以完成音讯的消费和处理。

消费组在 Kafka 音讯队列中起到了至关重要的效果。它能够供给如下功用:

  • 并发消费:消费组内的每个顾客都能够独登时消费音讯,能够完成高并发处理。

  • 主动负载均衡:消费组内的顾客会主动协作,将消费使命均分到一切顾客上,使得每个顾客都能处理相同数量的音讯。

  • 进步可用性:当消费组内的一个或多个顾客故障退出时,音讯会主动分配到其他顾客上,确保消费使命的不间断履行。

  • 支持多租户:能够经过 Consumer Group 来对不同的租户进行音讯隔离,不同的 Consumer Group 能够读取同一个 Topic 的不同副本,或许读取不同 Topic 的不同分区,完成多个实例同享同一 Topic 或涣散处理不同 Topic。

示例如下:

${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002 --group test002

通常状况下,消费组中的顾客都运行在不同的机器上,这样就能够完成分布式消费,以进步音讯处理功能和可用性。Kafka 对消费组的完成也十分简略,经过在顾客在订阅 Topic 时,接受一个 Group ID 参数,就能够主动加入到一个消费组中。Kafka 会将Group ID 相同的顾客映射到同一个 Consumer Group 中,以完成协同消费和分摊消费使命的意图。

8)检查数据积压

${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe --group test002

9)kafka 数据积压处理办法

在 Kafka 中,因为音讯的生产和消费速度或许不一致,导致音讯会积压在 Kafka 的分区中,假如这些积压的音讯处理不及时,会导致 Kafka 体系的功能下降和可用性降低一级问题。因而,需求采取一些处理办法来处理数据积压问题:

  • 添加顾客:添加顾客能够使消费使命并行履行,加快音讯的处理速度。能够经过添加顾客的方式将积压的音讯消费掉,进步体系处理速度和功率。

  • 调整顾客组:当一个消费组中的顾客无法处理一切的音讯时,能够考虑调整顾客组。能够添加顾客的数量或许替换顾客组,以适应音讯处理的速度和巨细。

  • 调整音讯分区:Kafka 中Topic 的分区数也会影响数据积压的状况。能够调整分区数以改进数据读取和分发的状况,或许对热点 Topic 进行分区处理,以完成更好的功能和可用性。

  • 调整消费 offset:若积压的音讯都已经被处理过了,却还在 Kafka 中存在,或许是顾客消费 offset 设置错误导致的。能够经过 Kafka 的 offset 操作,重置消费 offset,越过已经处理过的音讯,减少数据积压的问题。

  • 履行音讯清洗:在消费 Kafka 音讯时,能够额定履行一些音讯清洗处理操作,将无用的数据过滤出去,或许将数据进行整理和格式化处理,减少中心处理环节,进步数据消费的功率和可用性。

以上是一些处理 Kafka 数据积压问题的常用办法,需求视具体状况而定,挑选合适的办法来处理。


经过 docker-compose 快速布置 Kafka 教程就先到这儿了,有任何疑问欢迎给我留言或私信,可关注我大众号【大数据与云原生技能分享】加群交流或私信交流~

【中间件】通过 docker-compose 快速部署 Kafka 保姆级教程