• (二十)大数据实战——Flume数据采集的基本案例实战


    前言

    本节内容我们主要介绍几个Flume数据采集的基本案例,包括监控端口数据、实时监控单个追加文件、实时监控目录下多个新文件、实时监控目录下的多个追加文件等案例。完成flume数据监控的基本使用。

    正文

    • 监控端口数据

    ①需求说明

    - 使用 Flume 监听一个端口,收集该端口数据,并打印到控制台

    ②需求分析:

    ③安装netcat 工具:sudo yum install -y nc

    ④查看监听端口1111是否被占用:注意测试端口的范围是0-65535

    ⑤在flume安装目录下创建一个job目录:用与存放监听数据的配置文件

    ⑥在job目录下创建监听数据的配置文件:job-netcat-flume-console.conf

    1. # Name the components on this agent
    2. #a1:表示agent的名称,不能重复
    3. a1.sources = r1 #r1:表示a1Source的名称
    4. a1.sinks = k1 #k1:表示a1的Sink的名称
    5. a1.channels = c1 #c1:表示a1的Channel的名称
    6. # Describe/configure the source
    7. a1.sources.r1.type = netcat #表示a1的输入源类型为netcat端口类型
    8. a1.sources.r1.bind = localhost #表示a1的监听的主机
    9. a1.sources.r1.port = 1111 #表示a1的监听的端口号
    10. # Describe the sink
    11. a1.sinks.k1.type = logger #表示a1的输出目的地是控制台logger类型
    12. # Use a channel which buffers events in memory
    13. a1.channels.c1.type = memory #表示a1的channel类型是memory内存型
    14. a1.channels.c1.capacity = 1000 #表示a1的channel总容量1000个event
    15. a1.channels.c1.transactionCapacity = 100 #表示a1的channel传输时收集到了100条event以后再去提交事务
    16. # Bind the source and sink to the channel
    17. a1.sources.r1.channels = c1 #表示将r1和c1连接起来
    18. a1.sinks.k1.channel = c1 #表示将k1和c1连接起来

    ⑦开启 flume服务监听端口:

    bin/flume-ng agent -c conf/ -n a1 -f job/job-netcat-flume-console.conf -Dflume.root.logger=INFO,console

    ⑧启动参数说明:

    --conf/-c:表示配置文件存储在 conf/目录

    --name/-n:表示给 agent 起名为 a1

    --conf-file/-f:flume本次启动读取的配置文件是在job文件夹下的job-netcat-flume-console.conf文件

    -Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger 参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、 error

    ⑨使用netcat 工具向本机的1111端口发送内容

    • 实时监控单个追加文件

    ①监控需求

    - 实时监控Hive日志,并上传到HDFS

    ②需求分析:

    ③在job目录下创建监听数据的配置文件:job-file-flume-hdfs.conf

    1. # Name the components on this agent
    2. a2.sources = r2
    3. a2.sinks = k2
    4. a2.channels = c2
    5. # Describe/configure the source
    6. a2.sources.r2.type = exec
    7. #hive日志的默认位置
    8. a2.sources.r2.command = tail -F /tmp/hadoop/hive.log
    9. # Describe the sink
    10. a2.sinks.k2.type = hdfs
    11. a2.sinks.k2.hdfs.path = hdfs://hadoop101:8020/flume/%Y%m%d/%H
    12. #上传文件的前缀
    13. a2.sinks.k2.hdfs.filePrefix = logs-
    14. #是否按照时间滚动文件夹
    15. a2.sinks.k2.hdfs.round = true
    16. #多少时间单位创建一个新的文件夹
    17. a2.sinks.k2.hdfs.roundValue = 1
    18. #重新定义时间单位
    19. a2.sinks.k2.hdfs.roundUnit = hour
    20. #是否使用本地时间戳
    21. a2.sinks.k2.hdfs.useLocalTimeStamp = true
    22. #积攒多少个Event才flush到 HDFS一次
    23. a2.sinks.k2.hdfs.batchSize = 100
    24. #设置文件类型,可支持压缩
    25. a2.sinks.k2.hdfs.fileType = DataStream
    26. #多久生成一个新的文件
    27. a2.sinks.k2.hdfs.rollInterval = 60
    28. #设置每个文件的滚动大小
    29. a2.sinks.k2.hdfs.rollSize = 134217700
    30. #文件的滚动与Event数量无关
    31. a2.sinks.k2.hdfs.rollCount = 0
    32. # Use a channel which buffers events in memory
    33. a2.channels.c2.type = memory
    34. a2.channels.c2.capacity = 1000
    35. a2.channels.c2.transactionCapacity = 100
    36. # Bind the source and sink to the channel
    37. a2.sources.r2.channels = c2
    38. a2.sinks.k2.channel = c2

    ④启动hadoop集群

    ⑤启动flume监控任务

    bin/flume-ng agent --conf conf/ --name a2 --conf-file job/job-file-flume-hdfs.conf -Dflume.root.logger=INFO,console

    ⑥启动hive

    ⑦查看hdfs是否有监控日志

    ⑧存在的问题

    - tail命令不能实现断点续传监控的功能,可能会有数据丢失的情况或者数据重复的问题

    - Exec source 适用于监控一个实时追加的文件,不能实现断点续传

    • 实时监控目录下多个新文件

    ①监控需求

    - 使用 Flume 监听整个目录的文件,并上传至 HDFS

    ②需求分析

    ③在job目录下创建监听目录数据的配置文件:job-dir-flume-hdfs.conf

    1. a3.sources = r3
    2. a3.sinks = k3
    3. a3.channels = c3
    4. # Describe/configure the source
    5. a3.sources.r3.type = spooldir
    6. a3.sources.r3.spoolDir = /opt/module/apache-flume-1.9.0/upload
    7. a3.sources.r3.fileSuffix = .COMPLETED
    8. a3.sources.r3.fileHeader = true
    9. #忽略所有以.tmp 结尾的文件,不上传
    10. a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
    11. # Describe the sink
    12. a3.sinks.k3.type = hdfs
    13. a3.sinks.k3.hdfs.path = hdfs://hadoop101:8020/flume/upload/%Y%m%d/%H
    14. #上传文件的前缀
    15. a3.sinks.k3.hdfs.filePrefix = upload-
    16. #是否按照时间滚动文件夹
    17. a3.sinks.k3.hdfs.round = true
    18. #多少时间单位创建一个新的文件夹
    19. a3.sinks.k3.hdfs.roundValue = 1
    20. #重新定义时间单位
    21. a3.sinks.k3.hdfs.roundUnit = hour
    22. #是否使用本地时间戳
    23. a3.sinks.k3.hdfs.useLocalTimeStamp = true
    24. #积攒多少个 Event 才 flush 到 HDFS 一次
    25. a3.sinks.k3.hdfs.batchSize = 100
    26. #设置文件类型,可支持压缩
    27. a3.sinks.k3.hdfs.fileType = DataStream
    28. #多久生成一个新的文件
    29. a3.sinks.k3.hdfs.rollInterval = 60
    30. #设置每个文件的滚动大小大概是 128M
    31. a3.sinks.k3.hdfs.rollSize = 134217700
    32. #文件的滚动与 Event 数量无关
    33. a3.sinks.k3.hdfs.rollCount = 0
    34. # Use a channel which buffers events in memory
    35. a3.channels.c3.type = memory
    36. a3.channels.c3.capacity = 1000
    37. a3.channels.c3.transactionCapacity = 100
    38. # Bind the source and sink to the channel
    39. a3.sources.r3.channels = c3
    40. a3.sinks.k3.channel = c3

    ④启动hadoop集群

    ⑤创建upload监控目录

    ⑥启动目录监控任务

    bin/flume-ng agent -c conf/ -n a3 -f job/job-dir-flume-hdfs.conf -Dflume.root.logger=INFO,console

    ⑦在upload中上传文件

    ⑧查看hdfs中是否上传成功

    ⑨存在的问题

    - 相同文件名的文件不能重复上传,只能上传一次,修改了也不会再次上传

    - 忽略的文件和配置后缀.COMPLETED的文件不能重复上传

    - Spooldir Source 适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步

    • 实时监控目录下的多个追加文件

    ①案例需求

    - 使用Flume监听整个目录的实时追加文件,并上传至HDFS

    - 使用Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传

    ②需求分析

    ③在job目录下创建监听目录数据的配置文件:job-taildir-flume-hdfs.conf

    1. a4.sources = r4
    2. a4.sinks = k4
    3. a4.channels = c4
    4. # Describe/configure the source
    5. a4.sources.r4.type = TAILDIR
    6. a4.sources.r4.positionFile = /opt/module/apache-flume-1.9.0/tail_dir.json
    7. a4.sources.r4.filegroups = f1 f2
    8. a4.sources.r4.filegroups.f1 = /opt/module/apache-flume-1.9.0/files/.*file.*
    9. a4.sources.r4.filegroups.f2 = /opt/module/apache-flume-1.9.0/files2/.*log.*
    10. # Describe the sink
    11. a4.sinks.k4.type = hdfs
    12. a4.sinks.k4.hdfs.path = hdfs://hadoop101:8020/flume/upload2/%Y%m%d/%H
    13. #上传文件的前缀
    14. a4.sinks.k4.hdfs.filePrefix = upload-
    15. #是否按照时间滚动文件夹
    16. a4.sinks.k4.hdfs.round = true
    17. #多少时间单位创建一个新的文件夹
    18. a4.sinks.k4.hdfs.roundValue = 1
    19. #重新定义时间单位
    20. a4.sinks.k4.hdfs.roundUnit = hour
    21. #是否使用本地时间戳
    22. a4.sinks.k4.hdfs.useLocalTimeStamp = true
    23. #积攒多少个 Event 才 flush 到 HDFS 一次
    24. a4.sinks.k4.hdfs.batchSize = 100
    25. #设置文件类型,可支持压缩
    26. a4.sinks.k4.hdfs.fileType = DataStream
    27. #多久生成一个新的文件
    28. a4.sinks.k4.hdfs.rollInterval = 60
    29. #设置每个文件的滚动大小大概是 128M
    30. a4.sinks.k4.hdfs.rollSize = 134217700
    31. #文件的滚动与 Event 数量无关
    32. a4.sinks.k4.hdfs.rollCount = 0
    33. # Use a channel which buffers events in memory
    34. a4.channels.c4.type = memory
    35. a4.channels.c4.capacity = 1000
    36. a4.channels.c4.transactionCapacity = 100
    37. # Bind the source and sink to the channel
    38. a4.sources.r4.channels = c4
    39. a4.sinks.k4.channel = c4

    ④启动hadoop集群

    ⑤创建监控目录文件files和files2

    ⑥启动flume监控

    bin/flume-ng agent -c conf/ -n a4 -f job/job-taildir-flume-hdfs.conf -Dflume.root.logger=INFO,console

    ⑦往files和files2目录中的文件写数据

    ⑧在hdfs中查看数据

    结语

    关于Flume数据采集的基本案例实战到这里就结束了,我们下期见。。。。。。

  • 相关阅读:
    青岛建筑模板厂家有哪些?
    Go语言学习笔记——Golang 1.18新特性泛型
    ElasticsearchRestTemplate 和ElasticsearchRepository 的使用
    欧拉计划第265题:二进制圈
    Flutter TextField 点击时如何定位光标位置
    相关性分析热力图(Python&Matlab代码实现)
    HTTP(Hypertext Transfer Protocol)协议
    阿里云上了新闻联播
    将fbx文件转换成gltf格式的模型文件
    【Objective-C】浅析Block及其捕获机制
  • 原文地址:https://blog.csdn.net/yprufeng/article/details/132574553