• 大数据架构师——音乐数据中心平台离线数仓综合项目(四)


    音乐数据中心平台离线数仓综合项目

    第四个业务:商户营收统计

    需求

    • 公司生产卡拉唱歌机器,这种机器会卖给商户,商户来进行经营收费,但是公司会对商品进行升级和歌曲库升级,这里商户进行经营后的收入是要分给机器生产的公司的,也就是公司和商户进行分成经营,共同盈利。
      • 投资人:一台机器你可以来出资购买,你就是投资人。
      • 代理人:机器给谁代理,谁就是代理人,一般都是一些大型商场和KTV等。
      • 合伙人:合伙人就是与你共同出资进行投资购买机器的人。
    • 这里商户营收统计,指的是统计投资人、代理人、合伙人各部分营收情况。

    模型设计

    统计商户营收需要分为以下四个方面的数据:

    • TW_MAC_BASEINFO_D 机器基础信息日全量表:这个表中的数据是第二个业务统计的机器的详细信息,此表中的数据包含每天统计到的机器的歌库版本、系统版本、所处位置、门店名称、场景情况、投资人分层比例、代理人分层比例、合伙人分层比例、公司分层比例、代理人信息等数据。
    • TW_MAC_LOC_D 机器位置信息日统计表:是由ODS层TO_YCAK_USR_LOC_D 用户位置记录日增量表统计得到,由每天用户上传的经纬度调用高德api获取全量机器对应的最新的位置信息。这里需要TW_MAC_LOC_D 机器位置信息日统计表的数据主要原因是获取每天机器所在的位置。以此表中机器位置信息为主,如果此表中机器的位置为空,则获取对应的TW_MAC_BASEINFO_D 表中的机器位置信息。
    • TW_CNSM_BRIEF_D 消费退款订单流水日增量表:根据消费退款订单流水日增量表可以统计得到每天每个机器的订单、收入、退款情况,后期统计商户营收情况时,需要从此表中获取对应每台机器当天的订单、收入、退款情况。TW_CNSM_BRIEF_D 表中的数据由TO_YCAK_CNSM_D 机器消费订单明细增量表清洗得到。
    • TW_USR_BASEINFO_D 活跃用户基础信息日增量表:由业务三统计得到,在商户营收统计中可以由此表进一步聚合统计得到每天每台机器上的新增用户和登录的用户量。

    经过以上四个方面的分析,在数仓中设计分层如下:

    在这里插入图片描述

    • ODS:

    在这里插入图片描述

    • EDS_USR:

    在这里插入图片描述

    • EDS_MAC:

    在这里插入图片描述

    • DM_USR:

    在这里插入图片描述

    • 在 Hive 中创建 ODS、EDS层的表:
    USE `music`;
    -- 1. TO_YCAK_USR_LOC_D 用户位置记录日增量表
    CREATE EXTERNAL TABLE `TO_YCAK_USR_LOC_D` (
        `ID` int,
        `UID` int,
        `LAT` string,
        `LNG` string,
        `DATETIME` string,
        `MID` string
    ) PARTITIONED BY (data_dt string)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    LOCATION 'hdfs://node01/user/hive/warehouse/music.db/TO_YCAK_USR_LOC_D';
    
    -- 2. TW_MAC_LOC_D 机器位置信息日统计表
    CREATE EXTERNAL TABLE `TW_MAC_LOC_D`(
        `MID` int,
        `X` string,
        `Y` string,
        `CNT` int,
        `ADDER` string,
        `PRVC` string,
        `CTY` string,
        `CTY_CD` string,
        `DISTRICT` string,
        `AD_CD` string,
        `TOWN_SHIP` string,
        `TOWN_CD` string,
        `NB_NM` string,
        `NB_TP` string,
        `BD_NM` string,
        `BD_TP` string,
        `STREET` string,
        `STREET_NB` string,
        `STREET_LOC` string,
        `STREET_DRCTION` string,
        `STREET_DSTANCE` string,
        `BUS_INFO` string
    ) PARTITIONED BY (data_dt string)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    LOCATION 'hdfs://node01/user/hive/warehouse/music.db/TW_MAC_LOC_D';
    
    -- 3. TO_YCAK_CNSM_D 机器消费订单明细增量表
    CREATE EXTERNAL TABLE `TO_YCAK_CNSM_D`(
        `ID` int,
        `MID` int,
        `PRDCD_TYPE` int,
        `PAY_TYPE` int,
        `PKG_ID` int,
        `PKG_NM` string,
        `AMT` int,
        `CNSM_ID` string,
        `ORDR_ID` string,
        `TRD_ID` string,
        `ACT_TM` string,
        `UID` int,
        `NICK_NM` string,
        `ACTV_ID` int,
        `ACTV_NM` string,
        `CPN_TYPE` int,
        `CPN_TYPE_NM` string,
        `PKG_PRC` int,
        `PKG_DSCNT` int,
        `ORDR_TYPE` int,
        `BILL_DT` int
    ) PARTITIONED BY (data_dt string)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    LOCATION 'hdfs://node01/user/hive/warehouse/music.db/TO_YCAK_CNSM_D';
    
    -- 4. TW_CNSM_BRIEF_D 消费退款订单流水日增量表
    CREATE EXTERNAL TABLE `TW_CNSM_BRIEF_D`(
        `ID` int,
        `TRD_ID` string,
        `UID` string,
        `MID` int,
        `PRDCD_TYPE` int,
        `PAY_TYPE` int,
        `ACT_TM` string,
        `PKG_ID` int,
        `COIN_PRC` int,
        `COIN_CNT` int,
        `UPDATE_TM` string,
        `ORDR_ID` string,
        `ACTV_NM` string,
        `PKG_PRC` int,
        `PKG_DSCNT` int,
        `CPN_TYPE` int,
        `ABN_TYP` int
    ) PARTITIONED BY (data_dt string)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    LOCATION 'hdfs://node01/user/hive/warehouse/music.db/TW_CNSM_BRIEF_D';
    
    -- 5. TW_MAC_STAT_D 机器日营收情况统计表
    CREATE EXTERNAL TABLE `TW_MAC_STAT_D`(
        `MID` int, 
        `MAC_NM` string, 
        `PRDCT_TYPE` string, 
        `STORE_NM` string,
        `BUS_MODE` string,
        `PAY_SW` string,
        `SCENCE_CATGY` string,
        `SUB_SCENCE_CATGY` string,
        `SCENE` string,
        `SUB_SCENE` string,
        `BRND` string,
        `SUB_BRND` string,
        `PRVC` string,
        `CTY` string,
        `AREA` string,
        `AGE_ID` string,
        `INV_RATE` string,
        `AGE_RATE` string,
        `COM_RATE` string,
        `PAR_RATE` string,
        `PKG_ID` string,
        `PAY_TYPE` int,
        `CNSM_USR_CNT` string,
        `REF_USR_CNT` string,
        `NEW_USR_CNT` string,
        `REV_ORDR_CNT` string,
        `REF_ORDR_CNT` string,
        `TOT_REV` string,
        `TOT_REF` string
    ) PARTITIONED BY (data_dt string)
    ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' 
    LOCATION 'hdfs://node01/user/hive/warehouse/music.db/TW_MAC_STAT_D';
    
    -- 6. TM_USR_MRCHNT_STAT_D 商户日营收统计表
    CREATE EXTERNAL TABLE `TM_USR_MRCHNT_STAT_D`(
        `ADMIN_ID` string,
        `PAY_TYPE` int,
        `REV_ORDR_CNT` int,
        `REF_ORDR_CNT` int,
        `TOT_REV` double,
        `TOT_REF` double,
        `TOT_INV_REV` DECIMAL(10,2),
        `TOT_AGE_REV` DECIMAL(10,2),
        `TOT_COM_REV` DECIMAL(10,2),
        `TOT_PAR_REV` DECIMAL(10,2)
    ) PARTITIONED BY (DATA_DT string)
    ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' 
    LOCATION 'hdfs://node01/user/hive/warehouse/music.db/TM_USR_MRCHNT_STAT_D';
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 以上模型设计中,各个ODS层与EDS层表之间的流转关系如下:

    在这里插入图片描述

    数据处理流程

    1. 将数据导入MySQL业务库
    • 在第三个业务中,已经重新执行了 ycak.sql 脚本文件。
    2. 执行第二、三个业务
    • 执行第二个业务获取TW_MAC_BASEINFO_D 机器基础信息日全量表信息。
    • 执行第三个业务获取TW_USR_BASEINFO_D 活跃用户基础信息日增量表信息。
    3. 使用Sqoop抽取mysql数据到ODS层
    • 在 node03 上执行 sqoop 导入数据脚本,将 MySQL 中的表数据导入到 Hive 数仓中
    • 针对 ycak 库下的 user_location 和 machine_consume_detail 两张表,进行增量的导入,脚本 1_incr_extract_mysqldata_to_ods.sh 内容如下:
    #!/bin/bash
    currentDate=`date -d today +"%Y%m%d"`
    if [ x"$1" = x ]; then
      echo "====没有导入数据的日期,输入日期===="
      exit
    else
      echo "====使用导入数据的日期 ===="
      currentDate=$1
    fi
    echo "日期为 : $currentDate"
    
    # 查询hive ODS层表 to_ycak_usr_loc_d 中目前存在的最大的ID
    result=`ssh hadoop@node03 "source /etc/profile;hive -e 'select max(id) from music.to_ycak_usr_loc_d'" | grep _c0 -A 1`
    maxId=`echo ${result} | awk "{print \\$2}"`
    if [ x"${maxId}" = xNULL ]; then
      maxId=0
    fi
    echo "Hive ODS层表 TO_YCAK_USR_LOC_D 最大的ID是${maxId}"
    
    ssh hadoop@node03 > /tmp/logs/music_project/user-loc-info.log 2>&1 <<aabbcc
    hostname
    source /etc/profile
    
    ## user_location 	==>> 	TO_YCAK_USR_LOC_D
    sqoop import --connect jdbc:mysql://node01:3306/ycak?dontTrackOpenResources=true\&defaultFetchSize=10000\&useCursorFetch=true\&useUnicode=yes\&characterEncoding=utf8 --username root --password 123456 --table user_location --target-dir /user/hive/warehouse/music.db/TO_YCAK_USR_LOC_D/data_dt=${currentDate} --num-mappers 1 --fields-terminated-by '\t' --incremental append --check-column id --last-value ${maxId}
    
    # 更新Hive 分区
    hive -e 'alter table music.to_ycak_usr_loc_d add partition(data_dt=${currentDate})'
    
    exit
    aabbcc
    
    echo "all done!"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 脚本 2_incr_extract_mysqldata_to_ods.sh 内容如下:
    #!/bin/bash
    currentDate=`date -d today +"%Y%m%d"`
    if [ x"$1" = x ]; then
      echo "====没有导入数据的日期,输入日期===="
      exit
    else
      echo "====使用导入数据的日期 ===="
      currentDate=$1
    fi
    echo "日期为 : $currentDate"
    
    # 查询hive ODS层表 to_ycak_cnsm_d 中目前存在的最大的ID
    result=`ssh hadoop@node03 "source /etc/profile;hive -e 'select max(id) from music.to_ycak_cnsm_d'" | grep _c0 -A 1`
    maxId=`echo ${result} | awk "{print \\$2}"`
    if [ x"${maxId}" = xNULL ]; then
      maxId=0
    fi
    echo "Hive ODS层表 TO_YCAK_CNSM_D 最大的ID是${maxId}"
    
    ssh hadoop@node03 > /tmp/logs/music_project/machine-consume.log 2>&1 <<aabbcc
    hostname
    source /etc/profile
    
    ## machine_consume_detail 	==>> 	TO_YCAK_CNSM_D
    sqoop import --connect jdbc:mysql://node01:3306/ycak?dontTrackOpenResources=true\&defaultFetchSize=10000\&useCursorFetch=true\&useUnicode=yes\&characterEncoding=utf8 --username root --password 123456 --table machine_consume_detail --target-dir /user/hive/warehouse/music.db/TO_YCAK_CNSM_D/data_dt=${currentDate} --num-mappers 1 --fields-terminated-by '\t' --incremental append --check-column id --last-value ${maxId}
    
    # 更新Hive 分区
    hive -e 'alter table music.to_ycak_cnsm_d add partition(data_dt=${currentDate})'
    
    exit
    aabbcc
    
    echo "all done!"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    4. 使用SparkSQL处理ODS层数据得到EDS层数据
    处理ODS层数据得到EDS层数据
    • 使用SparkSQL清洗 TO_YCAK_USR_LOC_D 用户位置记录日增量表数据到 TW_MAC_LOC_D 机器位置信息日统计表中,这里清洗主要是根据用户上报的经纬度信息通过高德 API 获取对应的机器位置,对应的清洗数据的 scala 文件:GenerateTwMacLocD.scala

    在这里插入图片描述

    • 使用SparkSQL清洗 TO_YCAK_CNSM_D 机器消费订单明细增量表到 EDS 层 TW_CNSM_BRIEF_D 消费退款订单流水日增量表中,这里主要根据用户每天在机器上的消费行为统计每台机器的订单营收情况,对应的清洗数据的 scala 文件:GenerateTwCnsmBriefD.scala

    在这里插入图片描述

    将EDS层数据聚合到TW_MAC_STAT_D机器日统计表数据
    • 根据以下四个 EDS 层数据表聚合得到 TW_MAX_STAT_D 表的数据,基于以下 4 张表的数据统计出机器位置、机器的营收情况、用户登录情况。
      • TW_MAC_BASEINFO_D:机器基础信息日全量表信息
      • TW_USR_BASEINFO_D:活跃用户基础信息日增量表信息
      • TW_MAC_LOC_D:机器位置信息日统计表
      • TW_CNSM_BRIEF_D:消费退款订单流水日增量表
    • 以上数据处理对应的 scala 文件:GenerateTwMacStatD.scala

    在这里插入图片描述

    针对TW_MAC_STAT_D机器日统计表数据得到DM层数据
    • 这里根据 TW_MAC_STAT_D 机器日统计表数据得到 TM_USR_MRCHNT_STAT_D 商户日统计表信息。对应的数据处理 scala 文件:GenerateTmUsrMrchntStatD.scala

    在这里插入图片描述

    • 将工程打包并上传到服务器。

    使用Azkaban配置任务流

    1. 脚本准备
    • ① 针对 ycak 库下的 user_location 和 machine_consume_detail 两张表,增量导入到 Hive ODS 层,脚本如下:1_incr_extract_mysqldata_to_ods.sh2_incr_extract_mysqldata_to_ods.sh

    • ② 根据ODS层表数据,得到 Hive EDS 层机器位置信息日统计表和消费退款订单流水日增量表,脚本如下:

      • 3_generate_tw_mac_loc_d.sh
      #!/bin/bash
      currentDate=`date -d today +"%Y%m%d"`
      if [ x"$1" = x ]; then
      	echo "====使用自动生成的今天日期===="
      else
      	echo "====使用 Azkaban 传入的日期===="
      	currentDate=$1
      fi
      echo "日期为: $currentDate"
      ssh hadoop@node01 > /tmp/logs/music_project/mac_loc.log 2>&1 <<aabbcc
      hostname
      cd /bigdata/install/spark-2.3.3-bin-hadoop2.7/bin
      ./spark-submit --master yarn --class com.yw.musichw.eds.machine.GenerateTwMacLocD \
          /bigdata/data/music_project/musicwh-1.0.0-SNAPSHOT-jar-with-dependencies.jar $currentDate
      exit
      aabbcc
      
      echo "all done!"
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 4_generate_tw_cnsm_brief_d.sh
      #!/bin/bash
      currentDate=`date -d today +"%Y%m%d"`
      if [ x"$1" = x ]; then
      	echo "====使用自动生成的今天日期===="
      else
      	echo "====使用 Azkaban 传入的日期===="
      	currentDate=$1
      fi
      echo "日期为: $currentDate"
      ssh hadoop@node01 > /tmp/logs/music_project/cnsm_brief.log 2>&1 <<aabbcc
      hostname
      cd /bigdata/install/spark-2.3.3-bin-hadoop2.7/bin
      ./spark-submit --master yarn --class com.yw.musichw.eds.user.GenerateTwCnsmBriefD \
          /bigdata/data/music_project/musicwh-1.0.0-SNAPSHOT-jar-with-dependencies.jar $currentDate
      exit
      aabbcc
      
      echo "all done!"
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
    • ③ 以上两张EDS层表,再结合第二个业务统计的机器详细信息,和第三个业务统计的日活跃用户信息,统计得到机器日营收情况,脚本 5_generate_tw_mac_stat_d.sh 如下:

    #!/bin/bash
    currentDate=`date -d today +"%Y%m%d"`
    if [ x"$1" = x ]; then
    	echo "====使用自动生成的今天日期===="
    else
    	echo "====使用 Azkaban 传入的日期===="
    	currentDate=$1
    fi
    echo "日期为: $currentDate"
    ssh hadoop@node01 > /tmp/logs/music_project/mac_stat.log 2>&1 <<aabbcc
    hostname
    cd /bigdata/install/spark-2.3.3-bin-hadoop2.7/bin
    ./spark-submit --master yarn --class com.yw.musichw.eds.machine.GenerateTwMacStatD \
        /bigdata/data/music_project/musicwh-1.0.0-SNAPSHOT-jar-with-dependencies.jar $currentDate
    exit
    aabbcc
    
    echo "all done!"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • ④ 最后,统计DM层商户营收信息,脚本 6_generate_tm_usr_mrchnt_stat_d.sh 如下:
    #!/bin/bash
    currentDate=`date -d today +"%Y%m%d"`
    if [ x"$1" = x ]; then
    	echo "====使用自动生成的今天日期===="
    else
    	echo "====使用 Azkaban 传入的日期===="
    	currentDate=$1
    fi
    echo "日期为: $currentDate"
    ssh hadoop@node01 > /tmp/logs/music_project/usr_mrchnt_stat.log 2>&1 <<aabbcc
    hostname
    cd /bigdata/install/spark-2.3.3-bin-hadoop2.7/bin
    ./spark-submit --master yarn --class com.yw.musichw.dm.content.GenerateTmUsrMrchntStatD \
        /bigdata/data/music_project/musicwh-1.0.0-SNAPSHOT-jar-with-dependencies.jar $currentDate
    exit
    aabbcc
    
    echo "all done!"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    2. 编写Azkaban各个Job组成任务流
    • 编写 user-mrchnt.flow,内容如下:
    nodes:
      - name: Job1_ExtractMySQLDataToODS
        type: command
        config:
          command: sh 1_incr_extract_mysqldata_to_ods.sh ${mydate}
          command.1: sh 2_incr_extract_mysqldata_to_ods.sh ${mydate}
    
      - name: Job2_GenerateTwMacLocAndCnsmBrirfToEDS
        type: command
        config:
          command: sh 3_generate_tw_mac_loc_d.sh ${mydate}
          command.1: sh 4_generate_tw_cnsm_brief_d.sh ${mydate}
        dependsOn:
          - Job1_ExtractMySQLDataToODS
    
      - name: Job3_GenerateTwMacStatToEDS
        type: command
        config:
          command: sh 5_generate_tw_mac_stat_d.sh ${mydate}
        dependsOn:
          - Job2_GenerateTwMacLocAndCnsmBrirfToEDS
      
      - name: Job4_GenerateTmUsrMrchntStatToDM
        type: command
        config:
          command: sh 6_generate_tm_usr_mrchnt_stat_d.sh ${mydate}
        dependsOn:
          - Job3_GenerateTwMacStatToEDS
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 将以上6个脚本文件、 user-mrchnt.flowflow20.project 压缩生成 zip 文件 user-mrchnt.zip
    3. 清空数据
    • 在 node03 节点,编写脚本 vim drop_merchant_tables.sql,内容如下:
    drop table `music`.`to_ycak_usr_loc_d`;
    drop table `music`.`to_ycak_cnsm_d`;
    drop table `music`.`tw_mac_loc_d`;
    drop table `music`.`tw_cnsm_brief_d`;
    drop table `music`.`tw_mac_stat_d`;
    drop table `music`.`tm_usr_mrchnt_stat_d`;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 执行命令:hive -f drop_merchant_tables.sql ,删除表。由于这些都是外部表,真正的数据还在 HDFS,所以还需要删除相关的数据。
    • 然后重新创建 hive 表,编写脚本 vim create_merchant_tables.sql,内容在前面模型设计这一小节。
      执行命令 hive -f create_merchant_tables.sql,创建表。
    4.提交Azkaban作业
    • 启动Azkaban服务,并在 Azkaban 的 web server ui界面创建项目,然后上传项目 zip 文件user-mrchnt.zip

    在这里插入图片描述

    • 查看任务,配置任务参数,并执行:

    在这里插入图片描述

    • 执行成功后,查询最终结果

    在这里插入图片描述

    在这里插入图片描述

    第五个业务:地区营收日报统计

    需求

    • 根据“机器日营收情况统计表”,统计每天省市总营收、总退款、总订单数、总退款订单数、总消费用户数、退款用户数。

    模型设计

    • 统计每日省市的总营收、总退款、总订单、总退款订单、总消费用户及总退款用户可以根据业务四中统计的“TW_MAC_STAT_D”机器日营收情况统计表,按照省市字段聚合得到以上各个指标。
    • 在Hive中建表:TM_MAC_REGION_STAT_D 地区营收日统计表
    CREATE EXTERNAL TABLE `TM_MAC_REGION_STAT_D`(
        `PRVC` string,
        `CTY` string,
        `MAC_CNT` int,
        `MAC_REV` DECIMAL(10,2),
        `MAC_REF` DECIMAL(10,2),
        `MAC_REV_ORDR_CNT` int,
        `MAC_REF_ORDR_CNT` int,
        `MAC_CNSM_USR_CNT` int,
        `MAC_REF_USR_CNT` int
    ) PARTITIONED BY (DATA_DT string)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    LOCATION 'hdfs://node01/user/hive/warehouse/music.db/TM_MAC_REGION_STAT_D';
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    数据处理流程

    • 根据 EDS 层 TW_MAC_STAT_D 机器营收日统计表,得到 TM_MAC_REGION_STAT_D 地区营收日统计表。对应的处理数据的scala文件:GenerateTmMacRegionStatD.scala
    • 将工程打包并上传到服务器。

    使用Azkaban配置任务流

    1. 脚本准备
    • 地区营收日报统计依赖了“TW_MAC_STAT_D”机器日营收情况统计表,所以此业务的任务流调度与业务四中的任务流调度一致,在生成“TW_MAC_STAT_D”表之后,定时执行脚本 1_generate_tm_mac_region_stat_d.sh 如下:
    #!/bin/bash
    currentDate=`date -d today +"%Y%m%d"`
    if [ x"$1" = x ]; then
    	echo "====使用自动生成的今天日期===="
    else
    	echo "====使用 Azkaban 传入的日期===="
    	currentDate=$1
    fi
    echo "日期为: $currentDate"
    ssh hadoop@node01 > /tmp/logs/music_project/mac_region_stat.log 2>&1 <<aabbcc
    hostname
    cd /bigdata/install/spark-2.3.3-bin-hadoop2.7/bin
    ./spark-submit --master yarn --class com.yw.musichw.dm.machine.GenerateTmMacRegionStatD \
        /bigdata/data/music_project/musicwh-1.0.0-SNAPSHOT-jar-with-dependencies.jar $currentDate
    exit
    aabbcc
    
    echo "all done!"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    2. 编写Azkaban各个Job组成任务流
    • 编写 mac_region.flow,内容如下:
    nodes:
      - name: Job1_GenerateTmMacRegionStatToDM
        type: command
        config:
          command: sh 1_generate_tm_mac_region_stat_d.sh ${mydate}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 将以上脚本文件、 mac_region.flowflow20.project 压缩生成 zip 文件 mac_region.zip
    3. 提交Azkaban作业
    • 启动Azkaban服务,并在 Azkaban 的 web server ui界面创建项目,然后上传项目 zip 文件mac_region.zip

    在这里插入图片描述

    • 查看任务,配置任务参数,并执行:

    在这里插入图片描述

    • 执行成功后,查询最终结果:

    在这里插入图片描述

    使用Superset数据可视化

    • 添加数据表:依次点击 Data → Datasets → 添加,添加“song_result”库下的表“tm_mac_region_stat_d”。

    在这里插入图片描述

    • 修改表中对应字段显示名称:

    在这里插入图片描述

    • 编辑图表《日报-地区总营收日统计》:

    在这里插入图片描述

    • 编辑图表《日报-总营收订单数日统计》:

    在这里插入图片描述

    • 编辑图表《日报-地区总消费用户数日统计》:

    在这里插入图片描述

    • 编辑图表《日报-地区总退款日统计》:

    在这里插入图片描述

    • 编辑图表《日报-地区总退款订单日统计》:

    在这里插入图片描述

    • 编辑图表《日报-地区总退款用户数日统计》:

    在这里插入图片描述

    • 最终展示图表:

    在这里插入图片描述

    第六个业务:实时统计PV、UV

    需求

    • 全网用户在实时操作机器的同时,可以使用数据采集接口将实时用户登录操作数据进行采集,针对这些数据可以实时统计每台机器实时的pv/uv,以及全网中所有用户的实时pv/uv,并需要实时保存至Redis或者关系型数据库mysql中。

    数据采集接口及数据生产

    • 数据采集接口原理是利用SpringBoot提供日志采集服务接口,在web系统中当用户操作某个需要监控的行为时,调用SpringBoot对应的数据服务接口,通过Log4j日志功能将对应的日志实时写入到指定的目录日志文件中,再通过Flume监控对应的日志目录,将日志实时采集到Kafka中,进而使用流式处理框架进行数据分析处理。
    • 数据采集接口对应的项目地址:https://github.com/shouwangyw/bigdata/tree/master/log-collector
    • 将项目打包并上传至服务器 node02,并启动项目。
    • 生产数据:本地运行 ProduceUserLoginLog.java,调用服务端日志采集接口,查看服务器 node02上日志输出

    在这里插入图片描述

    数据处理流程

    在这里插入图片描述

    # 启动kafka
    bin/kafka-server-start.sh -daemon config/server.properties
    # 创建topic
    bin/kafka-topics.sh --create --bootstrap-server node01:9092,node02:9092,node03:9092 --replication-factor 1 --partitions 3 --topic user_login_log_topic
    # 消费消息
    bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic user_login_log_topic --from-beginning
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 利用 Flume 日志采集,监控日志目录,将日志读取到 kafka 中,配置文件 user_login_log.conf 如下:
    # 设置source名称
    a.sources = r1
    # 设置channel的名称
    a.channels = c1
    # 设置sink的名称
    a.sinks = k1
    
    # For each one of the sources, the type is defined
    # 设置source类型为TAILDIR,监控目录下的文件
    # Taildir Source可实时监控目录一批文件,并记录每个文件最新消费位置,agent进程重启后不会有重复消费的问题
    a.sources.r1.type = TAILDIR
    # 文件的组,可以定义多种
    # a.sources.r1.filegroups = f1 f2
    a.sources.r1.filegroups = f1
    a.sources.r1.filegroups.f1 = /data/logs/log-collector/common/.*log
    
    # The channel can be defined as follows.
    # 设置source的channel名称
    a.sources.r1.channels = c1
    a.sources.r1.max-line-length = 1000000
    # a.sources.r1.eventSize = 512000000
    
    # Each channel's type is defined.
    # 设置channel的类型
    a.channels.c1.type = memory
    # Other config values specific to each type of channel(sink or source)
    # can be defined as well
    # In this case, it specifies the capacity of the memory channel
    # 设置channel中最大可以存储的event数量
    a.channels.c1.capacity = 1000
    # 每次最大从source获取或者发送到sink中的数据量
    a.channels.c1.transcationCapacity = 100
    
    # Each sink's type must be defined
    # 设置Kafka接收器
    a.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    # 设置Kafka的broker地址和端口号
    a.sinks.k1.brokerList = node01:9092,node02:9092,node03:9092
    # 设置Kafka的Topic
    a.sinks.k1.topic = user_login_log_topic
    # 设置序列化方式
    a.sinks.k1.serializer.class = kafka.serializer.StringEncoder 
    # Specify the channel the sink should use
    # 设置sink的channel名称
    a.sinks.k1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 启动 flume:
    # node02
    bin/flume-ng agent -n a -c conf -f conf/log_collector.conf -Dflume.root.logger=DEBUG,console
    
    • 1
    • 2
    • 查看输出:

    在这里插入图片描述

    • 启动 SparkStreaming 读取 kafka 中的数据,实时统计PV、UV,对应的代码为:RealTimePVUV.scala。查看控制台输出:

    在这里插入图片描述

    • 在 Redis中查看对应的结果。

    在这里插入图片描述

    第七个业务:实时统计歌曲热榜

    需求

    • 实时采集用户在机器上点播歌曲的日志数据,统计每分钟歌曲点播热榜。将结果保存到关系型数据库mysql中。

    数据采集接口及数据生产

    • 数据采集接口对应的项目地址:https://github.com/shouwangyw/bigdata/tree/master/log-collector
    • 生产数据:本地运行ProduceUserPlaySongLog.java,调用服务端日志采集接口。

    数据处理流程

    • 启动 kafka 集群,并创建对应的日志接收 topic。
    # 创建topic
    bin/kafka-topics.sh --create --bootstrap-server node01:9092,node02:9092,node03:9092 --replication-factor 1 --partitions 3 --topic user_play_song_log_topic
    # 消费消息
    bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic user_play_song_log_topic --from-beginning
    
    • 1
    • 2
    • 3
    • 4
    • 利用 Flume 日志采集,监控日志目录,将日志读取到 kafka 中,配置文件 user_play_song_log.conf 如下:
    # 设置source名称
    a.sources = r1
    # 设置channel的名称
    a.channels = c1
    # 设置sink的名称
    a.sinks = k1
    
    # For each one of the sources, the type is defined
    # 设置source类型为TAILDIR,监控目录下的文件
    # Taildir Source可实时监控目录一批文件,并记录每个文件最新消费位置,agent进程重启后不会有重复消费的问题
    a.sources.r1.type = TAILDIR
    # 文件的组,可以定义多种
    # a.sources.r1.filegroups = f1 f2
    a.sources.r1.filegroups = f1
    a.sources.r1.filegroups.f1 = /data/logs/log-collector/user_play_song/.*log
    
    # The channel can be defined as follows.
    # 设置source的channel名称
    a.sources.r1.channels = c1
    a.sources.r1.max-line-length = 1000000
    # a.sources.r1.eventSize = 512000000
    
    # Each channel's type is defined.
    # 设置channel的类型
    a.channels.c1.type = memory
    # Other config values specific to each type of channel(sink or source)
    # can be defined as well
    # In this case, it specifies the capacity of the memory channel
    # 设置channel中最大可以存储的event数量
    a.channels.c1.capacity = 1000
    # 每次最大从source获取或者发送到sink中的数据量
    a.channels.c1.transcationCapacity = 100
    
    # Each sink's type must be defined
    # 设置Kafka接收器
    a.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    # 设置Kafka的broker地址和端口号
    a.sinks.k1.brokerList = node01:9092,node02:9092,node03:9092
    # 设置Kafka的Topic
    a.sinks.k1.topic = user_play_song_log_topic
    # 设置序列化方式
    a.sinks.k1.serializer.class = kafka.serializer.StringEncoder
    # Specify the channel the sink should use
    # 设置sink的channel名称
    a.sinks.k1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 启动 flume:
    # node02
    bin/flume-ng agent -n a -c conf -f conf/user_play_song_log.conf -Dflume.root.logger=DEBUG,console
    
    • 1
    • 2
    • 查看kafka消费输出:

    在这里插入图片描述

    • 启动 SparkStreaming 读取 kafka 中的数据,实时统计歌曲的点播热度,对应的代码为:RealTimeHotSong.scala。查看控制台输出:

    在这里插入图片描述

    • 在MySQL数据库查看结果。

    在这里插入图片描述

    • 最后利用SuperSet进行展示:

    在这里插入图片描述

  • 相关阅读:
    2023最新electron 进程间通讯的几种方法
    springboot整合SpringSecurity并实现简单权限控制
    linux下CentOS安装mysql-5.7
    08. Java内存模型(JMM)
    【Linux】ping命令详解
    独立站运营的核心——推广分享
    一个非常明显的现象,正在发生——元宇宙正在被越来越多的人所推崇
    java Map集合基本概念
    十六、【分布式微服务企业快速架构】SpringCloud分布式、微服务、云架构之Eclipse 运行程序
    汽车行驶性能的主观评价方法(2)-驾驶员的任务
  • 原文地址:https://blog.csdn.net/yangwei234/article/details/126203187