一、概述

Filebeat是一个轻量级的日志数据搜集东西,归于Elastic公司的Elastic Stack(ELK Stack)生态体系的一部分。它的主要功用是从各种来历搜集日志数据,将数据发送到Elasticsearch、Logstash或其他方针,以便进行查找、剖析和可视化。

以下是Filebeat的主要概述和特点:

  • 轻量级:Filebeat是一个轻量级的代理,对体系资源的消耗非常低。它设计用于高性能和低推迟,能够在各种环境中运转,包括服务器、容器和虚拟机。

  • 多源搜集:Filebeat支撑从各种来历搜集数据,包括日志文件、体系日志、Docker容器日志、Windows事情日志等。它具有多个输入模块,能够轻松装备用于不同数据源的数据搜集。

  • 模块化:Filebeat选用模块化的方法组织装备,每个输入类型都能够作为一个模块,易于扩展和装备。这使得增加新的数据源和日志格局变得愈加简略。

  • 主动发现:Filebeat支撑主动发现服务,能够在容器化环境中主动识别新的容器和服务,并开始搜集其日志数据。

  • 安全性:Filebeat支撑安全传输,能够运用TLS/SSL加密协议将数据安全地传输到方针。它还支撑基于令牌的身份验证。

  • 数据处理:Filebeat能够对数据进行简略的处理,如字段切割、字段重命名和数据过滤,以保证数据合适进一步处理和剖析。

  • 方针输出:Filebeat能够将数据发送到多个方针,最常见的是将数据发送到Elasticsearch,以便进行全文查找和剖析。此外,还能够将数据发送到Logstash、Kafka等方针。

  • 实时性:Filebeat能够以实时方法搜集和传输数据,保证日志数据及时可用于剖析和可视化。

  • 监控和管理:Filebeat具有自身的监控功用,能够监督自身的状态和性能,并与Elasticsearch、Kibana等东西集成,用于管理和监控数据搜集。

作业的流程图如下:

轻量级的日志采集组件 Filebeat 讲解与实战操作
Filebeat的搜集原理的主要步骤

  1. 数据源检测

    • Filebeat首要装备要监督的数据源,这能够是日志文件、体系日志、Docker容器日志、Windows事情日志等。Filebeat能够经过输入模块装备来界说数据源。
  2. 数据搜集

    • 一旦数据源被界说,Filebeat会定期轮询这些数据源,检查是否有新的数据发生。
    • 如果有新数据,Filebeat将读取数据并将其发送到后续处理阶段。
  3. 数据处理

    • Filebeat能够对搜集到的数据进行一些简略的处理,例如字段切割、字段重命名、数据解析等。这有助于保证数据格局合适进一步的处理和剖析。
  4. 数据传输

    • 搜集到的数据将被传输到一个或多个方针方位,通常是Elasticsearch、Logstash或Kafka等。
    • Filebeat能够装备多个输出方针,以便将数据复制到多个地方以增加冗余或分发数据。
  5. 安全性和可靠性

    • Filebeat支撑安全传输,能够运用TLS/SSL协议对数据进行加密。 它还具有数据重试机制,以保证数据能够成功传输到方针方位。
  6. 数据目的地:

    • 数据被传输到方针方位后,能够被进一步处理、索引和剖析。方针方位通常是Elasticsearch,用于全文查找和剖析,或许是Logstash用于进一步的数据处理和转化,也能够是Kafka等其他音讯行列。
  7. 实时性和监控:

    • Filebeat能够以实时方法监督数据源,保证新数据能够快速传输和处理。
    • Filebeat还能够与监控东西集成,以监控其自身的性能和状态,并将这些数据发送到监控体系中。

总的来说,Filebeat搜集原理是经过轮询监督数据源,将新数据搜集并发送到方针方位,一起保证数据的安全传输和可靠性。它供给了一种高效且灵敏的方法来处理各种类型的日志和事情数据,以便进行后续的剖析和可视化。

二、Kafka 装置

为了快速布置,这儿挑选经过docker-compose布置,能够参阅我这篇文章:【中间件】经过 docker-compose 快速布置 Kafka 保姆级教程

# 先装置 zookeeper
git clone https://gitee.com/hadoop-bigdata/docker-compose-zookeeper.git
cd docker-compose-zookeeper 
docker-compose -f docker-compose.yaml up -d
# 装置kafka
git clone https://gitee.com/hadoop-bigdata/docker-compose-kafka.git
cd docker-compose-kafka
docker-compose -f docker-compose.yaml up -d

如果仅仅只是为测验也能够布置一个单机kafka 官方下载地址:kafka.apache.org/downloads

### 1、下载kafka
wget https://downloads.apache.org/kafka/3.4.1/kafka_2.12-3.4.1.tgz --no-check-certificate
### 2、解压
tar -xf kafka_2.12-3.4.1.tgz
### 3、装备环境变量
# ~/.bashrc增加如下内容:
export PATH=$PATH:/opt/docker-compose-kafka/images/kafka_2.12-3.4.1/bin
### 4、装备zookeeper 新版Kafka已内置了ZooKeeper,如果没有其它大数据组件需求运用ZooKeeper的话,直接用内置的会更方便保护。
# vi kafka_2.12-3.4.1/config/zookeeper.properties
#注释掉
#maxClientCnxns=0
#设置衔接参数,增加如下装备
#为zk的基本时刻单元,毫秒
tickTime=2000
#Leader-Follower初始通信时限 tickTime*10
initLimit=10
#Leader-Follower同步通信时限 tickTime*5
syncLimit=5
#设置broker Id的服务地址
#hadoop-node1对应于前面在hosts里面装备的主机映射,0是broker.id, 2888是数据同步和音讯传递端口,3888是推举端口
server.0=local-168-182-110:2888:3888
### 5、装备kafka
# vi kafka_2.12-3.4.1/config/server.properties
#增加以下内容:
broker.id=0
listeners=PLAINTEXT://local-168-182-110:9092
# 上面容器的zookeeper
zookeeper.connect=local-168-182-110:2181
# topic不存在的,kafka就会创立该topic。
#auto.create.topics.enable=true
### 6、发动服务
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
./bin/kafka-server-start.sh -daemon config/server.properties
### 7、测验验证
#创立topic
kafka-topics.sh --bootstrap-server local-168-182-110:9092 --create --topic topic1 --partitions 8 --replication-factor 1
#列出一切topic
kafka-topics.sh --bootstrap-server local-168-182-110:9092 --list
#列出一切topic的信息
kafka-topics.sh --bootstrap-server local-168-182-110:9092 --describe
#列出指定topic的信息
kafka-topics.sh --bootstrap-server local-168-182-110:9092 --describe --topic topic1
#生产者(音讯发送程序)
kafka-console-producer.sh --broker-list local-168-182-110:9092 --topic topic1
#顾客(音讯接收程序)
kafka-console-consumer.sh --bootstrap-server local-168-182-110:9092 --topic topic1

三、Filebeat 装置

1)下载 Filebeat

官网地址:www.elastic.co/cn/download…

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.6.2-linux-x86_64.tar.gz
tar -xf filebeat-7.6.2-linux-x86_64.tar.gz

2)Filebeat 装备参数解说

Filebeat的装备文件通常是YAML格局,包括各种装备参数,用于界说数据源、输出方针、数据处理和其他选项。以下是一些常见的Filebeat装备参数及其意义:

  1. filebeat.inputs:指定要监督的数据源。能够装备多个输入,每个输入界说一个数据源。每个输入包括以下参数:

    • type:数据源的类型,例如日志文件、体系日志、Docker日志等。
    • paths:要监督的文件途径或许运用通配符指定多个文件。
    • enabled:是否启用该输入。

示例:

filebeat.inputs:
  - type: log
    paths:
      - /var/log/*.log
  - type: docker
    enabled: true
  1. filebeat.modules:界说要加载的模块,每个模块用于解析特定类型的日志或事情数据。每个模块包括以下参数:

    • module:模块称号。
    • enabled:是否启用模块。
    • var:自界说模块变量。

示例:

filebeat.modules:
  - module: apache
    access:
      enabled: true
    error:
      enabled: true
  1. output.elasticsearch:指定将数据发送到Elasticsearch的装备参数,包括Elasticsearch主机、索引称号等。

    • hosts:Elasticsearch主机列表。
    • index:索引称号模板。
    • username和password:用于身份验证的用户名和密码。
    • pipeline:用于数据预处理的Ingest节点管道。

示例:

output.elasticsearch:
  hosts: ["localhost:9200"]
  index: "filebeat-%{[agent.version]}-%{+yyyy.MM.dd}"
  username: "your_username"
  password: "your_password"
  1. output.logstash:指定将数据发送到Logstash的装备参数,包括Logstash主机和端口等。

    • hosts:Logstash主机列表。
    • index:索引称号模板。
    • ssl:是否运用SSL/TLS加密传输数据。

示例:

output.logstash:
  hosts: ["localhost:5044"]
  index: "filebeat-%{[agent.version]}-%{+yyyy.MM.dd}"
  ssl.enabled: true
  1. processors:界说对数据的预处理步骤,包括字段切割、重命名、增加字段等。

    • add_fields:增加字段到事情数据。
    • decode_json_fields:解码JSON格局的字段。
    • drop_fields:删去指定字段。
    • rename:重命名字段。

示例:

processors:
  - add_fields:
      target: "my_field"
      value: "my_value"
  - drop_fields:
      fields: ["field1", "field2"]
  1. filebeat.registry.path:指定Filebeat用于盯梢现已读取的文件和方位信息的注册文件的途径。

  2. filebeat.autodiscover:主动发现数据源,特别是用于容器化环境,装备主动检测新容器的战略。

  3. logging.level:指定Filebeat的日志等级,可选项包括info、debug、warning等。

这些是 Filebeat 的一些常见装备参数,具体的装备取决于您的运用场景和需求。您能够依据需求自界说装备文件,以满意您的数据搜集和处理需求。详细的装备文档能够在Filebeat官方文档中找到。

3)filebeat.prospectors 推送kafka完好装备

这儿主要用到几个中心字段:filebeat.prospectorsprocessorsoutput.kafka

1、filebeat.prospectors

filebeat.prospectors:用于界说要监督的数据源和搜集规则。每个 prospector 包括一个或多个输入规则,它们指定要监督的文件或数据源以及如何搜集和解析数据。

以下是一个示例 filebeat.prospectors 部分的装备:

filebeat.prospectors:
- type: log
  enabled: true
  paths:
    - /var/log/*.log
  exclude_files:
    - "*.gz"
  multiline.pattern: '^\['
  multiline.negate: false
  multiline.match: after
  tags: ["tag1", "tag2"]
  tail_files: true
  fields:
    app: myapp
    env: production

在上述示例中,我们界说了一个 filebeat.prospectors 包括一个 type: log 的 prospector,下面是各个字段的解说:

  • type(必需):数据源的类型。在示例中,类型是 log,表明监督一般文本日志文件。Filebeat支撑多种类型,如 log、stdin、tcp、udp 等。

  • enabled:是否启用此 prospector。如果设置为 true,则启用,不然禁用。默以为 true。

  • paths(必需):要监督的文件或文件形式,能够运用通配符指定多个文件。在示例中,Filebeat将监督 /var/log/ 目录下的一切以 .log 结束的文件。

  • exclude_files:要排除的文件或文件形式列表。这儿排除了一切以 .gz 结束的文件。可选字段。

  • multiline.pattern:多行日志的起始形式。如果您的日志事情跨过多行,此选项可用于兼并多行日志事情。例如,设置为 ‘pattern’ 将依据以 ‘pattern’ 最初的行来兼并事情。

  • multiline.negate:是否取反多行日志形式。如果设置为 true,则表明匹配不包括多行日志形式的行。可选字段,默以为 false。

  • multiline.match:多行匹配形式,能够是 before(与上一行兼并)或 after(与下一行兼并)。如果设置为 before,则当前行与上一行兼并为一个事情;如果设置为 after,则当前行与下一行兼并为一个事情。可选字段,默以为 after

  • tags:为搜集的事情增加标签,以便后续的数据处理。标签是一个字符串数组,能够包括多个标签。在示例中,事情将被标记为 “tag1” 和 “tag2″。可选字段。

  • tail_files:用于控制Filebeat是否应该盯梢正在写入的文件(tail文件)。当 tail_files 设置为 true 时,Filebeat将监督正在被写入的文件,即便它们还没有完结。这对于实时监督日志文件非常有用,由于它答应Filebeat当即处理新的日志行。默许情况下tail_files启用的,因此只有在特殊情况下才需求显式设置为 false。

  • fields:为事情增加自界说字段。这是一个键值对,答应您增加额外的信息到事情中。在示例中,事情将包括 “app” 字段和 “env” 字段,别离设置为 “myapp” 和 “production”。可选字段。

这些字段答应您装备Filebeat以满意特定的数据源和搜集需求。您能够依据需求界说多个 prospector 来监督不同类型的数据源,每个 prospector 能够包括不同的参数。经过灵敏装备 filebeat.prospectors,Filebeat能够适应各种日志和数据搜集场景。

2、processors

processors 是Filebeat装备中的一个部分,用于界说在事情传输到输出方针之前对事情数据进行预处理的操作。您能够运用 processors 来修正事情数据、增加字段、删去字段,以及履行其他自界说操作。以下是一些常见的 processors 装备示例和说明:

  • 增加字段(Add Fields): 能够运用 add_fields 处理器将自界说字段增加到事情中,以丰厚事情的信息。例如,将应用程序称号和环境增加到事情中:
processors:
  - add_fields:
      fields:
        app: myapp
        env: production
  • 删去字段(Drop Fields): 运用 drop_fields 处理器能够删去事情中的指定字段。以下示例删去名为 “sensitive_data” 的字段:
processors:
  - drop_fields:
      fields: ["sensitive_data"]
  • 解码 JSON 字段(Decode JSON Fields): 如果事情中包括JSON格局的字段,您能够运用 decode_json_fields 处理器将其解码为结构化数据。以下示例将名为 “json_data” 的字段解码为结构化数据:
processors:
  - decode_json_fields:
      fields: ["json_data"]
      target: ""
  • 字段重命名(Rename Fields): 能够运用 rename 处理器重命名事情中的字段。例如,将 “old_field” 重命名为 “new_field”:
processors:
  - rename:
      fields:
        - from: old_field
          to: new_field
  • 条件处理(Conditional Processing): 运用 if 条件能够依据事情的特定字段或属性来挑选是否应用某个处理器。以下示例依据事情中的 “log_level” 字段,仅在 “error” 日志等级时增加 “error” 标签:
processors:
  - add_tags:
      tags: ["error"]
    when:
      equals:
        log_level: "error"
  • 多个处理器(Multiple Processors): 您能够装备多个处理器,它们将依照顺序顺次应用于事情数据。例如,您能够先增加字段,然后删去字段,最后重命名字段。

processors 部分答应您对事情数据进行杂乱的处理和转化,以适应特定的需求。您能够依据需求组合不同的处理器来履行多个操作,以保证事情数据在传输到输出方针之前满意您的要求。

3、output.kafka

output.kafka 是Filebeat装备文件中的一个部分,用于装备将事情数据发送到Kafka音讯行列的相关设置。以下是 output.kafka 部分的常见参数及其解说:

output.kafka:
  hosts: ["kafka-broker1:9092", "kafka-broker2:9092"]
  topic: "my-log-topic"
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

以下是各个参数的详细解说:

  • hosts(必需):Kafka broker 的地址和端口列表。在示例中,我们指定了两个Kafka broker:kafka-broker1:9092 和 kafka-broker2:9092。Filebeat将运用这些地址来衔接到Kafka集群。

  • topic(必需):要发送事情到的Kafka主题(topic)的称号。在示例中,主题称号为 “my-log-topic”。Filebeat将会将事情发送到这个主题。

  • partition.round_robin:事情分区战略的装备。这儿的装备是将事情平均分布到一切分区,不仅仅是可达的分区。reachable_only 设置为 false,表明即便分区不可达也会发送数据。如果设置为 true,则只会发送到可达的分区。

  • required_acks:Kafka的承认机制。指定要等候的承认数,1 表明只需求得到一个分区的承认就以为音讯现已成功发送。更高的值表明更多的承认。通常,1 是常见的设置,由于它具有较低的推迟。

  • compression:数据的紧缩方法。在示例中,数据被gzip紧缩。这有助于减小传输数据的巨细,下降网络带宽的运用。

  • max_message_bytes:Kafka音讯的最大字节数。如果事情的巨细超越此限制,Filebeat会将事情拆分为多个音讯。

以上是常见的 output.kafka 参数,您能够依据您的Kafka集群装备和需求来调整这些参数。保证装备正确的Kafka主题和分区战略以满意您的数据传输需求。一起,要保证Filebeat服务器能够衔接到指定的Kafka broker地址。

以下是一个完好的Filebeat装备文件示例,其间包括了 filebeat.prospectorsprocessorsoutput.kafka 的装备部分,以用于从日志文件搜集数据并将其发送到Kafka音讯行列:

4)filebeat.inputs 与 filebeat.prospectors差异

Filebeat 从 7.x 版别开始引入了新的装备方法 filebeat.inputs,以供给更灵敏的输入装备选项,一起保留了向后兼容性。以下是 filebeat.inputsfilebeat.prospectors 之间的主要差异:

  • filebeat.inputs

    • filebeat.inputs 是较新版别的装备方法,用于界说输入装备。
    • 答应您以更灵敏的方法装备不同类型的输入。您能够在装备文件中界说多个独立的输入块,每个块用于装备不同类型的输入。
    • 每个输入块能够包括多个字段,用于定制不同输入类型的装备,如 type、enabled、paths、multiline 等。
    • 使装备更具可读性,由于每个输入类型都有自己的装备块。

示例:

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/app/*.log
- type: syslog
  enabled: true
  port: 514
  protocol.udp: true
  • filebeat.prospectors

    • filebeat.prospectors 是旧版装备方法,用于界说输入装备。
    • 一切的输入类型(如日志文件、体系日志、stdin 等)都需求放在同一个部分中。
    • 需求在同一个装备块中界说不同输入类型的途径等细节。
    • 旧版装备方法,不如 filebeat.inputs 装备方法那么灵敏和可读性好。

以下是一些常见的 type 值以及它们的意义:

  1. log常用):用于监督和搜集文本日志文件,例如应用程序日志。
- type: log
  paths:
    - /var/log/*.log
  1. stdin:用于从标准输入(stdin)搜集数据。
- type: stdin
  1. syslog:用于搜集体系日志数据,通常是经过UDPTCP协议从远程或本地 syslog 服务器接收。
- type: syslog
  port: 514
  protocol.udp: true
  1. filestream:用于搜集 Windows 上的文件日志数据。
- type: filestream
  enabled: true
  1. httpjson:用于经过 HTTP 请求从 JSON API 搜集数据。
- type: httpjson
  enabled: true
  urls:
    - http://example.com/api/data
  1. tcp 和 udp:用于经过 TCP 或 UDP 协议搜集网络数据。
- type: tcp
  enabled: true
  host: "localhost"
  port: 12345
- type: udp
  enabled: true
  host: "localhost"
  port: 12345

总的来说,filebeat.inputs 供给了更灵敏的方法来装备不同类型的输入,更简单组织和管理装备。如果您运用的是较新版别的 Filebeat,推荐运用 filebeat.inputs 装备方法。但对于向后兼容性,旧版的 filebeat.prospectors 依然能够运用。

5)filebeat.yml 装备

filebeat.yml

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/*.log
  multiline.pattern: '^\['
  multiline.negate: false
  multiline.match: after
  tail_files: true
  fields:
    app: myapp
    env: production
    topicname: my-log-topic
- type: log
  enabled: true
  paths:
    - /var/log/messages
  multiline.pattern: '^\['
  multiline.negate: false
  multiline.match: after
  tail_files: true
  fields:
    app: myapp
    env: production
    topicname: my-log-topic
processors:
  - add_fields:
      fields:
        app: myapp
        env: production
  - drop_fields:
      fields: ["sensitive_data"]
output.kafka:
  hosts: ["local-168-182-110:9092"]
  #topic: "my-log-topic"
  # 这儿也能够应用上面filebeat.prospectors.fields的值
  topic: '%{[fields][topicname]}'
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

6)发动 Filebeat 服务

nohup ./filebeat -e -c filebeat.yml >/dev/null 2>&1 &
# -e 将发动信息输出到屏幕上
# filebeat自身运转的日志默许方位${install_path}/logs/filebeat

要修正filebeat的日子途径,能够增加一下内容在filebeat.yml装备文件:

#logging.level :debug 日志等级
path.logs: /var/log/

运用 systemctl 发动 filebeat

# vi /usr/lib/systemd/system/filebeat.service
[Unit]
Description=filebeat server daemon
Documentation=/opt/filebeat-7.6.2-linux-x86_64/filebeat -help
Wants=network-online.target
After=network-online.target
[Service]
User=root
Group=root
Environment="BEAT_CONFIG_OPTS=-c /opt/filebeat-7.6.2-linux-x86_64/filebeat.yml"
ExecStart=/opt/filebeat-7.6.2-linux-x86_64/filebeat $BEAT_CONFIG_OPTS
Restart=always
[Install]
WantedBy=multi-user.target

【温馨提示】记住替换自己的 filebeat 目录。

systemctl 发动 filebeat 服务

#改写一下装备文件
systemctl daemon-reload
# 发动
systemctl start filebeat
# 检查状态
systemctl status filebeat
# 检查进程
ps -ef|grep filebeat
# 检查日志
vi logs/filebeat

7)检测日志是否现已搜集到 kafka

# 设置环境变量
export KAFKA_HOME=/opt/docker-compose-kafka/images/kafka_2.12-3.4.1
# 检查topic列表
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server local-168-182-110:9092 --list
# 检查topic列表概况
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server local-168-182-110:9092 --describe
# 指定topic
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server local-168-182-110:9092 --describe --topic my-log-topic
# 检查kafka数据
${KAFKA_HOME}/bin/kafka-console-consumer.sh --topic my-log-topic --bootstrap-server local-168-182-110:9092
#上述指令会衔接到指定的Kafka集群并打印my_topic主题上的一切音讯。如果要检查特定数量的最新音讯,则应将“--from-beginning”增加到指令中。
# 在较高版别的 Kafka 中(例如 Kafka 2.4.x 和更高版别),顾客默许需求清晰指定要消费的分区。
#以下是检查特定最新音讯数量的示例:
${KAFKA_HOME}/bin/kafka-console-consumer.sh --topic my-log-topic --bootstrap-server local-168-182-110:9092 --from-beginning --max-messages 10 --partition 0
# 检查kafka数据量,在较高版别的 Kafka 中(例如 Kafka 2.4.x 和更高版别),顾客默许需求清晰指定要消费的分区。
${KAFKA_HOME}/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list local-168-182-110:9092 --topic my-log-topic --time -1
# 消费数据检查数据,这儿指定一个分区
${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server local-168-182-110:9092 --topic my-log-topic --partition 0 --offset 100
# 也能够经过消费组消费,能够不指定分区
${KAFKA_HOME}/bin/kafka-console-consumer.sh --topic my-log-topic --bootstrap-server local-168-182-110:9092 --from-beginning --group my-group

这将回来主题 <topic_name> 的分区和偏移量信息,您能够依据这些信息计算出数据量。

轻量级的日志搜集组件 Filebeat 解说与实战操作就先到这儿了,有任何疑问也可重视我公众号:大数据与云原生技术分享,进行技术交流,如本篇文章对您有所协助,费事帮忙一键三连(点赞、转发、保藏)~

轻量级的日志采集组件 Filebeat 讲解与实战操作