
3、构建实时数据仓库
项目平台搭建架构及其总体流程

1、flink整合hive的catalog
因为本项目中的对应kafka中的表都存在hive的元数据库中,所以需要创建一个hive的catalog,方便直接操作
- -- 进入sql客户端
- sql-client.sh
-
- -- 创建hive catalog
- CREATE CATALOG hive_catalog WITH (
- 'type' = 'hive',
- 'default-database' = 'default',
- 'hive-conf-dir' = '/usr/local/soft/hive-1.2.1/conf'
- );
- -- set the HiveCatalog as the current catalog of the session
- USE CATALOG hive_catalog;
-
-
- -- 创建一个flink_init.sql文件,将hive catalog放进去,后面再启动sql-client时指定sql文件
2、在hviecatalog中创建数据库
离线数据仓库:每一个库对应一个用户,每一个库对应hdfs中一个目录
实时数据仓库每一个层的数据保存在不同位置,ods,dwd,dws保存在kafka,dim,ads的数据保存在数据库中
- create database gma_ods;
- create database gma_dwd;
- create database gma_dws;
- create database gma_dim;
- create database gma_ads;
4、数据采集
必须现在mysql中创建数据库,然后打开canal,实时监控数据库,当我们对于这个库做任何操作,kafka中就会实时同步我们的操作(通过json格式)
- # 1、在mysql中创建数据库gma,指定编码格式为utf-8
- # 2、修改canal配置文件
- cd /usr/local/soft/canal/conf/example
- # 修改配置文件
- vim instance.properties
- # 增加动态topic配置,每个表在kafka中生成一个topic
- canal.mq.dynamicTopic=gma\\..*
-
-
- # 3、开启mysqlbinlog日志
- vim /etc/my.cnf
- # 在配置文件中增加二配置
- # 需要将配置放在[mysqld]后面
- # 打开binlog
- log-bin=mysql-bin
- # 选择ROW(行)模式
- binlog-format=ROW
- # 重启mysql
- service mysqld restart
-
- # 4、启动canal
- cd /usr/local/soft/canal/bin
- ./restart.sh
-
- # 5、在数据库中创建表
- # 执行下面这个sql文件
- init_mysql_table.sql
-
- # 6、查看topic是否生成
- kafka-topics.sh --list --zookeeper master:2181
- # 如果topic没有生成,检测前面哪里出了问题,创建表之后必须要有topic
-
- # 7、导入数据到mysql中
- # 执行sql文件
- load_data.sql
-
- # 8、使用kafka控制台消费者消费数据
- kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic gma.base_category1
5、ODS层
- 在flink sql中创建ods的表
- # 进入sql-client
- sql-client.sh -i flink_init.sql
-
- # 创建ods层所有的表
- ods_mysql_kafka_base_category1:商品一级分类表
- ods_mysql_kafka_base_category2:商品二级分类表
- ods_mysql_kafka_base_category3:商品三级分类表
- ods_mysql_kafka_base_province:省份配置报表
- ods_mysql_kafka_base_region:地区配置表
- ods_mysql_kafka_base_trademark:品牌表
- ods_mysql_kafka_date_info:时间配置表
- ods_mysql_kafka_holiday_info:节假日表
- ods_mysql_kafka_holiday_year:节假日对应时间表
- ods_mysql_kafka_order_detail:订单详情表,一个订单中一个商品一条数据
- ods_mysql_kafka_order_info:订单表。一个订单一条数据,订单表中有订单状态
- ods_mysql_kafka_order_status_log:订单转台变化日志记录表
- ods_mysql_kafka_payment_info:支付流水表
- ods_mysql_kafka_sku_info:商品信息表
- ods_mysql_kafka_user_info:用户信息表
6、DIM层
每个kafka中的数据对应我在ods层中的一张表,然后我们可以通过sql对ods层进行操作,为dim,dwd层提供数据
注意:我们在构建dim层的维表并将维表的数据写入到hbase的时候,发现一个错误。就是写入到hbase的数据会变少。这可能是一个bug。
-
将维度表单独保存到一个hbae的命名空间(相当于库)中
- # 进入hbase shell
- habse shell
- # 创建命名空间
- create_namespace 'gma_dim'
1、地区维度表
将省份表和地区表合并成地区维度表(维度退化)
从kafka中读取数据合并两个表,将合并之后的地区维度表保存到hbae中
- -- 1、在hbase中创建表
- create 'gma_dim.dim_region','info'
-
- -- 2、在flink实时数据仓库的dim层创建地区维度表
- -- 创建hbase sink表
- CREATE TABLE gma_dim.dim_hbase_region (
- pro_id BIGINT,
- info ROW<pro_name STRING,region_id BIGINT,region_name STRING,area_code STRING>,
- PRIMARY KEY (pro_id) NOT ENFORCED
- ) WITH (
- 'connector' = 'hbase-1.4',
- 'table-name' = 'gma_dim.dim_region',
- 'zookeeper.quorum' = 'master:2181'
- );
-
-
- -- 3、编写flinksql 合并地区表和省份表,将数据保存到地区维度表中
- insert into gma_dim.dim_hbase_region
- select pro_id,ROW(pro_name,region_id,region_name,area_code) from (
- select b.id as pro_id,b.name as pro_name,region_id,region_name,area_code from
- gma_ods.ods_mysql_kafka_base_region /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as a
- inner join
- gma_ods.ods_mysql_kafka_base_province /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as b
- on
- a.id=b.region_id
- ) as c
-
- --4、到hbase中查看是否有数据
- scan 'gma_dim.dim_region'
2、商品维度表
商品表、品类表、spu表、商品三级分类、商品二级分类、商品一级分类表退化为商品维度表
维度退化:将多个维度表退化成一个维度表
- -- 1、在hbsae中创建商品维度表
- create 'gma_dim.dim_item_info','info'
-
- -- 2、在flink实时数据仓库中创建商品维度表
- CREATE TABLE gma_dim.dim_hbase_item_info (
- sku_id bigint,
- info ROW<spu_id bigint, price decimal(10,0) , sku_name STRING, sku_desc STRING, weight decimal(10,2), tm_id bigint, tm_name STRING, category3_id bigint, category3_name STRING, category2_id bigint, category2_name STRING, category1_id bigint, category1_name STRING, sku_default_img STRING, create_time TIMESTAMP(3) >,
- PRIMARY KEY (sku_id) NOT ENFORCED
- ) WITH (
- 'connector' = 'hbase-1.4',
- 'table-name' = 'gma_dim.dim_item_info',
- 'zookeeper.quorum' = 'master:2181',
- 'sink.buffer-flush.max-rows'='0'
- );
-
-
- --3、关联多张表得到商品维度表
- insert into gma_dim.dim_hbase_item_info
- select
- sku_id,ROW(spu_id , price, sku_name, sku_desc, weight, tm_id, tm_name, category3_id, category3_name, category2_id, category2_name, category1_id, category1_name, sku_default_img, create_time) as info
- from (
- select
- a.id as sku_id,
- a.spu_id,
- a.price,
- a.sku_name,
- a.sku_desc,
- a.weight,
- a.tm_id,
- b.tm_name,
- a.category3_id,
- c.name as category3_name,
- d.id as category2_id,
- d.name as category2_name,
- e.id as category1_id,
- e.name as category1_name,
- a.sku_default_img,
- a.create_time
- from
- gma_ods.ods_mysql_kafka_sku_info
- /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as a
- inner join
- gma_ods.ods_mysql_kafka_base_trademark
- /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as b
- on a.tm_id=cast(b.tm_id as bigint)
- inner join
- gma_ods.ods_mysql_kafka_base_category3
- /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as c
- on a.category3_id=c.id
- inner join
- gma_ods.ods_mysql_kafka_base_category2
- /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as d
- on c.category2_id=d.id
- inner join
- gma_ods.ods_mysql_kafka_base_category1
- /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as e
- on d.category1_id=e.id
- ) as f;
3、用户维度表
- --1、在hbase中创建用户维度表
- create 'gma_dim.dim_user_info','info'
-
- --2、在flink实时数据仓库中创建用户维度表
- CREATE TABLE gma_dim.dim_hbase_user_info (
- user_id BIGINT,
- info ROW<login_name STRING , nick_name STRING , passwd STRING , name STRING , phone_num STRING , email STRING , head_img STRING , user_level STRING , birthday DATE , gender STRING , create_time TIMESTAMP(3)>,
- PRIMARY KEY (user_id) NOT ENFORCED
- ) WITH (
- 'connector' = 'hbase-1.4',
- 'table-name' = 'gma_dim.dim_user_info',
- 'zookeeper.quorum' = 'master:2181'
- );
-
- --3、编写flink sql读取ods层用户信息表将数据保存到hbae中构建用户维度表
- insert into gma_dim.dim_hbase_user_info
- select id as user_id,ROW(login_name,nick_name,passwd,name,phone_num,email,head_img,user_level,birthday,gender,create_time) from
- gma_ods.ods_mysql_kafka_user_info
- /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ as a
- ;
