敞开掘金成长之旅!这是我参加「掘金日新计划 12 月更文应战」的第12天,点击检查活动详情

第一章 Flume基础理论

1.1 数据搜集东西产生背景

Hadoop 业务的一般整体开发流程:

浅析数据采集工具Flume

任何完好的大数据渠道,一般都会包括以下的根本处理进程:

数据搜集
数据 ETL 
数据存储 
数据计算/分析 
数据展现 

其间,数据搜集是所有数据体系必不可少的,跟着大数据越来越被重视,数据搜集的应战也变的尤为突出。这其间包括:

数据源多种多样
数据量大,改变快 
怎么确保数据搜集的牢靠性的功能 
怎么防止重复数据 
怎么确保数据的质量 

我们今天就来看看当时可用的一些数据搜集的产品,要点重视一些它们是怎么做到高牢靠, 高功能和高扩展。

总结: 数据的来历大体上包括:

1、业务数据
2、爬取的网络公开数据 
3、购买数据 
4、自行搜集日志数据

1.1 Flume简介

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

Flume 是一个分布式、牢靠、高可用的海量日志聚合体系,支撑在体系中定制各类数据发送 方,用于搜集数据,同时,Flume 供给对数据的简略处理,并写到各种数据接纳方的能力。

1、 Apache Flume 是一个分布式、牢靠、和高可用的海量日志搜集、聚合和传输的体系,和 Sqoop 同属于数据搜集体系组件,但是 Sqoop 用来搜集关系型数据库数据,而 Flume 用来搜集流动型数据。

2、 Flume 姓名来历于原始的近乎实时的日志数据搜集东西,现在被广泛用于任何流事情数 据的搜集,它支撑从很多数据源聚合数据到 HDFS。

3、 一般的搜集需求,经过对 flume 的简略装备即可完成。Flume 针对特别场景也具备良好 的自界说扩展能力,因此,flume 能够适用于大部分的日常数据搜集场景 。

4、 Flume 开始由 Cloudera 开发,在 2011 年贡献给了 Apache 基金会,2012 年变成了 Apache 的顶级项目。Flume OG(Original Generation)是 Flume 开始版别,后升级换代成 Flume NG(Next/New Generation)。

5、 Flume 的优势:可横向扩展、延展性、牢靠性。

1.2 Flume版别

Flume 在 0.9.x and 1.x 之间有较大的架构调整: 1.x 版别之后的改称 Flume NG 0.9.x 版别称为 Flume OG,终究一个版别是 0.94,之后是由 Apache 进行了重构 N是New 和 O是Old

Flume1.7版别要求:

Flume OG  Old/Original Generation
Flume NG  New/Next    Generation   

浅析数据采集工具Flume

留意,上面是flume1.7的要求,其他版别要求可能会不一样!!

本文运用版别链接:flume.apache.org/releases/co…

官网链接:flume.apache.org/

Flume1.9 版别要求:

System Requirements

Java Runtime Environment - Java 1.8 or later
Memory - Sufficient memory for configurations used by sources, channels or sinks
Disk Space - Sufficient disk space for configurations used by channels or sinks
Directory Permissions - Read/Write permissions for directories used by agent

第二章 Flume体系结构/中心组件

agent:能独立履行一个数据搜集使命的JVM进程
source : agent中的一个用来跟数据源对接的服务
channel : agent内部的一个中转组件
sink : agent中的一个用来跟数据意图地对接的服务
event: 音讯流转的一个载体/目标
        header  body
    
​
常见source的类型
    Avro source :接纳网络端口中的数据
    exec source: 监听文件新增内容  tail -f
    spooldir source :监控文件夹的,如果这个文件夹里面的文件发送了改变,就能够搜集
    Taildir source: 多目录多文件实时监控
常见的channel的类型
    memory : 内存中  , 快 , 但不安全
    file : 相对来说安全些,但是效率低些
    jdbc: 运用数据库进行数据的保存
​
​
常见的sink的类型
    logger  做测验运用
    HDFS    离线数据的sink 一般
    Kafka  流式数据的sink 
以上仅仅是常见的一些,官网中有完好的。

2.1 介绍

Flume 的数据流由事情(Event)贯穿始终。事情是 Flume 的根本数据单位,它携带日志数据(字节数组方式)而且携带有头信息,这些 Event 由 Agent 外部的 Source 生成,当 Source 捕获事 件后会进行特定的格式化,然后 Source 会把事情推入(单个或多个)Channel 中。你能够把 Channel 看作是一个缓冲区,它将保存事情直到 Sink 处理完该事情。Sink 担任耐久化日志或 者把事情面向另一个 Source。

Flume 以 agent 为最小的独立运转单位

一个 agent 就是一个 JVM

单 agent 由 Source、Sink 和 Channel 三大组件构成。

如下面官网图片

浅析数据采集工具Flume

解说:

浅析数据采集工具Flume

2.2 Flume三大中心组件

Event

Event 是 Flume 数据传输的根本单元。 Flume 以事情的方式将数据从源头传送到终究的意图地。 Event 由可选的 header 和载有数据的一个 byte array 构成。 载有的数据对 flume 是不透明的。 Header 是包容了 key-value 字符串对的无序调集,key 在调集内是唯一的。 Header 能够在上下文路由中运用扩展。

Client

Client 是一个将原始 log 包装成 events 而且发送他们到一个或多个 agent 的实体

意图是从数据源体系中解耦 Flume

在 Flume 的拓扑结构中不是有必要的

Agent

一个 Agent 包括 source,channel,sink 和其他组件。

它使用这些组件将 events 从一个节点传输到另一个节点或终究意图地。

Agent 是 flume 流的基础部分。

Flume为这些组件供给了装备,声明周期办理,监控支撑。

Agent 之 Source

Source 担任接纳 event 或经过特别机制产生 event,并将 events 批量的放到一个或多个

包括 event 驱动和轮询两种类型

不同类型的 Source 与体系集成的 Source:

Syslog,Netcat,监测目录池

自动生成事情的 Source:Exec

用于 Agent 和 Agent 之间通讯的 IPC source:avro,thrift

Source 有必要至少和一个 channel 关联

Agent 之 Channel

Channel 位于 Source 和 Sink 之间,用于缓存进来的 event。

当 sink 成功的将 event 发送到下一个的 channel 或终究意图,event 从 channel 删去。

不同的 channel 供给的耐久化水平也是不一样的

Memory Channel:volatile(不稳定的)

File Channel:依据 WAL(预写式日志 Write-Ahead Logging)完成

JDBC Channel:依据嵌入式 database 完成

Channel 支撑业务,供给较弱的顺序确保

能够和任何数量的 source 和 sink 工作

Agent 之 Sink

Sink 担任将 event 传输到下一级或终究意图地,成功后将 event 从 channel 移除

不同类型的 sink ,比如 HDFS,HBase

2.3 Flume经典布置方案

1、单Agent搜集数据

浅析数据采集工具Flume

由一个 agent 担任把从 web server 中搜集数据到 HDFS 。

2、多Agent串联

浅析数据采集工具Flume

在搜集数据的进程中,能够让多个 agent 串联起来,形成一条 event 数据线,进行传输,但 是留意的是:相邻两个 agent 的前一个 agent 的 sink 类型要和后一个 agent 的 source 类型一 致。

3、多Agent合并串联

浅析数据采集工具Flume

多个 agent 串联,并联成一个杂乱的 数据搜集架构。反映了 flume 的布置灵敏。而且针对要害节点,还能够进行高可用装备。

4、多路复用

浅析数据采集工具Flume

一份数据流,能够被复制成多份数据流,交给多个不同组件进行处理。一般用于一边永久存储一边进行计算。

第三章 Flume装置及事例

3.1 装置布置

3.1.1 Flume1.7装置布置

1、将apache-flume-1.7.0-bin.tar.gz上传到hadoop0的/software目录下,并解压

[root@hadoop0 software]# tar -zxvf apache-flume-1.7.0-bin.tar.gz

2、重命名为flume

[root@hadoop0 software]# mv apache-flume-1.7.0-bin flume

3、修正flume-env.sh文件

[root@hadoop0 conf]# mv flume-env.sh.template flume-env.sh

然后vim flume-env.sh,修正jdk途径

export JAVA_HOME=/software/jdk

3.1.2 Flume1.9装置布置

1、将apache-flume-1.9.0-bin.tar.gz上传到hadoop10的/software目录下,并解压

[root@hadoop10 software]# tar -zxvf apache-flume-1.9.0-bin.tar.gz

2、重命名为flume

[root@hadoop10 software]# mv apache-flume-1.9.0-bin flume

3、修正flume-env.sh文件

[root@hadoop10 conf]# mv flume-env.sh.template flume-env.sh

然后vim flume-env.sh,修正jdk途径

export JAVA_HOME=/software/jdk

4、看看Flume版别

[root@hadoop10 bin]# flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9
[root@hadoop10 bin]# pwd
/software/flume/bin
[root@hadoop10 bin]# 

3.2 事例

3.2.1 监控端口数据(官方事例)

1、在flume的目录下面创立文件夹
[root@hadoop0 flume]# mkdir job
[root@hadoop0 flume]# cd job
2、界说装备文件telnet-logger.conf
[root@hadoop0 job]# vim telnet-logger.conf
增加内容如下:
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3、先敞开flume监听端口
退到flume目录
官方样例:bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
实际操作:
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/telnet-logger.conf -Dflume.root.logger=INFO,console
4、履行telnet localhost 44444
telnet localhost 44444
会先报找不到telnet
[root@hadoop10 flume]# telnet localhost 44444
bash: telnet: command not found...
[root@hadoop10 flume]# 
然后履行yum -y install telnet
5、发送指令测验即可

针对于上述装备telnet-logger.conf文件的内容的解说:

# example.conf: A single-node Flume configuration
# Name the components on this agent  #a1: 表明的是agent的姓名
a1.sources = r1		#r1 : 表明的是a1的输入源
a1.sinks = k1  		#k1 : 表明的a1的输出意图地
a1.channels = c1  	#c1 : 表明的a1的缓冲区
# Describe/configure the source	#装备source
a1.sources.r1.type = netcat		#表明a1的输入源r1的类型是netcat类型
a1.sources.r1.bind = localhost  #表明a1监听的主机
a1.sources.r1.port = 44444      #表明a1监听的端口号
# Describe the sink		    #描述sink
a1.sinks.k1.type = logger	#表明a1的输入意图地k1的类型是logger
# Use a channel which buffers events in memory	
a1.channels.c1.type = memory		#表明a1的channel的类型是memory类型
a1.channels.c1.capacity = 1000		#表明a1的channel总容量1000个event
a1.channels.c1.transactionCapacity = 100  #表明a1的channel传输的时分搜集到了100个event今后再去提交业务
# Bind the source and sink to the channel
a1.sources.r1.channels = c1  #表明将r1和c1 连接起来
a1.sinks.k1.channel = c1     #表明将k1和c1 连接起来
3、先敞开flume监听端口
退到flume目录
官方样例:bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
实际操作:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/telnet-logger.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/telnet-logger2.conf -Dflume.root.logger=INFO,console
参数阐明:
	--conf conf : 表明装备文件在conf目录
	--name a1  :  表明给agent起名为a1 
	--conf-file job/telnet-logger.conf : flume本次发动所要读取的装备文件在job文件夹下面的telnet-logger.conf文件
	-Dflume.root.logger=INFO,console : -D 表明flume运转时分的动态修正flume.root.logger参数值,并将日志打印到控制台,等级是INFO等级。
	日志等级: log、info、warn、error 

3.2.2 监控目录中的文件到HDFS

1、创立装备文件dir-hdfs.conf
在job目录下面 vim dir-hdfs.conf
增加下面的内容:
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /software/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
a3.sources.r3.ignorePattern = ([^ ]*.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop10:8020/flume/upload/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 600
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0
a3.sinks.k3.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
2、发动监控目录指令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/dir-hdfs.conf

针对于上述装备dir-hdfs.conf文件的内容的解说:

1、创立装备文件dir-hdfs.conf
在job目录下面 vim dir-hdfs.conf
增加下面的内容:
a3.sources = r3		#界说source为r3
a3.sinks = k3       #界说sink为k3
a3.channels = c3    #界说channel为c3
# Describe/configure the source  #装备source相关的信息
a3.sources.r3.type = spooldir    #界说source的类型是spooldir类型
a3.sources.r3.spoolDir = /software/flume/upload   #界说监控的详细的目录
a3.sources.r3.fileSuffix = .COMPLETED			  #文件上传完了之后的后缀
a3.sources.r3.fileHeader = true					  #是否有文件头
a3.sources.r3.ignorePattern = ([^ ]*.tmp)        #疏忽以tmp结尾的文件,不进行上传
# Describe the sink			#装备sink相关的信息
a3.sinks.k3.type = hdfs		#界说sink的类型是hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop10:8020/flume/upload/%Y%m%d/%H	#文件上传到hdfs的详细的目录
a3.sinks.k3.hdfs.filePrefix = upload-		#文件上传到hdfs之后的前缀
a3.sinks.k3.hdfs.round = true				#是否按照时刻滚动生成文件
a3.sinks.k3.hdfs.roundValue = 1				#多长时刻单位创立一个新的文件
a3.sinks.k3.hdfs.roundUnit = hour			#时刻单位
a3.sinks.k3.hdfs.useLocalTimeStamp = true   #是否运用本地时刻
a3.sinks.k3.hdfs.batchSize = 100			#堆集多少个event才刷写到hdfs一次
a3.sinks.k3.hdfs.fileType = DataStream		#文件类型
a3.sinks.k3.hdfs.rollInterval = 600			#多久生成新文件
a3.sinks.k3.hdfs.rollSize = 134217700		#多大生成新文件
a3.sinks.k3.hdfs.rollCount = 0				#多少event生成新文件
a3.sinks.k3.hdfs.minBlockReplicas = 1		#副本数
# Use a channel which buffers events in memory
a3.channels.c3.type = memory				
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
2、发动监控目录指令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/dir-hdfs.conf

在履行上面的指令进程中遇到的了一点点小问题

......
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
	at org.apache.hadoop.conf.Configuration.set(Configuration.java:1357)
	at org.apache.hadoop.conf.Configuration.set(Configuration.java:1338)
	at org.apache.hadoop.conf.Configuration.setBoolean(Configuration.java:1679)
	at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:221)
	at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:572)
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:412)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
	at java.lang.Thread.run(Thread.java:748)

解决方案:将lib文件夹下的guava-11.0.2.jar删去以兼容Hadoop的版别。能够经过重命名的方式注释掉即可(完成删去的作用)。

[root@hadoop10 lib]# mv guava-11.0.2.jar guava-11.0.2.jar.backup

3.2.3 监控文件到HDFS

1、创立一个自动化文件
[root@hadoop0 job]# vim mydateauto.sh
写入:
#!/bin/bash
while true
do
	echo `date`
	sleep 1
done
然后运转测验:
[root@hadoop0 job]# sh mydateauto.sh 
Wed Aug 19 18:34:19 CST 2020
Wed Aug 19 18:34:20 CST 2020
然后修正装备,将输出的日志追加到某个文件中
#!/bin/bash
while true
do
        echo `date` >> /software/flume/mydate.txt
        sleep 1
done
再次履行[root@hadoop0 job]# sh mydateauto.sh 
就会在flume的文件夹下面生成了mydate.txt文件
经过tail -f mydate.txt 检查
再次履行sh mydateauto.sh  检查输出。
2、创立装备vim file-hdfs.conf
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /software/flume/mydate.txt
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop10:8020/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = logs-
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 1000
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 600
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0
a2.sinks.k2.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
3、发动
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/file-hdfs.conf

针对于上述装备file-hdfs.conf文件的内容的解说:

# Name the components on this agent
a2.sources = r2		#界说source为r2
a2.sinks = k2		#界说sink为k2
a2.channels = c2	#界说channel为c2
# Describe/configure the source
a2.sources.r2.type = exec	#界说source的类型是exec 可履行指令
a2.sources.r2.command = tail -F /software/flume/mydate.txt	#详细文件方位
a2.sources.r2.shell = /bin/bash -c  #指令最初
# Describe the sink	#sink相关装备
a2.sinks.k2.type = hdfs		#界说sink的类型是hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop10:8020/flume/%Y%m%d/%H		#详细的方位
a2.sinks.k2.hdfs.filePrefix = logs- 	
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 100
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 600   #单位是秒!!
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0
a2.sinks.k2.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
3、发动
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/file-hdfs.conf

进程中遇到的一点点小问题:

浅析数据采集工具Flume

18 Oct 2021 14:32:24,340 INFO  [conf-file-poller-0] (org.apache.flume.sink.DefaultSinkFactory.create:42)  - Creating instance of sink: k2, type: hdfs
18 Oct 2021 14:32:24,348 ERROR [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadSinks:469)  - Sink k2 has been removed due to an error during configuration
java.lang.InstantiationException: Incompatible sink and channel settings defined. sink's batch size is greater than the channels transaction capacity. Sink: k2, batch size = 1000, channel c2, transaction capacity = 100
	at org.apache.flume.node.AbstractConfigurationProvider.checkSinkChannelCompatibility(AbstractConfigurationProvider.java:403)
	at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:462)
	at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:106)
	at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

解决方案:

问题原因:原因其实很明了了,就是字面的意思,channel 与 sink的设置不匹配,sink的batch size大于channel的transaction capacity
解决方案:将a2.sinks.k2.hdfs.batchSize设置为小于等于100 。 或者注释掉也能够。

3.2.4 多目录多文件实时监控(Taildir Source)

与前面运用到的Source的对比

Spooldir Source 用于同步新文件,但不合适对实时追加日志的文件进行监听并同步。
Exec source 用于监控一个实时追加的文件,不能完成断点续传;
Taildir Source 用于监听多个实时追加的文件,而且能够完成断点续传。

操作事例:

1、在job下面创立 vim taildir-hdfs.conf
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /software/flume/taildir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /software/flume/taildirtest/filedir/.*file.*
a3.sources.r3.filegroups.f2 = /software/flume/taildirtest/logdir/.*log.*
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop10:8020/flume/uploadtaildir/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 600
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0
a3.sinks.k3.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
2、创立文件文件夹,留意需要在发动之前创立监控的文件夹
[root@hadoop10 flume]# mkdir taildirtest
[root@hadoop10 flume]# cd taildirtest/
[root@hadoop10 taildirtest]# ll
total 0
[root@hadoop10 taildirtest]# mkdir filedir
[root@hadoop10 taildirtest]# mkdir logdir
[root@hadoop10 taildirtest]# ll
total 0
drwxr-xr-x. 2 root root 6 Oct 18 16:44 filedir
drwxr-xr-x. 2 root root 6 Oct 18 16:45 logdir
[root@hadoop10 taildirtest]# vim file.txt
[root@hadoop10 taildirtest]# vim log.txt
[root@hadoop10 taildirtest]# ll
total 8
drwxr-xr-x. 2 root root  6 Oct 18 16:44 filedir
-rw-r--r--. 1 root root 35 Oct 18 16:45 file.txt
drwxr-xr-x. 2 root root  6 Oct 18 16:45 logdir
-rw-r--r--. 1 root root 35 Oct 18 16:46 log.txt
3、发动监控目录指令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/taildir-hdfs.conf
4、测验
[root@hadoop10 taildirtest]# cp file.txt filedir/
[root@hadoop10 taildirtest]# cp log.txt logdir/
[root@hadoop10 taildirtest]# cd filedir/
[root@hadoop10 filedir]# echo hello1 >> file.txt 
[root@hadoop10 filedir]# cd ../logdir/
[root@hadoop10 logdir]# echo hello2 >> log.txt 
[root@hadoop10 logdir]# 


声明:
文章中代码及相关语句为自己依据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技能对应的图片,若有相关贰言,请联络删去。感谢。转载请注明出处,感谢。

落叶飘雪