统计商户营收需要分为以下四个方面的数据:
经过以上四个方面的分析,在数仓中设计分层如下:





USE `music`;
-- 1. TO_YCAK_USR_LOC_D 用户位置记录日增量表
CREATE EXTERNAL TABLE `TO_YCAK_USR_LOC_D` (
`ID` int,
`UID` int,
`LAT` string,
`LNG` string,
`DATETIME` string,
`MID` string
) PARTITIONED BY (data_dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION 'hdfs://node01/user/hive/warehouse/music.db/TO_YCAK_USR_LOC_D';
-- 2. TW_MAC_LOC_D 机器位置信息日统计表
CREATE EXTERNAL TABLE `TW_MAC_LOC_D`(
`MID` int,
`X` string,
`Y` string,
`CNT` int,
`ADDER` string,
`PRVC` string,
`CTY` string,
`CTY_CD` string,
`DISTRICT` string,
`AD_CD` string,
`TOWN_SHIP` string,
`TOWN_CD` string,
`NB_NM` string,
`NB_TP` string,
`BD_NM` string,
`BD_TP` string,
`STREET` string,
`STREET_NB` string,
`STREET_LOC` string,
`STREET_DRCTION` string,
`STREET_DSTANCE` string,
`BUS_INFO` string
) PARTITIONED BY (data_dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION 'hdfs://node01/user/hive/warehouse/music.db/TW_MAC_LOC_D';
-- 3. TO_YCAK_CNSM_D 机器消费订单明细增量表
CREATE EXTERNAL TABLE `TO_YCAK_CNSM_D`(
`ID` int,
`MID` int,
`PRDCD_TYPE` int,
`PAY_TYPE` int,
`PKG_ID` int,
`PKG_NM` string,
`AMT` int,
`CNSM_ID` string,
`ORDR_ID` string,
`TRD_ID` string,
`ACT_TM` string,
`UID` int,
`NICK_NM` string,
`ACTV_ID` int,
`ACTV_NM` string,
`CPN_TYPE` int,
`CPN_TYPE_NM` string,
`PKG_PRC` int,
`PKG_DSCNT` int,
`ORDR_TYPE` int,
`BILL_DT` int
) PARTITIONED BY (data_dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION 'hdfs://node01/user/hive/warehouse/music.db/TO_YCAK_CNSM_D';
-- 4. TW_CNSM_BRIEF_D 消费退款订单流水日增量表
CREATE EXTERNAL TABLE `TW_CNSM_BRIEF_D`(
`ID` int,
`TRD_ID` string,
`UID` string,
`MID` int,
`PRDCD_TYPE` int,
`PAY_TYPE` int,
`ACT_TM` string,
`PKG_ID` int,
`COIN_PRC` int,
`COIN_CNT` int,
`UPDATE_TM` string,
`ORDR_ID` string,
`ACTV_NM` string,
`PKG_PRC` int,
`PKG_DSCNT` int,
`CPN_TYPE` int,
`ABN_TYP` int
) PARTITIONED BY (data_dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION 'hdfs://node01/user/hive/warehouse/music.db/TW_CNSM_BRIEF_D';
-- 5. TW_MAC_STAT_D 机器日营收情况统计表
CREATE EXTERNAL TABLE `TW_MAC_STAT_D`(
`MID` int,
`MAC_NM` string,
`PRDCT_TYPE` string,
`STORE_NM` string,
`BUS_MODE` string,
`PAY_SW` string,
`SCENCE_CATGY` string,
`SUB_SCENCE_CATGY` string,
`SCENE` string,
`SUB_SCENE` string,
`BRND` string,
`SUB_BRND` string,
`PRVC` string,
`CTY` string,
`AREA` string,
`AGE_ID` string,
`INV_RATE` string,
`AGE_RATE` string,
`COM_RATE` string,
`PAR_RATE` string,
`PKG_ID` string,
`PAY_TYPE` int,
`CNSM_USR_CNT` string,
`REF_USR_CNT` string,
`NEW_USR_CNT` string,
`REV_ORDR_CNT` string,
`REF_ORDR_CNT` string,
`TOT_REV` string,
`TOT_REF` string
) PARTITIONED BY (data_dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION 'hdfs://node01/user/hive/warehouse/music.db/TW_MAC_STAT_D';
-- 6. TM_USR_MRCHNT_STAT_D 商户日营收统计表
CREATE EXTERNAL TABLE `TM_USR_MRCHNT_STAT_D`(
`ADMIN_ID` string,
`PAY_TYPE` int,
`REV_ORDR_CNT` int,
`REF_ORDR_CNT` int,
`TOT_REV` double,
`TOT_REF` double,
`TOT_INV_REV` DECIMAL(10,2),
`TOT_AGE_REV` DECIMAL(10,2),
`TOT_COM_REV` DECIMAL(10,2),
`TOT_PAR_REV` DECIMAL(10,2)
) PARTITIONED BY (DATA_DT string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION 'hdfs://node01/user/hive/warehouse/music.db/TM_USR_MRCHNT_STAT_D';

1_incr_extract_mysqldata_to_ods.sh 内容如下:#!/bin/bash
currentDate=`date -d today +"%Y%m%d"`
if [ x"$1" = x ]; then
echo "====没有导入数据的日期,输入日期===="
exit
else
echo "====使用导入数据的日期 ===="
currentDate=$1
fi
echo "日期为 : $currentDate"
# 查询hive ODS层表 to_ycak_usr_loc_d 中目前存在的最大的ID
result=`ssh hadoop@node03 "source /etc/profile;hive -e 'select max(id) from music.to_ycak_usr_loc_d'" | grep _c0 -A 1`
maxId=`echo ${result} | awk "{print \\$2}"`
if [ x"${maxId}" = xNULL ]; then
maxId=0
fi
echo "Hive ODS层表 TO_YCAK_USR_LOC_D 最大的ID是${maxId}"
ssh hadoop@node03 > /tmp/logs/music_project/user-loc-info.log 2>&1 <<aabbcc
hostname
source /etc/profile
## user_location ==>> TO_YCAK_USR_LOC_D
sqoop import --connect jdbc:mysql://node01:3306/ycak?dontTrackOpenResources=true\&defaultFetchSize=10000\&useCursorFetch=true\&useUnicode=yes\&characterEncoding=utf8 --username root --password 123456 --table user_location --target-dir /user/hive/warehouse/music.db/TO_YCAK_USR_LOC_D/data_dt=${currentDate} --num-mappers 1 --fields-terminated-by '\t' --incremental append --check-column id --last-value ${maxId}
# 更新Hive 分区
hive -e 'alter table music.to_ycak_usr_loc_d add partition(data_dt=${currentDate})'
exit
aabbcc
echo "all done!"
2_incr_extract_mysqldata_to_ods.sh 内容如下:#!/bin/bash
currentDate=`date -d today +"%Y%m%d"`
if [ x"$1" = x ]; then
echo "====没有导入数据的日期,输入日期===="
exit
else
echo "====使用导入数据的日期 ===="
currentDate=$1
fi
echo "日期为 : $currentDate"
# 查询hive ODS层表 to_ycak_cnsm_d 中目前存在的最大的ID
result=`ssh hadoop@node03 "source /etc/profile;hive -e 'select max(id) from music.to_ycak_cnsm_d'" | grep _c0 -A 1`
maxId=`echo ${result} | awk "{print \\$2}"`
if [ x"${maxId}" = xNULL ]; then
maxId=0
fi
echo "Hive ODS层表 TO_YCAK_CNSM_D 最大的ID是${maxId}"
ssh hadoop@node03 > /tmp/logs/music_project/machine-consume.log 2>&1 <<aabbcc
hostname
source /etc/profile
## machine_consume_detail ==>> TO_YCAK_CNSM_D
sqoop import --connect jdbc:mysql://node01:3306/ycak?dontTrackOpenResources=true\&defaultFetchSize=10000\&useCursorFetch=true\&useUnicode=yes\&characterEncoding=utf8 --username root --password 123456 --table machine_consume_detail --target-dir /user/hive/warehouse/music.db/TO_YCAK_CNSM_D/data_dt=${currentDate} --num-mappers 1 --fields-terminated-by '\t' --incremental append --check-column id --last-value ${maxId}
# 更新Hive 分区
hive -e 'alter table music.to_ycak_cnsm_d add partition(data_dt=${currentDate})'
exit
aabbcc
echo "all done!"
TO_YCAK_USR_LOC_D 用户位置记录日增量表数据到 TW_MAC_LOC_D 机器位置信息日统计表中,这里清洗主要是根据用户上报的经纬度信息通过高德 API 获取对应的机器位置,对应的清洗数据的 scala 文件:GenerateTwMacLocD.scala。
TO_YCAK_CNSM_D 机器消费订单明细增量表到 EDS 层 TW_CNSM_BRIEF_D 消费退款订单流水日增量表中,这里主要根据用户每天在机器上的消费行为统计每台机器的订单营收情况,对应的清洗数据的 scala 文件:GenerateTwCnsmBriefD.scala。
TW_MAX_STAT_D 表的数据,基于以下 4 张表的数据统计出机器位置、机器的营收情况、用户登录情况。


① 针对 ycak 库下的 user_location 和 machine_consume_detail 两张表,增量导入到 Hive ODS 层,脚本如下:1_incr_extract_mysqldata_to_ods.sh,2_incr_extract_mysqldata_to_ods.sh
② 根据ODS层表数据,得到 Hive EDS 层机器位置信息日统计表和消费退款订单流水日增量表,脚本如下:
3_generate_tw_mac_loc_d.sh#!/bin/bash
currentDate=`date -d today +"%Y%m%d"`
if [ x"$1" = x ]; then
echo "====使用自动生成的今天日期===="
else
echo "====使用 Azkaban 传入的日期===="
currentDate=$1
fi
echo "日期为: $currentDate"
ssh hadoop@node01 > /tmp/logs/music_project/mac_loc.log 2>&1 <<aabbcc
hostname
cd /bigdata/install/spark-2.3.3-bin-hadoop2.7/bin
./spark-submit --master yarn --class com.yw.musichw.eds.machine.GenerateTwMacLocD \
/bigdata/data/music_project/musicwh-1.0.0-SNAPSHOT-jar-with-dependencies.jar $currentDate
exit
aabbcc
echo "all done!"
4_generate_tw_cnsm_brief_d.sh#!/bin/bash
currentDate=`date -d today +"%Y%m%d"`
if [ x"$1" = x ]; then
echo "====使用自动生成的今天日期===="
else
echo "====使用 Azkaban 传入的日期===="
currentDate=$1
fi
echo "日期为: $currentDate"
ssh hadoop@node01 > /tmp/logs/music_project/cnsm_brief.log 2>&1 <<aabbcc
hostname
cd /bigdata/install/spark-2.3.3-bin-hadoop2.7/bin
./spark-submit --master yarn --class com.yw.musichw.eds.user.GenerateTwCnsmBriefD \
/bigdata/data/music_project/musicwh-1.0.0-SNAPSHOT-jar-with-dependencies.jar $currentDate
exit
aabbcc
echo "all done!"
③ 以上两张EDS层表,再结合第二个业务统计的机器详细信息,和第三个业务统计的日活跃用户信息,统计得到机器日营收情况,脚本 5_generate_tw_mac_stat_d.sh 如下:
#!/bin/bash
currentDate=`date -d today +"%Y%m%d"`
if [ x"$1" = x ]; then
echo "====使用自动生成的今天日期===="
else
echo "====使用 Azkaban 传入的日期===="
currentDate=$1
fi
echo "日期为: $currentDate"
ssh hadoop@node01 > /tmp/logs/music_project/mac_stat.log 2>&1 <<aabbcc
hostname
cd /bigdata/install/spark-2.3.3-bin-hadoop2.7/bin
./spark-submit --master yarn --class com.yw.musichw.eds.machine.GenerateTwMacStatD \
/bigdata/data/music_project/musicwh-1.0.0-SNAPSHOT-jar-with-dependencies.jar $currentDate
exit
aabbcc
echo "all done!"
6_generate_tm_usr_mrchnt_stat_d.sh 如下:#!/bin/bash
currentDate=`date -d today +"%Y%m%d"`
if [ x"$1" = x ]; then
echo "====使用自动生成的今天日期===="
else
echo "====使用 Azkaban 传入的日期===="
currentDate=$1
fi
echo "日期为: $currentDate"
ssh hadoop@node01 > /tmp/logs/music_project/usr_mrchnt_stat.log 2>&1 <<aabbcc
hostname
cd /bigdata/install/spark-2.3.3-bin-hadoop2.7/bin
./spark-submit --master yarn --class com.yw.musichw.dm.content.GenerateTmUsrMrchntStatD \
/bigdata/data/music_project/musicwh-1.0.0-SNAPSHOT-jar-with-dependencies.jar $currentDate
exit
aabbcc
echo "all done!"
user-mrchnt.flow,内容如下:nodes:
- name: Job1_ExtractMySQLDataToODS
type: command
config:
command: sh 1_incr_extract_mysqldata_to_ods.sh ${mydate}
command.1: sh 2_incr_extract_mysqldata_to_ods.sh ${mydate}
- name: Job2_GenerateTwMacLocAndCnsmBrirfToEDS
type: command
config:
command: sh 3_generate_tw_mac_loc_d.sh ${mydate}
command.1: sh 4_generate_tw_cnsm_brief_d.sh ${mydate}
dependsOn:
- Job1_ExtractMySQLDataToODS
- name: Job3_GenerateTwMacStatToEDS
type: command
config:
command: sh 5_generate_tw_mac_stat_d.sh ${mydate}
dependsOn:
- Job2_GenerateTwMacLocAndCnsmBrirfToEDS
- name: Job4_GenerateTmUsrMrchntStatToDM
type: command
config:
command: sh 6_generate_tm_usr_mrchnt_stat_d.sh ${mydate}
dependsOn:
- Job3_GenerateTwMacStatToEDS
user-mrchnt.flow 与 flow20.project 压缩生成 zip 文件 user-mrchnt.zipdrop table `music`.`to_ycak_usr_loc_d`;
drop table `music`.`to_ycak_cnsm_d`;
drop table `music`.`tw_mac_loc_d`;
drop table `music`.`tw_cnsm_brief_d`;
drop table `music`.`tw_mac_stat_d`;
drop table `music`.`tm_usr_mrchnt_stat_d`;
hive -f drop_merchant_tables.sql ,删除表。由于这些都是外部表,真正的数据还在 HDFS,所以还需要删除相关的数据。hive -f create_merchant_tables.sql,创建表。user-mrchnt.zip



CREATE EXTERNAL TABLE `TM_MAC_REGION_STAT_D`(
`PRVC` string,
`CTY` string,
`MAC_CNT` int,
`MAC_REV` DECIMAL(10,2),
`MAC_REF` DECIMAL(10,2),
`MAC_REV_ORDR_CNT` int,
`MAC_REF_ORDR_CNT` int,
`MAC_CNSM_USR_CNT` int,
`MAC_REF_USR_CNT` int
) PARTITIONED BY (DATA_DT string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION 'hdfs://node01/user/hive/warehouse/music.db/TM_MAC_REGION_STAT_D';
1_generate_tm_mac_region_stat_d.sh 如下:#!/bin/bash
currentDate=`date -d today +"%Y%m%d"`
if [ x"$1" = x ]; then
echo "====使用自动生成的今天日期===="
else
echo "====使用 Azkaban 传入的日期===="
currentDate=$1
fi
echo "日期为: $currentDate"
ssh hadoop@node01 > /tmp/logs/music_project/mac_region_stat.log 2>&1 <<aabbcc
hostname
cd /bigdata/install/spark-2.3.3-bin-hadoop2.7/bin
./spark-submit --master yarn --class com.yw.musichw.dm.machine.GenerateTmMacRegionStatD \
/bigdata/data/music_project/musicwh-1.0.0-SNAPSHOT-jar-with-dependencies.jar $currentDate
exit
aabbcc
echo "all done!"
mac_region.flow,内容如下:nodes:
- name: Job1_GenerateTmMacRegionStatToDM
type: command
config:
command: sh 1_generate_tm_mac_region_stat_d.sh ${mydate}
mac_region.flow 与 flow20.project 压缩生成 zip 文件 mac_region.zipmac_region.zip













# 启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
# 创建topic
bin/kafka-topics.sh --create --bootstrap-server node01:9092,node02:9092,node03:9092 --replication-factor 1 --partitions 3 --topic user_login_log_topic
# 消费消息
bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic user_login_log_topic --from-beginning
user_login_log.conf 如下:# 设置source名称
a.sources = r1
# 设置channel的名称
a.channels = c1
# 设置sink的名称
a.sinks = k1
# For each one of the sources, the type is defined
# 设置source类型为TAILDIR,监控目录下的文件
# Taildir Source可实时监控目录一批文件,并记录每个文件最新消费位置,agent进程重启后不会有重复消费的问题
a.sources.r1.type = TAILDIR
# 文件的组,可以定义多种
# a.sources.r1.filegroups = f1 f2
a.sources.r1.filegroups = f1
a.sources.r1.filegroups.f1 = /data/logs/log-collector/common/.*log
# The channel can be defined as follows.
# 设置source的channel名称
a.sources.r1.channels = c1
a.sources.r1.max-line-length = 1000000
# a.sources.r1.eventSize = 512000000
# Each channel's type is defined.
# 设置channel的类型
a.channels.c1.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
# 设置channel中最大可以存储的event数量
a.channels.c1.capacity = 1000
# 每次最大从source获取或者发送到sink中的数据量
a.channels.c1.transcationCapacity = 100
# Each sink's type must be defined
# 设置Kafka接收器
a.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 设置Kafka的broker地址和端口号
a.sinks.k1.brokerList = node01:9092,node02:9092,node03:9092
# 设置Kafka的Topic
a.sinks.k1.topic = user_login_log_topic
# 设置序列化方式
a.sinks.k1.serializer.class = kafka.serializer.StringEncoder
# Specify the channel the sink should use
# 设置sink的channel名称
a.sinks.k1.channel = c1
# node02
bin/flume-ng agent -n a -c conf -f conf/log_collector.conf -Dflume.root.logger=DEBUG,console



# 创建topic
bin/kafka-topics.sh --create --bootstrap-server node01:9092,node02:9092,node03:9092 --replication-factor 1 --partitions 3 --topic user_play_song_log_topic
# 消费消息
bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic user_play_song_log_topic --from-beginning
user_play_song_log.conf 如下:# 设置source名称
a.sources = r1
# 设置channel的名称
a.channels = c1
# 设置sink的名称
a.sinks = k1
# For each one of the sources, the type is defined
# 设置source类型为TAILDIR,监控目录下的文件
# Taildir Source可实时监控目录一批文件,并记录每个文件最新消费位置,agent进程重启后不会有重复消费的问题
a.sources.r1.type = TAILDIR
# 文件的组,可以定义多种
# a.sources.r1.filegroups = f1 f2
a.sources.r1.filegroups = f1
a.sources.r1.filegroups.f1 = /data/logs/log-collector/user_play_song/.*log
# The channel can be defined as follows.
# 设置source的channel名称
a.sources.r1.channels = c1
a.sources.r1.max-line-length = 1000000
# a.sources.r1.eventSize = 512000000
# Each channel's type is defined.
# 设置channel的类型
a.channels.c1.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
# 设置channel中最大可以存储的event数量
a.channels.c1.capacity = 1000
# 每次最大从source获取或者发送到sink中的数据量
a.channels.c1.transcationCapacity = 100
# Each sink's type must be defined
# 设置Kafka接收器
a.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 设置Kafka的broker地址和端口号
a.sinks.k1.brokerList = node01:9092,node02:9092,node03:9092
# 设置Kafka的Topic
a.sinks.k1.topic = user_play_song_log_topic
# 设置序列化方式
a.sinks.k1.serializer.class = kafka.serializer.StringEncoder
# Specify the channel the sink should use
# 设置sink的channel名称
a.sinks.k1.channel = c1
# node02
bin/flume-ng agent -n a -c conf -f conf/user_play_song_log.conf -Dflume.root.logger=DEBUG,console



