

根据如上数仓分层图和数仓数据走向图,可以看出ods层的主要目的有如下3点:
所以在此次的数仓分层重构中,ods层分别实现了上述3点:
根据上述数仓数据走向图,前面的日志服务器接收日志和通过flume将日志写入到kafka中就不详细介绍了,只介绍数据到达kafka后的后续流向。
数据到达kafka后,可以使用flume来对接kafka,然后将数据写入到文件系统中,并且flume还能设置文件大小时间等滚动,但是由于博主使用的是华为云的DLI服务,没有找到flume组件,所以直接使用FlinkSQL从kafka中获取数据,并按时更新到OBS文件系统中了。
同时,在将数据写入到OBS文件系统中时,还设置了parquet格式,这样只要表中对应分区的元数据有了,就能做到查询数据时分钟级更新(更新时间按照FlinkSQL的Checkpoint时间来算,博主这里使用的是每3分钟更新,即1天会生成480个文件,此更新时间可以根据实际情况来设置,综合考虑小文件和Flink算子反压以及业务需要的数据更新时间等,一般来说在1分钟到5分钟内均可),这为后续实现准实时需求(即要统计当天截止到现在的数据,又没达到完全实时的地步)提供了基础。
具体实现如下代码所示(需要现在OBS并行文件系统中创建对应目录):
-
- -- 创建日期格式化函数,将10位时间戳转换成8位年月日(yyyyMMdd)
- CREATE FUNCTION date_formatted AS 'com.fumi.flink.sql.udf.DateFormattedUDF';
-
- -- ============================================================================
- -- bigdata_new_app_log_store 主题数据 (APP埋点中的点击日志数据)
- -- 创建kafka的source源
- CREATE SOURCE STREAM bigdata_new_app_log_store_source (
- __time__ STRING,
- __topic__ STRING,
- scdata STRING,
- __source__ STRING,
- __receive_time__ STRING,
- `topic` STRING,
- `__client_ip__` STRING
- ) WITH (
- type = "kafka",
- kafka_bootstrap_servers = "xxx1:9092,xxx2:9092,xxx3:9092",
- kafka_group_id = "behavior_log_to_obs",
- kafka_topic = "bigdata_new_app_log_store",
- encode = "json",
- json_config = "__time__=__time__;__topic__=__topic__;scdata=scdata;__source__=__source__;__receive_time__=__tag__:__receive_time__;__client_ip__=__tag__:__client_ip__;topic=topic;"
- );
-
-
- -- 创建obs的sink源
- CREATE SINK STREAM bigdata_new_app_log_store_sink (
- __time__ BIGINT,
- __topic__ STRING,
- scdata STRING,
- __source__ STRING,
- __receive_time__ BIGINT,
- `topic` STRING,
- `__client_ip__` STRING,
- `dt` STRING
- ) PARTITIONED BY(dt) WITH (
- type = "filesystem",
- file.path = "obs://yishou-test/yishou_data_qa_test.db/ods_new_app_log_store_dt",
- encode = "parquet",
- ak = "akxxx",
- sk = "skxxx"
- );
-
-
- -- 将source源数据写入到sink源中
- INSERT INTO
- bigdata_new_app_log_store_sink
- SELECT
- cast(__time__ as BIGINT) as __time__,
- __topic__ ,
- scdata ,
- __source__ ,
- cast(__receive_time__ as BIGINT) as __receive_time__ ,
- `topic`,
- `__client_ip__`,
- date_formatted(__time__, 'yyyyMMdd') as dt
- FROM
- bigdata_new_app_log_store_source;
-
-
-
-
- -- ============================================================================
- -- bigdata_yishou_log_exposure 主题数据 (APP埋点中的曝光日志数据)
- -- 创建kafka的source源
- CREATE SOURCE STREAM bigdata_yishou_log_exposure_source (
- __time__ STRING,
- __topic__ STRING,
- scdata STRING,
- __source__ STRING,
- __receive_time__ STRING,
- `topic` STRING,
- `__client_ip__` STRING
- ) WITH (
- type = "kafka",
- kafka_bootstrap_servers = "xxx1:9092,xxx2:9092,xxx3:9092",
- kafka_group_id = "behavior_log_to_obs",
- kafka_topic = "bigdata_yishou_log_exposure",
- encode = "json",
- json_config = "__time__=__time__;__topic__=__topic__;scdata=scdata;__source__=__source__;__receive_time__=__tag__:__receive_time__;__client_ip__=__tag__:__client_ip__;topic=topic;"
- );
-
-
- -- 创建obs的sink源
- CREATE SINK STREAM bigdata_yishou_log_exposure_sink (
- __time__ BIGINT,
- __topic__ STRING,
- scdata STRING,
- __source__ STRING,
- __receive_time__ BIGINT,
- `topic` STRING,
- `__client_ip__` STRING,
- `dt` STRING
- ) PARTITIONED BY(dt) WITH (
- type = "filesystem",
- file.path = "obs://yishou-test/yishou_data_qa_test.db/ods_yishou_log_exposure_dt",
- encode = "parquet",
- ak = "akxxx",
- sk = "skxxx"
- );
-
-
- -- 将source源数据写入到sink源中
- INSERT INTO
- bigdata_yishou_log_exposure_sink
- SELECT
- cast(__time__ as BIGINT) as __time__,
- __topic__ ,
- scdata ,
- __source__ ,
- cast(__receive_time__ as BIGINT) as __receive_time__ ,
- `topic`,
- `__client_ip__`,
- date_formatted(__time__, 'yyyyMMdd') as dt
- FROM
- bigdata_yishou_log_exposure_source;
-
-
-
- -- ============================================================================
- -- ......
- --
注意:
在华为云的DLI服务中,运行界面如下所示:



- -- ods_new_app_log_store_dt 主题
- DROP TABLE if exists ${yishou_data_dbname}.ods_new_app_log_store_dt;
-
- create table if not exists ${yishou_data_dbname}.ods_new_app_log_store_dt (
- __time__ BIGINT comment '时间',
- __topic__ STRING comment '埋点模块主题名',
- scdata STRING comment '核心数据',
- __source__ STRING comment '数据来源',
- __receive_time__ BIGINT comment '日志服务器接收时间',
- `topic` STRING comment '埋点模块主题名',
- `__client_ip__` STRING comment '客户端IP地址',
- `dt` STRING comment '日期分区'
- ) USING parquet PARTITIONED BY (dt)
- COMMENT 'ods层ods_new_app_log_store主题数据,所有该主题的原始数据都在此表中'
- LOCATION 'obs://yishou-test/yishou_data_qa_test.db/ods_new_app_log_store_dt'
- ;
-
- -- ods_yishou_log_exposure_dt 主题
- DROP TABLE if exists ${yishou_data_dbname}.ods_yishou_log_exposure_dt;
-
- CREATE TABLE ${yishou_data_dbname}.ods_yishou_log_exposure_dt (
- __time__ BIGINT comment '时间',
- __topic__ STRING comment '埋点模块主题名',
- scdata STRING comment '核心数据',
- __source__ STRING comment '数据来源',
- __receive_time__ BIGINT comment '日志服务器接收时间',
- `topic` STRING comment '埋点模块主题名',
- `__client_ip__` STRING comment '客户端IP地址',
- `dt` STRING comment '日期分区'
- ) USING parquet PARTITIONED BY (dt)
- COMMENT 'ods层ods_yishou_log_exposure主题数据,所有该主题的原始数据都在此表中'
- LOCATION 'obs://yishou-test/yishou_data_qa_test.db/ods_yishou_log_exposure_dt'
- ;
注意:第一次建表后需要对表的分区进行msck,执行如下命令:
- MSCK REPAIR TABLE ${yishou_data_dbname}.ods_new_app_log_store_dt;
- MSCK REPAIR TABLE ${yishou_data_dbname}.ods_yishou_log_exposure_dt;
之后查看数据条数可以看到每3分钟会更新一次最新数据:

由于这是重构,之前的历史数据是存在旧表里,可以直接从旧表中查出数据,写入新表中即可。
在此层中调度任务只需要在凌晨添加当天分区即可,因为数据会实时进入obs文件系统,所以只要添加分区在元数据中有该分区就能直接查询最新的数据,脚本如下:
- -- 在 ods_new_app_log_store_dt 表中增加对应日期分区(每天凌晨执行,添加当天分区即可)
- ALTER TABLE ${yishou_data_dbname}.ods_new_app_log_store_dt ADD if not exists PARTITION (dt=${gmtdate});
作业调度如下(可以凌晨0点0分调度):

注意:对历史数据,当集群资源空闲时,还可以通过同一张表自己写入自己来合并小文件,但其实感觉没有必要;因为当每3分钟生成一个文件时,能看到一般情况下文件大小为60M左右,这不会造成文件系统小文件过多,并且自己写入自己也存在一定风险;并且可以对更新时间进行调节,控制一定文件数量即可。
注:其他 离线数仓 相关文章链接由此进 -> 离线数仓文章汇总