flume 基础配置和使用

Posted by Jackson on 2017-12-14

flume 基础配置文件example.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Name the components on this agent 设置Agent各个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source 配置Agent的source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink 配置Agent的sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory 配置Agent的channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel 配置source连接channel和channel连接sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

a1 为flume Agent的名称

安装NetCat yum -y install nc

命令:

1
2
3
4
5
bin/flume-ng agent \
--conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \
--conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/example.conf \
--name a1 \
-Dflume.root.logger=INFO,console

查看flume-ng 的命令帮助,主要使用global option 和global options

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
[hadoop@bigdata01 flume-1.6.0-cdh5.16.2-bin]$ bin/flume-ng
Error: Unknown or unspecified command ''

Usage: bin/flume-ng <command> [options]...

commands:
help display this help text
agent run a Flume agent
avro-client run an avro Flume client
version show Flume version info

global options:
--conf,-c <conf> use configs in <conf> directory
--classpath,-C <cp> append to the classpath
--dryrun,-d do not actually start Flume, just print the command
--plugins-path <dirs> colon-separated list of plugins.d directories. See the
plugins.d section in the user guide for more details.
Default: $FLUME_HOME/plugins.d
-Dproperty=value sets a Java system property value
-Xproperty=value sets a Java -X option

agent options:
--name,-n <name> the name of this agent (required)
--conf-file,-f <file> specify a config file (required if -z missing)
--zkConnString,-z <str> specify the ZooKeeper connection to use (required if -f missing)
--zkBasePath,-p <path> specify the base path in ZooKeeper for agent configs
--no-reload-conf do not reload config file if changed
--help,-h display help text

启动日志分析:

  • 通过配置文件找到env信息,然后拷贝jar包
  • 确定Agent 名字为a1
  • 创建channel, 创建channel的实例memory,创建channel c1
  • 创建source的实例类型为netcat r1
  • 创建sink实例类型为logger k1
  • 使用channel连接r1和k1
  • 开启channel,休眠0.5秒等待channel开启,成功启动channel c1
  • 启动Sink, 启动Source,创建serverSocket

运行flume-ng 之后采用telnet的方式连接44444并且发送hadoop。。。发现控制台即可输出

1
2
3
4
5
6
7
8
9
10
11
12
2017-12-14 20:56:31,342 (SinkRunner-PollingRunner-DefaultSinkProcessor) 
[INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)]
Event: { headers:{} body: 68 61 64 6F 6F 70 0D hadoop. }
2017-12-14 20:56:32,884 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)]
Event: { headers:{} body: 68 69 76 65 0D hive. }
2017-12-14 20:56:34,604 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)]
Event: { headers:{} body: 73 70 61 72 6B 0D spark. }
2017-12-14 20:56:36,989 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)]
Event: { headers:{} body: 66 6C 69 6E 6B 0D hadoop. }

接收到Event,Event由两部分组成,一部分是headers:{}可选的, 另一部分是body,body中存放我们的数据

NetCat TCP Source

监听一个给定的端口并且将每一行通过channel转换成为一个event,可以使用nc -k -l host port 换句话说NetCat Source打开一个确定的端口并且监听数据。
基础的NetCat配置如下所示:

1
2
3
4
a1.sources.r1.type = netcat   <===Source 的类型
a1.sources.r1.bind = 0.0.0.0 <===Source绑定的ip
a1.sources.r1.port = 6666 <===Source绑定的端口号
a1.sources.r1.channels = c1 <===Source绑定的channel

Logger Sink

日志级别的Logs Event,Logger Sink通常是用于测试或者debug调试的目的,必须要设置的参数是有一个即:Type,maxBytesToLog参数可以设置Event中body的最大字节数,默认为16

1
example:a1.sinks.k1.type = logger

Memory Channel

Event被存储在内存的队列中,并且可以配置最大值,对于需要更高吞吐量并且准备在Agent挂掉时会丢失分段数据的流来说,这是理想的。必须参数:Type

1
2
3
4
5
a1.channels.c1.type = memory						<===channel 的类型
a1.channels.c1.capacity = 10000 <===channel 中能存储的Event的最大个数
a1.channels.c1.transactionCapacity = 10000 <===channel 传递的一个批次的最大Event个数
a1.channels.c1.byteCapacityBufferPercentage = 20 <===the percent of buffer between byteCapacity and the estimated total size of all events in the channel,
to account for data in headers.

采集数据写到HDFS上面

source:exec source
channel:memory channel
sink:hdfs sink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Name the components on this agent 设置Agent各个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source 配置Agent的source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/flumeData/word.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink 配置Agent的sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://bigdata01:9000/flume/events
a1.sinks.k1.hdfs.filePrefix = bigdata-
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text


# Use a channel which buffers events in memory 配置Agent的channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel 配置source连接channel和channel连接sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

命令:

1
2
3
4
5
bin/flume-ng agent \
--conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \
--conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/exec-hdfs.conf \
--name a1 \
-Dflume.root.logger=INFO,console

写入数据到word.log日志分析:

  • 1.在HDFS创建文件并且命名为.tmp
  • 2.关闭创建的.tmp文件
  • 3.重命名文件去掉.tmp 后缀
1
2
3
4
5
6
7
8
9
10
11
12
13
2017-12-14 19:29:42,636 (SinkRunner-PollingRunner-DefaultSinkProcessor) 
[INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:246)]
Creating hdfs://bigdata01:9000/flume/events/bigdata-.1581679782308.tmp <=== 在HDFS创建文件并且命名为.tmp
2017-12-14 19:29:42,787 (hdfs-k1-call-runner-0)
[WARN - org.apache.hadoop.util.NativeCodeLoader.<clinit>(NativeCodeLoader.java:62)]
Unable to load native-hadoop library for your platform... using builtin-java classes
where applicable
2017-12-14 19:29:48,915 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -
org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:438)] Closing
hdfs://bigdata01:9000/flume/events/bigdata-.1581679782308.tmp <=== 关闭创建的.tmp文件
2017-12-14 19:29:48,957 (hdfs-k1-call-runner-3) [INFO - org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:681)] Renaming
hdfs://bigdata01:9000/flume/events/bigdata-.1581679782308.tmp to
hdfs://bigdata01:9000/flume/events/bigdata-.1581679782308 <=== 重命名文件去掉.tmp 后缀

此种方式存在的问题:

  • 1.tail -F挂掉怎么办?只能够监控文件
  • 2.缺失了offset的概念
  • 3.小文件太多

因此HDFS Sink提供了三个参数来解决小文件的问题

  • hdfs.rollInterval 30 多长时间滚动一次文件 0代表不滚动
  • hdfs.rollSize 1024 文件到达多大合并 0 不基于此参数滚动
  • hdfs.rollCount 10 写入文件中的event的个数达到多少合并 不基于此参数滚动

监控目录下面的数据采集到HDFS上面

spooling Directory Source

这个Source将监视新文件的指定目录,并在新文件出现时解析它们。事件解析逻辑是可插拔的。
将给定的文件完全读入通道后,默认情况下通过重命名该文件来表示完成,或者可以删除该文件,或者使用trackerDir跟踪已处理的文件。
不同于EXEC Source的是,Spooling Directory Source是可靠的,并且不会丢失数据甚至当flume重启或者挂掉的时候都不会丢失数据,保证可靠性的代价是只有不可变,唯一命名的文件可以被放到监控目录下面
为了避免发生异常,添加具有唯一标识的文件到监控目录下面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Name the components on this agent 设置Agent各个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source 配置Agent的source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/data/flumeData/spool
a1.sources.r1.fileHeader =

# Describe the sink 配置Agent的sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://bigdata01:9000/flume/spool/%Y%m%d%H%M
a1.sinks.k1.hdfs.filePrefix = bigdata-
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.useLocalTimeStamp = true


# Use a channel which buffers events in memory 配置Agent的channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel 配置source连接channel和channel连接sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

命令:

1
2
3
4
5
bin/flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/script/flume-spool-hdfs.conf \
--name a1 \
-Dflume.root.logger=INFO,console

日志分析:

  • 1.读取文件内容,如果还有滚动到下一个文件
  • 2.重命名文件加上后缀.COMPLETED
  • 3.创建HDFS文件xxx.tmp
  • 4.关闭HDFS文件xxx.tmp
  • 5.重命名HDFS文件xxx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2017-12-14 20:36:04,867 (pool-5-thread-1) [INFO - 
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolin
gFileEventReader.java:324)] Last read took us just up to a file boundary. Rolling to
the next file, if there is one.
2017-12-14 20:36:04,868 (pool-5-thread-1) [INFO -
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSp
oolingFileEventReader.java:433)] Preparing to move file
/home/hadoop/data/flumeData/spool/word.log to
/home/hadoop/data/flumeData/spool/word.log.COMPLETED
2017-12-14 20:36:04,877 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -
org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:57)] Serializer
= TEXT, UseRawLocalFileSystem = false
2017-12-14 20:36:05,146 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -
org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:246)] Creating
hdfs://bigdata01:9000/flume/spoool/202002142036/bigdata-.1581683764877.tmp
2017-12-14 20:36:05,285 (hdfs-k1-call-runner-0) [WARN - org.apache.hadoop.util.NativeCodeLoader.<clinit>(NativeCodeLoader.java:62)] Unable to
load native-hadoop library for your platform... using builtin-java classes where
applicable
2017-12-14 20:36:06,582 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -
org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:438)] Closing
hdfs://bigdata01:9000/flume/spoool/202002142036/bigdata-.1581683764877.tmp
2017-12-14 20:36:06,598 (hdfs-k1-call-runner-3) [INFO -
org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:681)] Renaming
hdfs://bigdata01:9000/flume/spoool/202002142036/bigdata-.1581683764877.tmp to
hdfs://bigdata01:9000/flume/spoool/202002142036/bigdata-.1581683764877

Spooling Directory Source只能够支持目录,不支持文件

Taildir Source

TailDir Source 既可以支持文件,也可以支持目录

监控一个文件一旦这个文件有新的行写入,该source可以以接近实时的方式监控到。
如果这个文件正在写入数据,Taildir Source等到该文件写入完成然后再次试图读取该文件。
Taildir Source不会丢失数据,它会周期性的将最后一次读取的偏移量以json的形式写入指定的文件中。
如果Flume因为某些原因停掉了,它可以从文件中读取之前监控的位置,同样也可以监控多个文件。
文件将会以他们被写入时候的顺序被消费掉。最开始写入的数据将会最先被消费到。
该Source不会删除或者修改正在被监控的文件,并且该Source不支持二进制的文件,它能够一行一行的读取text文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Name the components on this agent 设置Agent各个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source 配置Agent的source
a1.sources.r1.type = TAILDIR
a1.sources.r1.writePosInterval = 3000
a1.sources.r1.positionFile = /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/log/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /home/hadoop/data/flumeData/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /home/hadoop/data/flumeData/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2

# Describe the sink 配置Agent的sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory 配置Agent的channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel 配置source连接channel和channel连接sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

命令:

1
2
3
4
5
bin/flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/script/tail-logger.conf \
--name a1 \
-Dflume.root.logger=INFO,console

报错信息

我们使用时间戳的时候需要重flume的event中的header里面取数据,但是并没有设置header
需要设置hdfs sink 中的 参数来进行解决

报错:

1
2
3
4
5
6
7
8
9
10
11
12
2017-12-14 23:14:13,824 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:449)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:251)
at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:460)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:366)
... 3 more

IDEA 日志采集到Flume

IDEA中配置Log4j.properties往44444端口发送数据

log4j.rootCategory=info,console,flume

1
2
3
4
5
6
7
8
9
10
11
12
# 控制台中日志的格式设置
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss.SSS} [%t] [%t] [%t] - %m%n
# Flume中日志的格式设置
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.52.50
log4j.appender.flume.Port = 44444
log4j.appender.flume.UnsafeMode = true
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
log4j.appender.flume.layout.ConversionPattern=%m

pom.xml 文件中添加依赖

1
2
3
4
5
6
<!--配置flume依赖和log4j依赖-->
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.6.0</version>
</dependency>

Flume的采集文件配置

  • Source采用的是avro source 往44444端口发送数据
  • Channel采用的是memory channel
  • Sink 采用的是logger的方式,向控制台发送数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source 配置Agent的source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

# Describe the sink 配置Agent的sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory 配置Agent的channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel 配置source连接channel和channel连接sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1