• Flume


    1. 概述

    1.1 定义

    Flume是 Cloucera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flune基于流式架构。灵活简单

    1.2 Flume可以对接的数据源?

    Console、RPC、Text、Tail、Syslog、Exec等

    1.3 Flume接受的数据源输出目标?

    磁盘,hdfs,hbase, 经过网络传输kafka

    data->flume->Kafka->spark streaming/ storm / flink -> HBase, MySQL

    1.4 agent

    Flume中最核心的角色是agent,flume采集系统就是由一个个agent连接起来所形成的一个或简单或复杂的数据传输通道。

    对于每一个Agent来说,它就是一个独立的守护进程(JVM),它负责从数据源接收数据,并发往下一个目的地,如下图所示:
    在这里插入图片描述
    每一个agent相当于一个数据(被封装成Event对象)传递员,内部有3个核心组件:

    Source: 采集组件,用于跟数据源对接,以获取数据;它有各种各样的内置实现;
    Sink: 下沉组件,用于往下一级agent传递数据或者向最终存储系统传递数据
    Channel: 传输通道组件,用于从source将数据传递到sink

    1. file: 保证数据不丢失,速度相对较慢
    2. memory: 数据可能会丢失,速度较快

    当数据传输完成之后,该event才从通道中进行移除–(可靠性)

    1.5 Flume特性

    • Flume 是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。
    • Flume 可以采集文件,socket 数据包、文件、文件夹、kafka 等各种形式源数据,
    • 又可以将采集到的数据输出到 HDFS 、hbase 、hive 、kafka 等众多外部存储系统中
    • 对一般的采集需求,通过对 flume 的简单配置即可实现
    • Flume 针对特殊场景也具备良好的自定义扩展能力,因此,flume 可以适用于大部分的日常数据采集场景
    • Flume 的管道是基于事务,保证了数据在传送和接收时的一致性.
    • Flume 支持多路径流量,多管道接入流量,多管道接出流量,上下文路由等。

    2. Flume核心

    2.1 Flume 事件

    • Event 对象是Flume内部数据传输的最基本单元

    两部分组成:Event是由一个转载数据的字节数组+一个可选头部构成

    • Event 由零个或者多个header和正文body组成

    • Header 是 key/value 形式的,可以用来制造路由决策或携带其他结构化信息(如事件的时间戳或事件来源的服务器主机名)。你可以把它想象成和 HTTP 头一样提供相同的功能——通过该方法来传输正文之外的额外信息。

    • Body是一个字节数组,包含了实际的内容

    2.2 Flume Agent

    在这里插入图片描述

    • Flume内部有一个或者多个Agent

    • 每一个Agent是一个独立的守护进程(JVM)

    • 从客户端哪儿接收收集,或者从其他的Agent哪儿接收,然后迅速的将获取的数据传给下一个目的节点Agent

    • Agent主要由source、channel、sink三个组件组成。

    在这里插入图片描述

    2.2.1 Agent Source

    • 一个Flume源

    • 负责一个外部源(数据发生器),如一个web服务器传递给他的事件

    • 该外部源将它的事件以Flume可以识别的格式发送到Flume中

    • 当一个Flume源接收到一个事件时,其将通过一个或者多个通道存储该事件

    2.2.2 Agent Channel

    • 通道:采用被动存储的形式,即通道会缓存该事件直到该事件被sink组件处理

    • 所以Channel是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉,它在source和sink间起着桥梁的作用,channel是一个完整的事务,这一点保证了数据在收发的时候的一致性. 并且它可以和任意数量的source和sink链接

    • 可以通过参数设置event的最大个数

    • Flume通常选择FileChannel,而不使用Memory Channel

    Memory Channel: 内存存储事务,吞吐率极高,但存在丢数据风险
    File Channel: 本地磁盘的事务实现模式,保证数据不会丢失(WAL实现)write ahead log ( 将日志预写先写到磁盘)

    2.2.3 Agent Sink

    • Sink会将事件从Channel中移除,并将事件放置到外部数据介质上

    例如:通过Flume HDFS Sink将数据放置到HDFS中,或者放置到下一个Flume的Source,等到下一个Flume处理。

    • 对于缓存在通道中的事件,Source和Sink采用异步处理的方式

    • Sink成功取出Event后,将Event从Channel中移除

    • Sink必须作用于一个确切的Channel

    • 不同类型的Sink:

    存储Event到最终目的的终端:HDFS、Hbase
    自动消耗:Null Sink
    用于Agent之间通信:Avro

    2. Flume事务

    在这里插入图片描述

    2.1 概念

    2.1.1 事务的介绍

    flume中的事务和关系型数据库中的事务是不同的,只是名称一样!

    ​ Flume中的事务代表的是一批要以原子性写入到channel或要从channel中移除的events

    2. 1.1.1 put事务

    ​ put事务指source将封装好的event,交给ChannelProcessor,ChannelProcessor在处理这批events时,

    doPut: 将批处理数据先写入到临时缓冲区putList
    doCommit : 检查channle内存队列是否足够合并
    doRollback : channle内存队列空间不足,回滚数据

    先把events放入到putList中,放入完成后,一次性commit(),这批events就可以成功写入到channel!

    ​ 写入成功后,执行清空putList操作!

    ​ 如果在过程中,发生任何异常,此时执行rollback(),rollback()会回滚putList,回滚也会直接清空putList!

    2.1.1.2 take事务

    ​ take事务指sink不断从channel中获取event!,每获取一批event中的一个,都会将这个event放入takeList中!

    doTake : 将数据取到临时缓冲区takeList.并将数据发送到HDFS
    doCommit : 如果数据全部发送成功,则清除临时缓冲区takeList
    doRollback :数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channle内存队列

    一般一批evnet全部写入,执行commit()方法,这个方法会清空takeList!如果在此期间,发生了异常,执行rollback(),此时会回滚takeList中的这批event到channel!

    2.1.2 事务的数据重复情况

    2.1.2.1 take事务的数据重复情况

    假如一个事务中,一部分event已经写入到目的地,但是随着事务的回滚,这些event可能重复写入!

    2.1.2.2 put事务的数据重复的情况

    如果一个source对接多个channel , 可能出现,一批数据 某些channelput完了,但是另一些失败了!

    此时channel Processor会重试对此批数据的put,包括之前已经写成功的channel!此时,之前写成功的channel,就会出现重复!

    2.1.3 事务中的数量关系

    //每个source和sink都可以配置batchSize,batchSize代表一批数据的数量。
    batchSize: 
    //putList和takeList的大小!(在channel的参数中配置)
    transcationCapacity: 
    //指channel中存储event的容量!
    capacity: 
    batchSize <= transcationCapacity <= capacity
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2.2 消息队列或传输框架的设计语义

    at least once : 至少一次,在异常时有可能出现数据重复!

    at most once : 最多一次,在异常时有可能出现数据丢失!

    exactly once : 精准一次,不管是什么情况,数据只精准一次,不会重复,也不会丢失!

    2. 安装与使用

    1. 下载地址
    https://archive.apache.org/dist/flume/
    
    • 1
    1. 安装过程省略
    2. 在conf文件夹内编写配置文件
    # The configuration file needs to define the sources, 
    # the channels and the sinks.
    # Sources, channels and sinks are defined per agent, 
    # in this case called 'agent'
    
    agent.sources = s1
    agent.channels = c1
    agent.sinks = r1
    
    # 设置 source从指定文件夹读取文件
    agent.sources.s1.type =spooldir 
    agent.sources.s1.spoolDir = 指定文件夹
    agent.sources.s1.channels = c1
    agent.sources.s1.deserializer.maxLineLength = 8192
    
    # 设置 sink 下游为kafuka ,并设置kafuka 服务ip和端口
    agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
    agent.sinks.r1.kafka.bootstrap.servers = IP:port
    agent.sinks.r1.serializer.class=kafka.serializer.StringEncoder
    agent.sinks.r1.kafka.topic = kafka的topic名称
    agent.sinks.r1.request.required.acks=0
    agent.sinks.r1.max.message.size=1000
    
    agent.sinks.r1.channel = c1
    
    # Each channel's type is defined.
    # 设置渠道类别为内存
    agent.channels.c1.type = memory
    # 通道中存储的event事件的最大数量
    agent.channels.c1.capacity = 10000
    agent.channels.c1.transactionCapacity = 10000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    1. 启动脚本
    nohup /data/software/flume/bin/flume-ng agent --conf conf --conf-file 启动的配置文件 --name agent -Dflume.root.logger=INFO,console
    
    • 1

    3.

  • 相关阅读:
    多元函数奇偶性
    2020年全国职业院校技能竞赛网络安全数据包分析capture.pcapng
    CentOS 7 下安装 MySQL 8.x
    azkban设置重试不起作用,且有的任务一直running,无日志
    ceph集群巡检项
    破局数据分析滞后难题,赋能企业高速增长的指标管理解决方案
    使用 RAFT 的光流:第 1 部分
    js 继承
    如何创建默认的docker0网桥
    SpringAop和Redis实现分布式锁限制接口重复提交
  • 原文地址:https://blog.csdn.net/abcd741258358/article/details/127407654