案例1:双层Flume串联
双层flume衔接,第一层从exec采集sink到avro中,第二层从上一层的avro接收采集输出到控制台
第一层采用 exec source ===> memory channel ===> avro sink
第二层采用 avro source ===> memory channel ===> logger sink
Agent1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
   | a1.sources = r1 a1.sinks = k1 a1.channels = c1
  a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444
  a1.sinks.k1.type = logger
  a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
  a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
   | 
 
Agent2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
   | a1.sources = r1 a1.sinks = k1 a1.channels = c1
  a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/hadoop/data/flumeData/word.log a1.sources.r1.shell = /bin/bash -c
  a1.sinks.k1.type = avro a1.sinks.k1.hostname = 0.0.0.0 a1.sinks.k1.port = 44444
  a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
  a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
   | 
 
启动脚本
1 2 3 4 5 6 7 8 9 10 11 12
   | bin/flume-ng agent \ --conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \ --conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/exec-avro.conf \ --name a1 \ -Dflume.root.logger=INFO,console
 
  bin/flume-ng agent \ --conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \ --conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/avro-logger.conf \ --name a1 \ -Dflume.root.logger=INFO,console
   | 
 
案例2.双Source 单Sink
采用的是 exec 和 nc 两个source 使用 logger sink
Agent1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
   | a1.sources = r1 r2 a1.sinks = k1 a1.channels = c1
  a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/hadoop/data/flumeData/word.log a1.sources.r1.shell = /bin/bash -c
  a1.sources.r2.type = netcat a1.sources.r2.bind = localhost a1.sources.r2.port = 44444
  a1.sinks.k1.type = logger
  a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
  a1.sources.r1.channels = c1 a1.sources.r2.channels = c1 a1.sinks.k1.channel = c1
   | 
 
启动脚本
1 2 3 4 5
   | bin/flume-ng agent \ --conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \ --conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/twosource.conf \ --name a1 \ -Dflume.root.logger=INFO,console
   | 
 
3.单Soure双Sink
采用exec soure ,sink分别使用logger sink 和 hdfs sink
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
   | a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1
  a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/hadoop/data/flumeData/word.log a1.sources.r1.shell = /bin/bash -c
  a1.sinks.k1.type = logger
  a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.path = hdfs: a1.sinks.k2.hdfs.filePrefix = bigdata- a1.sinks.k2.hdfs.fileType = DataStream  a1.sinks.k2.hdfs.writeFormat = Tex
  a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
  a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1
   | 
 
启动脚本
1 2 3 4 5
   | bin/flume-ng agent \ --conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \ --conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/twosink.conf \ --name a1 \ -Dflume.root.logger=INFO,console
   | 
 
Replacting Channel Selector(default)
不配置的话默认就是replacting Channel Selector,该Selector以复制的方式将event写到一至多个channel里面去。
1 2
   | nc soure  ===> channel1  ===> sink1 		  ===> channel2  ===> sink2
   | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
   | a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2
  a1.sources.r1.selector.type = replicating
  a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444
  a1.sinks.k1.type = logger
  a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.path = hdfs: a1.sinks.k2.hdfs.filePrefix = bigdata- a1.sinks.k2.hdfs.fileType = DataStream  a1.sinks.k2.hdfs.writeFormat = Tex
  a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
  a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100
  a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
   | 
 
启动脚本:
1 2 3 4 5
   | bin/flume-ng agent \ --conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \ --conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/replactingchannelselector.conf \ --name a1 \ -Dflume.root.logger=INFO,console
   | 
 
Multiplexing Channel Selector
1 2 3 4 5 6 7 8 9
   | agent1									===>  agent2		agent4 ===> multplexing   	===> agent3									===>
  agent1 端口44441 ===> 55555  Inceptor 加US agent2 端口44442 ===> 55555  Inceptor 加CN agent3 端口44443 ===> 55555  Inceptor 加EN
  agent4 source 55555  ===> 根据header进行选择所发送到的channel
   | 
 
Agent1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
   | a1.sources = r1 a1.sinks = k1 a1.channels = c1
  a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44441
  a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = state a1.sources.r1.interceptors.i1.value = CN
  a1.sinks.k1.type = avro a1.sinks.k1.hostname = 0.0.0.0 a1.sinks.k1.port = 55555
  a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
  a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
   | 
 
Agent2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
   | a1.sources = r1 a1.sinks = k1 a1.channels = c1
  a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44442
  a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = state a1.sources.r1.interceptors.i1.value = US
  a1.sinks.k1.type = avro a1.sinks.k1.hostname = 0.0.0.0 a1.sinks.k1.port = 55555
  a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
  a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
   | 
 
Agent3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
   | a1.sources = r1 a1.sinks = k1 a1.channels = c1
  a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44443
  a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = state a1.sources.r1.interceptors.i1.value = ES
  a1.sinks.k1.type = avro a1.sinks.k1.hostname = 0.0.0.0 a1.sinks.k1.port = 55555
  a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
  a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
   | 
 
Agent4
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
   | a1.sources = r1 a1.sinks = k1 k2 k3 a1.channels = c1 c2 c3
  a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 55555
  a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CN = c1 a1.sources.r1.selector.mapping.US = c2  a1.sources.r1.selector.default = c3
  a1.channels.c1.type = memory a1.channels.c2.type = memory a1.channels.c3.type = memory
  a1.sinks.k1.type = logger
  a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.path = hdfs: a1.sinks.k2.hdfs.filePrefix = flume- a1.sinks.k2.hdfs.fileType = DataStream  a1.sinks.k2.hdfs.writeFormat = Tex
  a1.sinks.k3.type = hdfs a1.sinks.k3.hdfs.path = hdfs: a1.sinks.k3.hdfs.filePrefix = flume- a1.sinks.k3.hdfs.fileType = DataStream  a1.sinks.k3.hdfs.writeFormat = Tex
  a1.sources.r1.channels = c1 c2 c3 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2 a1.sinks.k3.channel = c3
   | 
 
启动脚本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
   | bin/flume-ng agent \ --conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \ --conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/multi1.conf \ --name a1 \ -Dflume.root.logger=INFO,console
  bin/flume-ng agent \ --conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \ --conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/multi2.conf \ --name a1 \ -Dflume.root.logger=INFO,console
  bin/flume-ng agent \ --conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \ --conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/multi3.conf \ --name a1 \ -Dflume.root.logger=INFO,console
  bin/flume-ng agent \ --conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \ --conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/multi4.conf \ --name a1 \ -Dflume.root.logger=INFO,console
   | 
 
Flume Sink Processors
三种类型分别是fail over、load_balance、default,生产上面采用fail over这种方式。
1 2 3
   | 								   avro sink----avro source ===> logger sink nc source ===> memory channel ===> 								   avro sink----avro source ===> logger sink
   | 
 
数值越大,优先级越高。测试先启动两个avro-logger1,发送数据,发现数据都往55552中去了,手动停掉55552,再次发送数据发现 Agent3报错,随后又重新将数据发送到55551上面,测试failover功能成功。
Agent1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
   | a1.sources = r1 a1.sinks = k1 a1.channels = c1
  a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 55551
  a1.sinks.k1.type = logger
  a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
  a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
   | 
 
Agent2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
   | a1.sources = r1 a1.sinks = k1 a1.channels = c1
  a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 55552
  a1.sinks.k1.type = logger
  a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
  a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
   | 
 
Agent3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
   | a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1
  a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444
  a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000
  a1.channels.c1.type = memory
  a1.sinks.k1.type = avro a1.sinks.k1.hostname = 0.0.0.0 a1.sinks.k1.port = 55551
  a1.sinks.k2.type = avro a1.sinks.k2.hostname = 0.0.0.0 a1.sinks.k2.port = 55552
  a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1
   | 
 
启动脚本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
   | bin/flume-ng agent \ --conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \ --conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/sinkprocessors/avro-logger1.conf \ --name a1 \ -Dflume.root.logger=INFO,console
  bin/flume-ng agent \ --conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \ --conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/sinkprocessors/avro-logger2.conf \ --name a1 \ -Dflume.root.logger=INFO,console
  bin/flume-ng agent \ --conf /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/conf \ --conf-file /home/hadoop/app/flume-1.6.0-cdh5.16.2-bin/script/sinkprocessors/nc-multi-sink.conf \ --name a1 \ -Dflume.root.logger=INFO,console
   |