• 3、构建实时数据仓库-ods和dim层构建


    3、构建实时数据仓库

    项目平台搭建架构及其总体流程

    1、flink整合hive的catalog

    因为本项目中的对应kafka中的表都存在hive的元数据库中,所以需要创建一个hive的catalog,方便直接操作

    1. -- 进入sql客户端
    2. sql-client.sh
    3. -- 创建hive catalog
    4. CREATE CATALOG hive_catalog WITH (
    5. 'type' = 'hive',
    6. 'default-database' = 'default',
    7. 'hive-conf-dir' = '/usr/local/soft/hive-1.2.1/conf'
    8. );
    9. -- set the HiveCatalog as the current catalog of the session
    10. USE CATALOG hive_catalog;
    11. -- 创建一个flink_init.sql文件,将hive catalog放进去,后面再启动sql-client时指定sql文件

    2、在hviecatalog中创建数据库

    离线数据仓库:每一个库对应一个用户,每一个库对应hdfs中一个目录

    实时数据仓库每一个层的数据保存在不同位置,ods,dwd,dws保存在kafkadim,ads的数据保存在数据库中

    1. create database gma_ods;
    2. create database gma_dwd;
    3. create database gma_dws;
    4. create database gma_dim;
    5. create database gma_ads;

    4、数据采集

    必须现在mysql中创建数据库,然后打开canal,实时监控数据库,当我们对于这个库做任何操作,kafka中就会实时同步我们的操作(通过json格式)

    1. # 1、在mysql中创建数据库gma,指定编码格式为utf-8
    2. # 2、修改canal配置文件
    3. cd /usr/local/soft/canal/conf/example
    4. # 修改配置文件
    5. vim instance.properties
    6. # 增加动态topic配置,每个表在kafka中生成一个topic
    7. canal.mq.dynamicTopic=gma\\..*
    8. # 3、开启mysqlbinlog日志
    9. vim /etc/my.cnf
    10. # 在配置文件中增加二配置
    11. # 需要将配置放在[mysqld]后面
    12. # 打开binlog
    13. log-bin=mysql-bin
    14. # 选择ROW(行)模式
    15. binlog-format=ROW
    16. # 重启mysql
    17. service mysqld restart
    18. # 4、启动canal
    19. cd /usr/local/soft/canal/bin
    20. ./restart.sh
    21. # 5、在数据库中创建表
    22. # 执行下面这个sql文件
    23. init_mysql_table.sql
    24. # 6、查看topic是否生成
    25. kafka-topics.sh --list --zookeeper master:2181
    26. # 如果topic没有生成,检测前面哪里出了问题,创建表之后必须要有topic
    27. # 7、导入数据到mysql中
    28. # 执行sql文件
    29. load_data.sql
    30. # 8、使用kafka控制台消费者消费数据
    31. kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic gma.base_category1

    5、ODS层

    • 在flink sql中创建ods的表
    1. # 进入sql-client
    2. sql-client.sh -i flink_init.sql
    3. # 创建ods层所有的表
    4. ods_mysql_kafka_base_category1:商品一级分类表
    5. ods_mysql_kafka_base_category2:商品二级分类表
    6. ods_mysql_kafka_base_category3:商品三级分类表
    7. ods_mysql_kafka_base_province:省份配置报表
    8. ods_mysql_kafka_base_region:地区配置表
    9. ods_mysql_kafka_base_trademark:品牌表
    10. ods_mysql_kafka_date_info:时间配置表
    11. ods_mysql_kafka_holiday_info:节假日表
    12. ods_mysql_kafka_holiday_year:节假日对应时间表
    13. ods_mysql_kafka_order_detail:订单详情表,一个订单中一个商品一条数据
    14. ods_mysql_kafka_order_info:订单表。一个订单一条数据,订单表中有订单状态
    15. ods_mysql_kafka_order_status_log:订单转台变化日志记录表
    16. ods_mysql_kafka_payment_info:支付流水表
    17. ods_mysql_kafka_sku_info:商品信息表
    18. ods_mysql_kafka_user_info:用户信息表

    6、DIM层

    每个kafka中的数据对应我在ods层中的一张表,然后我们可以通过sql对ods层进行操作,为dim,dwd层提供数据

    注意:我们在构建dim层的维表并将维表的数据写入到hbase的时候,发现一个错误。就是写入到hbase的数据会变少。这可能是一个bug。

    • 将维度表单独保存到一个hbae的命名空间(相当于库)中

      1. # 进入hbase shell
      2. habse shell
      3. # 创建命名空间
      4. create_namespace 'gma_dim'

    1、地区维度表

    将省份表和地区表合并成地区维度表(维度退化)
    从kafka中读取数据合并两个表,将合并之后的地区维度表保存到hbae中

    1. -- 1、在hbase中创建表
    2. create 'gma_dim.dim_region','info'
    3. -- 2、在flink实时数据仓库的dim层创建地区维度表
    4. -- 创建hbase sink表
    5. CREATE TABLE gma_dim.dim_hbase_region (
    6. pro_id BIGINT,
    7. info ROW<pro_name STRING,region_id BIGINT,region_name STRING,area_code STRING>,
    8. PRIMARY KEY (pro_id) NOT ENFORCED
    9. ) WITH (
    10. 'connector' = 'hbase-1.4',
    11. 'table-name' = 'gma_dim.dim_region',
    12. 'zookeeper.quorum' = 'master:2181'
    13. );
    14. -- 3、编写flinksql 合并地区表和省份表,将数据保存到地区维度表中
    15. insert into gma_dim.dim_hbase_region
    16. select pro_id,ROW(pro_name,region_id,region_name,area_code) from (
    17. select b.id as pro_id,b.name as pro_name,region_id,region_name,area_code from
    18. gma_ods.ods_mysql_kafka_base_region /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as a
    19. inner join
    20. gma_ods.ods_mysql_kafka_base_province /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as b
    21. on
    22. a.id=b.region_id
    23. ) as c
    24. --4、到hbase中查看是否有数据
    25. scan 'gma_dim.dim_region'

    2、商品维度表

    商品表、品类表、spu表、商品三级分类、商品二级分类、商品一级分类表退化为商品维度表

    维度退化:将多个维度表退化成一个维度表

    1. -- 1、在hbsae中创建商品维度表
    2. create 'gma_dim.dim_item_info','info'
    3. -- 2、在flink实时数据仓库中创建商品维度表
    4. CREATE TABLE gma_dim.dim_hbase_item_info (
    5. sku_id bigint,
    6. info ROW<spu_id bigint, price decimal(10,0) , sku_name STRING, sku_desc STRING, weight decimal(10,2), tm_id bigint, tm_name STRING, category3_id bigint, category3_name STRING, category2_id bigint, category2_name STRING, category1_id bigint, category1_name STRING, sku_default_img STRING, create_time TIMESTAMP(3) >,
    7. PRIMARY KEY (sku_id) NOT ENFORCED
    8. ) WITH (
    9. 'connector' = 'hbase-1.4',
    10. 'table-name' = 'gma_dim.dim_item_info',
    11. 'zookeeper.quorum' = 'master:2181',
    12. 'sink.buffer-flush.max-rows'='0'
    13. );
    14. --3、关联多张表得到商品维度表
    15. insert into gma_dim.dim_hbase_item_info
    16. select
    17. sku_id,ROW(spu_id , price, sku_name, sku_desc, weight, tm_id, tm_name, category3_id, category3_name, category2_id, category2_name, category1_id, category1_name, sku_default_img, create_time) as info
    18. from (
    19. select
    20. a.id as sku_id,
    21. a.spu_id,
    22. a.price,
    23. a.sku_name,
    24. a.sku_desc,
    25. a.weight,
    26. a.tm_id,
    27. b.tm_name,
    28. a.category3_id,
    29. c.name as category3_name,
    30. d.id as category2_id,
    31. d.name as category2_name,
    32. e.id as category1_id,
    33. e.name as category1_name,
    34. a.sku_default_img,
    35. a.create_time
    36. from
    37. gma_ods.ods_mysql_kafka_sku_info
    38. /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as a
    39. inner join
    40. gma_ods.ods_mysql_kafka_base_trademark
    41. /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as b
    42. on a.tm_id=cast(b.tm_id as bigint)
    43. inner join
    44. gma_ods.ods_mysql_kafka_base_category3
    45. /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as c
    46. on a.category3_id=c.id
    47. inner join
    48. gma_ods.ods_mysql_kafka_base_category2
    49. /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as d
    50. on c.category2_id=d.id
    51. inner join
    52. gma_ods.ods_mysql_kafka_base_category1
    53. /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as e
    54. on d.category1_id=e.id
    55. ) as f;

    3、用户维度表

    1. --1、在hbase中创建用户维度表
    2. create 'gma_dim.dim_user_info','info'
    3. --2、在flink实时数据仓库中创建用户维度表
    4. CREATE TABLE gma_dim.dim_hbase_user_info (
    5. user_id BIGINT,
    6. info ROW<login_name STRING , nick_name STRING , passwd STRING , name STRING , phone_num STRING , email STRING , head_img STRING , user_level STRING , birthday DATE , gender STRING , create_time TIMESTAMP(3)>,
    7. PRIMARY KEY (user_id) NOT ENFORCED
    8. ) WITH (
    9. 'connector' = 'hbase-1.4',
    10. 'table-name' = 'gma_dim.dim_user_info',
    11. 'zookeeper.quorum' = 'master:2181'
    12. );
    13. --3、编写flink sql读取ods层用户信息表将数据保存到hbae中构建用户维度表
    14. insert into gma_dim.dim_hbase_user_info
    15. select id as user_id,ROW(login_name,nick_name,passwd,name,phone_num,email,head_img,user_level,birthday,gender,create_time) from
    16. gma_ods.ods_mysql_kafka_user_info
    17. /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as a
    18. ;
  • 相关阅读:
    阿里云服务器如何购买?三种方式可买(图文教程举例)
    数仓4.0
    MayApps平台为政府机关后勤管理添“智慧”
    php 校园oa办公系统xammp
    java毕业设计基于网络平台个人博客系统Mybatis+系统+数据库+调试部署
    php学习总结
    [MySQL]基本介绍及安装使用详细讲解
    相机标定和双目相机标定标定原理推导及效果展示
    c++ virtual关键字
    用 AWTK 和 AWPLC 快速开发嵌入式应用程序 (3)- 定时器
  • 原文地址:https://blog.csdn.net/weixin_48370579/article/details/126336147