• 大数据技术——Flume简介&安装配置&使用案例


    1. Flume 概述

    1.1 Flume简介

        Flume是一种可配置、高可用数据采集工具,主要用于采集来自各种流媒体的数据(Web服务器的日志数据等)并传输到集中式数据存储区域。

        Flume 支持在日志系统中定制各种数据发送方,用于收集数据;并且可以对数据进行简单处理,将其写到可定制的各种数据接收方(如文本、HDFS、HBase等)。

        Flume最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS。

    在这里插入图片描述

    1.2 Flume的特点

    (1)具有复杂的流动性。Flume允许用户构建多跳流,允许使用扇入流和扇出流、上下文路由和故障转移。

        ①多跳流:Flume中可以有多个agent【JVM进程】。事件(Event)需要通过多个agent才能到达最终目的地,成为多跳流。
        ②扇入流:从一个源(Source)通过多个通道(Chanel)。
        ③扇出流:多个源通过一个通道到达Sink。

    (2)具有可靠性:Flume的源(Source)和接收器(Slink)封装在事务中,可以确保事件在数据流中从一个点 到另一个点的可靠性传递。

    (3)可恢复性:事件(Event)存储在通道中,当Flume故障时,通道赋值恢复数据

    1.3 Flume的基础架构

    1.3.1 Agent简介

    在这里插入图片描述

        一个 JVM 进程,它以事件的形式将数据从源头送至目的。

        ​Agent 主要有 3 个部分组成,Source、Channel、Sink。


    Source简介

    ​     Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、taildir、sequence generator、syslog、httplegacy。-----》说白了就是就是数据来源


    Slink简介

    ​     Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。

        ​ Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。-----》数据最终写出的位置


    Channel简介:

        ​ Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运作在不同的速率上。Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个Sink 的读取操作。------》中间的缓冲区

        ​ Flume 自带两种 Channel:Memory Channel 和 File Channel。

        Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。

        File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。

    1.3.2 Event简介

    ​    传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。

    ​    Event 由 HeaderBody 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组。

    Flume 官网地址:http://flume.apache.org/

    2. Flume安装配置

    2.1 下载地址

    (1)Flume 官网地址:http://flume.apache.org/
    (2)文档查看地址:http://flume.apache.org/FlumeUserGuide.html
    (3)下载地址:http://archive.apache.org/dist/flume/

    2.2 安装部署

    (1)将 apache-flume-1.9.0-bin.tar.gz 上传到 linux 的/opt/software 目录下

    (2)解压 apache-flume-1.9.0-bin.tar.gz 到/opt/module/目录下

     tar -zxf /opt/software/apacheflume-1.9.0-bin.tar.gz -C /opt/module/
    
    • 1

    (3)修改 apache-flume-1.9.0-bin 的名称为 flume

     mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume
    
    • 1

    (4)将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3

     rm /opt/module/flume/lib/guava-11.0.2.jar
    
    • 1

    3. Flume 使用案例

    3.1 实时监控单个追加文件

    • 需求:
      • 实时监控 Hive 日志,并上传到 HDFS 中(源文件为Hive日志,目标文件为HDFS)
    • 分析:
      在这里插入图片描述

    实战操作:

    准备工作:启动mysql、Hadoop、Hive

    创建flume-file-hdfs.conf 文件【其中配置源文件,目标文件,以及channel的相关信息-----》Flume收集工具就是通过读取此配置来实现实时监控】

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/module/hive-3.1.2/logs /hive.log
    
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
    #上传文件的前缀
    a1.sinks.k1.hdfs.filePrefix = upload-
    #是否按照时间滚动文件夹
    a1.sinks.k1.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a1.sinks.k1.hdfs.roundValue = 1
    #重新定义时间单位
    a1.sinks.k1.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    #积攒多少个 Event 才 flush 到 HDFS 一次
    a1.sinks.k1.hdfs.batchSize = 100
    #设置文件类型,可支持压缩
    a1.sinks.k1.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a1.sinks.k1.hdfs.rollInterval = 30
    #设置每个文件的滚动大小大概是 128M
    a1.sinks.k1.hdfs.rollSize = 134217700
    #文件的滚动与 Event 数量无关
    a1.sinks.k1.hdfs.rollCount = 0
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    执行操作:
    启动Flume监控:

    bin/flume-ng agent -n a1 -c conf/ -f job/flume-file-hdfs.conf 
    
    • 1

    启动Hive,并且做出相关操作,使hivelog产生新的日志信息,以此作为数据源

    //启动hive
    bin/hive
    
    //执行操作:
    select * from table_name;
    
    select count(*) from table_name;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    Hive执行命令:
    在这里插入图片描述

    hive.log日志信息

    在这里插入图片描述

    HDFS存储的信息【source读取上述信息后,将日志读到HDFS】
    在这里插入图片描述

    3.2 实时监控目录下多个新文件

    案例需求:使用 Flume 监听整个目录的文件,并上传至 HDFS

    需求分析:

    在这里插入图片描述

    创建配置文件:

    vim flume-dir-hdfs.conf
    
    • 1
    a3.sources = r3
    a3.sinks = k3
    a3.channels = c3
    
    # Describe/configure the source
    a3.sources.r3.type = spooldir
    a3.sources.r3.spoolDir = /opt/module/flume-1.9.0/upload
    a3.sources.r3.fileSuffix = .COMPLETED
    a3.sources.r3.fileHeader = true
    
    #忽略所有以.tmp 结尾的文件,不上传
    a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
    
    # Describe the sink
    a3.sinks.k3.type = hdfs
    a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload/%Y%m%d/%H
    #上传文件的前缀
    a3.sinks.k3.hdfs.filePrefix = upload-
    #是否按照时间滚动文件夹
    a3.sinks.k3.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a3.sinks.k3.hdfs.roundValue = 1
    #重新定义时间单位
    a3.sinks.k3.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a3.sinks.k3.hdfs.useLocalTimeStamp = true
    #积攒多少个 Event 才 flush 到 HDFS 一次
    a3.sinks.k3.hdfs.batchSize = 100
    #设置文件类型,可支持压缩
    a3.sinks.k3.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a3.sinks.k3.hdfs.rollInterval = 20
    #设置每个文件的滚动大小大概是 128M
    a3.sinks.k3.hdfs.rollSize = 134217700
    #文件的滚动与 Event 数量无关
    a3.sinks.k3.hdfs.rollCount = 0
    
    # Use a channel which buffers events in memory
    a3.channels.c3.type = memory
    a3.channels.c3.capacity = 1000
    a3.channels.c3.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a3.sources.r3.channels = c3
    a3.sinks.k3.channel = c3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    配置信息解读:
    在这里插入图片描述
    启动监控命令:

    bin/flume-ng agent -n a3 -c conf/ -f job/flume-dir-hdfs.conf 
    
    • 1

    在这里插入图片描述

    创建指定的源文件目录并且上传数据

    在这里插入图片描述
    HDFS存储结果
    在这里插入图片描述

    3.3 实时监控目录下的多个追加文件

      Exec source 适用于监控一个实时追加的文件,不能实现断点续传;Spooldir Source适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;而 Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传。

    • 案例需求:
      • 使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS

    需求分析:

    在这里插入图片描述

    创建一个文件

     vim flume-taildir-hdfs.conf
    
    • 1
    a3.sources = r3
    a3.sinks = k3
    a3.channels = c3
    
    # Describe/configure the source
    a3.sources.r3.type = TAILDIR
    a3.sources.r3.positionFile = /opt/module/flume-1.9.0/tail_dir.json
    a3.sources.r3.filegroups = f1 f2
    a3.sources.r3.filegroups.f1 = /opt/module/flume-1.9.0/files/.*file.*
    a3.sources.r3.filegroups.f2 = /opt/module/flume-1.9.0/files2/.*log.*
    
    # Describe the sink
    a3.sinks.k3.type = hdfs
    a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload2/%Y%m%d/%H
    #上传文件的前缀
    a3.sinks.k3.hdfs.filePrefix = upload-
    #是否按照时间滚动文件夹
    a3.sinks.k3.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a3.sinks.k3.hdfs.roundValue = 1
    #重新定义时间单位
    a3.sinks.k3.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a3.sinks.k3.hdfs.useLocalTimeStamp = true
    #积攒多少个 Event 才 flush 到 HDFS 一次
    a3.sinks.k3.hdfs.batchSize = 100
    #设置文件类型,可支持压缩
    a3.sinks.k3.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a3.sinks.k3.hdfs.rollInterval = 20
    #设置每个文件的滚动大小大概是 128M
    a3.sinks.k3.hdfs.rollSize = 134217700
    #文件的滚动与 Event 数量无关
    a3.sinks.k3.hdfs.rollCount = 0
    
    # Use a channel which buffers events in memory
    a3.channels.c3.type = memory
    a3.channels.c3.capacity = 1000
    a3.channels.c3.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a3.sources.r3.channels = c3
    a3.sinks.k3.channel = c3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    启动监控文件夹命令

    [atguigu@hadoop102 flume]$ bin/flume-ng agent -n a3 -c conf/ -f job/flume-taildir-hdfs.conf

    向 files 文件夹中追加内容
    在这里插入图片描述
    HDFS收到的存储内容
    file目录的存储
    在这里插入图片描述
    log目录的存储内容

    在这里插入图片描述

  • 相关阅读:
    低代码-业务流程引擎
    Linux目录--proc详解
    .net core中前端vue HTML5 History 刷新页面404问题
    HttpClient / Http客户端
    Qt SQLite的创建和使用
    计算机竞赛 : 题目:基于深度学习的水果识别 设计 开题 技术
    pnpm、npm、yarn 包管理工具『优劣对比』及『环境迁移』
    Spring的后处理器
    单层神经网络
    CENTOS下的常用命令(一)
  • 原文地址:https://blog.csdn.net/weixin_44606952/article/details/128087068