1.1 案例演示:avro+memory+logger
Avro Source:监听一个指定的Avro端口,通过Avro端口可以获取到Avro client发送过来的文件,即只要应用程序通过Avro端口发送文件,source组件就可以获取到该文件中的内容,输出位置为Logger
1.1.1 编写采集方案
- [root@hadoop01 ~]# mkdir flumeconf
- [root@hadoop01 ~]# cd flumeconf/
- [root@hadoop01 flumeconf]# vim avro-logger.conf
- # 定义各个组件的名字
- a1.sources=s1
- a1.channels=c1
- a1.sinks=sk1
-
- # 对组件进行绑定操作
- a1.sources.s1.channels=c1
- a1.sinks.sk1.channel=c1
-
- # 定义sourse组件的属性
- a1.sources.s1.type=avro
- a1.sources.s1.bind=hadoop01
- a1.sources.s1.port=9999
-
- # 定义channel组件的属性
- a1.channels.c1.type=memory
-
- # 定义sink组件的属性
- a1.sinks.sk1.type=logger
- a1.sinks.sk1.maxBytesToLog=100
1.1.2 启动Agent 开始测试
[root@hadoop01 flumeconf]# flume-ng agent -c $FLUME_HOME/conf -f ./avro-logger.conf -n a1 -Dflume.root.logger=INFO,console
敲几个换行,方便一会区分。这是一个前台进程。
1.1.3 测试数据
新开一个窗口,准备数据文件
- [root@hadoop01 ~]# echo "hello flume" >> test
- [root@hadoop01 ~]# cat test
- hello flume
启动客户端
- [root@hadoop01 ~]# flume-ng avro-client -c $FLUME_HOME/conf -H hadoop01 -p 9999 -F ./test
-
- -c 指定配置文件所在路径
- -H 指定主机
- -p 指定监听端口
- -F 指定要发送的文件
然后回到第一个窗口
发现已经捕获到我发送的数据了
1.2 实时采集(监听文件):exec+memory+hdfs
Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源
#常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容memory:传输数据的Channel为Memory
hdfs 是输出目标为Hdfs
1.2.1 配置方案
- [root@hadoop01 flumeconf]# ll
- 总用量 4
- -rw-r--r-- 1 root root 404 8月 31 16:47 avro-logger.conf
- drwxr-xr-x 2 root root 23 8月 31 16:50 logs
- [root@hadoop01 flumeconf]# vim exec-hdfs.conf
- # 定义组件的名字
- a1.sources=s1
- a1.channels=c1
- a1.sinks=sk1
-
- # 组件之间的绑定
- a1.sources.s1.channels=c1
- a1.sinks.sk1.channel=c1
-
- # 配置source的属性
- a1.sources.s1.type=exec
- a1.sources.s1.command=tail -F /root/flume-test-exec-hdfs
-
- # 配置channel的属性
- a1.channels.c1.type=memory
-
- # 配置sink的属性
- a1.sinks.sk1.type=hdfs
- a1.sinks.sk1.hdfs.path=hdfs://hadoop01:8020/flume/exec-mem-hdfs/%Y-%m-%d
- a1.sinks.sk1.hdfs.useLocalTimeStamp=true
- a1.sinks.sk1.hdfs.rollInterval=30
- a1.sinks.sk1.hdfs.rollSize=10240
- a1.sinks.sk1.hdfs.rollCount=10
- a1.sinks.sk1.hdfs.writeFormat=Text
- a1.sinks.sk1.hdfs.fileType=DataStream
1.2.2 启动Agent
- [root@hadoop01 flumeconf]# flume-ng agent -f ./exec-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
-
- -f 指定采集方案
- -n 指定agent
敲几个换行,方便一会区分。这是一个前台进程。
1.2.3 测试数据
新开一个窗口准备数据文件
- [root@hadoop01 ~]# echo "hello world" >> flume-test-exec-hdfs
- [root@hadoop01 ~]# cat flume-test-exec-hdfs
- hello world
回到第一个窗口
通过hdfs发现目录文件已经生成好了
当我在第二个窗口再次对 flume-test-exec-hdfs
文件写入数据时,那么flume就会又新的文件产生
1.3 实时采集(监听目录):spool+ mem+logger
spool: Source来源于目录,有文件进入目录就摄取。
mem:通过内存来传输数据
logger:是传送数据到日志
1.3.1 配置方案
- [root@hadoop01 flumeconf]# vim spool-logger.conf
- # 设置组件
- a1.sources=s1
- a1.channels=c1
- a1.sinks=sk1
-
- # 组件绑定
- a1.sources.s1.channels=c1
- a1.sinks.sk1.channel=c1
-
- # 设置source的属性
- a1.sources.s1.type=spooldir
- a1.sources.s1.spoolDir=/root/flumeData
- a1.sources.s1.deletePolicy=never
- a1.sources.s1.fileSuffix=.COMPLETED
- a1.sources.s1.fileHeader=true
- a1.sources.s1.fileHeaderKey=file
- a1.sources.s1.basenameHeader=true
- a1.sources.s1.basenameHeaderKey=basename
-
- # 设置channel的属性
- a1.channels.c1.type=memory
-
- # 设置sink的属性
- a1.sinks.sk1.type=logger
- a1.sinks.sk1.maxBytesToLog=100
-
1.3.2 启动Agent
[root@hadoop01 flumeconf]# flume-ng agent -f ./spool-logger.conf -n a1 -Dflume.root.logger=INFO,console
启动后报告如下日常:文件不存在
去第二个窗口创建文件
[root@hadoop01 ~]# mkdir flumeData
然后回到第一个窗口再次启动agent
[root@hadoop01 flumeconf]# flume-ng agent -f ./spool-logger.conf -n a1 -Dflume.root.logger=INFO,console
敲几个换行,方便一会区分。这是一个前台进程。
1.3.3 测试
切换到第二个窗口
- [root@hadoop01 ~]# cd flumeData/
- [root@hadoop01 flumeData]# echo "test" >> test
回到第一个窗口,发现已经捕获到test
文件
1.4 案例演示:http+ mem+logger
http: 表示数据来源是http网络协议,一般接收的请求为get或post请求. 所有的http请求会通过插件格式的Handle转化为一个flume的Event数据.
mem:表示用内存传输通道
logger:表示输出格式为Logger格式
1.4.1 配置方案
- [root@hadoop01 flumeconf]# vim http-logger.conf
- # 定义组件
- a1.sources=s1
- a1.channels=c1
- a1.sinks=sk1
-
- # 绑定组件
- a1.soucres.s1.channels=c1
- a1.sinks.sk1.channel=c1
-
- # 设置source属性
- a1.sources.s1.type=http
- a1.sources.s1.bind=hadoop01
- a1.sources.s1.port=6666
- a1.sources.s1.handler=org.apache.flume.source.http.JSONHandler
-
- # 设置channel属性
- a1.channels.c1.type=memory
-
- # 设置sink的属性
- a1.sinks.sk1.type=Logger
- a1.sinks.sk1.maxBytesToLog=100
-
1.4.2 启动Agent
[root@hadoop01 flumeconf]# flume-ng agent -f ./http-logger.conf -n a1 -Dflume.root.logger=INFO,console
敲几个换行,方便一会区分。这是一个前台进程。
1.4.3 测试
切换到第二个窗口,我们用curl
这个命令模拟一个访问
[root@hadoop01 flumeData]# curl -X POST -d '[{"headers":{"name":"zhangsan","pwd":"123456"},"body":"this is my content"}]' http://hadoop01:6666
切换到第一个窗口,查看效果