• Flume学习笔记(1)—— Flume入门


    Flume 概述

    Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统

    Flume 基于流式架构,灵活简单

    Flume最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS

    基础架构

    Agent

    Agent 是一个 JVM 进程,它事件的形式将数据从源头送至目的

    Agent 主要有 3 个部分组成,Source、Channel、Sink

    Source

    Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、taildir、sequence generator、syslog、http、legacy

    Sink

    Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent

    Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义

    Channel

    Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运作在不同的速率上Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个Sink 的读取操作


    Flume 自带两种 Channel:Memory Channel 和 File Channel

    • Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
    • File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据

    Event

    Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地

    Event 由 Header 和 Body 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组

    Flume 安装部署

    flume官网:Welcome to Apache Flume — Apache Flume

    官方文档:Flume 1.11.0 User Guide — Apache Flume

    下载:Index of /dist/flume


    1.下载tar包:

    版本:1.9.0

    2.上传到服务器,解压至/opt/module/路径下;

    3.重命名:mv /opt/module/apache-flume-1.9.0-bin/opt/module/flume

    4.将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3rm/opt/module/flume/lib/guava-11.0.2.jar

    Flume入门案例

    监控端口数据官方案例

    需求:使用 Flume 监听一个端口收集该端口数据,并打印到控制台

    实现步骤:

    1. 通过netcat工具向本机的44444端口发送数据
    2. Flume监控本机的44444端口,通过Flume的source端读取数据
    3. Flume将获取的数据通过Sink端写出到控制台

    实现流程:

    • 安装 netcat 工具:sudo yum install -y nc
    • 判断 44444 端口是否被占用:netstat -nlp | grep 44444

    netstat:显示网络状态;

    相关参数说明:Linux netstat命令 | 菜鸟教程

    • 在 flume 目录下创建 job 文件夹并进入 job 文件夹;
    • 在 job 文件夹下创建 Flume Agent 配置文件net-flume-logger.conf
    • 配置文件内容如下:
    1. # example.conf: A single-node Flume configuration
    2. # Name the components on this agent(source,channel,sink的名称)
    3. a1.sources = r1
    4. a1.sinks = k1
    5. a1.channels = c1
    6. # Describe/configure the source(source的类型,绑定的ip)
    7. a1.sources.r1.type = netcat
    8. a1.sources.r1.bind = localhost
    9. a1.sources.r1.port = 44444
    10. # Describe the sink(sink的类型)
    11. a1.sinks.k1.type = logger
    12. # Use a channel which buffers events in memory(channel的类型,容量)
    13. a1.channels.c1.type = memory
    14. a1.channels.c1.capacity = 1000
    15. a1.channels.c1.transactionCapacity = 100
    16. # Bind the source and sink to the channel(将source、sink和channel绑定)
    17. a1.sources.r1.channels = c1
    18. a1.sinks.k1.channel = c1

    具体参数详解如下:

    • 开启 flume 监听端口:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/net-flume-logger.conf -Dflume.root.logger=INFO,console或者 bin/flume-ng agent -c conf/ -n a1 -f job/net-flume-logger.conf -Dflume.root.logger=INFO,console

    参数说明:

    --conf/-c:表示配置文件存储在 conf/目录

    --name/-n:表示给 agent 起名为 a1

    --conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf文件

    -Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、error

    • 使用 netcat 工具向本机的 44444 端口发送内容:nc localhost 44444

    • 在 Flume 监听页面观察接收数据情况:

    实时监控单个追加文件

    需求:实时监控 Hive 日志,并上传到 HDFS

    实现流程:

    1.flume要想将数据上传到hadoop,需要依赖相关的jar包,所以需要确定hadoop及Java的环境变量已经配置好了:

    2.创建配置文件(在job目录下):flume-file-hdfs.conf

    1. # Name the components on this agent
    2. a2.sources = r2
    3. a2.sinks = k2
    4. a2.channels = c2
    5. # Describe/configure the source
    6. a2.sources.r2.type = exec
    7. a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
    8. # Describe the sink
    9. a2.sinks.k2.type = hdfs
    10. a2.sinks.k2.hdfs.path = hdfs://hadoop102:9820/flume/%Y%m%d/%H
    11. #上传文件的前缀
    12. a2.sinks.k2.hdfs.filePrefix = logs-
    13. #是否按照时间滚动文件夹
    14. a2.sinks.k2.hdfs.round = true
    15. #多少时间单位创建一个新的文件夹
    16. a2.sinks.k2.hdfs.roundValue = 1
    17. #重新定义时间单位
    18. a2.sinks.k2.hdfs.roundUnit = hour
    19. #是否使用本地时间戳
    20. a2.sinks.k2.hdfs.useLocalTimeStamp = true
    21. #积攒多少个 Event 才 flush 到 HDFS 一次
    22. a2.sinks.k2.hdfs.batchSize = 100
    23. #设置文件类型,可支持压缩
    24. a2.sinks.k2.hdfs.fileType = DataStream
    25. #多久生成一个新的文件
    26. a2.sinks.k2.hdfs.rollInterval = 60
    27. #设置每个文件的滚动大小
    28. a2.sinks.k2.hdfs.rollSize = 134217700
    29. #文件的滚动与 Event 数量无关
    30. a2.sinks.k2.hdfs.rollCount = 0
    31. # Use a channel which buffers events in memory
    32. a2.channels.c2.type = memory
    33. a2.channels.c2.capacity = 1000
    34. a2.channels.c2.transactionCapacity = 100
    35. # Bind the source and sink to the channel
    36. a2.sources.r2.channels = c2
    37. a2.sinks.k2.channel = c2

    参数说明:

    ①source为exec sourceFlume 1.11.0 User Guide — Apache Flume

    • exec及execute的意思,执行给定的Uxin系统指令;
    • 相应的command即需要执行的指令,在本示例中即读取Hive日志文件中的内容

    tail -F:循环读取

    Linux tail 命令 | 菜鸟教程

    ②sind为hdfs sinkFlume 1.11.0 User Guide — Apache Flume

    path即hdfs上文件的路径,hdfs sink允许使用一些转义序列,如下:


    根据官网描述:

    The files can be rolled (close current file and create a new one) periodically based on the elapsed time or size of data or number of events

    文件的滚动可以通过时间、文件、events的数量来控制;

    • 时间:主要是round——是否开启时间控制;roundUnit——时间单位;roundValue——时间值

    • 文件:主要是rollInterval——生成新文件的间隔(设置为0表示不按照时间滚动);rollSize——单个文件的最大大小(单位:bytes);rollCount——新文件生成时必须满足的events数量;

    • events数量:通过batchSize来设置

    ③对于所有与时间相关的转义序列,Event Header 中必须存在以 “timestamp”的key(除非 hdfs.useLocalTimeStamp 设置为 true,此方法会使用 TimestampInterceptor 自动添加 timestamp)

    3.执行指令:bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf

    4.执行hive指令,生成日志文件;为了简单测试这里直接向hive.log文件中写内容:

    echo "test.2023.11.17" > hive.log

    到hdfs中去查看,可以看到对应的文件:

    实时监控目录下多个新文件

    需求:使用 Flume 监听整个目录的文件,并上传至 HDFS

    实现流程:
    1.创建配置文件 flume-dir-hdfs.conf

    配置文件内容如下:

    1. a3.sources = r3
    2. a3.sinks = k3
    3. a3.channels = c3
    4. # Describe/configure the source
    5. a3.sources.r3.type = spooldir
    6. #定义监控目录
    7. a3.sources.r3.spoolDir = /opt/module/flume/upload
    8. a3.sources.r3.fileSuffix = .COMPLETED
    9. a3.sources.r3.fileHeader = true
    10. #忽略所有以.tmp 结尾的文件,不上传
    11. a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
    12. # Describe the sink
    13. a3.sinks.k3.type = hdfs
    14. a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload/%Y%m%d/%H
    15. #上传文件的前缀
    16. a3.sinks.k3.hdfs.filePrefix = upload-
    17. #是否按照时间滚动文件夹
    18. a3.sinks.k3.hdfs.round = true
    19. #多少时间单位创建一个新的文件夹
    20. a3.sinks.k3.hdfs.roundValue = 1
    21. #重新定义时间单位
    22. a3.sinks.k3.hdfs.roundUnit = hour
    23. #是否使用本地时间戳
    24. a3.sinks.k3.hdfs.useLocalTimeStamp = true
    25. #积攒多少个 Event 才 flush 到 HDFS 一次
    26. a3.sinks.k3.hdfs.batchSize = 100
    27. #设置文件类型,可支持压缩
    28. a3.sinks.k3.hdfs.fileType = DataStream
    29. #多久生成一个新的文件
    30. a3.sinks.k3.hdfs.rollInterval = 60
    31. #设置每个文件的滚动大小大概是 128M
    32. a3.sinks.k3.hdfs.rollSize = 134217700
    33. #文件的滚动与 Event 数量无关
    34. a3.sinks.k3.hdfs.rollCount = 0
    35. # Use a channel which buffers events in memory
    36. a3.channels.c3.type = memory
    37. a3.channels.c3.capacity = 1000
    38. a3.channels.c3.transactionCapacity = 100
    39. # Bind the source and sink to the channel
    40. a3.sources.r3.channels = c3
    41. a3.sinks.k3.channel = c3

    参数说明:
    source类型是spooldirFlume 1.11.0 User Guide — Apache Flume

    根据官网的描述:This source will watch the specified directory for new files, and will parse events out of new files as they appear

    该类型的source会监控特定目录,当目录下出现新文件的时候,会进行文件的上传

    • spoolDir:文件监控的目录
    • fileSuffix:默认值.COMPLETED,文件后缀
    • fileHeader:是否添加文件头
    • ignorePattern:定义忽略的文件;ignorePattern = ([^ ]*\.tmp):忽略所有以tmp结尾的文件而不上传

    2.启动指令: bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf

    3.在/opt/module/flume/upload文件夹中上传文件:

    1. touch why.txt
    2. touch why.tmp
    3. touch why.log

    可以看到添加了文件后缀:

    hdfs中也有了对应的文件:

    实时监控目录下的多个追加文件(断点续传)

    • Exec source 适用于监控一个实时追加的文件,不能实现断点续传
    • Spooldir Source适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步
    • Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传

    需求:使用 Flume 监听整个目录的实时追加文件(向文件中写入新的内容),并上传至 HDFS

    实现流程:

    1.创建配置文件 flume-taildir-hdfs.conf

    文件内容如下:

    1. a3.sources = r3
    2. a3.sinks = k3
    3. a3.channels = c3
    4. # Describe/configure the source
    5. a3.sources.r3.type = TAILDIR
    6. a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
    7. a3.sources.r3.filegroups = f1 f2
    8. a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*txt.*
    9. a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*
    10. # Describe the sink
    11. a3.sinks.k3.type = hdfs
    12. a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload2/%Y%m%d/%H
    13. #上传文件的前缀
    14. a3.sinks.k3.hdfs.filePrefix = upload-
    15. #是否按照时间滚动文件夹
    16. a3.sinks.k3.hdfs.round = true
    17. #多少时间单位创建一个新的文件夹
    18. a3.sinks.k3.hdfs.roundValue = 1
    19. #重新定义时间单位
    20. a3.sinks.k3.hdfs.roundUnit = hour
    21. #是否使用本地时间戳
    22. a3.sinks.k3.hdfs.useLocalTimeStamp = true
    23. #积攒多少个 Event 才 flush 到 HDFS 一次
    24. a3.sinks.k3.hdfs.batchSize = 100
    25. #设置文件类型,可支持压缩
    26. a3.sinks.k3.hdfs.fileType = DataStream
    27. #多久生成一个新的文件
    28. a3.sinks.k3.hdfs.rollInterval = 60
    29. #设置每个文件的滚动大小大概是 128M
    30. a3.sinks.k3.hdfs.rollSize = 134217700
    31. #文件的滚动与 Event 数量无关
    32. a3.sinks.k3.hdfs.rollCount = 0
    33. # Use a channel which buffers events in memory
    34. a3.channels.c3.type = memory
    35. a3.channels.c3.capacity = 1000
    36. a3.channels.c3.transactionCapacity = 100
    37. # Bind the source and sink to the channel
    38. a3.sources.r3.channels = c3
    39. a3.sinks.k3.channel = c3

    参数说明:

    source类型为taildir:Flume 1.11.0 User Guide — Apache Flume

    • positionFile:默认值:~/.flume/taildir_position.json;根据官方的解释,该文件是用于记录inode的值的,inode即linux中储存文件元数据的区域,linux中可以通过inode来识别不同的文件,因此taildir source通过维护positionFile即可记录每个文件读取到的最新的位置,从而实现断点续传;
    • filegroups & filegroups.:定义一个或多个需要监控的目录;

    .*txt.*即表示所有以txt结尾的文件

    3.执行指令: bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf

    4.向/opt/module/flume/files目录下的文件中写入内容:

    1. echo hello1 >> file1.txt
    2. echo hello2 >> file2.txt
    3. echo hello3 >> file3.txt
    4. echo hello4 >> file4.txt

    在files2中:

    echo log1 >> log1.log

    hdfs中出现相应的文件:

    内容如下:

    继续在files中执行:echo log2 >> log1.log

    可以看到已经同步更新:

  • 相关阅读:
    StableAudio-大模型创作音乐的工具
    Neo4j 5的自治图数据库集群
    Java 顺序控制、分支控制、循环控制详解
    视频怎么加水印?这里有你想要的答案
    人工神经网络的应用实例,人工神经网络算法实例
    Android-Framework 时间格式默认使用24小时制、时区为上海
    uniapp自定义权限菜单,动态tabbar
    Ubuntu 虚拟化中Android Studio 不支持HAXM(CPU不支持问题)
    C# net core中的过滤器(Filter)使用及跳过过滤器
    【Redis】Redis 缓存重点解析
  • 原文地址:https://blog.csdn.net/qq_51235856/article/details/134465660