• 大数据之Flume


    Flume概述

    一个高可用(稳定),高可靠(稳定),分布式的海量日志采集,聚合和传输的系统。Flume基于流式架构,灵活简单。日志文件即txt文件,不能传输音频,视频,office等其他文件。Flume最主要的作用是实时读取服务器本地磁盘的数据,将数据写入到HDFS

    架构

    • Web Server一般为java后台程序

    • Agent

      • Source: 对接各种web server程序,从中读取数据
      • Channel:用来作为缓冲区,平衡读取和写入速度的不一致问题。
        • Memory Channel: 速度快,容易丢失
        • File Channel: 速度慢,不容易丢失
      • Sink: 对接各种传输终点,Sink不断轮询Channel中的事件且批量的移除它们到对应系统中
    • 文件系统:比如HDFS,传输的终点

    • Event: Flume数据传输的基本单元

    安装部署

    1. 官网下载:http://flume.apache.org/
    2. 解压后修改文件名称为flume
    3. 修改conf 下log4j2.xml确定打印的位置,日志会跟随运行路径放置flume日志。
    <Properties>
        <Property name="LOG_DIR">/opt/module/flume/logsProperty>
      Properties>
    
    • 1
    • 2
    • 3

    入门案例

    1. 监控端口数据官方案例

    1. 安装netcat工具,该工具用于端口传话sudo yum install -y nc
    2. 一个窗口使用nc -lk 端口号,监听端口
    3. 另一个窗口使用nc localhost 44444 连接端口
    4. 配置flume configuration文件
    # 给组件命名,默认 agent 叫 a1
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # 配置 source 的类型、主机名、端口号
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # 配置 sink 
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000   # 1000个event
    a1.channels.c1.transactionCapacity = 100  # 批次大小,达到该值就发送
    
    # 绑定 the source and sink to the channel
    a1.sources.r1.channels = c1 # source可以连多个channel
    a1.sinks.k1.channel = c1  # sink不能连多个channel
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    1. 创建job目录,创建conf文件。命令时建议为 发送者_flume_接收者.conf
    2. 运行Flume, bin/flume-ng agent -c conf/ -f-file job/nc_flume_log.conf -n a1
    3. 配置log4j2.xml文件,添加一个属性让日志同时打印到控制台,方便查看。
     
    
    • 1

    2. 实时监控目录下的多个追加文件

    1. 创建符合条件的flume配置文件
    2. 执行配置文件taildir_flume_log.conf,开启监控
    3. 向监控文件追加文件,echo >> file1.txt
    4. 运行flume监听,bin/flume-ng agent -c conf/ -f-file job/taildir_flume_log.conf -n a1
    5. 可以关闭该监听服务后,继续追加,然后重写开启监听,会发现之前添加的内容也可以查看到,这个就是断点续传。监听读取后会生成对应的监听日志taildir_position.json
    6. 如果监控文件改名后仍然符合监控正则,该文件的数据会全部重传。这个机制在底层实现上不仅判断了inode的值,还判断了文件名是否改变。后期可以修改源码,改为只判断Inode即可。
    7. HDFS sink存在小文件问题,每10个event一个文件,或者每30秒一个文件。可以通过修改sink的参数来解决改问题:
      • hdfs.rollinterval: 默认是30秒,置为0时表示不判断时间。一般企业中建议设置为3600秒
      • hdfs.rollSize: 一般设置为134217700,超过128M生成一个文件
      • hdfs.rollCount: 设置为0

    Flume原理

    1. 事务

    put事务流程

    在Source和Channel之间有事务来保证put操作的完整性。source能够保证数据没有提交成功时,不会修改当前发送数据的偏移量,只有当source成功将数据放到channel中时,source才会更新当前的偏移量。当channel内存空间不足时,即put失败,会回滚数据,即不更新当前数据的偏移量。

    pull事务流程

    在channel和sink节点之间进行的拉取操作也有可能失败,只有当数据成功放到sink中,才会将channl中的数据进行删除。否则,事务会将数据回滚到数据还未拉取前的状态,来保证数据的不丢失性。

    2. Flume Agent内部原理

    1. 拦截器清除数据,排除错误数据,添加解析部分数据,比如Header头信息
    2. channel选择器:
      • 副本选择器,同一数据发送多份
      • 多路复用选择器, 同一数据拆分发送
    3. sinkProcessor处理器:
      • 默认处理器:由于都在同一机器上运行,后两种处理器没有任何意义
      • 负载均衡处理器
      • 竞争失败处理器

    内部原理图

    如果有flume的连接传输,启动时需要先开启下游flume,再开启上游flume。关闭时需要先关闭上游flume,再关闭下游flume。

  • 相关阅读:
    关于safari浏览器浏览html video标签无法正常播放的问题
    银行家算法——C语言实现
    常用的外贸英语口语汇总
    MySQL备份及恢复
    可托拉拽的WPF选项卡控件,强大好用!
    手摸手带你撸一个拖拽效果
    open62541开发:添加sqlite3 历史数据库
    springboot中没有主清单属性解决办法
    【圆环展开算法剖析与硬核实战--全程高能,做好笔记】
    virtualbox7 虚拟机与宿主机互传文件、共享u盘
  • 原文地址:https://blog.csdn.net/qq_44273739/article/details/133231211