• Flume笔记


    1. Flume 概述

    1.1 Flume 定义

    Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传 输的系统。Flume 基于流式架构,灵活简单。

    为什么选用Flume

    1659592180888

    Flume最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS。

    1.2 Flume 基础架构

    1659592209010

    1.2.1 Agent

    Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的。

    Agent 主要有 3 个部分组成,Source、Channel、Sink。

    1.2.2 Source

    Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种 格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、taildir、 sequence generator、syslog、http、legacy。

    1.2.3 Sink

    Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储 或索引系统、或者被发送到另一个 Flume Agent。

    Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定 义。

    1.2.4 Channel

    Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运 作在不同的速率上。Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个 Sink 的读取操作。

    Flume 自带两种 Channel:Memory ChannelFile Channel

    Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适 用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕 机或者重启都会导致数据丢失。

    File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数 据。

    1.2.5 Event

    传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。 Event 由 Header 和 Body 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构, Body 用来存放该条数据,形式为字节数组。

    1659592321601

    2. Flume 入门

    2.1 Flume 安装部署

    2.1.1 安装地址

    (1)Flume 官网地址:http://flume.apache.org/

    (2)文档查看地址:http://flume.apache.org/FlumeUserGuide.html

    (3)下载地址:http://archive.apache.org/dist/flume/

    2.1.2 安装部署

    (1)将 apache-flume-1.9.0-bin.tar.gz 上传到 linux 的/opt/software 目录下

    (2)解压 apache-flume-1.9.0-bin.tar.gz 到/opt/module/目录下

    [hhhyixin@hadoop102 software]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
    
    • 1

    (3)修改 apache-flume-1.9.0-bin 的名称为 flume

    [hhhyixin@hadoop102 module]$ mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume
    
    • 1

    (4)将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3

    [hhhyixin@hadoop102 module]$ rm /opt/module/flume/lib/guava-11.0.2.jar
    
    • 1

    2.2 Flume 入门案例

    2.2.1 监控端口数据官方案例

    1)案例需求

    使用 Flume 监听一个端口,收集该端口数据,并打印到控制台。

    2)需求分析

    监听数据端口案例分析

    1659592927019

    3)实现步骤

    (1)安装 netcat 工具

    [hhhyixin@hadoop102 module]$ rm /opt/module/flume/lib/guava-11.0.2.jar
    
    • 1

    (2)判断 44444 端口是否被占用

    [hhhyixin@hadoop102 flume]$ sudo netstat -nlp | grep 44444
    
    • 1

    没有输出内容说明端口没有被占用

    (3)创建 Flume Agent 配置文件 flume-netcat-logger.conf

    (4)在 flume 目录下创建 job 文件夹并进入 job 文件夹。

    [hhhyixin@hadoop102 flume]$ mkdir job
    [hhhyixin@hadoop102 flume]$ cd job/
    
    • 1
    • 2

    (5)在 job 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf

    [hhhyixin@hadoop102 job]$ vim flume-netcat-logger.conf
    
    • 1

    (6)在 flume-netcat-logger.conf 文件中添加如下内容。

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    注:配置文件来源于官方手册 http://flume.apache.org/FlumeUserGuide.html

    配置文件解析

    1659593549562

    (7)先开启 flume 监听端口

    第一种写法:

    [hhhyixin@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
    
    • 1

    第二种写法

    [hhhyixin@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
    
    • 1
    参数说明
    –conf/-c表示配置文件存储在 conf/目录
    –name/-n表示给 agent 起名为 a1
    –conf-file/-fflume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf 文件
    -Dflume.root.logger=INFO,console-D 表示 flume 运行时动态修改 flume.root.logger 参数属性值
    并将控制台日志打印级别设置为 INFO 级别
    日志级别包括:log、info、warn、 error

    (8)使用 netcat 工具向本机的 44444 端口发送内容

    在开启服务端之后,应该另开启102窗口

    [hhhyixin@hadoop102 ~]$ nc localhost 44444
    
    • 1

    (9)在 Flume 监听页面观察接收数据情况

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-e3yb7y0C-1668175932652)(https://note-01-1313694416.cos.ap-nanjing.myqcloud.com/1659594365409.png)]

    2.2.2 实时监控单个追加文件

    1)案例需求:实时监控 Hive 日志,并上传到 HDFS 中

    2)需求分析

    实时读取本地文件到HDFS案例

    1659594418575

    3)实现步骤

    (1)Flume 要想将数据输出到 HDFS,依赖 Hadoop 相关 jar 包

    检查/etc/profile.d/my_env.sh 文件,确认 Hadoop 和 Java 环境变量配置正确

    JAVA_HOME=/opt/module/jdk1.8.0_212
    HADOOP_HOME=/opt/module/ha/hadoop-3.1.3
    
    PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
    
    export PATH JAVA_HOME HADOOP_HOME
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    (2)创建 flume-file-hdfs.conf 文件

    [hhhyixin@hadoop102 job]$ vim flume-file-hdfs.conf
    
    • 1

    要想读取 Linux 系统中的文件,就得按照 Linux 命令的规则执行命令。由于 Hive 日志在 Linux 系统中所以读取文件的类型选择:exec 即 execute 执行的意思。表示执行 Linux 命令来读取文件。

    # Name the components on this agent
    a2.sources = r2
    a2.sinks = k2
    a2.channels = c2
    
    # Describe/configure the source
    a2.sources.r2.type = exec
    a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
    
    # Describe the sink
    a2.sinks.k2.type = hdfs
    a2.sinks.k2.hdfs.path = hdfs://hadoop102:9820/flume/%Y%m%d/%H
    
    #上传文件的前缀
    a2.sinks.k2.hdfs.filePrefix = logs-
    
    #是否按照时间滚动文件夹
    a2.sinks.k2.hdfs.round = true
    
    #多少时间单位创建一个新的文件夹
    a2.sinks.k2.hdfs.roundValue = 1
    
    #重新定义时间单位
    a2.sinks.k2.hdfs.roundUnit = hour
    
    #是否使用本地时间戳
    a2.sinks.k2.hdfs.useLocalTimeStamp = true
    
    #积攒多少个 Event 才 flush 到 HDFS 一次
    a2.sinks.k2.hdfs.batchSize = 100
    
    #设置文件类型,可支持压缩
    a2.sinks.k2.hdfs.fileType = DataStream
    
    #多久生成一个新的文件
    a2.sinks.k2.hdfs.rollInterval = 60
    
    #设置每个文件的滚动大小
    a2.sinks.k2.hdfs.rollSize = 134217700
    
    #文件的滚动与 Event 数量无关
    a2.sinks.k2.hdfs.rollCount = 0
    
    # Use a channel which buffers events in memory
    a2.channels.c2.type = memory
    a2.channels.c2.capacity = 1000
    a2.channels.c2.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a2.sources.r2.channels = c2
    a2.sinks.k2.channel = c2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    对于所有与时间相关的转义序列,Event Header 中必须存在以 “timestamp”的 key(除非 hdfs.useLocalTimeStamp 设置为 true,此方法会使用 TimestampInterceptor 自 动添加 timestamp)

    1659594641149

    (3)运行 Flume

    [hhhyixin@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
    
    • 1

    (4)开启 Hadoop 和 Hive 并操作 Hive 产生日志

    [hhhyixin@hadoop102 ~]$ start-dfs.sh
    [hhhyixin@hadoop103 ~]$ start-yarn.sh
    [hhhyixin@hadoop102 hive]$ bin/hive
    hive (default)>
    
    • 1
    • 2
    • 3
    • 4

    (5)在 HDFS 上查看文件

    2.2.3 实时监控目录下多个新文件

    1)案例需求:使用 Flume 监听整个目录的文件,并上传至 HDFS

    2)需求分析

    实时读取目录文件到HDFS案例

    1659679343183

    3)实现步骤

    (1)创建配置文件 flume-dir-hdfs.conf

    创建一个文件

    [hhhyixin@hadoop102 job]$ vim flume-dir-hdfs.conf
    
    • 1

    添加如下内容

    a3.sources = r3
    a3.sinks = k3
    a3.channels = c3
    
    # Describe/configure the source
    a3.sources.r3.type = spooldir
    a3.sources.r3.spoolDir = /opt/module/flume/upload
    a3.sources.r3.fileSuffix = .COMPLETED
    a3.sources.r3.fileHeader = true
    
    #忽略所有以.tmp 结尾的文件,不上传
    a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
    
    # Describe the sink
    a3.sinks.k3.type = hdfs
    a3.sinks.k3.hdfs.path = 
    hdfs://hadoop102:9820/flume/upload/%Y%m%d/%H
    
    #上传文件的前缀
    a3.sinks.k3.hdfs.filePrefix = upload-
    
    #是否按照时间滚动文件夹
    a3.sinks.k3.hdfs.round = true
    
    #多少时间单位创建一个新的文件夹
    a3.sinks.k3.hdfs.roundValue = 1
    
    #重新定义时间单位
    a3.sinks.k3.hdfs.roundUnit = hour
    
    #是否使用本地时间戳
    a3.sinks.k3.hdfs.useLocalTimeStamp = true
    
    #积攒多少个 Event 才 flush 到 HDFS 一次
    a3.sinks.k3.hdfs.batchSize = 100
    
    #设置文件类型,可支持压缩
    a3.sinks.k3.hdfs.fileType = DataStream
    
    #多久生成一个新的文件
    a3.sinks.k3.hdfs.rollInterval = 60
    
    #设置每个文件的滚动大小大概是 128M
    a3.sinks.k3.hdfs.rollSize = 134217700
    
    #文件的滚动与 Event 数量无关
    a3.sinks.k3.hdfs.rollCount = 0
    
    # Use a channel which buffers events in memory
    a3.channels.c3.type = memory
    a3.channels.c3.capacity = 1000
    a3.channels.c3.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a3.sources.r3.channels = c3
    a3.sinks.k3.channel = c3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    1659679492455

    (2)启动监控文件夹命令

    [hhhyixin@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf
    
    • 1

    在使用 Spooling Directory Source 时,不要在监控目录中创建并持续修改文 件;上传完成的文件会以.COMPLETED 结尾;被监控文件夹每 500 毫秒扫描一次文件变动。

    (3)向 upload 文件夹中添加文件

    在/opt/module/flume 目录下创建 upload 文件夹

    [hhhyixin@hadoop102 flume]$ mkdir upload
    
    • 1

    向 upload 文件夹中添加文件

    1659680557253

    (4)查看 HDFS 上的数据

    2.2.4 实时监控目录下的多个追加文件

    Exec source 适用于监控一个实时追加的文件,不能实现断点续传;Spooldir Source 适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;而 Taildir Source 适合用于监听多个实时追加的文件,并且能够实现断点续传

    1)案例需求:使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS

    2)需求分析

    实时读取目录文件到HDFS案例

    1659680602667

    3)实现步骤

    (1)创建配置文件 flume-taildir-hdfs.conf

    创建一个文件

    [hhhyixin@hadoop102 job]$ vim flume-taildir-hdfs.conf
    
    • 1

    添加如下内容

    a3.sources = r3
    a3.sinks = k3
    a3.channels = c3
    
    # Describe/configure the source
    a3.sources.r3.type = TAILDIR
    a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
    a3.sources.r3.filegroups = f1 f2
    a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
    a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*
    
    # Describe the sink
    a3.sinks.k3.type = hdfs
    a3.sinks.k3.hdfs.path = 
    hdfs://hadoop102:9820/flume/upload2/%Y%m%d/%H
    
    #上传文件的前缀
    a3.sinks.k3.hdfs.filePrefix = upload-
    
    #是否按照时间滚动文件夹
    a3.sinks.k3.hdfs.round = true
    
    #多少时间单位创建一个新的文件夹
    a3.sinks.k3.hdfs.roundValue = 1
    
    #重新定义时间单位
    a3.sinks.k3.hdfs.roundUnit = hour
    
    #是否使用本地时间戳
    a3.sinks.k3.hdfs.useLocalTimeStamp = true
    
    #积攒多少个 Event 才 flush 到 HDFS 一次
    a3.sinks.k3.hdfs.batchSize = 100
    
    #设置文件类型,可支持压缩
    a3.sinks.k3.hdfs.fileType = DataStream
    
    #多久生成一个新的文件
    a3.sinks.k3.hdfs.rollInterval = 60
    
    #设置每个文件的滚动大小大概是 128M
    a3.sinks.k3.hdfs.rollSize = 134217700
    
    #文件的滚动与 Event 数量无关
    a3.sinks.k3.hdfs.rollCount = 0
    
    # Use a channel which buffers events in memory
    a3.channels.c3.type = memory
    a3.channels.c3.capacity = 1000
    a3.channels.c3.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a3.sources.r3.channels = c3
    a3.sinks.k3.channel = c3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    1659680702182

    (2)启动监控文件夹命令

    [hhhyixin@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf
    
    • 1

    (3)向 files 文件夹中追加内容

    在/opt/module/flume 目录下创建 files 文件夹

    [hhhyixin@hadoop102 flume]$ mkdir files
    
    • 1

    向 upload 文件夹中添加文件

    (4)查看 HDFS 上的数据

    Taildir 说明

    Taildir Source 维护了一个 json 格式的 position File,其会定期的往 position File 中更新每个文件读取到的最新的位置,因此能够实现断点续传。Position File 的格式如下

    {"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}
    {"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"}
    
    • 1
    • 2

    注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统 用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来 识别文件

    3. Flume 进阶

    3.1 Flume 事务

    1659680980601

    3.2 Flume Agent 内部原理

    1659681025862

    Channel Selectors 有 两 种 类 型 :Replicating Channel Selector (default)和 Multiplexing Channel Selector 。Replicating 会 将 source过来的events发往所 有channel,而Multiplexing可 以配置发往哪些Channel。

    重要组件

    1)ChannelSelector

    ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel。其共有两种类型, 分别是 Replicating(复制)和 Multiplexing(多路复用

    ReplicatingSelector 会将同一个 Event 发往所有的 Channel,Multiplexing 会根据相 应的原则,将不同的 Event 发往不同的 Channel。

    2)SinkProcessor

    SinkProcessor 共 有 三 种 类 型 , 分 别 是 DefaultSinkProcessor 、 LoadBalancingSinkProcessor 和 FailoverSinkProcessor

    DefaultSinkProcessor 对 应 的 是 单 个 的 Sink , LoadBalancingSinkProcessor 和 FailoverSinkProcessor 对应的是 Sink Group,LoadBalancingSinkProcessor 可以实现负 载均衡的功能,FailoverSinkProcessor 可以错误恢复的功能。

    3.3 Flume 拓扑结构

    3.3.1 简单串联

    1659681085871

    这种模式是将多个 flume 顺序连接起来了,从最初的 source 开始到最终 sink 传送的 目的存储系统。此模式不建议桥接过多的 flume 数量, flume 数量过多不仅会影响传输速 率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统。

    3.3.2 复制和多路复用

    1659681118023

    Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个 channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的 地。

    3.3.3 负载均衡和故障转移

    1659681147944

    3.3.4 聚合

    1659681168186

    这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者 甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用 flume 的这种组合方式 能很好的解决这一问题,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析。

  • 相关阅读:
    一文讲透DevOps理论体系的演进
    共享模型之无锁
    【无标题】多卡聚合路由器在消防领域的应用
    solid works草图绘制与设置零件特征的使用说明
    离散数学 学习 之 一阶逻辑基本概念 ( 四 )
    ROS 开源项目 TurtleBot3 安装与使用
    计算机组成原理期中考试
    【力扣】83. 删除排序链表中的重复元素
    ICLR 2022最佳论文解读
    深度学习 框架代码(草稿)
  • 原文地址:https://blog.csdn.net/m0_59598325/article/details/127814194