flume 进阶

Posted by Jackson on 2018-01-06

案例1:双层Flume串联

双层flume衔接,第一层从exec采集sink到avro中,第二层从上一层的avro接收采集输出到控制台

第一层采用 exec source ===> memory channel ===> avro sink
第二层采用 avro source ===> memory channel ===> logger sink

Agent1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Agent2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/flumeData/word.log
a1.sources.r1.shell = /bin/bash -c

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 0.0.0.0
a1.sinks.k1.port = 44444

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动脚本

1
2
3
4
5
6
7
8
9
10
11
12
bin/flume-ng agent \
--conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \
--conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/exec-avro.conf \
--name a1 \
-Dflume.root.logger=INFO,console


bin/flume-ng agent \
--conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \
--conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/avro-logger.conf \
--name a1 \
-Dflume.root.logger=INFO,console

案例2.双Source 单Sink

采用的是 exec 和 nc 两个source 使用 logger sink

Agent1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
a1.sources = r1 r2
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/flumeData/word.log
a1.sources.r1.shell = /bin/bash -c

a1.sources.r2.type = netcat
a1.sources.r2.bind = localhost
a1.sources.r2.port = 44444

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sinks.k1.channel = c1

启动脚本

1
2
3
4
5
bin/flume-ng agent \
--conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \
--conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/twosource.conf \
--name a1 \
-Dflume.root.logger=INFO,console

3.单Soure双Sink

采用exec soure ,sink分别使用logger sink 和 hdfs sink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/flumeData/word.log
a1.sources.r1.shell = /bin/bash -c

a1.sinks.k1.type = logger

a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://bigdata01:9000/flume/twosink
a1.sinks.k2.hdfs.filePrefix = bigdata-
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.writeFormat = Tex

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

启动脚本

1
2
3
4
5
bin/flume-ng agent \
--conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \
--conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/twosink.conf \
--name a1 \
-Dflume.root.logger=INFO,console

Replacting Channel Selector(default)

不配置的话默认就是replacting Channel Selector,该Selector以复制的方式将event写到一至多个channel里面去。

1
2
nc soure  ===> channel1  ===> sink1
===> channel2 ===> sink2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

a1.sources.r1.selector.type = replicating

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinks.k1.type = logger

a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://bigdata01:9000/flume/twosink
a1.sinks.k2.hdfs.filePrefix = bigdata-
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.writeFormat = Tex

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

启动脚本:

1
2
3
4
5
bin/flume-ng agent \
--conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \
--conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/replactingchannelselector.conf \
--name a1 \
-Dflume.root.logger=INFO,console

Multiplexing Channel Selector

1
2
3
4
5
6
7
8
9
agent1									===> 
agent2 agent4 ===> multplexing ===>
agent3 ===>

agent1 端口44441 ===> 55555 Inceptor 加US
agent2 端口44442 ===> 55555 Inceptor 加CN
agent3 端口44443 ===> 55555 Inceptor 加EN

agent4 source 55555 ===> 根据header进行选择所发送到的channel

Agent1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44441

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = state
a1.sources.r1.interceptors.i1.value = CN

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 0.0.0.0
a1.sinks.k1.port = 55555

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Agent2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44442

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = state
a1.sources.r1.interceptors.i1.value = US

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 0.0.0.0
a1.sinks.k1.port = 55555

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Agent3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44443

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = state
a1.sources.r1.interceptors.i1.value = ES

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 0.0.0.0
a1.sinks.k1.port = 55555

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Agent4

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
a1.sources = r1
a1.sinks = k1 k2 k3
a1.channels = c1 c2 c3

a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 55555

a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CN = c1
a1.sources.r1.selector.mapping.US = c2
a1.sources.r1.selector.default = c3

a1.channels.c1.type = memory
a1.channels.c2.type = memory
a1.channels.c3.type = memory

a1.sinks.k1.type = logger

a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://bigdata01:9000/flume/sink2
a1.sinks.k2.hdfs.filePrefix = flume-
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.writeFormat = Tex

a1.sinks.k3.type = hdfs
a1.sinks.k3.hdfs.path = hdfs://bigdata01:9000/flume/sink3
a1.sinks.k3.hdfs.filePrefix = flume-
a1.sinks.k3.hdfs.fileType = DataStream
a1.sinks.k3.hdfs.writeFormat = Tex

a1.sources.r1.channels = c1 c2 c3
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k3.channel = c3

启动脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
bin/flume-ng agent \
--conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \
--conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/multi1.conf \
--name a1 \
-Dflume.root.logger=INFO,console

bin/flume-ng agent \
--conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \
--conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/multi2.conf \
--name a1 \
-Dflume.root.logger=INFO,console

bin/flume-ng agent \
--conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \
--conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/multi3.conf \
--name a1 \
-Dflume.root.logger=INFO,console

bin/flume-ng agent \
--conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \
--conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/multi4.conf \
--name a1 \
-Dflume.root.logger=INFO,console

Flume Sink Processors

三种类型分别是fail over、load_balance、default,生产上面采用fail over这种方式。

1
2
3
								   avro sink----avro source ===> logger sink
nc source ===> memory channel ===>
avro sink----avro source ===> logger sink

数值越大,优先级越高。测试先启动两个avro-logger1,发送数据,发现数据都往55552中去了,手动停掉55552,再次发送数据发现 Agent3报错,随后又重新将数据发送到55551上面,测试failover功能成功。

Agent1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 55551

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Agent2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 55552

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Agent3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

a1.channels.c1.type = memory

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 0.0.0.0
a1.sinks.k1.port = 55551

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 0.0.0.0
a1.sinks.k2.port = 55552

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

启动脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
bin/flume-ng agent \
--conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \
--conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/sinkprocessors/avro-logger1.conf \
--name a1 \
-Dflume.root.logger=INFO,console

bin/flume-ng agent \
--conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \
--conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/sinkprocessors/avro-logger2.conf \
--name a1 \
-Dflume.root.logger=INFO,console

bin/flume-ng agent \
--conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \
--conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/sinkprocessors/nc-multi-sink.conf \
--name a1 \
-Dflume.root.logger=INFO,console