事务处理流程如下:


ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel
其共有两种类型,分别是 Replicating(复制)和 Multiplexing(多路复用)
SinkProcessor 共有三种类型 , 分别是 DefaultSinkProcessor 、LoadBalancingSinkProcessor、FailoverSinkProcessor

将多个 flume 顺序连接起来,从最初的 source 开始到最终 sink 传送的目的存储系统
不建议桥接过多的 flume 数量, flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统

(单 source,多 channel、sink)
Flume 支持将事件流向一个或者多个目的地
这种模式可以将相同数据复制到多个channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的地

Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能、
这里的agent1有三个sink,分别连接agent2,agent3,agent4,即使其中有的sink出现了故障,数据还是能同步到hdfs

业务中常用,比如说日志采集功能:
日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器,产生的日志处理起来也非常麻烦
可以采用聚合的方式,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析
需求:使用 Flume-1 监控文件变动
实现流程:
1.在job下创建文件夹group1,并在其中创建配置文件flume-file-flume.conf
配置文件中需要有1个source,2个channel,2个sink
- # Name the components on this agent
- a1.sources = r1
- a1.sinks = k1 k2
- a1.channels = c1 c2
-
- # 将数据流复制给所有 channel
- a1.sources.r1.selector.type = replicating
-
- # Describe/configure the source
- a1.sources.r1.type = exec
- a1.sources.r1.command = tail -F /opt/module/apache-hive-3.1.2-bin/logs/hive.log
- a1.sources.r1.shell = /bin/bash -c
-
- # Describe the sink
- # sink 端的 avro 是一个数据发送者
- a1.sinks.k1.type = avro
- a1.sinks.k1.hostname = hadoop102
- a1.sinks.k1.port = 4141
- a1.sinks.k2.type = avro
- a1.sinks.k2.hostname = hadoop102
- a1.sinks.k2.port = 4142
-
- # Describe the channel
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
- a1.channels.c2.type = memory
- a1.channels.c2.capacity = 1000
- a1.channels.c2.transactionCapacity = 100
-
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1 c2
- a1.sinks.k1.channel = c1
- a1.sinks.k2.channel = c2
该配置文件的作用是将数据发送到两个不同的sink,再由sink发送到其他的agent进行处理
2.创建配置文件flume-flume-hdfs.conf
- # Name the components on this agent
- a2.sources = r1
- a2.sinks = k1
- a2.channels = c1
-
- # Describe/configure the source
- # source 端的 avro 是一个数据接收服务
- a2.sources.r1.type = avro
- a2.sources.r1.bind = hadoop102
- a2.sources.r1.port = 4141
-
- # Describe the sink
- a2.sinks.k1.type = hdfs
- a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume2/%Y%m%d/%H
- #上传文件的前缀
- a2.sinks.k1.hdfs.filePrefix = flume2-
- #是否按照时间滚动文件夹
- a2.sinks.k1.hdfs.round = true
- #多少时间单位创建一个新的文件夹
- a2.sinks.k1.hdfs.roundValue = 1
- #重新定义时间单位
- a2.sinks.k1.hdfs.roundUnit = hour
- #是否使用本地时间戳
- a2.sinks.k1.hdfs.useLocalTimeStamp = true
- #积攒多少个 Event 才 flush 到 HDFS 一次
- a2.sinks.k1.hdfs.batchSize = 100
- #设置文件类型,可支持压缩
- a2.sinks.k1.hdfs.fileType = DataStream
- #多久生成一个新的文件
- a2.sinks.k1.hdfs.rollInterval = 30
- #设置每个文件的滚动大小大概是 128M
- a2.sinks.k1.hdfs.rollSize = 134217700
- #文件的滚动与 Event 数量无关
- a2.sinks.k1.hdfs.rollCount = 0
-
- # Describe the channel
- a2.channels.c1.type = memory
- a2.channels.c1.capacity = 1000
- a2.channels.c1.transactionCapacity = 100
-
- # Bind the source and sink to the channel
- a2.sources.r1.channels = c1
- a2.sinks.k1.channel = c1
source绑定上一个agent的sink1,然后上传到hdfs
3.创建配置文件:flume-flume-dir.conf
- # Name the components on this agent
- a3.sources = r1
- a3.sinks = k1
- a3.channels = c2
-
- # Describe/configure the source
- a3.sources.r1.type = avro
- a3.sources.r1.bind = hadoop102
- a3.sources.r1.port = 4142
-
- # Describe the sink
- a3.sinks.k1.type = file_roll
- a3.sinks.k1.sink.directory = /home/why/data/flumeDemo/test1
-
- # Describe the channel
- a3.channels.c2.type = memory
- a3.channels.c2.capacity = 1000
- a3.channels.c2.transactionCapacity = 100
-
- # Bind the source and sink to the channel
- a3.sources.r1.channels = c2
- a3.sinks.k1.channel = c2
参数说明:
sink类型为file_roll:Flume 1.11.0 User Guide — Apache Flume
可以将events保存到本地文件系统
4.分别启动相应的flume进程:
nohup bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf &
nohup bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf &
nohup bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf &
5.在hdfs和文件夹中都能看到相应的内容:
hdfs:

文件系统:
![]()
需求:使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用FailoverSinkProcessor,实现故障转移的功能
实现流程:
1.在/opt/module/flume/job 目录下创建 group2 文件夹,创建配置文件flume-netcat-flume.conf
配置 1 个 netcat source 和 1 个 channel、1 个 sink group(2 个 sink),分别输送给flume-flume-console1 和 flume-flume-console2
- # Name the components on this agent
- a1.sources = r1
- a1.channels = c1
- a1.sinkgroups = g1
- a1.sinkgroups.g1.processor.type = failover
- a1.sinkgroups.g1.processor.priority.k1 = 5
- a1.sinkgroups.g1.processor.priority.k2 = 10
- a1.sinkgroups.g1.processor.maxpenalty = 10000
- a1.sinks = k1 k2
-
- # Describe/configure the source
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = localhost
- a1.sources.r1.port = 44444
-
-
- # Describe the sink
- a1.sinks.k1.type = avro
- a1.sinks.k1.hostname = hadoop102
- a1.sinks.k1.port = 4141
- a1.sinks.k2.type = avro
- a1.sinks.k2.hostname = hadoop102
- a1.sinks.k2.port = 4142
-
- # Describe the channel
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
-
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinkgroups.g1.sinks = k1 k2
- a1.sinks.k1.channel = c1
- a1.sinks.k2.channel = c1
参数说明:Flume 1.11.0 User Guide — Apache Flume
通过sink groups在一个agent中定义多个sink,并可以配置sink processor使用:Flume 1.11.0 User Guide — Apache Flume
2.创建 flume-flume-console1.conf
- # Name the components on this agent
- a2.sources = r1
- a2.sinks = k1
- a2.channels = c1
-
- # Describe/configure the source
- a2.sources.r1.type = avro
- a2.sources.r1.bind = hadoop102
- a2.sources.r1.port = 4141
-
- # Describe the sink
- a2.sinks.k1.type = logger
-
- # Describe the channel
- a2.channels.c1.type = memory
- a2.channels.c1.capacity = 1000
- a2.channels.c1.transactionCapacity = 100
-
- # Bind the source and sink to the channel
- a2.sources.r1.channels = c1
- a2.sinks.k1.channel = c1
sink输出到本地的控制台
3.创建 flume-flume-console2.conf
- # Name the components on this agent
- a3.sources = r1
- a3.sinks = k1
- a3.channels = c2
-
- # Describe/configure the source
- a3.sources.r1.type = avro
- a3.sources.r1.bind = hadoop102
- a3.sources.r1.port = 4142
-
- # Describe the sink
- a3.sinks.k1.type = logger
-
- # Describe the channel
- a3.channels.c2.type = memory
- a3.channels.c2.capacity = 1000
- a3.channels.c2.transactionCapacity = 100
-
- # Bind the source and sink to the channel
- a3.sources.r1.channels = c2
- a3.sinks.k1.channel = c2
sink输出到本地的控制台
4.执行指令:
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf
5.使用nc localhost 44444发送数据

由于console2设置的优先级高于console1,因此数据由console2接收到;

接下来将console2进程kill掉,数据就由console1接收了:

需求:
hadoop102 上的 Flume-1 监控文件/home/why/data/flumeDemo/test3/test3.log
hadoop103 上的 Flume-2 监控某一个端口的数据流
Flume-1 与 Flume-2 将数据发送给 hadoop104 上的 Flume-3,Flume-3 将最终数据打印到控制台
实现流程:
1.首先在三台服务器的job文件夹先创建目录group3
2.在hadoop102上,创建配置文件flume1-logger-flume.conf,source用于监控log日志文件,sink用于输出数据到下一级的Flume
- # Name the components on this agent
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
-
- # Describe/configure the source
- a1.sources.r1.type = exec
- a1.sources.r1.command = tail -F /home/why/data/flumeDemo/test3/test3.log
- a1.sources.r1.shell = /bin/bash -c
-
- # Describe the sink
- a1.sinks.k1.type = avro
- a1.sinks.k1.hostname = hadoop104
- a1.sinks.k1.port = 4141
-
- # Describe the channel
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
-
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
3.在hadoop103上,创建配置文件flume2-netcat-flume.conf,source用于监控端口44444的数据流,sink用于将数据传输到下一级的flume
- # Name the components on this agent
- a2.sources = r1
- a2.sinks = k1
- a2.channels = c1
-
- # Describe/configure the source
- a2.sources.r1.type = netcat
- a2.sources.r1.bind = localhost
- a2.sources.r1.port = 44444
-
- # Describe the sink
- a2.sinks.k1.type = avro
- a2.sinks.k1.hostname = hadoop104
- a2.sinks.k1.port = 4141
-
- # Use a channel which buffers events in memory
- a2.channels.c1.type = memory
- a2.channels.c1.capacity = 1000
- a2.channels.c1.transactionCapacity = 100
-
- # Bind the source and sink to the channel
- a2.sources.r1.channels = c1
- a2.sinks.k1.channel = c1
注意,这两个agent的sink目的地都是hadoop104这一个服务器,因此hostname和port都相同
4.在hadoop104上创建配置文件flume3-flume-logger.conf,source用于接收flume1和flume2发送来的数据流,sink用于输出数据到控制台;
- # Name the components on this agent
- a3.sources = r1
- a3.sinks = k1
- a3.channels = c1
-
- # Describe/configure the source
- a3.sources.r1.type = avro
- a3.sources.r1.bind = hadoop104
- a3.sources.r1.port = 4141
-
- # Describe the sink
- a3.sinks.k1.type = logger
-
- # Describe the channel
- a3.channels.c1.type = memory
- a3.channels.c1.capacity = 1000
- a3.channels.c1.transactionCapacity = 100
-
- # Bind the source and sink to the channel
- a3.sources.r1.channels = c1
- a3.sinks.k1.channel = c1
5.分别在三台服务器上执行指令
hadoop104:
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,consolehadoop102:
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume1-logger-flume.confhadoop103:
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume2-netcat-flume.conf
6.在hadoop102上向日志文件中追加内容:
echo "hello" > /home/why/data/flumeDemo/test3/test3.log
在hadoop103中通过nc hadoop103 44444向44444端口发送数据;
然后在hadoop104中即可接收到数据:
