Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统
。Flume 基于流式架构,灵活简单
Flume最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS或者Kafka,最主要的,flume是实时采集的
Agent 是一个 JVM 进程,主要有 3 个部分组成,Source、Channel、Sink,它以事件
的形式将数据从源头送至目的
Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、taildir、sequence generator、syslog、http、legacy
doPut:将数据写进临时缓冲区putList
doCommit:检查channel内存队列是否足够合并
doRollback:内存队列不足,回滚数据
Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运作在不同的速率上Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个Sink 的读取操作
Flume 自带两种 Channel:Memory Channel 和 File Channel
Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent,Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义
doTake:先将数据取到临时缓冲区takeList
doCommit:提交到Sink,成功提交清空takeList
doRollback:发送出现异常,将takeList数据归还给内存队列
传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。Event 由 Header 和 Body 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组
简单串联
这种模式是将多个 flume 顺序连接起来了,从最初的 source 开始到最终 sink 传送的目的存储系统。此模式不建议桥接过多的 flume 数量, flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统
复制和多路复用
单 source,多 channel、sink,Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的地
负载均衡和故障转移
Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能
聚合
这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用 flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志
注意,一个sink最多对应一个channel,而一个channel可以对应更多sink
Sinkprocessor有三种,第一种defaultsinkprocessor,一个channel只能绑定一个sink,第二种loadbalancingsinkprocessor,负载均衡,多个sink轮流查看channel,第三种failoversinkprocessor,故障转移,对多个sink可以配置优先级,若优先级高的sink挂掉后,转移到优先级低的sink
官网下载安装包
http://archive.apache.org/dist/flume/1.9.0/
解压
tar -zxvf ./apache-flume-1.9.0-bin.tar.gz -C .
改名
mv ./apache-flume-1.9.0-bin/ ./flume-1.9
将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3
rm /opt/module/flume-1.9/lib/guava-11.0.2.jar
需求
使用 Flume 监听一个端口,收集该端口数据,并打印到控制台
实现步骤
(1)安装 netcat 工具
sudo yum install -y nc
(2)判断 44444 端口是否被占用
sudo netstat -nlp | grep 44444
(3)创建 Flume Agent 配置文件 flume-netcat-logger.conf
(4)在 flume 目录下创建 job 文件夹并进入 job 文件夹
(5)在 job 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf
[gzhu@hadoop102 flume-1.9]$ mkdir job
[gzhu@hadoop102 flume-1.9]$ cd job/
[gzhu@hadoop102 job]$ vim flume-netcat-logger.conf
(6)在 flume-netcat-logger.conf 文件中添加如下内容
# Name the components on this agent
# a1就是表示当前agent,单台机器里flume下a1要唯一
a1.sources = r1 # r1表示a1的输入源
a1.sinks = k1 # k1表示a1的输出目的地
a1.channels = c1 # c1表示a1的缓冲区channel
# Describe/configure the source
a1.sources.r1.type = netcat # source的类型
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink sink的类型 logger打印到控制台
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000 # channel最多放1000个event
a1.channels.c1.transactionCapacity = 100 # 一个事务的做多放100个event
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 # 一个source可以绑定多个channel
a1.sinks.k1.channel = c1 # 一个sink最多绑定一个channel
(7)先开启 flume 监听端口
bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
参数说明:
-c:表示配置文件存储在 conf/目录
-n:表示给 agent 起名为 a1
-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf文件
-Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、error
(8)使用 netcat 工具向本机的 44444 端口发送内容
nc localhost 44444
(9)在 Flume 监听页面观察接收数据情况
需求
实时监控日志,并上传到 HDFS 中
实现步骤
(1)Flume 要想将数据输出到 HDFS,依赖 Hadoop 相关 jar 包,上传到/opt/module/flume-1.9/lib,以下jar包可在Hadoop的依赖下找到
实现source是exec,代表执行一条命令,sink就是HDFS了
(2)检查/etc/profile.d/my_env.sh 文件,确认 Hadoop 和 Java 环境变量配置正确
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_212
export PATH=$PATH:$JAVA_HOME/bin
#HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
(2)创建 flume-file-hdfs.conf 文件
[gzhu@hadoop102 job]$ vim flume-file-hdfs.conf
注:要想读取 Linux 系统中的文件,就得按照 Linux 命令的规则执行命令。由于 Hive日志在 Linux 系统中所以读取文件的类型选择:exec 即 execute 执行的意思。表示执行Linux 命令来读取文件
添加如下内容
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec # 执行命令 tail -f 循环读取日志文件 默认最后10行 可能会重复数据
a2.sources.r2.command = tail -F /opt/module/flume-1.9/exec.log
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹 1小时1次
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
(4)在flume-1.9文件下创建exec.log
(5)运行 Flume
[gzhu@hadoop102 flume-1.9]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
(5)开启 Hadoop
(6)往文件中追加数据
[gzhu@hadoop102 flume-1.9]$ echo spark >> exec.log
[gzhu@hadoop102 flume-1.9]$ echo hive >> exec.log
(7)在 HDFS 上查看文件
需求
使用Flume监听整个目录上传到HDFS
实现source是spooldir,sink是hdfs
实现步骤
(1)创建配置文件 flume-dir-hdfs.conf
创建一个文件
[gzhu@hadoop102 job]$ vim flume-dir-hdfs.conf
添加如下内容
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume-1.9/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp 结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload- #是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
(2)启动监控文件夹命令
[gzhu@hadoop102 flume-1.9]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf
说明:在使用 Spooling Directory Source 时,不要在监控目录中创建并持续修改文件;上传完成的文件会以.COMPLETED 结尾;被监控文件夹每 500 毫秒扫描一次文件变动
(3)向 upload 文件夹中添加文件
在/opt/module/flume-1.9 目录下创建 upload 文件夹
[gzhu@hadoop102 flume-1.9]$ mkdir upload
向 upload 文件夹中添加文件
[gzhu@hadoop102 upload]$ touch gzhu.log
[gzhu@hadoop102 upload]$ touch gzhu.tmp
[gzhu@hadoop102 upload]$ touch gzhu.txt
(4)查看 HDFS 上的数据
注意,当我们监控这个目录时,文件一放进去,就立马被上传到了HDFS,并且已经上传了的文件会被标记成.COMPLETED,这样才不会导致重复上传,这样我们不能修改文件了,因为即使修改了,由于有后缀也不会上传了
需求:使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS
实现source是taildir(可以断点续传),实现sink是hdfs
/opt/module/flume-1.9/job
vim flume-taildir-hdfs.conf
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume-1.9/tail_dir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume-1.9/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume-1.9/files2/.*log.*
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload2/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M`在这里插入代码片`
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf
在/opt/module/flume-1.9 目录下创建 files 文件夹
mkdir files
mkdir files2
进入文件夹并创建文件,通过echo进行数据的追加
cd file1
touch file1.txt
echo hello >> file1
tail_dir.json记录了每个文件追加到了哪个位置,因此可以准确的追加数据,但是,万一inode或者file有一个变化,那么就代表只是一个新的文件,flume认为该文件从来没有被上传过HDFS
注意点
1.监控的多个文件,这些文件的追加内容会进入一个文件里(滚动时间范围内)
2.假如我们监控Hive的日志,当天Hive的日志文件名字是hive.log,此时新增加的数据会被上传,但是当新的一天开始时,当天的hive.log会更名,会加上昨天的日期,例如,hive.log.2022-7-7,这样flume会认为这是一个新的文件,于是就导致了重复上传了
我们就写死,只监控hive.log这个文件,但是也有问题,比如,在晚上11.30机器挂掉了,而我们第二天才修复,那么此时昨天那半小时的数据已经监控不到了,因为已经改名了,不是hive.log了,所以这样会丢数据
解决办法有两种,第一种是找管控日志的人协商,生成日志时就带有日期,这样一天生成一本日志就好了。第二种办法就是修改flume的源码,只负责检测inode,这样文件改名后由于inode没变也不会重新上传了
需求
使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem,这是replicating
很明显,我们使用了3个agent,因此需要3个flume配置文件,我们统一将这个三个配置文件放在job目录下的group1文件夹下
第一个配置文件 flume-file-flume.conf
配置 1 个接收日志文件的 source 和两个 channel、两个 sink,分别输送给 flume-flume-hdfs 和 flume-flume-dir
vim flume-file-flume.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有 channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/flume-1.9/files/exec.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
# sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
第二个配置文件 flume-flume-hdfs.conf
配置上级 Flume 输出的 Source,输出是到 HDFS 的 Sink
vim flume-flume-hdfs.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
# source 端的 avro 是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k1.hdfs.rollCount = 0
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
第三个配置文件 flume-flume-dir.conf
配置上级 Flume 输出的 Source,输出是到本地目录的 Sink
vim flume-flume-dir.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/flume-1.9/data
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
提示:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录
执行配置文件
先启动服务端,再启动客户端
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf
向exec.log追加数据
echo My Name is Jack ! >> exec.log
我们来看下官方的案例,一个source连接了4个channel,当我们选择器是multiplexing时,首先判断event中的头信息,我们知道,一个event是由头信息和body组成的,而头信息是一个K-V结构,那么下面这段代码应该清楚了,判断state,如果是CZ,发送c1这个channel,如果是US,发往c2和c3,否则发往c4,那现在问题来了,怎么向一个event里头信息里添加state-V这个数据呢?用拦截器
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state #以每个Event的header中的state这个属性的值作为选择channel的依据
a1.sources.r1.selector.mapping.CZ = c1 #如果state=CZ,则选择c1这个channel
a1.sources.r1.selector.mapping.US = c2 c3 #如果state=US,则选择c2 和 c3 这两个channel
a1.sources.r1.selector.default = c4 #默认使用c4这个channel
拦截器
我们想要实现拦截器,需要自定义实现拦截器类
<dependency>
<groupId>org.apache.flumegroupId>
<artifactId>flume-ng-coreartifactId>
<version>1.9.0version>
dependency>
// 注意 Interceptor 接口是flume下的
public class TypeInterceptor implements Interceptor {
private List<Event> addList;
public void initialize() {
addList = new ArrayList<Event>();
}
// 单个事件处理方法
public Event intercept(Event event) {
// 头信息是个map结构,我们首先获取头信息结构
Map<String, String> headers = event.getHeaders();
// body默认是字节数组 我们转成字符串就行了
String body = new String(event.getBody());
// 我们拿到body也就是发送过来的数据了,再进行加工就可以了
// 我们这里假设 将点击行为 和 支付行为分开 将来我们区分的时候,就是根据某一个key的不同value值进行区分的!
if(body.contains("click")){
// key随便起名字
headers.put("type","click");
}else {
headers.put("type","pay");
}
// event进行了加工,添加了头信息 返回就好了
return event;
}
// 多个事件的处理
public List<Event> intercept(List<Event> list) {
// 先清空我们定义的集合 因为不同批次的事件肯定不能再发一遍 清空 保证集合中只有本批次的事件
addList.clear();
list.forEach(event -> {
// 单个事件的处理逻辑和上面是一样的,直接调用方法就好了
Event intercept = intercept(event);
addList.add(intercept);
});
return addList;
}
public void close() {
}
// 这里我们要特别注意 官网的案例要求我们必须有一个Builder
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new TypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
打包,将jar包放到flume的lib下
我们只需要将上面复制的案例改成多路复用就行了
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 区别就在这里
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.gzhu.interceptor.TypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.click = c1
a1.sources.r1.selector.mapping.pay = c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
你可以把多个sink分成一个组, 这时候Sink组逻辑处理器(Flume Sink Processors)可以对这同一个组里的几个sink进行负载均衡或者其中一个sink发生故障后将输出Event的任务转移到其他的sink上
Sinkprocessor有三种,第一种defaultsinkprocessor,一个channel只能绑定一个sink,第二种loadbalancingsinkprocessor,负载均衡,多个sink轮流查看channel,第三种failoversinkprocessor,故障转移,对多个sink可以配置优先级,若优先级高的sink挂掉后,转移到优先级低的sink
我们假设flume-2的优先级高,它挂掉后将数据送到flume-3
在/opt/module/flume-1.9/job 目录下创建 group2 文件夹
第一个配置文件 flume-file-flume.conf
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/flume-1.9/files/exec.log
a1.sources.r1.shell = /bin/bash -c
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
第二个配置文件 flume-flume-console1.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = logger
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
第三个配置文件 flume-flume-console2.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
执行配置文件
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-file-flume.conf
向exec.log追加数据
echo zhang >> exec.log
可以看到数据到了console2,也就是4142端口,10>5,优先级高
断开4142,再追加数据
数据到了console1,也就是4141端口
负载均衡Sink 选择器提供了在多个sink上进行负载均衡流量的功能。 它维护一个活动sink列表的索引来实现负载的分配。 默认支持了轮询(round_robin)和随机(random)两种选择机制分配负载
。 默认是轮询,可以通过配置来更改。也可以从 AbstractSinkSelector 继承写一个自定义的选择器
工作时,此选择器使用其配置的选择机制选择下一个sink并调用它。 如果所选sink无法正常工作,则处理器通过其配置的选择机制选择下一个可用sink。 此实现不会将失败的Sink列入黑名单,而是继续乐观地尝试每个可用的Sink
如果所有sink调用都失败了,选择器会将故障抛给sink的运行器
如果backoff设置为true则启用了退避机制,失败的sink会被放入黑名单,达到一定的超时时间后会自动从黑名单移除。 如从黑名单出来后sink仍然失败,则再次进入黑名单而且超时时间会翻倍,以避免在无响应的sink上浪费过长时间。 如果没有启用退避机制,在禁用此功能的情况下,发生sink传输失败后,会将本次负载传给下一个sink继续尝试,因此这种情况下是不均衡的
我们只需要将上面flume-file-flume.conf改一点参数就可以了
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random # 也可以round_robin
a1.sinkgroups.g1.processor.backoff = true
失败的sink是否成倍地增加退避它的时间。 如果设置为false,负载均衡在某一个sink发生异常后,下一次选择sink的时候仍然会将失败的这个sink加入候选队列; 如果设置为true,某个sink连续发生异常时会成倍地增加它的退避时间,在退避的时间内是无法参与负载均衡竞争的。退避机制只统计1个小时(可以设置)发生的异常,超过1个小时没有发生异常就会重新计算
采集多台服务器的日志文件,聚合到HDFS
由于涉及了多台机器,首先要分发 Flume
xsync /opt/module/flume-1.9
[gzhu@hadoop102 job]$ mkdir group3
[gzhu@hadoop103 job]$ mkdir group3
[gzhu@hadoop104 job]$ mkdir group3
hadoop102编写配置文件
[gzhu@hadoop102 job]$ cd group3
vim flume1-exec-flume.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/tmp/exec.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
hadoop103编写配置文件
vim flume2-exec-flume.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = exec
a2.sources.r1.command = tail -F /opt/module/tmp/input.log
a2.sources.r1.shell = /bin/bash -c
# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
hadoop104收集
vim flume3-flume-hdfs.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141
a3.sinks.k1.type = hdfs
a3.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume4/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k1.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k1.hdfs.rollCount = 0
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
分别启动
[gzhu@hadoop104 flume-1.9]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-hdfs.conf
[gzhu@hadoop103 flume-1.9]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume2-exec-flume.conf
[gzhu@hadoop102 flume-1.9]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume1-exec-flume.conf
测试
Ganglia 由 gmond、gmetad 和 gweb 三部分组成
gmond(Ganglia Monitoring Daemon)是一种轻量级服务,安装在每台需要收集指标数据的节点主机上。使用 gmond,你可以很容易收集很多系统指标数据,如 CPU、内存、磁盘、网络和活跃进程的数据等
gmetad(Ganglia Meta Daemon)整合所有信息,并将其以 RRD 格式存储至磁盘的服务
gweb(Ganglia Web)Ganglia 可视化工具,gweb 是一种利用浏览器显示 gmetad 所存储数据的 PHP 前端。在 Web 界面中以图表方式展现集群的运行状态下收集的多种不同指标数据
==安装 ganglia ==
(1)规划
hadoop102: web gmetad gmod
hadoop103: gmod
hadoop104: gmod
(2)在 102 103 104 分别安装 epel-release
[gzhu@hadoop102 flume-1.9]$ sudo yum -y install epel-release
(3)在 102 安装
[gzhu@hadoop102 flume-1.9]$ sudo yum -y install ganglia-gmetad
[gzhu@hadoop102 flume-1.9]$ sudo yum -y install ganglia-web
[gzhu@hadoop102 flume-1.9]$ sudo yum -y install ganglia-gmond
(4)在 103 和 104 安装
[gzhu@hadoop102 flume-1.9]$ sudo yum -y install ganglia-gmond
(5)在 102 修改配置文件/etc/httpd/conf.d/ganglia.conf
[gzhu@hadoop102 flume-1.9]$ sudo vim /etc/httpd/conf.d/ganglia.conf
# Ganglia monitoring system php web frontend
#
Alias /ganglia /usr/share/ganglia
<Location /ganglia>
# Require local
# 通过 windows 访问 ganglia,需要配置 Linux 对应的主机(windows)ip 地址
Require ip windows主机ipv4,ipconfig查看
# Require ip 10.1.2.3
# Require host example.org
</Location>
(6)在 102 修改配置文件/etc/ganglia/gmetad.conf
[gzhu@hadoop102 flume-1.9]$ sudo vim /etc/ganglia/gmetad.conf
修改为:
data_source “my cluster” hadoop102
(7)在 102 103 104 修改配置文件/etc/ganglia/gmond.conf
[gzhu@hadoop102 flume-1.9]$ sudo vim /etc/ganglia/gmond.conf
修改为: 修改三个地方 name host bind
cluster {
name = "my cluster" # 修改1
owner = "unspecified"
latlong = "unspecified"
url = "unspecified"
}
udp_send_channel {
#bind_hostname = yes # Highly recommended, soon to be default.
# This option tells gmond to use a source
address
# that resolves to the machine's hostname.
Without
# this, the metrics may appear to come from
any
# interface and the DNS names associated with
# those IPs will be used to create the RRDs.
# mcast_join = 239.2.11.71
# 数据发送给 hadoop102
host = hadoop102 # 修改2 注意每台机器不同
port = 8649
ttl = 1
}
udp_recv_channel {
# mcast_join = 239.2.11.71
port = 8649
# 接收来自任意连接的数据
bind = 0.0.0.0 # 修改3
retry_bind = true
# Size of the UDP buffer. If you are handling lots of metrics
you really
# should bump it up to e.g. 10MB or even higher.
# buffer = 10485760
}
(8)在 102 修改配置文件/etc/selinux/config
[gzhu@hadoop102 flume-1.9]$ sudo vim /etc/selinux/config
修改为:
# This file controls the state of SELinux on the system.
# SELINUX= can take one of these three values:
# enforcing - SELinux security policy is enforced.
# permissive - SELinux prints warnings instead of enforcing.
# disabled - No SELinux policy is loaded.
SELINUX=disabled
# SELINUXTYPE= can take one of these two values:
# targeted - Targeted processes are protected,
# mls - Multi Level Security protection.
SELINUXTYPE=targeted
临时生效
sudo setenforce 0
(9)启动 ganglia (1)在 102 103 104 启动
[gzhu@hadoop102 flume-1.9]$ sudo systemctl start gmond
在 102 启动
sudo systemctl start httpd
sudo systemctl start gmetad
(10)打开网页浏览 ganglia 页面
http://hadoop102/ganglia
如果完成以上操作依然出现权限不足错误,请修改/var/lib/ganglia 目录的权限:
[gzhu@hadoop102 flume-1.9]$ sudo chmod -R 777 /var/lib/ganglia
启动 Flume 任务 时加上参数
bin/flume-ng agent \
-c conf/ \
-n a1 \
-f job/flume-netcat-logger.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=ganglia \
-Dflume.monitoring.hosts=hadoop102:8649