
7、DWD层
流表和维表关联,可以使用lookup join ,当存在hbase或者mysql中的表发生改变时,可以动态的发生改变

1、支付事实表
数据仓库建模的方法:

注意:Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。从kafka中source并且sink到kafka中去。
学习一个新的函数:listAGG():连接字符串表达式的值并在它们之间放置分隔符值。字符串末尾不添加分隔符时则分隔符的默认值为“,”。
- --1、创建kafka sink表
-
- CREATE TABLE gma_dwd.dwd_kafka_payment_info (
- id bigint,
- user_id BIGINT,
- payment_time STRING,
- payment_type STRING,
- province_id BIGINT,
- skus STRING,
- payment_price decimal(10,2),
- sku_num BIGINT,
- proc_time as PROCTIME(),
- PRIMARY KEY (id) NOT ENFORCED-- 设置唯一主键
- ) WITH (
- 'connector' = 'upsert-kafka',
- 'topic' = 'dwd_payment_info',
- 'properties.bootstrap.servers' = 'master:9092',
- 'key.format' = 'json',
- 'value.format' = 'json'
- );
-
- --2、关联支付流水表,订单表,订单明细表,构建支付事实表
-
- insert into gma_dwd.dwd_kafka_payment_info
- select
- id,
- cast(user_id as BIGINT) as user_id,
- payment_time,
- payment_type,
- cast(province_id as BIGINT) as province_id,
- listAGG(cast(sku_id as STRING)) as skus,
- sum(order_price*sku_num) as payment_price,
- sum(sku_num) as sku_num
- from (
- select a.id,a.user_id,a.payment_time,a.payment_type,b.province_id,c.sku_id,c.order_price,cast(c.sku_num as bigint) as sku_num from
- gma_ods.ods_mysql_kafka_payment_info
- /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as a
- inner join
- gma_ods.ods_mysql_kafka_order_info
- /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as b
- on cast(a.order_id as bigint)=b.id
- inner join
- gma_ods.ods_mysql_kafka_order_detail
- /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as c
- on b.id=c.order_id
- )
- as d
- group by
- id,user_id,payment_time,payment_type,province_id;
-
-
- -- 消费kafka
- kafka-console-consumer.sh --bootstrap-server master:9092,node2:9092,node2:9092 --from-beginning --topic dwd_payment_info
-
- --在flink中查询数据
- select * from gma_dwd.dwd_kafka_payment_info /*+ OPTIONS('scan.startup.mode'='earliest-offset') */
2、订单事实表

- -- 1、创建kafka sink表
-
- CREATE TABLE gma_dwd.dwd_kafka_order_info (
- id BIGINT,
- consignee STRING,
- consignee_tel STRING,
- delivery_address STRING,
- order_status STRING,
- user_id BIGINT,
- payment_way STRING,
- create_time TIMESTAMP(3),
- operate_time TIMESTAMP(3),
- expire_time TIMESTAMP(3),
- province_id BIGINT,
- skus STRING,
- total_amount decimal(10,2),
- proc_time as PROCTIME(),
- PRIMARY KEY (id) NOT ENFORCED-- 设置唯一主键
- ) WITH (
- 'connector' = 'upsert-kafka',
- 'topic' = 'dwd_order_info',
- 'properties.bootstrap.servers' = 'master:9092',
- 'key.format' = 'json',
- 'value.format' = 'json'
- );
-
-
- -- 执行sql
- insert into gma_dwd.dwd_kafka_order_info
- select
- a.id,
- a.consignee,
- a.consignee_tel,
- a.delivery_address,
- a.order_status,
- a.user_id,
- a.payment_way,
- a.create_time,
- a.operate_time,
- a.expire_time,
- cast(a.province_id as bigint),
- b.skus,
- a.total_amount
- from
- gma_ods.ods_mysql_kafka_order_info
- /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as a
- join
- (
- select order_id,listagg(cast(sku_id as STRING)) as skus from
- gma_ods.ods_mysql_kafka_order_detail
- /*+ OPTIONS('scan.startup.mode'='earliest-offset') */
- group by order_id
- )
- as b
- on a.id=b.order_id
8、ADS层
-
统计指标的方法论
原子指标:下单金额,支付金额
派生指标=原子指标+统计周期+业务限定+统计维度
先在mysql中创建数据库gma_ads
1、支付金额
- 实时计算每个用户每天实时的支付金额
- 实时计算每个地区每天的支付金额
- 实时计算每种支付方式每天支付金额
- 实时统计每个大区每天的支付金额
- 实时统计不同性别每天支付金额
1、实时计算每个用户每天实时的支付金额
- -- 1、创建msyql sink表
- -- flink sql jdbc sink表
- CREATE TABLE gma_ads.ads_mysql_user_day_sum_payment_price (
- user_id BIGINT,
- day_id STRING,
- sum_payment_price decimal(10,2),
- PRIMARY KEY (user_id,day_id) NOT ENFORCED -- 按照主键更新数据
- ) WITH (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:mysql://master:3306/gma_ads?useUnicode=true&characterEncoding=UTF-8',
- 'table-name' = 'ads_mysql_user_day_sum_payment_price', -- 需要手动到数据库中创建表
- 'username' = 'root',
- 'password' = '123456'
- );
-
-
- -- 在mysql中创建表
- CREATE TABLE `ads_mysql_user_day_sum_payment_price` (
- `user_id` BIGINT NOT NULL,
- `day_id` varchar(255) NOT NULL,
- `sum_payment_price` decimal(10,2),
- PRIMARY KEY (`user_id`,`day_id`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-
- --实时统计
- insert into gma_ads.ads_mysql_user_day_sum_payment_price
- select
- user_id,
- substr(payment_time,1,10) as day_id,
- sum(payment_price) as sum_payment_price
- from gma_dwd.dwd_kafka_payment_info
- group by user_id,substr(payment_time,1,10);
2、实时计算每个地区每天的支付金额
- -- 1、创建msyql sink表
- -- flink sql jdbc sink表
- CREATE TABLE gma_ads.ads_mysql_proc_day_sum_payment_price (
- pro_name STRING,
- day_id STRING,
- sum_payment_price decimal(10,2),
- PRIMARY KEY (pro_name,day_id) NOT ENFORCED -- 按照主键更新数据
- ) WITH (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:mysql://master:3306/gma_ads?useUnicode=true&characterEncoding=UTF-8',
- 'table-name' = 'ads_mysql_proc_day_sum_payment_price', -- 需要手动到数据库中创建表
- 'username' = 'root',
- 'password' = '123456'
- );
-
-
- -- 在mysql中创建表
- CREATE TABLE `ads_mysql_proc_day_sum_payment_price` (
- `pro_name` varchar(255) NOT NULL,
- `day_id` varchar(255) NOT NULL,
- `sum_payment_price` decimal(10,2),
- PRIMARY KEY (`pro_name`,`day_id`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-
- --实时统计
- -- 实时关联读取维度表获取省名
- insert into gma_ads.ads_mysql_proc_day_sum_payment_price
- select
- b.info.pro_name as pro_name,
- substr(payment_time,1,10) as day_id,
- sum(payment_price) as sum_payment_price
- from gma_dwd.dwd_kafka_payment_info as a
- LEFT JOIN
- gma_dim.dim_hbase_region FOR SYSTEM_TIME AS OF a.proc_time as b
- on a.province_id=b.pro_id
- group by b.info.pro_name,substr(payment_time,1,10);
2、下单笔数
- 每个省份每天实时下单的数量
- 每个大区每天实时下单的数量
- 实时统计每个品牌每天下单的数量
- 实时统计每个用户每天下单的数量
1、每个省份每天实时下单的数量
- -- 创建mysql sin表
- CREATE TABLE gma_ads.ads_mysql_proc_day_order_num (
- pro_name STRING,
- day_id STRING,
- num bigint,
- PRIMARY KEY (pro_name,day_id) NOT ENFORCED -- 按照主键更新数据
- ) WITH (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:mysql://master:3306/gma_ads?useUnicode=true&characterEncoding=UTF-8',
- 'table-name' = 'ads_mysql_proc_day_order_num', -- 需要手动到数据库中创建表
- 'username' = 'root',
- 'password' = '123456'
- );
-
-
- -- 在mysql中创建表
- CREATE TABLE `ads_mysql_proc_day_order_num` (
- `pro_name` varchar(255) NOT NULL,
- `day_id` varchar(255) NOT NULL,
- `num` bigint,
- PRIMARY KEY (`pro_name`,`day_id`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2、每个大区每天实时下单的数量
- -- 创建mysql sin表
- CREATE TABLE gma_ads.ads_mysql_region_day_order_num (
- region_name STRING,
- day_id STRING,
- num bigint,
- PRIMARY KEY (region_name,day_id) NOT ENFORCED -- 按照主键更新数据
- ) WITH (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:mysql://master:3306/gma_ads?useUnicode=true&characterEncoding=UTF-8',
- 'table-name' = 'ads_mysql_region_day_order_num', -- 需要手动到数据库中创建表
- 'username' = 'root',
- 'password' = '123456'
- );
-
-
- -- 在mysql中创建表
- CREATE TABLE `ads_mysql_region_day_order_num` (
- `region_name` varchar(255) NOT NULL,
- `day_id` varchar(255) NOT NULL,
- `num` bigint,
- PRIMARY KEY (`region_name`,`day_id`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3、实时统计每个品牌每天下单的数量
- LOAD MODULE hive WITH ('hive-version' = '1.2.1');
-
-
- select c.info.tm_name as tm_name,day_id,count(1) as num from (
- select
- sku_id,
- DATE_FORMAT(create_time,'yyyy-MM-dd') as day_id,
- PROCTIME() as proc_time
- from
- gma_dwd.dwd_kafka_order_info as a,
- LATERAL TABLE(explode(split(skus,','))) t(sku_id)
- ) as b
- left join
- gma_dim.dim_hbase_item_info FOR SYSTEM_TIME AS OF b.proc_time as c
- on cast(b.sku_id as bigint)=c.sku_id
- group by c.info.tm_name,day_id
在一个任务中执行多条sql
- EXECUTE STATEMENT SET
- BEGIN
- -- 每个省份每天实时下单的数量
- insert into gma_ads.ads_mysql_proc_day_order_num
- select
- b.info.pro_name,
- DATE_FORMAT(create_time,'yyyy-MM-dd') as day_id,
- count(1) as num
- from
- gma_dwd.dwd_kafka_order_info as a
- left join
- gma_dim.dim_hbase_region FOR SYSTEM_TIME AS OF a.proc_time as b
- on a.province_id=b.pro_id
- group by b.info.pro_name,DATE_FORMAT(create_time,'yyyy-MM-dd');
-
- -- 每个大区每天实时下单的数量
- insert into gma_ads.ads_mysql_region_day_order_num
- select
- b.info.region_name,
- DATE_FORMAT(create_time,'yyyy-MM-dd') as day_id,
- count(1) as num
- from
- gma_dwd.dwd_kafka_order_info as a
- left join
- gma_dim.dim_hbase_region FOR SYSTEM_TIME AS OF a.proc_time as b
- on a.province_id=b.pro_id
- group by b.info.region_name,DATE_FORMAT(create_time,'yyyy-MM-dd');
-
- END;
9、问题
1、flink将更新的数据批量写入hbase时数据丢失的问题 (BUG)
- 解决方法:将批量写入改成条写入
- 'sink.buffer-flush.max-rows'='0'
