• 二百零七、Flume——Flume实时采集5分钟频率的Kafka数据直接写入ODS层表的HDFS文件路径下


    一、目的

    在离线数仓中,需要用Flume去采集Kafka中的数据,然后写入HDFS中。

    由于每种数据类型的频率、数据大小、数据规模不同,因此每种数据的采集需要不同的Flume配置文件。玩了几天Flume,感觉Flume的使用难点就是配置文件

    二、使用场景

    转向比数据是数据频率为5分钟的数据类型代表,数据量很小、频率不高,因此搞定了转向比数据的采集就搞定了这一类低频率数据的实时采集问题

    1台设备每日的转向比数据规模是30KB,25台设备的数据规模则是750KB

    三、转向比数据ODS层建表

    create external table  if not exists  ods_turnratio(
        turnratio_json  string
    )
    comment '转向比数据外部表——静态分区'
    partitioned by (day string)
    row format delimited fields terminated by '\x001'
    lines terminated by '\n'
    stored as SequenceFile
    tblproperties("skip.header.line.count"="1");

    四、转向比数据的配置文件

    ## agent a1
    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1

    ## configure source s1
    a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.s1.kafka.bootstrap.servers = 192.168.0.27:9092
    a1.sources.s1.kafka.topics = topic_b_turnratio
    a1.sources.s1.kafka.consumer.group.id = turnratio_group
    a1.sources.s1.kafka.consumer.auto.offset.reset = latest
    a1.sources.s1.batchSize = 1000

    ## configure channel c1
    ## a1.channels.c1.type = memory
    ## a1.channels.c1.capacity = 10000
    ## a1.channels.c1.transactionCapacity = 1000
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /home/data/flumeData/checkpoint/turnratio
    a1.channels.c1.dataDirs = /home/data/flumeData/flumedata/turnratio

    ## configure sink k1
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://hurys23:8020/user/hive/warehouse/hurys_dc_ods.db/ods_turnratio/day=%Y-%m-%d/
    a1.sinks.k1.hdfs.filePrefix = turnratio
    a1.sinks.k1.hdfs.fileSuffix = .log
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = second
    a1.sinks.k1.hdfs.rollSize = 62500
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.rollInterval = 0
    a1.sinks.k1.hdfs.idleTimeout = 600
    a1.sinks.k1.hdfs.minBlockReplicas = 1

    ## Bind the source and sink to the channel
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1

    注意:62500约为61KB

    五、Flume写入HDFS结果

    Flume根据时间戳按照ODS层表的分区,将数据写入对应HDFS文件

    25台设备,50分钟1个文件,文件大小66.18 KB 

    六、ODS表刷新分区后查验数据

    (一)刷新表分区

    MSCK REPAIR TABLE ods_turnratio;

    (二)查看表数据

    select * from ods_turnratio;

    (三)验证数据完整性

    --2023-11-19 数据基本完整  23时297条 标准300  少3条
    --2023-11-20 数据基本完整  23时299条 标准300  少1条

    数据基本完整,尤其是调度文件大小之后

    19日a1.sinks.k1.hdfs.rollSize = 31250        数据基本完整 23时297条 标准300 少3条

    20日a1.sinks.k1.hdfs.rollSize = 62500        数据基本完整 23时299条 标准300 少1条

    七、注意点

    (一)配置文件中的重点是红色标记的几点

    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = second
    a1.sinks.k1.hdfs.rollSize = 62500
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.rollInterval = 0
    a1.sinks.k1.hdfs.idleTimeout = 600
    a1.sinks.k1.hdfs.minBlockReplicas = 1

    (二)任务配置文件中rollSize参数设置可大不可小

    rollSize参数小的话数据会丢失,大的话没问题

    配置文件的参数还是不断调试中,争取调到最优的状态。能够及时、完整的消费Kafka数据,并且能够最大化的利用HDFS资源。

    目前就先这样,如果有问题的话后面再更新!!!

  • 相关阅读:
    自定义hooks之useLastState、useSafeState
    华为云数据治理生产线DataArts,让“数据‘慧’说话”
    安卓逆向案例——X酷APP逆向分析
    微信小程序-WXS脚本
    Linux 命令:head
    Mysql通过binlog恢复误删数据
    CDH大数据平台 28Cloudera Manager Console之superset相关包安装(markdown新版二)
    TMC5160问题,插上步进电机、步进电机一转或步进电机带负载转瞬间,TMC5160就无输出
    高逼格UI-ASD(Android Support Design)
    模型压缩-对模型结构进行优化
  • 原文地址:https://blog.csdn.net/tiantang2renjian/article/details/134525041