一、概述
Kafka
的磁盘扩容和数据均衡是与保证Kafka集群可用性和功能相关的两个重要方面。在 Kafka
中,分区数据的存储和平衡对集群的运行至关重要。以下是有关Kafka磁盘扩容和数据均衡的一些主张:
1)Kafka 磁盘扩容概述
-
增加新磁盘:在服务器上增加新的磁盘,保证磁盘有满足的容量,而且其功能符合集群的需求。
-
修正
Kafka
装备:在Kafka的装备文件(server.properties
)中更新log.dirs
特点,将新磁盘途径增加到现有的途径中。
log.dirs=/path/to/old/disk,/path/to/new/disk
-
从头启动
Kafka
节点:从头启动Kafka
节点,保证新的装备收效。在进行重启之前,保证已经备份了要害的装备文件和数据。
2)Kafka 数据均衡概述
-
分区从头平衡:在
Kafka
中,分区数据的均衡很重要,以保证每个节点的负载相对均匀。您能够使用Kafka
供给的东西或API
来从头平衡分区,保证每个节点负责处理相似数量的分区和数据。 -
监控分区状况:使用Kafka的监控东西,例如
Kafka Manager
、Burrow
等,来监控分区的状况和散布状况。保证没有分区处于不平衡的状况。 -
手动干涉:在某些状况下,或许需求手动干涉来处理数据均衡问题。这或许包含手动从头分配分区或手动调整分区的副本散布。
-
考虑作业负载改变:在Kafka集群上布置新的生产者或顾客时,要考虑作业负载的改变。新的生产者或许导致更多的数据写入,而新的顾客或许导致更多的数据读取。
-
分区数量和副本:考虑恰当的分区数量和副本数量。分区数太多或许导致办理和维护的困难,而分区数太少或许导致单个节点的负载过重。
-
使用Kafka东西:Kafka供给了一些东西,如
kafka-reassign-partitions.sh
用于手动从头分配分区,以及kafka-preferred-replica-election.sh
用于履行首选副本选举。
在进行磁盘扩容和数据均衡时,请保证在生产环境中小心操作,并在非生产环境中进行测验和模拟。仔细的规划和履行能够保证Kafka集群的可用性和功能。
二、K8s 集群布置
k8s 环境装置之前写过许多文档,能够参阅我以下几篇文章:
三、kafka on k8s 环境布置
这儿为了快速演示,挑选了 on k8s 布置方式,当然也能够挑选物理机布置方式。曾经也写过许多关于 kafka的文章,能够参阅一下:
- Kafka原理介绍 装置 根本操作(kafka on k8s)
- 大数据Hadoop之——Kafka鉴权认证
- 大数据Hadoop之——Kafka安全机制(Kafka SSL认证完成)
- 大数据Hadoop之——Kafka Streams原理介绍与简略应用示例
- 大数据Hadoop之——Kafka 图形化东西 EFAK(EFAK环境布置)
- 大数据Hadoop之——EFAK安全认证完成(kafka zookeeper)
- 【云原生】zookeeper kafka on k8s 环境布置
- 【中间件】经过 docker-compose 快速布置 Kafka 保姆级教程
1)装置 helm
# 下载包
wget https://get.helm.sh/helm-v3.7.1-linux-amd64.tar.gz -O /tmp/helm-v3.7.1-linux-amd64.tar.gz
# 解压压缩包
tar -xf /tmp/helm-v3.7.1-linux-amd64.tar.gz -C /root/
# 软链
ln -s /root/linux-amd64/helm /usr/local/bin/helm
2)装置 zookeeper
1、增加源并下载布置包
helm repo add bitnami https://charts.bitnami.com/bitnami
helm pull bitnami/zookeeper --version 10.2.1
tar -xf zookeeper-10.2.1.tgz
2、修正装备
- 修正
zookeeper/values.yaml
image:
registry: registry.cn-hangzhou.aliyuncs.com
repository: bigdata_cloudnative/zookeeper
tag: 3.8.0-debian-11-r36
...
replicaCount: 3
...
service:
type: NodePort
nodePorts:
#NodePort 默许规模是 30000-32767
client: "32181"
tls: "32182"
...
persistence:
storageClass: "zookeeper-local-storage"
size: "10Gi"
# 目录需求提早在宿主机上创立
local:
- name: zookeeper-0
host: "local-168-182-110"
path: "/opt/bigdata/servers/zookeeper/data/data1"
- name: zookeeper-1
host: "local-168-182-111"
path: "/opt/bigdata/servers/zookeeper/data/data1"
- name: zookeeper-2
host: "local-168-182-112"
path: "/opt/bigdata/servers/zookeeper/data/data1"
...
# Enable Prometheus to access ZooKeeper metrics endpoint
metrics:
enabled: true
- 增加
zookeeper/templates/pv.yaml
{{- range .Values.persistence.local }}
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: {{ .name }}
labels:
name: {{ .name }}
spec:
storageClassName: {{ $.Values.persistence.storageClass }}
capacity:
storage: {{ $.Values.persistence.size }}
accessModes:
- ReadWriteOnce
local:
path: {{ .path }}
nodeAffinity:
required:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/hostname
operator: In
values:
- {{ .host }}
---
{{- end }}
- 增加
zookeeper/templates/storage-class.yaml
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
name: {{ .Values.persistence.storageClass }}
provisioner: kubernetes.io/no-provisioner
- 设置时区,
zookeeper/templates/statefulset.yaml
env:
- name: TZ
value: Asia/Shanghai
3、开端装置 zookeeper
docker pull docker.io/bitnami/zookeeper:3.8.0-debian-11-r36
# 为了便利下次快速拉取镜像,将镜像推送到阿里云上
docker tag docker.io/bitnami/zookeeper:3.8.0-debian-11-r36 registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/zookeeper:3.8.0-debian-11-r36
docker push registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/zookeeper:3.8.0-debian-11-r36
# 开端装置
helm install zookeeper ./zookeeper -n zookeeper --create-namespace
NOTES
NAME: zookeeper
LAST DEPLOYED: Sun Nov 12 22:39:36 2023
NAMESPACE: zookeeper
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: zookeeper
CHART VERSION: 10.2.1
APP VERSION: 3.8.0
** Please be patient while the chart is being deployed **
ZooKeeper can be accessed via port 2181 on the following DNS name from within your cluster:
zookeeper.zookeeper.svc.cluster.local
To connect to your ZooKeeper server run the following commands:
export POD_NAME=$(kubectl get pods --namespace zookeeper -l "app.kubernetes.io/name=zookeeper,app.kubernetes.io/instance=zookeeper,app.kubernetes.io/component=zookeeper" -o jsonpath="{.items[0].metadata.name}")
kubectl exec -it $POD_NAME -- zkCli.sh
To connect to your ZooKeeper server from outside the cluster execute the following commands:
export NODE_IP=$(kubectl get nodes --namespace zookeeper -o jsonpath="{.items[0].status.addresses[0].address}")
export NODE_PORT=$(kubectl get --namespace zookeeper -o jsonpath="{.spec.ports[0].nodePort}" services zookeeper)
zkCli.sh $NODE_IP:$NODE_PORT
检查pod状况
kubectl get pods,svc -n zookeeper -owide
4、测验验证
# 登录zookeeper pod
kubectl exec -it zookeeper-0 -n zookeeper -- zkServer.sh status
kubectl exec -it zookeeper-1 -n zookeeper -- zkServer.sh status
kubectl exec -it zookeeper-2 -n zookeeper -- zkServer.sh status
kubectl exec -it zookeeper-0 -n zookeeper -- bash
5、卸载
helm uninstall zookeeper -n zookeeper
kubectl delete pod -n zookeeper `kubectl get pod -n zookeeper|awk 'NR>1{print $1}'` --force
kubectl patch ns zookeeper -p '{"metadata":{"finalizers":null}}'
kubectl delete ns zookeeper --force
3)装置 kafka
1、增加源并下载布置包
helm repo add bitnami https://charts.bitnami.com/bitnami
helm pull bitnami/kafka --version 18.4.2
tar -xf kafka-18.4.2.tgz
2、修正装备
- 修正
kafka/values.yaml
image:
registry: registry.cn-hangzhou.aliyuncs.com
repository: bigdata_cloudnative/kafka
tag: 3.2.1-debian-11-r16
...
replicaCount: 3
...
service:
type: NodePort
nodePorts:
client: "30092"
external: "30094"
...
externalAccess
enabled: true
service:
type: NodePort
nodePorts:
- 30001
- 30002
- 30003
useHostIPs: true
...
persistence:
storageClass: "kafka-local-storage"
size: "10Gi"
# 目录需求提早在宿主机上创立
local:
- name: kafka-0
host: "local-168-182-110"
path: "/opt/bigdata/servers/kafka/data/data1"
- name: kafka-1
host: "local-168-182-111"
path: "/opt/bigdata/servers/kafka/data/data1"
- name: kafka-2
host: "local-168-182-112"
path: "/opt/bigdata/servers/kafka/data/data1"
...
metrics:
kafka:
enabled: true
image:
registry: registry.cn-hangzhou.aliyuncs.com
repository: bigdata_cloudnative/kafka-exporter
tag: 1.6.0-debian-11-r8
jmx:
enabled: true
image:
registry: registry.cn-hangzhou.aliyuncs.com
repository: bigdata_cloudnative/jmx-exporter
tag: 0.17.1-debian-11-r1
annotations:
prometheus.io/path: "/metrics"
...
zookeeper:
enabled: false
...
externalZookeeper
servers:
- zookeeper-0.zookeeper-headless.zookeeper
- zookeeper-1.zookeeper-headless.zookeeper
- zookeeper-2.zookeeper-headless.zookeeper
- 增加
kafka/templates/pv.yaml
{{- range .Values.persistence.local }}
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: {{ .name }}
labels:
name: {{ .name }}
spec:
storageClassName: {{ $.Values.persistence.storageClass }}
capacity:
storage: {{ $.Values.persistence.size }}
accessModes:
- ReadWriteOnce
local:
path: {{ .path }}
nodeAffinity:
required:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/hostname
operator: In
values:
- {{ .host }}
---
{{- end }}
- 增加
kafka/templates/storage-class.yaml
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
name: {{ .Values.persistence.storageClass }}
provisioner: kubernetes.io/no-provisioner
- 设置时区,
kafka/templates/statefulset.yaml
env:
- name: TZ
value: Asia/Shanghai
3、开端装置
docker pull docker.io/bitnami/kafka:3.2.1-debian-11-r16
# 为了便利下次快速拉取镜像,将镜像推送到阿里云上
docker tag docker.io/bitnami/kafka:3.2.1-debian-11-r16 registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:3.2.1-debian-11-r16
docker push registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:3.2.1-debian-11-r16
# node-export
docker pull docker.io/bitnami/kafka-exporter:1.6.0-debian-11-r8
docker tag docker.io/bitnami/kafka-exporter:1.6.0-debian-11-r8 registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka-exporter:1.6.0-debian-11-r8
docker push registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka-exporter:1.6.0-debian-11-r8
# JXM
docker pull docker.io/bitnami/jmx-exporter:0.17.1-debian-11-r1
docker tag docker.io/bitnami/jmx-exporter:0.17.1-debian-11-r1 registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/jmx-exporter:0.17.1-debian-11-r1
docker push registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/jmx-exporter:0.17.1-debian-11-r1
#开端装置
helm install kafka ./kafka -n kafka --create-namespace
NOTES
NAME: kafka
LAST DEPLOYED: Sun Nov 12 23:32:49 2023
NAMESPACE: kafka
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: kafka
CHART VERSION: 18.4.2
APP VERSION: 3.2.1
** Please be patient while the chart is being deployed **
Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:
kafka.kafka.svc.cluster.local
Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:
kafka-0.kafka-headless.kafka.svc.cluster.local:9092
kafka-1.kafka-headless.kafka.svc.cluster.local:9092
kafka-2.kafka-headless.kafka.svc.cluster.local:9092
To create a pod that you can use as a Kafka client run the following commands:
kubectl run kafka-client --restart='Never' --image registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:3.2.1-debian-11-r16 --namespace kafka --command -- sleep infinity
kubectl exec --tty -i kafka-client --namespace kafka -- bash
PRODUCER:
kafka-console-producer.sh
--broker-list kafka-0.kafka-headless.kafka.svc.cluster.local:9092,kafka-1.kafka-headless.kafka.svc.cluster.local:9092,kafka-2.kafka-headless.kafka.svc.cluster.local:9092
--topic test
CONSUMER:
kafka-console-consumer.sh
--bootstrap-server kafka.kafka.svc.cluster.local:9092
--topic test
--from-beginning
检查pod状况
kubectl get pods,svc -n kafka -owide
4、测验验证
# 登录zookeeper pod
kubectl exec -it kafka-0 -n kafka -- bash
# 1、创立分区
kafka-topics.sh --create --topic test001 --bootstrap-server kafka.kafka:9092 --partitions 1 --replication-factor 1
# 检查
kafka-topics.sh --describe --bootstrap-server kafka.kafka:9092 --topic test001
问题处理:Error: Exception thrown by the agent : java.rmi.server.ExportException: Port already in use: 5555; nested exception is:
修正 /opt/bitnami/kafka/bin/kafka-run-class.sh
脚本,修正内容如下:
# 增加
ISKAFKASERVER="false"
if [[ "$*" =~ "kafka.Kafka" ]]; then
ISKAFKASERVER="true"
fi
# 修正
# if [ $JMX_PORT ];then
if [ $JMX_PORT ] && [ -z "$ISKAFKASERVER" ]; then
修正后的完好脚本
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]"
exit 1
fi
# CYGWIN == 1 if Cygwin is detected, else 0.
if [[ $(uname -a) =~ "CYGWIN" ]]; then
CYGWIN=1
else
CYGWIN=0
fi
if [ -z "$INCLUDE_TEST_JARS" ]; then
INCLUDE_TEST_JARS=false
fi
# Exclude jars not necessary for running commands.
regex="(-(test|test-sources|src|scaladoc|javadoc).jar|jar.asc|connect-file.*.jar)$"
should_include_file() {
if [ "$INCLUDE_TEST_JARS" = true ]; then
return 0
fi
file=$1
if [ -z "$(echo "$file" | egrep "$regex")" ] ; then
return 0
else
return 1
fi
}
ISKAFKASERVER="false"
if [[ "$*" =~ "kafka.Kafka" ]]; then
ISKAFKASERVER="true"
fi
base_dir=$(dirname $0)/..
if [ -z "$SCALA_VERSION" ]; then
SCALA_VERSION=2.13.6
if [[ -f "$base_dir/gradle.properties" ]]; then
SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2`
fi
fi
if [ -z "$SCALA_BINARY_VERSION" ]; then
SCALA_BINARY_VERSION=$(echo $SCALA_VERSION | cut -f 1-2 -d '.')
fi
# run ./gradlew copyDependantLibs to get all dependant jars in a local dir
shopt -s nullglob
if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
for dir in "$base_dir"/core/build/dependant-libs-${SCALA_VERSION}*;
do
CLASSPATH="$CLASSPATH:$dir/*"
done
fi
for file in "$base_dir"/examples/build/libs/kafka-examples*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
clients_lib_dir=$(dirname $0)/../clients/build/libs
streams_lib_dir=$(dirname $0)/../streams/build/libs
streams_dependant_clients_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION}
else
clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
streams_lib_dir=$clients_lib_dir
streams_dependant_clients_lib_dir=$streams_lib_dir
fi
for file in "$clients_lib_dir"/kafka-clients*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
for file in "$streams_lib_dir"/kafka-streams*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
else
VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/.//g'`
SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number
for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$file":"$CLASSPATH"
fi
done
if [ "$SHORT_VERSION_NO_DOTS" = "0100" ]; then
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.8.jar":"$CLASSPATH"
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.6.jar":"$CLASSPATH"
fi
if [ "$SHORT_VERSION_NO_DOTS" = "0101" ]; then
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.9.jar":"$CLASSPATH"
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.8.jar":"$CLASSPATH"
fi
fi
for file in "$streams_dependant_clients_lib_dir"/rocksdb*.jar;
do
CLASSPATH="$CLASSPATH":"$file"
done
for file in "$streams_dependant_clients_lib_dir"/*hamcrest*.jar;
do
CLASSPATH="$CLASSPATH":"$file"
done
for file in "$base_dir"/shell/build/libs/kafka-shell*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
for dir in "$base_dir"/shell/build/dependant-libs-${SCALA_VERSION}*;
do
CLASSPATH="$CLASSPATH:$dir/*"
done
for file in "$base_dir"/tools/build/libs/kafka-tools*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
for dir in "$base_dir"/tools/build/dependant-libs-${SCALA_VERSION}*;
do
CLASSPATH="$CLASSPATH:$dir/*"
done
for file in "$base_dir"/trogdor/build/libs/trogdor-*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
for dir in "$base_dir"/trogdor/build/dependant-libs-${SCALA_VERSION}*;
do
CLASSPATH="$CLASSPATH:$dir/*"
done
for cc_pkg in "api" "transforms" "runtime" "mirror" "mirror-client" "json" "tools" "basic-auth-extension"
do
for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
if [ -d "$base_dir/connect/${cc_pkg}/build/dependant-libs" ] ; then
CLASSPATH="$CLASSPATH:$base_dir/connect/${cc_pkg}/build/dependant-libs/*"
fi
done
# classpath addition for release
for file in "$base_dir"/libs/*;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
for file in "$base_dir"/core/build/libs/kafka_${SCALA_BINARY_VERSION}*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
shopt -u nullglob
if [ -z "$CLASSPATH" ] ; then
echo "Classpath is empty. Please build the project first e.g. by running './gradlew jar -PscalaVersion=$SCALA_VERSION'"
exit 1
fi
# JMX settings
if [ -z "$KAFKA_JMX_OPTS" ]; then
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "
fi
# JMX port to use
if [ $JMX_PORT ] && [ -z "$ISKAFKASERVER" ]; then
KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
fi
# Log directory to use
if [ "x$LOG_DIR" = "x" ]; then
LOG_DIR="$base_dir/logs"
fi
# Log4j settings
if [ -z "$KAFKA_LOG4J_OPTS" ]; then
# Log to console. This is a tool.
LOG4J_DIR="$base_dir/config/tools-log4j.properties"
# If Cygwin is detected, LOG4J_DIR is converted to Windows format.
(( CYGWIN )) && LOG4J_DIR=$(cygpath --path --mixed "${LOG4J_DIR}")
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_DIR}"
else
# create logs directory
if [ ! -d "$LOG_DIR" ]; then
mkdir -p "$LOG_DIR"
fi
fi
# If Cygwin is detected, LOG_DIR is converted to Windows format.
(( CYGWIN )) && LOG_DIR=$(cygpath --path --mixed "${LOG_DIR}")
KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"
# Generic jvm settings you want to add
if [ -z "$KAFKA_OPTS" ]; then
KAFKA_OPTS=""
fi
# Set Debug options if enabled
if [ "x$KAFKA_DEBUG" != "x" ]; then
# Use default ports
DEFAULT_JAVA_DEBUG_PORT="5005"
if [ -z "$JAVA_DEBUG_PORT" ]; then
JAVA_DEBUG_PORT="$DEFAULT_JAVA_DEBUG_PORT"
fi
# Use the defaults if JAVA_DEBUG_OPTS was not set
DEFAULT_JAVA_DEBUG_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT"
if [ -z "$JAVA_DEBUG_OPTS" ]; then
JAVA_DEBUG_OPTS="$DEFAULT_JAVA_DEBUG_OPTS"
fi
echo "Enabling Java debug options: $JAVA_DEBUG_OPTS"
KAFKA_OPTS="$JAVA_DEBUG_OPTS $KAFKA_OPTS"
fi
# Which java to use
if [ -z "$JAVA_HOME" ]; then
JAVA="java"
else
JAVA="$JAVA_HOME/bin/java"
fi
# Memory options
if [ -z "$KAFKA_HEAP_OPTS" ]; then
KAFKA_HEAP_OPTS="-Xmx256M"
fi
# JVM performance options
# MaxInlineLevel=15 is the default since JDK 14 and can be removed once older JDKs are no longer supported
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
KAFKA_JVM_PERFORMANCE_OPTS="-server -XX: UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX: ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true"
fi
while [ $# -gt 0 ]; do
COMMAND=$1
case $COMMAND in
-name)
DAEMON_NAME=$2
CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out
shift 2
;;
-loggc)
if [ -z "$KAFKA_GC_LOG_OPTS" ]; then
GC_LOG_ENABLED="true"
fi
shift
;;
-daemon)
DAEMON_MODE="true"
shift
;;
*)
break
;;
esac
done
# GC options
GC_FILE_SUFFIX='-gc.log'
GC_LOG_FILE_NAME=''
if [ "x$GC_LOG_ENABLED" = "xtrue" ]; then
GC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX
# The first segment of the version number, which is '1' for releases before Java 9
# it then becomes '9', '10', ...
# Some examples of the first line of `java --version`:
# 8 -> java version "1.8.0_152"
# 9.0.4 -> java version "9.0.4"
# 10 -> java version "10" 2018-03-20
# 10.0.1 -> java version "10.0.1" 2018-04-17
# We need to match to the end of the line to prevent sed from printing the characters that do not match
JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -E -n 's/.* version "([0-9]*).*$/1/p')
if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then
KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=100M"
else
KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX: PrintGCDetails -XX: PrintGCDateStamps -XX: PrintGCTimeStamps -XX: UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
fi
fi
# Remove a possible colon prefix from the classpath (happens at lines like `CLASSPATH="$CLASSPATH:$file"` when CLASSPATH is blank)
# Syntax used on the right side is native Bash string manipulation; for more details see
# http://tldp.org/LDP/abs/html/string-manipulation.html, specifically the section titled "Substring Removal"
CLASSPATH=${CLASSPATH#:}
# If Cygwin is detected, classpath is converted to Windows format.
(( CYGWIN )) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")
# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@"
fi
将脚本覆盖容器里的
kubectl cp kafka-run-class.sh kafka-0:/opt/bitnami/kafka/bin/kafka-run-class.sh -n kafka
再履行创立topic
# 登录zookeeper pod
kubectl exec -it kafka-0 -n kafka -- bash
# 参数解释:
# --create: 指定创立topic动作
# --topic:指定新建topic的名称
#--bootstrap-server: 指定kafka衔接地址
#--config:指定当时topic上有效的参数值,参数列表参阅文档为: Topic-level configuration
#--partitions:指定当时创立的kafka分区数量,默许为1个
# --replication-factor:指定每个分区的副本数,默许1个
# 1、创立topic,三分区,三副本,设置数据过期时间72小时(-1表明不过期,默许是永久保存的,不会主动过期),单位ms,72*3600*1000=259200000
kafka-topics.sh --create --topic test001 --bootstrap-server kafka.kafka:9092 --partitions 3 --replication-factor 3 --config retention.ms=259200000
# 检查
kafka-topics.sh --list --bootstrap-server kafka.kafka:9092
kafka-topics.sh --describe --bootstrap-server kafka.kafka:9092 --topic test001
生产者/顾客测验
# 【生产者】
kafka-console-producer.sh --broker-list kafka.kafka:9092 --topic test001
{"id":"1","name":"n1","age":"20"}
{"id":"2","name":"n2","age":"21"}
{"id":"3","name":"n3","age":"22"}
# 【顾客】
# 从头开端消费
kafka-console-consumer.sh --bootstrap-server kafka.kafka:9092 --topic test001 --from-beginning
# 指定从分区的某个位置开端消费,这儿只指定了一个分区,能够多写几行或者遍历对应的所有分区
kafka-console-consumer.sh --bootstrap-server kafka.kafka:9092 --topic test001 --partition 0 --offset 100 --group test001
# 检查数据积压
kafka-consumer-groups.sh --bootstrap-server kafka.kafka:9092 --describe --group test001
删去 topic
# 删去topic,默许是没有启用删去topic的
kafka-topics.sh --delete --topic test001 --bootstrap-server kafka.kafka:9092
# 装备启用能够删去topic,topic 装备文件里,delete.topic.enable=true;k8s helm chat包里敞开这个参数:
deleteTopicEnable: true
5、卸载
helm uninstall kafka -n kafka
kubectl delete pod -n kafka `kubectl get pod -n kafka|awk 'NR>1{print $1}'` --force
kubectl patch ns kafka -p '{"metadata":{"finalizers":null}}'
kubectl delete ns kafka --force
四、kafka 分区与副本
Kafka中的分区(Partitions)和副本(Replicas)是要害的概念,它们有助于完成高可用性、容错性和扩展性。下面是有关Kafka分区和副本的根本概念:
1)分区(Partitions):
界说:分区是Kafka中用于存储音讯的根本单元。每个主题(Topic)都能够被划分为一个或多个分区。分区中的每条音讯都会被分配到一个特定的分区中。
1、效果:
- 水平扩展:经过将主题划分为多个分区,Kafka能够水平扩展,允许音讯的并行处理和更好的功能。
- 次序保证:每个分区中的音讯坚持有序。在同一分区中,音讯的写入和读取次序是严格有序的。
2、分区的特点:
- 编号:每个分区都有一个仅有的编号(从0开端),用于标识分区。
- 持久化:分区的数据是持久化的,能够在多个节点之间仿制以提高可用性和容错性。
- 副本数量:每个分区能够有一个或多个副本。
3、生产者和顾客:
- 生产者能够指定音讯发送到特定的分区。
- 顾客订阅主题时,会消费所有分区中的音讯。
2)副本(Replicas):
界说:副本是分区的仿制。每个分区能够装备多个副本,这些副本散布在Kafka集群的不同节点上。
1、效果:
- 高可用性:副本供给了毛病康复和高可用性。当某个节点或分区不可用时,依然能够从其他节点或副本读取数据。
- 容错性:经过在多个节点上存储相同的数据,即使某个节点产生毛病,数据依然可用。
2、副本的特点:
- 同步仿制:副本之间能够装备为同步或异步仿制。同步仿制保证写入操作在所有副本上都完成后才回来成功。
- 领导者和追随者:每个分区都有一个领导者(Leader)和零个或多个追随者(Follower)。生产者和顾客通常与分区的领导者进行交互。
3、ISR(In-Sync Replicas):
-
ISR
是指与分区领导者坚持同步的副本调集。只有ISR中的副本才能成为新的领导者。当某个副本无法坚持同步时,它将从ISR中移除。
生产者和顾客与分区和副本的关系:
- 生产者能够挑选将音讯发送到特定的分区,也能够根据分区键挑选。
- 顾客订阅主题时,会消费分区中的音讯,与分区中的领导者和追随者进行交互。
总体而言,Kafka的分区和副本机制供给了高度的可伸缩性、高可用性和容错性,使其成为处理大规模实时数据流的强壮渠道。
五、kafka 磁盘扩容
场景:或许由于数据量上涨,就得靠谱扩容磁盘了,这儿每个节点增加一块磁盘,假如不新增topic的状况下,是不会写到对应新磁盘的。kafka装备文件log.dirs
增加了几个目录。
# log.dirs用来装备多个根目录(以逗号分隔)
log.dirs=/data1,/data2
# 修正完装备重启kafka即可
六、数据均衡(分区搬迁)
场景:kafka装备文件log.dirs
增加了几个目录,可是新目录没有分区数据写入,所以打算进行重分区一下。
1)检查topic分区状况
# 登录zookeeper pod
kubectl exec -it kafka-0 -n kafka -- bash
# 为了测验这儿多建几个topic
kafka-topics.sh --create --topic test002 --bootstrap-server kafka.kafka:9092 --partitions 1 --replication-factor 1 --config retention.ms=259200000
kafka-topics.sh --create --topic test003 --bootstrap-server kafka.kafka:9092 --partitions 1 --replication-factor 1 --config retention.ms=259200000
kafka-topics.sh --create --topic test004 --bootstrap-server kafka.kafka:9092 --partitions 1 --replication-factor 1 --config retention.ms=259200000
kafka-topics.sh --create --topic test005 --bootstrap-server kafka.kafka:9092 --partitions 2 --replication-factor 2 --config retention.ms=259200000
kafka-topics.sh --create --topic test006 --bootstrap-server kafka.kafka:9092 --partitions 2 --replication-factor 2 --config retention.ms=259200000
kafka-topics.sh --create --topic test007 --bootstrap-server kafka.kafka:9092 --partitions 2 --replication-factor 2 --config retention.ms=259200000
# 检查分区状况
kafka-topics.sh --describe --bootstrap-server kafka.kafka:9092 --topic test001
2)检查分区巨细
# 显现所有的topic详情
kafka-log-dirs.sh --describe --bootstrap-server kafka.kafka:9092
# 只显现test001信息
kafka-log-dirs.sh --describe --bootstrap-server kafka.kafka:9092 --topic-list test001
数据格式化:
{
"version": 1,
"brokers": [
{
"broker": 2,
"logDirs": [
{
"logDir": "/bitnami/kafka/data",
"error": null,
"partitions": [
{
"partition": "test001-0",
"size": 380,
"offsetLag": 0,
"isFuture": false
},
{
"partition": "test001-2",
"size": 198,
"offsetLag": 0,
"isFuture": false
},
{
"partition": "test001-1",
"size": 190,
"offsetLag": 0,
"isFuture": false
}
]
}
]
},
{
"broker": 1,
"logDirs": [
{
"logDir": "/bitnami/kafka/data",
"error": null,
"partitions": [
{
"partition": "test001-0",
"size": 380,
"offsetLag": 0,
"isFuture": false
},
{
"partition": "test001-2",
"size": 198,
"offsetLag": 0,
"isFuture": false
},
{
"partition": "test001-1",
"size": 190,
"offsetLag": 0,
"isFuture": false
}
]
}
]
},
{
"broker": 0,
"logDirs": [
{
"logDir": "/bitnami/kafka/data",
"error": null,
"partitions": [
{
"partition": "test001-0",
"size": 380,
"offsetLag": 0,
"isFuture": false
},
{
"partition": "test001-2",
"size": 198,
"offsetLag": 0,
"isFuture": false
},
{
"partition": "test001-1",
"size": 190,
"offsetLag": 0,
"isFuture": false
}
]
}
]
}
]
}
3)编写 move-json-file.json,生成履行方案
move-json-file.json
这个文件就是告知想对哪些Topic进行从头分配的计算。
【示例一】分区搬迁
{
"topics": [{
"topic": "test002"
}],
"version": 1
}
# 检查分区
kafka-topics.sh --describe --bootstrap-server kafka.kafka:9092 --topic test002
# 检查分区巨细
kafka-log-dirs.sh --describe --bootstrap-server kafka.kafka:9092 --topic-list test002
开端履行
# 当时topic在,0节点,搬迁到1节点
kafka-reassign-partitions.sh --bootstrap-server kafka.kafka:9092 --topics-to-move-json-file /tmp/move-json-file.json --broker-list "1" --generate
# 输出信息:生成了两条信息,第一条为现在的分配状况,第二条为方案更改的内容
# 当时:Current partition replica assignment
{"version":1,"partitions":[{"topic":"test002","partition":0,"replicas":[0],"log_dirs":["any"]}]}
# 搬迁:Proposed partition reassignment configuration
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test002","partition":0,"replicas":[1],"log_dirs":["any"]}]}
把方案修正的结果仿制,放在第二个json文件中,这儿取名为reassignment-json-file.json
【留意】现在还没真实搬迁,仅仅输出搬迁信息。能够履行检查就知道了。 kafka-topics.sh –describe –bootstrap-server kafka.kafka:9092 –topic test002
{"version":1,"partitions":[{"topic":"test002","partition":0,"replicas":[1],"log_dirs":["any"]}]}
【温馨提示】–broker-list “1”:扩容后的所有机器的broker.id。
4)开端搬迁
运行kafka-reassign-partition.sh
命令根据上述履行方案生成的结果进行分配,命令如下:
echo '{"version":1,"partitions":[{"topic":"test002","partition":0,"replicas":[1],"log_dirs":["any"]}]}' >/tmp/reassignment-json-file.json
kafka-reassign-partitions.sh --bootstrap-server kafka.kafka:9092 -reassignment-json-file /tmp/reassignment-json-file.json -execute
【示例二】磁盘间、不同途径分区搬迁
{
"version": 1,
"partitions": [{
"topic": "test01",
"partition": 2,
"replicas": [0],
"log_dirs": ["/data1"]
}, {
"topic": "test01",
"partition": 1,
"replicas": [0],
"log_dirs": ["/data2"]
}]
}
version:固定值 1
开端履行搬迁
kafka-reassign-partitions.sh --zookeeper --bootstrap-server kafka.kafka:9092 --reassignment-json-file config/move-json-file.json --execute --bootstrap-server
kafka.kafka:9092 --execute --replica-alter-log-dirs-throttle 10000 --throttle 50000000
参数解说:
-
--replica-alter-log-dirs-throttle
:需求留意的是,假如你搬迁的时分包含 副本跨途径搬迁(同一个Broker多个途径)那么这个限流措施不会收效,你需求再加上--replica-alter-log-dirs-throttle
这个限流参数,它限制的是同一个Broker不同途径直接搬迁的限流。 -
--throttle 50000000
:那么履行移动分区的时分,会被限制流量在50000000 B/s
。
kafka 磁盘扩容与数据均衡真实操作解说就先到这儿了,有任何疑问也可关注我公众号:大数据与云原生技能分享
,进行技能交流,如本篇文章对您有所协助,费事帮忙一键三连(点赞、转发、收藏)~