• 大数据采集工具Flume


    一.概述

    Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume 基于流式架构,灵活简单

    Flume最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS或者Kafka,最主要的,flume是实时采集的
    在这里插入图片描述

    1.基础架构

    在这里插入图片描述

    • Agent

    Agent 是一个 JVM 进程,主要有 3 个部分组成,Source、Channel、Sink,它以事件的形式将数据从源头送至目的

    • Source

    Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、taildir、sequence generator、syslog、http、legacy

    doPut:将数据写进临时缓冲区putList
    doCommit:检查channel内存队列是否足够合并
    doRollback:内存队列不足,回滚数据

    • Channel

    Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运作在不同的速率上Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个Sink 的读取操作

    Flume 自带两种 Channel:Memory Channel 和 File Channel

    Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据

    • Sink

    Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent,Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义

    doTake:先将数据取到临时缓冲区takeList
    doCommit:提交到Sink,成功提交清空takeList
    doRollback:发送出现异常,将takeList数据归还给内存队列

    • Event

    传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。Event 由 Header 和 Body 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组
    在这里插入图片描述

    2.拓扑结构

    简单串联

    这种模式是将多个 flume 顺序连接起来了,从最初的 source 开始到最终 sink 传送的目的存储系统。此模式不建议桥接过多的 flume 数量, flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统

    在这里插入图片描述

    复制和多路复用

    单 source,多 channel、sink,Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的地

    在这里插入图片描述

    负载均衡和故障转移

    Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能

    在这里插入图片描述

    聚合

    这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用 flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志

    在这里插入图片描述

    3.Agent内部原理

    在这里插入图片描述

    注意,一个sink最多对应一个channel,而一个channel可以对应更多sink

    Sinkprocessor有三种,第一种defaultsinkprocessor,一个channel只能绑定一个sink,第二种loadbalancingsinkprocessor,负载均衡,多个sink轮流查看channel,第三种failoversinkprocessor,故障转移,对多个sink可以配置优先级,若优先级高的sink挂掉后,转移到优先级低的sink

    二.安装Flume

    官网下载安装包

    http://archive.apache.org/dist/flume/1.9.0/

    在这里插入图片描述

    解压

    tar -zxvf ./apache-flume-1.9.0-bin.tar.gz -C .
    
    • 1

    改名

    mv ./apache-flume-1.9.0-bin/ ./flume-1.9
    
    • 1

    将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3

     rm /opt/module/flume-1.9/lib/guava-11.0.2.jar
    
    • 1

    三.入门案例

    1.监控端口

    需求

    使用 Flume 监听一个端口,收集该端口数据,并打印到控制台

    在这里插入图片描述

    实现步骤

    (1)安装 netcat 工具

    sudo yum install -y nc
    
    • 1

    (2)判断 44444 端口是否被占用

    sudo netstat -nlp | grep 44444
    
    • 1

    (3)创建 Flume Agent 配置文件 flume-netcat-logger.conf
    (4)在 flume 目录下创建 job 文件夹并进入 job 文件夹
    (5)在 job 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf

    [gzhu@hadoop102 flume-1.9]$ mkdir job
    [gzhu@hadoop102 flume-1.9]$ cd job/
    [gzhu@hadoop102 job]$ vim flume-netcat-logger.conf
    
    • 1
    • 2
    • 3

    (6)在 flume-netcat-logger.conf 文件中添加如下内容

    # Name the components on this agent
    # a1就是表示当前agent,单台机器里flume下a1要唯一
    a1.sources = r1 # r1表示a1的输入源
    a1.sinks = k1 # k1表示a1的输出目的地
    a1.channels = c1 # c1表示a1的缓冲区channel
    
    # Describe/configure the source
    a1.sources.r1.type = netcat  # source的类型
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink sink的类型 logger打印到控制台
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000 # channel最多放1000个event
    a1.channels.c1.transactionCapacity = 100 # 一个事务的做多放100个event
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1 # 一个source可以绑定多个channel
    a1.sinks.k1.channel = c1 # 一个sink最多绑定一个channel
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    (7)先开启 flume 监听端口

    bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
    
    • 1

    参数说明:
    -c:表示配置文件存储在 conf/目录
    -n:表示给 agent 起名为 a1
    -f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf文件
    -Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、error

    (8)使用 netcat 工具向本机的 44444 端口发送内容

     nc localhost 44444
    
    • 1

    在这里插入图片描述

    (9)在 Flume 监听页面观察接收数据情况

    在这里插入图片描述

    2.实时监控单个追加文件

    需求

    实时监控日志,并上传到 HDFS 中
    在这里插入图片描述

    实现步骤

    (1)Flume 要想将数据输出到 HDFS,依赖 Hadoop 相关 jar 包,上传到/opt/module/flume-1.9/lib,以下jar包可在Hadoop的依赖下找到

    在这里插入图片描述

    实现source是exec,代表执行一条命令,sink就是HDFS了

    (2)检查/etc/profile.d/my_env.sh 文件,确认 Hadoop 和 Java 环境变量配置正确

    #JAVA_HOME
    export JAVA_HOME=/opt/module/jdk1.8.0_212
    export PATH=$PATH:$JAVA_HOME/bin
    
    #HADOOP_HOME
    export HADOOP_HOME=/opt/module/hadoop-3.1.3
    export PATH=$PATH:$HADOOP_HOME/bin
    export PATH=$PATH:$HADOOP_HOME/sbin
    export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
    export HADOOP_CLASSPATH=`hadoop classpath`
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    (2)创建 flume-file-hdfs.conf 文件

    [gzhu@hadoop102 job]$ vim flume-file-hdfs.conf
    
    • 1

    注:要想读取 Linux 系统中的文件,就得按照 Linux 命令的规则执行命令。由于 Hive日志在 Linux 系统中所以读取文件的类型选择:exec 即 execute 执行的意思。表示执行Linux 命令来读取文件

    添加如下内容

    # Name the components on this agent
    a2.sources = r2
    a2.sinks = k2
    a2.channels = c2
    
    # Describe/configure the source
    a2.sources.r2.type = exec # 执行命令 tail -f 循环读取日志文件 默认最后10行 可能会重复数据
    a2.sources.r2.command = tail -F /opt/module/flume-1.9/exec.log
    
    # Describe the sink
    a2.sinks.k2.type = hdfs
    a2.sinks.k2.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
    
    #上传文件的前缀
    a2.sinks.k2.hdfs.filePrefix = logs
    
    #是否按照时间滚动文件夹
    a2.sinks.k2.hdfs.round = true
    #多少时间单位创建一个新的文件夹   1小时1次
    a2.sinks.k2.hdfs.roundValue = 1
    #重新定义时间单位
    a2.sinks.k2.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a2.sinks.k2.hdfs.useLocalTimeStamp = true
    
    
    #积攒多少个 Event 才 flush 到 HDFS 一次
    a2.sinks.k2.hdfs.batchSize = 100
    #设置文件类型,可支持压缩
    a2.sinks.k2.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a2.sinks.k2.hdfs.rollInterval = 60
    #设置每个文件的滚动大小
    a2.sinks.k2.hdfs.rollSize = 134217700
    #文件的滚动与 Event 数量无关
    a2.sinks.k2.hdfs.rollCount = 0
    # Use a channel which buffers events in memory
    a2.channels.c2.type = memory
    a2.channels.c2.capacity = 1000
    a2.channels.c2.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a2.sources.r2.channels = c2
    a2.sinks.k2.channel = c2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    (4)在flume-1.9文件下创建exec.log

    (5)运行 Flume

    [gzhu@hadoop102 flume-1.9]$ bin/flume-ng agent --conf conf/ --name  a2 --conf-file job/flume-file-hdfs.conf
    
    • 1

    (5)开启 Hadoop
    在这里插入图片描述

    (6)往文件中追加数据

    [gzhu@hadoop102 flume-1.9]$ echo spark >> exec.log
    [gzhu@hadoop102 flume-1.9]$ echo hive >> exec.log
    
    • 1
    • 2

    (7)在 HDFS 上查看文件
    在这里插入图片描述

    3.读取目录新文件到HDFS

    需求

    使用Flume监听整个目录上传到HDFS

    在这里插入图片描述

    实现source是spooldir,sink是hdfs

    实现步骤

    (1)创建配置文件 flume-dir-hdfs.conf

    创建一个文件

    [gzhu@hadoop102 job]$ vim flume-dir-hdfs.conf
    
    • 1

    添加如下内容

    a3.sources = r3
    a3.sinks = k3
    a3.channels = c3
    # Describe/configure the source
    a3.sources.r3.type = spooldir
    a3.sources.r3.spoolDir = /opt/module/flume-1.9/upload
    a3.sources.r3.fileSuffix = .COMPLETED
    a3.sources.r3.fileHeader = true
    #忽略所有以.tmp 结尾的文件,不上传
    a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
    # Describe the sink
    a3.sinks.k3.type = hdfs
    a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload/%Y%m%d/%H
    #上传文件的前缀
    a3.sinks.k3.hdfs.filePrefix = upload- #是否按照时间滚动文件夹
    a3.sinks.k3.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a3.sinks.k3.hdfs.roundValue = 1
    #重新定义时间单位
    a3.sinks.k3.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a3.sinks.k3.hdfs.useLocalTimeStamp = true
    #积攒多少个 Event 才 flush 到 HDFS 一次
    a3.sinks.k3.hdfs.batchSize = 100
    #设置文件类型,可支持压缩
    a3.sinks.k3.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a3.sinks.k3.hdfs.rollInterval = 60
    #设置每个文件的滚动大小大概是 128M
    a3.sinks.k3.hdfs.rollSize = 134217700
    #文件的滚动与 Event 数量无关
    a3.sinks.k3.hdfs.rollCount = 0
    # Use a channel which buffers events in memory
    a3.channels.c3.type = memory
    a3.channels.c3.capacity = 1000
    a3.channels.c3.transactionCapacity = 100
    # Bind the source and sink to the channel
    a3.sources.r3.channels = c3
    a3.sinks.k3.channel = c3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    (2)启动监控文件夹命令

    [gzhu@hadoop102 flume-1.9]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf
    
    • 1

    说明:在使用 Spooling Directory Source 时,不要在监控目录中创建并持续修改文件;上传完成的文件会以.COMPLETED 结尾;被监控文件夹每 500 毫秒扫描一次文件变动

    (3)向 upload 文件夹中添加文件

    在/opt/module/flume-1.9 目录下创建 upload 文件夹

    [gzhu@hadoop102 flume-1.9]$ mkdir upload
    
    • 1

    向 upload 文件夹中添加文件

    [gzhu@hadoop102 upload]$ touch gzhu.log
    [gzhu@hadoop102 upload]$ touch gzhu.tmp
    [gzhu@hadoop102 upload]$ touch gzhu.txt
    
    • 1
    • 2
    • 3

    (4)查看 HDFS 上的数据

    在这里插入图片描述

    注意,当我们监控这个目录时,文件一放进去,就立马被上传到了HDFS,并且已经上传了的文件会被标记成.COMPLETED,这样才不会导致重复上传,这样我们不能修改文件了,因为即使修改了,由于有后缀也不会上传了

    在这里插入图片描述

    4.实时监控多个追加文件

    需求:使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS
    在这里插入图片描述

    实现source是taildir(可以断点续传),实现sink是hdfs

    /opt/module/flume-1.9/job
    
    vim flume-taildir-hdfs.conf
    
    • 1
    • 2
    • 3
    a3.sources = r3
    a3.sinks = k3
    a3.channels = c3
    # Describe/configure the source
    a3.sources.r3.type = TAILDIR
    a3.sources.r3.positionFile = /opt/module/flume-1.9/tail_dir.json
    a3.sources.r3.filegroups = f1 f2
    a3.sources.r3.filegroups.f1 = /opt/module/flume-1.9/files/.*file.*
    a3.sources.r3.filegroups.f2 = /opt/module/flume-1.9/files2/.*log.*
    # Describe the sink
    a3.sinks.k3.type = hdfs
    a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload2/%Y%m%d/%H
    #上传文件的前缀
    a3.sinks.k3.hdfs.filePrefix = upload-
    
    #是否按照时间滚动文件夹
    a3.sinks.k3.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a3.sinks.k3.hdfs.roundValue = 1
    #重新定义时间单位
    a3.sinks.k3.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a3.sinks.k3.hdfs.useLocalTimeStamp = true
    #积攒多少个 Event 才 flush 到 HDFS 一次
    a3.sinks.k3.hdfs.batchSize = 100
    #设置文件类型,可支持压缩
    a3.sinks.k3.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a3.sinks.k3.hdfs.rollInterval = 60
    #设置每个文件的滚动大小大概是 128M`在这里插入代码片`
    a3.sinks.k3.hdfs.rollSize = 134217700
    #文件的滚动与 Event 数量无关
    a3.sinks.k3.hdfs.rollCount = 0
    # Use a channel which buffers events in memory
    a3.channels.c3.type = memory
    a3.channels.c3.capacity = 1000
    a3.channels.c3.transactionCapacity = 100
    # Bind the source and sink to the channel
    a3.sources.r3.channels = c3
    a3.sinks.k3.channel = c3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf
    
    • 1

    在/opt/module/flume-1.9 目录下创建 files 文件夹

    mkdir files
    
    mkdir files2
    
    • 1
    • 2
    • 3

    进入文件夹并创建文件,通过echo进行数据的追加

    cd file1
    
    touch file1.txt
    
    echo hello >> file1
    
    • 1
    • 2
    • 3
    • 4
    • 5

    tail_dir.json记录了每个文件追加到了哪个位置,因此可以准确的追加数据,但是,万一inode或者file有一个变化,那么就代表只是一个新的文件,flume认为该文件从来没有被上传过HDFS
    在这里插入图片描述

    注意点

    1.监控的多个文件,这些文件的追加内容会进入一个文件里(滚动时间范围内)

    2.假如我们监控Hive的日志,当天Hive的日志文件名字是hive.log,此时新增加的数据会被上传,但是当新的一天开始时,当天的hive.log会更名,会加上昨天的日期,例如,hive.log.2022-7-7,这样flume会认为这是一个新的文件,于是就导致了重复上传了

    我们就写死,只监控hive.log这个文件,但是也有问题,比如,在晚上11.30机器挂掉了,而我们第二天才修复,那么此时昨天那半小时的数据已经监控不到了,因为已经改名了,不是hive.log了,所以这样会丢数据

    解决办法有两种,第一种是找管控日志的人协商,生成日志时就带有日期,这样一天生成一本日志就好了。第二种办法就是修改flume的源码,只负责检测inode,这样文件改名后由于inode没变也不会重新上传了

    四.进阶案例

    1.Chanel Selector

    1.1 replicating

    需求

    使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem,这是replicating
    在这里插入图片描述

    很明显,我们使用了3个agent,因此需要3个flume配置文件,我们统一将这个三个配置文件放在job目录下的group1文件夹下

    第一个配置文件 flume-file-flume.conf

    配置 1 个接收日志文件的 source 和两个 channel、两个 sink,分别输送给 flume-flume-hdfs 和 flume-flume-dir

    vim flume-file-flume.conf
    
    • 1
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1 c2
    # 将数据流复制给所有 channel
    a1.sources.r1.selector.type = replicating
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/module/flume-1.9/files/exec.log
    a1.sources.r1.shell = /bin/bash -c
    # Describe the sink
    # sink 端的 avro 是一个数据发送者
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop102
    a1.sinks.k1.port = 4141
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = hadoop102
    a1.sinks.k2.port = 4142
    # Describe the channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    a1.channels.c2.type = memory
    a1.channels.c2.capacity = 1000
    a1.channels.c2.transactionCapacity = 100
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1 c2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    第二个配置文件 flume-flume-hdfs.conf

    配置上级 Flume 输出的 Source,输出是到 HDFS 的 Sink

    vim flume-flume-hdfs.conf
    
    • 1
    # Name the components on this agent
    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    # Describe/configure the source
    # source 端的 avro 是一个数据接收服务
    a2.sources.r1.type = avro
    a2.sources.r1.bind = hadoop102
    a2.sources.r1.port = 4141
    # Describe the sink
    a2.sinks.k1.type = hdfs
    a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume2/%Y%m%d/%H
    #上传文件的前缀
    a2.sinks.k1.hdfs.filePrefix = flume2- 
    #是否按照时间滚动文件夹
    a2.sinks.k1.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a2.sinks.k1.hdfs.roundValue = 1
    #重新定义时间单位
    a2.sinks.k1.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a2.sinks.k1.hdfs.useLocalTimeStamp = true
    #积攒多少个 Event 才 flush 到 HDFS 一次
    a2.sinks.k1.hdfs.batchSize = 100
    #设置文件类型,可支持压缩
    a2.sinks.k1.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a2.sinks.k1.hdfs.rollInterval = 30
    #设置每个文件的滚动大小大概是 128M
    a2.sinks.k1.hdfs.rollSize = 134217700 
    #文件的滚动与 Event 数量无关
    a2.sinks.k1.hdfs.rollCount = 0
    # Describe the channel
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    第三个配置文件 flume-flume-dir.conf

    配置上级 Flume 输出的 Source,输出是到本地目录的 Sink

    vim flume-flume-dir.conf
    
    • 1
    # Name the components on this agent
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c2
    # Describe/configure the source
    a3.sources.r1.type = avro
    a3.sources.r1.bind = hadoop102
    a3.sources.r1.port = 4142
    # Describe the sink
    a3.sinks.k1.type = file_roll
    a3.sinks.k1.sink.directory = /opt/module/flume-1.9/data
    # Describe the channel
    a3.channels.c2.type = memory
    a3.channels.c2.capacity = 1000
    a3.channels.c2.transactionCapacity = 100
    # Bind the source and sink to the channel
    a3.sources.r1.channels = c2
    a3.sinks.k1.channel = c2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    提示:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录

    执行配置文件

    先启动服务端,再启动客户端

    bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf
    
    • 1
    bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf
    
    • 1
    bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf
    
    • 1

    向exec.log追加数据

    echo My Name is Jack ! >> exec.log
    
    • 1
    1.2 multiplexing

    我们来看下官方的案例,一个source连接了4个channel,当我们选择器是multiplexing时,首先判断event中的头信息,我们知道,一个event是由头信息和body组成的,而头信息是一个K-V结构,那么下面这段代码应该清楚了,判断state,如果是CZ,发送c1这个channel,如果是US,发往c2和c3,否则发往c4,那现在问题来了,怎么向一个event里头信息里添加state-V这个数据呢?用拦截器

    a1.sources = r1
    a1.channels = c1 c2 c3 c4
    a1.sources.r1.selector.type = multiplexing
    a1.sources.r1.selector.header = state        #以每个Event的header中的state这个属性的值作为选择channel的依据
    a1.sources.r1.selector.mapping.CZ = c1       #如果state=CZ,则选择c1这个channel
    a1.sources.r1.selector.mapping.US = c2 c3    #如果state=US,则选择c2 和 c3 这两个channel
    a1.sources.r1.selector.default = c4          #默认使用c4这个channel
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    拦截器

    我们想要实现拦截器,需要自定义实现拦截器类

    <dependency>
      <groupId>org.apache.flumegroupId>
      <artifactId>flume-ng-coreartifactId>
      <version>1.9.0version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    // 注意 Interceptor 接口是flume下的
    public class TypeInterceptor implements Interceptor {
        private List<Event> addList;
    
        public void initialize() {
            addList = new ArrayList<Event>();
        }
        // 单个事件处理方法
        public Event intercept(Event event) {
            // 头信息是个map结构,我们首先获取头信息结构
            Map<String, String> headers = event.getHeaders();
            // body默认是字节数组 我们转成字符串就行了
            String body = new String(event.getBody());
            // 我们拿到body也就是发送过来的数据了,再进行加工就可以了
            // 我们这里假设  将点击行为 和 支付行为分开 将来我们区分的时候,就是根据某一个key的不同value值进行区分的!
            if(body.contains("click")){
                // key随便起名字
                headers.put("type","click");
            }else {
                headers.put("type","pay");
            }
            // event进行了加工,添加了头信息 返回就好了
            return event;
        }
        // 多个事件的处理
        public List<Event> intercept(List<Event> list) {
            // 先清空我们定义的集合 因为不同批次的事件肯定不能再发一遍 清空 保证集合中只有本批次的事件
            addList.clear();
    
            list.forEach(event -> {
                // 单个事件的处理逻辑和上面是一样的,直接调用方法就好了
                Event intercept = intercept(event);
                addList.add(intercept);
            });
    
            return addList;
        }
    
        public void close() {
    
        }
    
        // 这里我们要特别注意 官网的案例要求我们必须有一个Builder
        public static class Builder implements Interceptor.Builder{
    
            @Override
            public Interceptor build() {
                return new TypeInterceptor();
            }
    
            @Override
            public void configure(Context context) {
    
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    打包,将jar包放到flume的lib下

    我们只需要将上面复制的案例改成多路复用就行了
    在这里插入图片描述

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1 c2
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    # 区别就在这里
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.gzhu.interceptor.TypeInterceptor$Builder
    a1.sources.r1.selector.type = multiplexing
    a1.sources.r1.selector.header = type
    a1.sources.r1.selector.mapping.click = c1
    a1.sources.r1.selector.mapping.pay = c2
    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop103
    a1.sinks.k1.port = 4141
    a1.sinks.k2.type=avro
    a1.sinks.k2.hostname = hadoop104
    a1.sinks.k2.port = 4242
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # Use a channel which buffers events in memory
    a1.channels.c2.type = memory
    a1.channels.c2.capacity = 1000
    a1.channels.c2.transactionCapacity = 100
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1 c2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    2.Sink Processors

    你可以把多个sink分成一个组, 这时候Sink组逻辑处理器(Flume Sink Processors)可以对这同一个组里的几个sink进行负载均衡或者其中一个sink发生故障后将输出Event的任务转移到其他的sink上

    Sinkprocessor有三种,第一种defaultsinkprocessor,一个channel只能绑定一个sink,第二种loadbalancingsinkprocessor,负载均衡,多个sink轮流查看channel,第三种failoversinkprocessor,故障转移,对多个sink可以配置优先级,若优先级高的sink挂掉后,转移到优先级低的sink

    2.1 failoversinkprocessor

    我们假设flume-2的优先级高,它挂掉后将数据送到flume-3
    在这里插入图片描述
    在/opt/module/flume-1.9/job 目录下创建 group2 文件夹

    第一个配置文件 flume-file-flume.conf

    # Name the components on this agent
    a1.sources = r1
    a1.channels = c1
    a1.sinkgroups = g1
    a1.sinks = k1 k2
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/module/flume-1.9/files/exec.log
    a1.sources.r1.shell = /bin/bash -c
    
    a1.sinkgroups.g1.processor.type = failover
    a1.sinkgroups.g1.processor.priority.k1 = 5
    a1.sinkgroups.g1.processor.priority.k2 = 10
    a1.sinkgroups.g1.processor.maxpenalty = 10000
    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop102
    a1.sinks.k1.port = 4141
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = hadoop102
    a1.sinks.k2.port = 4142
    # Describe the channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    第二个配置文件 flume-flume-console1.conf

    # Name the components on this agent
    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    # Describe/configure the source
    a2.sources.r1.type = avro
    a2.sources.r1.bind = hadoop102
    a2.sources.r1.port = 4141
    # Describe the sink
    a2.sinks.k1.type = logger
    # Describe the channel
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    第三个配置文件 flume-flume-console2.conf

    # Name the components on this agent
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c2
    # Describe/configure the source
    a3.sources.r1.type = avro
    a3.sources.r1.bind = hadoop102
    a3.sources.r1.port = 4142
    # Describe the sink
    a3.sinks.k1.type = logger
    # Describe the channel
    a3.channels.c2.type = memory
    a3.channels.c2.capacity = 1000
    a3.channels.c2.transactionCapacity = 100
    # Bind the source and sink to the channel
    a3.sources.r1.channels = c2
    a3.sinks.k1.channel = c2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    执行配置文件

    bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
    
    bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
    
    bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-file-flume.conf
    
    • 1
    • 2
    • 3
    • 4
    • 5

    向exec.log追加数据

    echo zhang >> exec.log
    
    • 1

    可以看到数据到了console2,也就是4142端口,10>5,优先级高
    在这里插入图片描述

    断开4142,再追加数据

    在这里插入图片描述
    数据到了console1,也就是4141端口
    在这里插入图片描述

    2.2 load_balance

    负载均衡Sink 选择器提供了在多个sink上进行负载均衡流量的功能。 它维护一个活动sink列表的索引来实现负载的分配。 默认支持了轮询(round_robin)和随机(random)两种选择机制分配负载。 默认是轮询,可以通过配置来更改。也可以从 AbstractSinkSelector 继承写一个自定义的选择器

    工作时,此选择器使用其配置的选择机制选择下一个sink并调用它。 如果所选sink无法正常工作,则处理器通过其配置的选择机制选择下一个可用sink。 此实现不会将失败的Sink列入黑名单,而是继续乐观地尝试每个可用的Sink

    如果所有sink调用都失败了,选择器会将故障抛给sink的运行器

    如果backoff设置为true则启用了退避机制,失败的sink会被放入黑名单,达到一定的超时时间后会自动从黑名单移除。 如从黑名单出来后sink仍然失败,则再次进入黑名单而且超时时间会翻倍,以避免在无响应的sink上浪费过长时间。 如果没有启用退避机制,在禁用此功能的情况下,发生sink传输失败后,会将本次负载传给下一个sink继续尝试,因此这种情况下是不均衡的

    我们只需要将上面flume-file-flume.conf改一点参数就可以了

    a1.sinkgroups = g1
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinkgroups.g1.processor.type = load_balance
    a1.sinkgroups.g1.processor.backoff = true
    a1.sinkgroups.g1.processor.selector = random # 也可以round_robin
    
    • 1
    • 2
    • 3
    • 4
    • 5

    a1.sinkgroups.g1.processor.backoff = true

    失败的sink是否成倍地增加退避它的时间。 如果设置为false,负载均衡在某一个sink发生异常后,下一次选择sink的时候仍然会将失败的这个sink加入候选队列; 如果设置为true,某个sink连续发生异常时会成倍地增加它的退避时间,在退避的时间内是无法参与负载均衡竞争的。退避机制只统计1个小时(可以设置)发生的异常,超过1个小时没有发生异常就会重新计算

    3.聚合

    采集多台服务器的日志文件,聚合到HDFS
    在这里插入图片描述

    由于涉及了多台机器,首先要分发 Flume

    xsync /opt/module/flume-1.9
    
    • 1
    [gzhu@hadoop102 job]$ mkdir group3
    
    [gzhu@hadoop103 job]$ mkdir group3
    
    [gzhu@hadoop104 job]$ mkdir group3
    
    • 1
    • 2
    • 3
    • 4
    • 5

    hadoop102编写配置文件

    [gzhu@hadoop102 job]$ cd group3
    
    vim flume1-exec-flume.conf
    
    • 1
    • 2
    • 3
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/module/tmp/exec.log
    a1.sources.r1.shell = /bin/bash -c
    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop104
    a1.sinks.k1.port = 4141
    
    # Describe the channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    hadoop103编写配置文件

    vim flume2-exec-flume.conf
    
    • 1
    # Name the components on this agent
    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    # Describe/configure the source
    a2.sources.r1.type = exec
    a2.sources.r1.command = tail -F /opt/module/tmp/input.log
    a2.sources.r1.shell = /bin/bash -c
    # Describe the sink
    a2.sinks.k1.type = avro
    a2.sinks.k1.hostname = hadoop104
    a2.sinks.k1.port = 4141
    
    # Describe the channel
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    hadoop104收集

    vim flume3-flume-hdfs.conf
    
    • 1
    # Name the components on this agent
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c1
    # Describe/configure the source
    a3.sources.r1.type = avro
    a3.sources.r1.bind = hadoop104
    a3.sources.r1.port = 4141
    
    a3.sinks.k1.type = hdfs
    a3.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume4/upload/%Y%m%d/%H
    #上传文件的前缀
    a3.sinks.k1.hdfs.filePrefix = upload- 
    #是否按照时间滚动文件夹
    a3.sinks.k1.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a3.sinks.k1.hdfs.roundValue = 1
    #重新定义时间单位
    a3.sinks.k1.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a3.sinks.k1.hdfs.useLocalTimeStamp = true
    #积攒多少个 Event 才 flush 到 HDFS 一次
    a3.sinks.k1.hdfs.batchSize = 100
    #设置文件类型,可支持压缩
    a3.sinks.k1.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a3.sinks.k1.hdfs.rollInterval = 60
    #设置每个文件的滚动大小大概是 128M
    a3.sinks.k1.hdfs.rollSize = 134217700
    #文件的滚动与 Event 数量无关
    a3.sinks.k1.hdfs.rollCount = 0
    
    
    # Describe the channel
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 1000
    a3.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a3.sources.r1.channels = c1
    a3.sinks.k1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    分别启动

    [gzhu@hadoop104 flume-1.9]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-hdfs.conf 
    
    
    [gzhu@hadoop103 flume-1.9]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume2-exec-flume.conf
    
    
    [gzhu@hadoop102 flume-1.9]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume1-exec-flume.conf
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    测试

    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述

    五.Flume数据流监控

    Ganglia 由 gmond、gmetad 和 gweb 三部分组成

    gmond(Ganglia Monitoring Daemon)是一种轻量级服务,安装在每台需要收集指标数据的节点主机上。使用 gmond,你可以很容易收集很多系统指标数据,如 CPU、内存、磁盘、网络和活跃进程的数据等

    gmetad(Ganglia Meta Daemon)整合所有信息,并将其以 RRD 格式存储至磁盘的服务

    gweb(Ganglia Web)Ganglia 可视化工具,gweb 是一种利用浏览器显示 gmetad 所存储数据的 PHP 前端。在 Web 界面中以图表方式展现集群的运行状态下收集的多种不同指标数据

    ==安装 ganglia ==

    (1)规划

    hadoop102: web gmetad gmod
    hadoop103: gmod
    hadoop104: gmod
    
    • 1
    • 2
    • 3

    (2)在 102 103 104 分别安装 epel-release

    [gzhu@hadoop102 flume-1.9]$ sudo yum -y install epel-release

    (3)在 102 安装

    [gzhu@hadoop102 flume-1.9]$ sudo yum -y install ganglia-gmetad
    [gzhu@hadoop102 flume-1.9]$ sudo yum -y install ganglia-web
    [gzhu@hadoop102 flume-1.9]$ sudo yum -y install ganglia-gmond

    (4)在 103 和 104 安装
    [gzhu@hadoop102 flume-1.9]$ sudo yum -y install ganglia-gmond

    (5)在 102 修改配置文件/etc/httpd/conf.d/ganglia.conf

    [gzhu@hadoop102 flume-1.9]$ sudo vim /etc/httpd/conf.d/ganglia.conf

    # Ganglia monitoring system php web frontend
    #
    Alias /ganglia /usr/share/ganglia
    <Location /ganglia>
     # Require local
     # 通过 windows 访问 ganglia,需要配置 Linux 对应的主机(windows)ip 地址
     Require ip windows主机ipv4,ipconfig查看
     # Require ip 10.1.2.3
     # Require host example.org
    </Location>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    在这里插入图片描述

    (6)在 102 修改配置文件/etc/ganglia/gmetad.conf

    [gzhu@hadoop102 flume-1.9]$ sudo vim /etc/ganglia/gmetad.conf

    修改为:

    data_source “my cluster” hadoop102

    在这里插入图片描述
    (7)在 102 103 104 修改配置文件/etc/ganglia/gmond.conf

    [gzhu@hadoop102 flume-1.9]$ sudo vim /etc/ganglia/gmond.conf

    修改为: 修改三个地方 name host bind

    cluster {
     name = "my cluster"   # 修改1
     owner = "unspecified"
     latlong = "unspecified"
     url = "unspecified"
    }
    udp_send_channel {
     #bind_hostname = yes # Highly recommended, soon to be default.
     # This option tells gmond to use a source 
    address
     # that resolves to the machine's hostname. 
    Without
     # this, the metrics may appear to come from 
    any
     # interface and the DNS names associated with
     # those IPs will be used to create the RRDs.
     # mcast_join = 239.2.11.71
     # 数据发送给 hadoop102
     host = hadoop102  # 修改2 注意每台机器不同
     port = 8649
     ttl = 1
    }
    
    udp_recv_channel {
     # mcast_join = 239.2.11.71
     port = 8649
    # 接收来自任意连接的数据
     bind = 0.0.0.0  # 修改3
     retry_bind = true
     # Size of the UDP buffer. If you are handling lots of metrics 
    you really
     # should bump it up to e.g. 10MB or even higher.
     # buffer = 10485760
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    (8)在 102 修改配置文件/etc/selinux/config

    [gzhu@hadoop102 flume-1.9]$ sudo vim /etc/selinux/config

    修改为:

    # This file controls the state of SELinux on the system.
    # SELINUX= can take one of these three values:
    # enforcing - SELinux security policy is enforced.
    # permissive - SELinux prints warnings instead of enforcing.
    # disabled - No SELinux policy is loaded.
    SELINUX=disabled
    # SELINUXTYPE= can take one of these two values:
    # targeted - Targeted processes are protected,
    # mls - Multi Level Security protection.
    SELINUXTYPE=targeted
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    临时生效

    sudo setenforce 0
    
    • 1

    (9)启动 ganglia (1)在 102 103 104 启动

    [gzhu@hadoop102 flume-1.9]$ sudo systemctl start gmond
    
    • 1

    在 102 启动

    sudo systemctl start httpd
    sudo systemctl start gmetad
    
    • 1
    • 2

    (10)打开网页浏览 ganglia 页面
    http://hadoop102/ganglia

    如果完成以上操作依然出现权限不足错误,请修改/var/lib/ganglia 目录的权限:

    [gzhu@hadoop102 flume-1.9]$ sudo chmod -R 777 /var/lib/ganglia

    在这里插入图片描述
    启动 Flume 任务 时加上参数

    bin/flume-ng agent \
    -c conf/ \
    -n a1 \ 
    -f job/flume-netcat-logger.conf \ 
    -Dflume.root.logger=INFO,console \ 
    -Dflume.monitoring.type=ganglia \ 
    -Dflume.monitoring.hosts=hadoop102:8649
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在这里插入图片描述
    在这里插入图片描述

  • 相关阅读:
    【C++】运算符重载 ① ( 运算符重载简介 | 运算符重载推衍 | 普通类型数据相加 | 对象类型数据相加 - 普通函数实现 / 运算符重载实现 | 运算符重载调用 - 函数名调用 / 运算符调 )
    配置 clangd 使用指定标准和编译器的规则
    基于Android的日程管理工具
    数据加密的常见方法
    漏洞分析丨HEVD-0x8.IntegerOverflow[win7x86]
    【力扣刷题】Day11——栈和队列专题
    Maven install 报错程序包不存在问题的解决
    荧光染料BDP FL carboxylic acid,BDP FL COOH/ACID/羧基
    three.js实现管道漫游
    Linux进程【1】进程概念(超详解哦)
  • 原文地址:https://blog.csdn.net/ks_1998/article/details/125574442