• 【Flume-02】案例演示


    案例演示

    1.1 案例演示:avro+memory+logger

    Avro Source:监听一个指定的Avro端口,通过Avro端口可以获取到Avro client发送过来的文件,即只要应用程序通过Avro端口发送文件,source组件就可以获取到该文件中的内容,输出位置为Logger

    1.1.1 编写采集方案

    1. [root@hadoop01 ~]# mkdir flumeconf
    2. [root@hadoop01 ~]# cd flumeconf/
    3. [root@hadoop01 flumeconf]# vim avro-logger.conf
    4. # 定义各个组件的名字
    5. a1.sources=s1
    6. a1.channels=c1
    7. a1.sinks=sk1
    8. # 对组件进行绑定操作
    9. a1.sources.s1.channels=c1
    10. a1.sinks.sk1.channel=c1
    11. # 定义sourse组件的属性
    12. a1.sources.s1.type=avro
    13. a1.sources.s1.bind=hadoop01
    14. a1.sources.s1.port=9999
    15. # 定义channel组件的属性
    16. a1.channels.c1.type=memory
    17. # 定义sink组件的属性
    18. a1.sinks.sk1.type=logger
    19. a1.sinks.sk1.maxBytesToLog=100

    1.1.2 启动Agent 开始测试

    [root@hadoop01 flumeconf]# flume-ng agent -c $FLUME_HOME/conf -f ./avro-logger.conf -n a1 -Dflume.root.logger=INFO,console
    


     

    敲几个换行,方便一会区分。这是一个前台进程。

    1.1.3 测试数据

    新开一个窗口,准备数据文件

    1. [root@hadoop01 ~]# echo "hello flume" >> test
    2. [root@hadoop01 ~]# cat test
    3. hello flume

    启动客户端

    1. [root@hadoop01 ~]# flume-ng avro-client -c $FLUME_HOME/conf -H hadoop01 -p 9999 -F ./test
    2. -c 指定配置文件所在路径
    3. -H 指定主机
    4. -p 指定监听端口
    5. -F 指定要发送的文件

    然后回到第一个窗口


     

    发现已经捕获到我发送的数据了


    1.2 实时采集(监听文件):exec+memory+hdfs

    Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源
    #常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容

    memory:传输数据的Channel为Memory

    hdfs 是输出目标为Hdfs

    1.2.1 配置方案

    1. [root@hadoop01 flumeconf]# ll
    2. 总用量 4
    3. -rw-r--r-- 1 root root 404 831 16:47 avro-logger.conf
    4. drwxr-xr-x 2 root root 23 831 16:50 logs
    5. [root@hadoop01 flumeconf]# vim exec-hdfs.conf
    6. # 定义组件的名字
    7. a1.sources=s1
    8. a1.channels=c1
    9. a1.sinks=sk1
    10. # 组件之间的绑定
    11. a1.sources.s1.channels=c1
    12. a1.sinks.sk1.channel=c1
    13. # 配置source的属性
    14. a1.sources.s1.type=exec
    15. a1.sources.s1.command=tail -F /root/flume-test-exec-hdfs
    16. # 配置channel的属性
    17. a1.channels.c1.type=memory
    18. # 配置sink的属性
    19. a1.sinks.sk1.type=hdfs
    20. a1.sinks.sk1.hdfs.path=hdfs://hadoop01:8020/flume/exec-mem-hdfs/%Y-%m-%d
    21. a1.sinks.sk1.hdfs.useLocalTimeStamp=true
    22. a1.sinks.sk1.hdfs.rollInterval=30
    23. a1.sinks.sk1.hdfs.rollSize=10240
    24. a1.sinks.sk1.hdfs.rollCount=10
    25. a1.sinks.sk1.hdfs.writeFormat=Text
    26. a1.sinks.sk1.hdfs.fileType=DataStream

    1.2.2 启动Agent

    1. [root@hadoop01 flumeconf]# flume-ng agent -f ./exec-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
    2. -f 指定采集方案
    3. -n 指定agent


     

    敲几个换行,方便一会区分。这是一个前台进程。

    1.2.3 测试数据

    新开一个窗口准备数据文件

    1. [root@hadoop01 ~]# echo "hello world" >> flume-test-exec-hdfs
    2. [root@hadoop01 ~]# cat flume-test-exec-hdfs
    3. hello world

    回到第一个窗口


     

    通过hdfs发现目录文件已经生成好了


     

    当我在第二个窗口再次对 flume-test-exec-hdfs文件写入数据时,那么flume就会又新的文件产生

    1.3 实时采集(监听目录):spool+ mem+logger

    spool: Source来源于目录,有文件进入目录就摄取。

    mem:通过内存来传输数据

    logger:是传送数据到日志

    1.3.1 配置方案

    1. [root@hadoop01 flumeconf]# vim spool-logger.conf
    2. # 设置组件
    3. a1.sources=s1
    4. a1.channels=c1
    5. a1.sinks=sk1
    6. # 组件绑定
    7. a1.sources.s1.channels=c1
    8. a1.sinks.sk1.channel=c1
    9. # 设置source的属性
    10. a1.sources.s1.type=spooldir
    11. a1.sources.s1.spoolDir=/root/flumeData
    12. a1.sources.s1.deletePolicy=never
    13. a1.sources.s1.fileSuffix=.COMPLETED
    14. a1.sources.s1.fileHeader=true
    15. a1.sources.s1.fileHeaderKey=file
    16. a1.sources.s1.basenameHeader=true
    17. a1.sources.s1.basenameHeaderKey=basename
    18. # 设置channel的属性
    19. a1.channels.c1.type=memory
    20. # 设置sink的属性
    21. a1.sinks.sk1.type=logger
    22. a1.sinks.sk1.maxBytesToLog=100

    1.3.2 启动Agent

    [root@hadoop01 flumeconf]# flume-ng agent -f ./spool-logger.conf -n a1 -Dflume.root.logger=INFO,console
    

    启动后报告如下日常:文件不存在


     

    去第二个窗口创建文件

    [root@hadoop01 ~]# mkdir flumeData
    

    然后回到第一个窗口再次启动agent

    [root@hadoop01 flumeconf]# flume-ng agent -f ./spool-logger.conf -n a1 -Dflume.root.logger=INFO,console
    


     

    敲几个换行,方便一会区分。这是一个前台进程。

    1.3.3 测试

    切换到第二个窗口

    1. [root@hadoop01 ~]# cd flumeData/
    2. [root@hadoop01 flumeData]# echo "test" >> test

    回到第一个窗口,发现已经捕获到test文件


     

    1.4 案例演示:http+ mem+logger

    http: 表示数据来源是http网络协议,一般接收的请求为get或post请求. 所有的http请求会通过插件格式的Handle转化为一个flume的Event数据.

    mem:表示用内存传输通道

    logger:表示输出格式为Logger格式

    1.4.1 配置方案

    1. [root@hadoop01 flumeconf]# vim http-logger.conf
    2. # 定义组件
    3. a1.sources=s1
    4. a1.channels=c1
    5. a1.sinks=sk1
    6. # 绑定组件
    7. a1.soucres.s1.channels=c1
    8. a1.sinks.sk1.channel=c1
    9. # 设置source属性
    10. a1.sources.s1.type=http
    11. a1.sources.s1.bind=hadoop01
    12. a1.sources.s1.port=6666
    13. a1.sources.s1.handler=org.apache.flume.source.http.JSONHandler
    14. # 设置channel属性
    15. a1.channels.c1.type=memory
    16. # 设置sink的属性
    17. a1.sinks.sk1.type=Logger
    18. a1.sinks.sk1.maxBytesToLog=100

    1.4.2 启动Agent

    [root@hadoop01 flumeconf]# flume-ng agent -f ./http-logger.conf -n a1 -Dflume.root.logger=INFO,console
    


     

    敲几个换行,方便一会区分。这是一个前台进程。

    1.4.3 测试

    切换到第二个窗口,我们用curl这个命令模拟一个访问

    [root@hadoop01 flumeData]#  curl -X POST -d '[{"headers":{"name":"zhangsan","pwd":"123456"},"body":"this is my content"}]' http://hadoop01:6666
    

    切换到第一个窗口,查看效果

  • 相关阅读:
    关于视觉重定位(VPS)的工作经验分享
    SpringBoot HTTP接口实战-基础篇
    更换Anaconda的下载源为国内源的办法
    kafka 调优
    多个git提交,只推送其中一个到远程该如何处理
    Linux性能优化--性能工具:网络
    力扣(300,674)补9.11
    Docker安全及日志管理
    pandas学习笔记
    [附源码]JAVA毕业设计课程答疑系统(系统+LW)
  • 原文地址:https://blog.csdn.net/Hao1999_/article/details/126861944