• 基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源


    04:数据源

    • 目标了解数据源的格式及实现模拟数据的生成

    • 路径

      • step1:数据格式
      • step2:数据生成
    • 实施

      • 数据格式

        image-20210905200540304

        消息时间发件人昵称发件人账号发件人性别发件人IP发件人系统发件人手机型号发件人网络制式发件人GPS收件人昵称收件人IP收件人账号收件人系统收件人手机型号收件人网络制式收件人GPS收件人性别消息类型双方距离消息
        msg_timesender_nickynamesender_accountsender_sexsender_ipsender_ossender_phone_typesender_networksender_gpsreceiver_nickynamereceiver_ipreceiver_accountreceiver_osreceiver_phone_typereceiver_networkreceiver_gpsreceiver_sexmsg_typedistancemessage
        2020/05/08 15:11:33古博易1474787719448.147.134.255Android 8.0小米 Redmi K304G94.704577,36.247553莱优97.61.25.5217832829395IOS 10.0Apple iPhone 104G84.034145,41.423804TEXT77.82KM天涯海角惆怅渡,牛郎织女隔天河。佛祖座前长顿首,只求共度一百年。
      • 数据生成

        • 创建原始文件目录

          mkdir /export/data/momo_init
          
          • 1
        • 上传模拟数据程序

          cd /export/data/momo_init
          rz
          
          • 1
          • 2

          image-20210905142015948

        • 创建模拟数据目录

          mkdir /export/data/momo_data
          
          • 1
        • 运行程序生成数据

          • 语法

            java -jar /export/data/momo_init/MoMo_DataGen.jar 原始数据路径 模拟数据路径 随机产生数据间隔ms时间
            
            • 1
          • 测试:每500ms生成一条数据

            java -jar /export/data/momo_init/MoMo_DataGen.jar \
            /export/data/momo_init/MoMo_Data.xlsx \
            /export/data/momo_data/ \
            500
            
            • 1
            • 2
            • 3
            • 4
          • 结果:生成模拟数据文件MOMO_DATA.dat,并且每条数据中字段分隔符为\001

          image-20210929100901349

    • 小结

      • 了解数据源的格式及实现模拟数据的生成

    05:技术架构及技术选型

    • 目标掌握实时案例的技术架构及技术选型

    • 路径

      • step1:需求分析
      • step2:技术选型
      • step3:技术架构
    • 实施

      • 需求分析

        • 离线存储计算
          • 提供离线T + 1的统计分析
          • 提供离线数据的即时查询
        • 实时存储计算
          • 提供实时统计分析
      • 技术选型

        • 离线
          • 数据采集:Flume
          • 离线存储:Hbase
          • 离线分析:Hive:复杂计算
          • 即时查询:Phoenix:高效查询
        • 实时
          • 数据采集:Flume
          • 实时存储:Kafka
          • 实时计算:Flink
          • 实时应用:MySQL + FineBI 或者 Redis + JavaWeb可视化
      • 技术架构

        image-20210905162218286

        • 为什么不直接将Flume的数据给Hbase,而统一的给了Kafka,再由Kafka到Hbase?
          • 避免高并发写导致机器负载过高、实现架构解耦、实现异步高效
          • 保证数据一致性
    • 小结

      • 掌握实时案例的技术架构及技术选型

    06:Flume的回顾及安装

    • 目标回顾Flume基本使用及实现Flume的安装测试

    • 路径

      • step1:Flume回顾
      • step2:Flume的安装
      • step3:Flume的测试
    • 实施

      • Flume的回顾

        • 功能:实时对文件或者网络端口进行数据流监听采集
        • 场景:文件实时采集
        • 开发
          • step1:先开发一个配置文件:properties【K=V】
          • step2:运行这个文件即可
        • 组成
          • Agent:一个Agent就是一个Flume程序
          • Source:负责监听数据源,将数据源的动态数据变成每一条Event数据,将Event数据流放入Channel
          • Channel:负责临时存储Source发送过来的数据,供Sink来取数据
          • Sink:负责从Channel拉取数据写入目标地
          • Event:代表一条数据对象
            • head:Map集合[KV]
            • body:byte[]
      • Flume的安装

        • 上传安装包

          cd /export/software/
          rz
          
          • 1
          • 2

          image-20210905162948401

        • 解压安装

          tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /export/server/
          cd /export/server
          mv apache-flume-1.9.0-bin flume-1.9.0-bin
          
          • 1
          • 2
          • 3
        • 修改配置

          #集成HDFS,拷贝HDFS配置文件
          cd /export/server/flume-1.9.0-bin
          cp /export/server/hadoop/etc/hadoop/core-site.xml  ./conf/
          #修改Flume环境变量
          cd /export/server/flume-1.9.0-bin/conf/
          mv flume-env.sh.template flume-env.sh
          vim flume-env.sh 
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          #修改22行
          export JAVA_HOME=/export/server/jdk1.8.0_65
          #修改34行
          export HADOOP_HOME=/export/server/hadoop-3.3.0
          
          • 1
          • 2
          • 3
          • 4
        • 删除Flume自带的guava包,替换成Hadoop的

          cd /export/server/flume-1.9.0-bin 
          rm -rf lib/guava-11.0.2.jar
          cp /export/server/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar lib/
          
          • 1
          • 2
          • 3
        • 创建目录

          cd /export/server/flume-1.9.0-bin
          #程序配置文件存储目录
          mkdir usercase
          #Taildir元数据存储目录
          mkdir position
          
          • 1
          • 2
          • 3
          • 4
          • 5
      • Flume的测试

        • 需求:采集聊天数据,写入HDFS

        • 分析

          • Source:taildir:动态监听多个文件实现实时数据采集
          • Channel:mem:将数据缓存在内存
          • Sink:hdfs
        • 开发

          vim /export/server/flume-1.9.0-bin/usercase/momo_mem_hdfs.properties
          
          • 1
          # define a1
          a1.sources = s1 
          a1.channels = c1
          a1.sinks = k1
          
          #define s1
          a1.sources.s1.type = TAILDIR
          #指定一个元数据记录文件
          a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_hdfs.json
          #将所有需要监控的数据源变成一个组
          a1.sources.s1.filegroups = f1
          #指定了f1是谁:监控目录下所有文件
          a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*
          #指定f1采集到的数据的header中包含一个KV对
          a1.sources.s1.headers.f1.type = momo
          a1.sources.s1.fileHeader = true
          
          #define c1
          a1.channels.c1.type = memory
          a1.channels.c1.capacity = 10000
          a1.channels.c1.transactionCapacity = 1000
          
          #define k1
          a1.sinks.k1.type = hdfs
          a1.sinks.k1.hdfs.path = /flume/momo/test/daystr=%Y-%m-%d
          a1.sinks.k1.hdfs.fileType = DataStream
          #指定按照时间生成文件,一般关闭
          a1.sinks.k1.hdfs.rollInterval = 0
          #指定文件大小生成文件,一般120 ~ 125M对应的字节数
          a1.sinks.k1.hdfs.rollSize = 102400
          #指定event个数生成文件,一般关闭
          a1.sinks.k1.hdfs.rollCount = 0
          a1.sinks.k1.hdfs.filePrefix = momo
          a1.sinks.k1.hdfs.fileSuffix = .log
          a1.sinks.k1.hdfs.useLocalTimeStamp = true
          
          #bound
          a1.sources.s1.channels = c1
          a1.sinks.k1.channel = c1
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
          • 16
          • 17
          • 18
          • 19
          • 20
          • 21
          • 22
          • 23
          • 24
          • 25
          • 26
          • 27
          • 28
          • 29
          • 30
          • 31
          • 32
          • 33
          • 34
          • 35
          • 36
          • 37
          • 38
          • 39
        • 启动HDFS

          start-dfs.sh
          
          • 1
        • 运行Flume

          cd /export/server/flume-1.9.0-bin
          bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_hdfs.properties -Dflume.root.logger=INFO,console
          
          • 1
          • 2
        • 运行模拟数据

          java -jar /export/data/momo_init/MoMo_DataGen.jar \
          /export/data/momo_init/MoMo_Data.xlsx \
          /export/data/momo_data/ \
          100
          
          • 1
          • 2
          • 3
          • 4
        • 查看结果

          image-20210905171157230

    • 小结

      • 回顾Flume基本使用及实现Flume的安装测试

    07:Flume采集程序开发

    • 目标实现案例Flume采集程序的开发

    • 路径

      • step1:需求分析
      • step2:程序开发
      • step3:测试实现
    • 实施

      • 需求分析

        • 需求:采集聊天数据,实时写入Kafka

        • Source:taildir

        • Channel:mem

        • Sink:Kafka sink

          a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
          a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
          a1.sinks.k1.kafka.producer.acks = 1
          a1.sinks.k1.kafka.topic = mytopic
          a1.sinks.k1.kafka.flumeBatchSize = 20
          a1.sinks.k1.kafka.producer.linger.ms = 1
          a1.sinks.k1.kafka.producer.compression.type = snappy
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
      • 程序开发

        vim /export/server/flume-1.9.0-bin/usercase/momo_mem_kafka.properties
        
        • 1
        # define a1
        a1.sources = s1 
        a1.channels = c1
        a1.sinks = k1
        
        #define s1
        a1.sources.s1.type = TAILDIR
        #指定一个元数据记录文件
        a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_kafka.json
        #将所有需要监控的数据源变成一个组
        a1.sources.s1.filegroups = f1
        #指定了f1是谁:监控目录下所有文件
        a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*
        #指定f1采集到的数据的header中包含一个KV对
        a1.sources.s1.headers.f1.type = momo
        a1.sources.s1.fileHeader = true
        
        #define c1
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 10000
        a1.channels.c1.transactionCapacity = 1000
        
        #define k1
        a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
        a1.sinks.k1.kafka.topic = MOMO_MSG
        a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
        a1.sinks.k1.kafka.flumeBatchSize = 10
        a1.sinks.k1.kafka.producer.acks = 1
        a1.sinks.k1.kafka.producer.linger.ms = 100
        
        #bound
        a1.sources.s1.channels = c1
        a1.sinks.k1.channel = c1
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
        • 31
        • 32
        • 33
      • 测试实现

        • 启动Kafka

          start-zk-all.sh
          start-kafka.sh 
          
          • 1
          • 2
        • 创建Topic

          kafka-topics.sh --create --topic MOMO_MSG  --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
          
          • 1

          注意:Kafka2.11版本用–zookeeper 替代
          kafka-topics.sh --create --topic MOMO_MSG --partitions 3 --replication-factor 2 --zookeeper node01:9092

        • 列举

          kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
          
          • 1
        • 启动消费者

          kafka-console-consumer.sh --topic MOMO_MSG --bootstrap-server node1:9092,node2:9092,node3:9092
          
          • 1
        • 启动Flume程序

          cd /export/server/flume-1.9.0-bin
          bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
          
          • 1
          • 2
        • 启动模拟数据

          java -jar /export/data/momo_init/MoMo_DataGen.jar \
          /export/data/momo_init/MoMo_Data.xlsx \
          /export/data/momo_data/ \
          50
          
          • 1
          • 2
          • 3
          • 4
        • 观察结果
          在这里插入图片描述

    • 小结

      • 实现案例Flume采集程序的开发
  • 相关阅读:
    与归并排序相关的一些问题
    现代c++中实现精确延时方法总结
    「C++程序设计 (面向对象进阶)」学习笔记・二
    JUC并发编程(5)(自定义线程池 + 共享模型之工具2)
    直冲云霄,阿里大牛耗时49天整理12W字面试手册,押题准确率直冲95%
    JAVA_多态(面向对象进阶)学习笔记
    MySQL——函数和流程控制
    Trino 与Hive 有差异的函数
    Rust学习----Rust安装
    java-net-php-python-ssm办公自动化计划管理子系统计算机毕业设计程序
  • 原文地址:https://blog.csdn.net/xianyu120/article/details/133811713