• 离线数仓(8):ODS层实现之导入流量日志


    0. 相关文章链接

     离线数仓文章汇总 

    1. ODS层概述

    1.1. 数仓分层

    1.2. 数仓数据走向图

    1.3. 实现目标

    根据如上数仓分层图和数仓数据走向图,可以看出ods层的主要目的有如下3点:

    1. 保持数据原貌不做任何修改,起到备份数据的作用
    2. 数据采用压缩,减少磁盘存储空间(例如:原始数据100G,可以压缩到10G左右)
    3. 创建分区表,防止后续的全表扫描

    所以在此次的数仓分层重构中,ods层分别实现了上述3点:

    1. 将流量数据不做任何修改的写入到文件系统中
    2. 使用了parquet格式,但由于平台和业务原因没有在parquet上再使用例如lzo、snappy等压缩了,但单独的parquet格式压缩也对文本进行了较好的压缩了
    3. 在建表时创建了分区表

    2. 数据准实时进入文件系统

            根据上述数仓数据走向图,前面的日志服务器接收日志和通过flume将日志写入到kafka中就不详细介绍了,只介绍数据到达kafka后的后续流向。

            数据到达kafka后,可以使用flume来对接kafka,然后将数据写入到文件系统中,并且flume还能设置文件大小时间等滚动,但是由于博主使用的是华为云的DLI服务,没有找到flume组件,所以直接使用FlinkSQL从kafka中获取数据,并按时更新到OBS文件系统中了。

            同时,在将数据写入到OBS文件系统中时,还设置了parquet格式,这样只要表中对应分区的元数据有了,就能做到查询数据时分钟级更新(更新时间按照FlinkSQL的Checkpoint时间来算,博主这里使用的是每3分钟更新,即1天会生成480个文件,此更新时间可以根据实际情况来设置,综合考虑小文件和Flink算子反压以及业务需要的数据更新时间等,一般来说在1分钟到5分钟内均可),这为后续实现准实时需求(即要统计当天截止到现在的数据,又没达到完全实时的地步)提供了基础。

    具体实现如下代码所示(需要现在OBS并行文件系统中创建对应目录):

    1. -- 创建日期格式化函数,将10位时间戳转换成8位年月日(yyyyMMdd)
    2. CREATE FUNCTION date_formatted AS 'com.fumi.flink.sql.udf.DateFormattedUDF';
    3. -- ============================================================================
    4. -- bigdata_new_app_log_store 主题数据 (APP埋点中的点击日志数据)
    5. -- 创建kafka的source源
    6. CREATE SOURCE STREAM bigdata_new_app_log_store_source (
    7. __time__ STRING,
    8. __topic__ STRING,
    9. scdata STRING,
    10. __source__ STRING,
    11. __receive_time__ STRING,
    12. `topic` STRING,
    13. `__client_ip__` STRING
    14. ) WITH (
    15. type = "kafka",
    16. kafka_bootstrap_servers = "xxx1:9092,xxx2:9092,xxx3:9092",
    17. kafka_group_id = "behavior_log_to_obs",
    18. kafka_topic = "bigdata_new_app_log_store",
    19. encode = "json",
    20. json_config = "__time__=__time__;__topic__=__topic__;scdata=scdata;__source__=__source__;__receive_time__=__tag__:__receive_time__;__client_ip__=__tag__:__client_ip__;topic=topic;"
    21. );
    22. -- 创建obs的sink源
    23. CREATE SINK STREAM bigdata_new_app_log_store_sink (
    24. __time__ BIGINT,
    25. __topic__ STRING,
    26. scdata STRING,
    27. __source__ STRING,
    28. __receive_time__ BIGINT,
    29. `topic` STRING,
    30. `__client_ip__` STRING,
    31. `dt` STRING
    32. ) PARTITIONED BY(dt) WITH (
    33. type = "filesystem",
    34. file.path = "obs://yishou-test/yishou_data_qa_test.db/ods_new_app_log_store_dt",
    35. encode = "parquet",
    36. ak = "akxxx",
    37. sk = "skxxx"
    38. );
    39. -- 将source源数据写入到sink源中
    40. INSERT INTO
    41. bigdata_new_app_log_store_sink
    42. SELECT
    43. cast(__time__ as BIGINT) as __time__,
    44. __topic__ ,
    45. scdata ,
    46. __source__ ,
    47. cast(__receive_time__ as BIGINT) as __receive_time__ ,
    48. `topic`,
    49. `__client_ip__`,
    50. date_formatted(__time__, 'yyyyMMdd') as dt
    51. FROM
    52. bigdata_new_app_log_store_source;
    53. -- ============================================================================
    54. -- bigdata_yishou_log_exposure 主题数据 (APP埋点中的曝光日志数据)
    55. -- 创建kafka的source源
    56. CREATE SOURCE STREAM bigdata_yishou_log_exposure_source (
    57. __time__ STRING,
    58. __topic__ STRING,
    59. scdata STRING,
    60. __source__ STRING,
    61. __receive_time__ STRING,
    62. `topic` STRING,
    63. `__client_ip__` STRING
    64. ) WITH (
    65. type = "kafka",
    66. kafka_bootstrap_servers = "xxx1:9092,xxx2:9092,xxx3:9092",
    67. kafka_group_id = "behavior_log_to_obs",
    68. kafka_topic = "bigdata_yishou_log_exposure",
    69. encode = "json",
    70. json_config = "__time__=__time__;__topic__=__topic__;scdata=scdata;__source__=__source__;__receive_time__=__tag__:__receive_time__;__client_ip__=__tag__:__client_ip__;topic=topic;"
    71. );
    72. -- 创建obs的sink源
    73. CREATE SINK STREAM bigdata_yishou_log_exposure_sink (
    74. __time__ BIGINT,
    75. __topic__ STRING,
    76. scdata STRING,
    77. __source__ STRING,
    78. __receive_time__ BIGINT,
    79. `topic` STRING,
    80. `__client_ip__` STRING,
    81. `dt` STRING
    82. ) PARTITIONED BY(dt) WITH (
    83. type = "filesystem",
    84. file.path = "obs://yishou-test/yishou_data_qa_test.db/ods_yishou_log_exposure_dt",
    85. encode = "parquet",
    86. ak = "akxxx",
    87. sk = "skxxx"
    88. );
    89. -- 将source源数据写入到sink源中
    90. INSERT INTO
    91. bigdata_yishou_log_exposure_sink
    92. SELECT
    93. cast(__time__ as BIGINT) as __time__,
    94. __topic__ ,
    95. scdata ,
    96. __source__ ,
    97. cast(__receive_time__ as BIGINT) as __receive_time__ ,
    98. `topic`,
    99. `__client_ip__`,
    100. date_formatted(__time__, 'yyyyMMdd') as dt
    101. FROM
    102. bigdata_yishou_log_exposure_source;
    103. -- ============================================================================
    104. -- ......
    105. --

    注意:

    • 上述是使用华为云的FlinkSQL实现的,跟开源的FlinkSQL有些不同,但大体类似,可以根据实际情况来进行开发;
    • 博主的流量日志一共有9个大的主题,上述只贴出来了2个,同样可以根据实际情况来添加修改;
    • 上述使用了一个自定义函数,将10位时间戳转换成了8位的年月日格式,在开源的FlinkSQL中已有类似函数,不过华为云的FlinkSQL这个版本中还没有,所以添加了自定义函数,具体情况具体开发即可。

    在华为云的DLI服务中,运行界面如下所示:

    • FlinkSQL作业详情配置图

    • FlinkSQL任务列表图 

    • OBS文件系统(正常运行后每3分钟生成一个新的文件)

    3. 创建外部表

    1. -- ods_new_app_log_store_dt 主题
    2. DROP TABLE if exists ${yishou_data_dbname}.ods_new_app_log_store_dt;
    3. create table if not exists ${yishou_data_dbname}.ods_new_app_log_store_dt (
    4. __time__ BIGINT comment '时间',
    5. __topic__ STRING comment '埋点模块主题名',
    6. scdata STRING comment '核心数据',
    7. __source__ STRING comment '数据来源',
    8. __receive_time__ BIGINT comment '日志服务器接收时间',
    9. `topic` STRING comment '埋点模块主题名',
    10. `__client_ip__` STRING comment '客户端IP地址',
    11. `dt` STRING comment '日期分区'
    12. ) USING parquet PARTITIONED BY (dt)
    13. COMMENT 'ods层ods_new_app_log_store主题数据,所有该主题的原始数据都在此表中'
    14. LOCATION 'obs://yishou-test/yishou_data_qa_test.db/ods_new_app_log_store_dt'
    15. ;
    16. -- ods_yishou_log_exposure_dt 主题
    17. DROP TABLE if exists ${yishou_data_dbname}.ods_yishou_log_exposure_dt;
    18. CREATE TABLE ${yishou_data_dbname}.ods_yishou_log_exposure_dt (
    19. __time__ BIGINT comment '时间',
    20. __topic__ STRING comment '埋点模块主题名',
    21. scdata STRING comment '核心数据',
    22. __source__ STRING comment '数据来源',
    23. __receive_time__ BIGINT comment '日志服务器接收时间',
    24. `topic` STRING comment '埋点模块主题名',
    25. `__client_ip__` STRING comment '客户端IP地址',
    26. `dt` STRING comment '日期分区'
    27. ) USING parquet PARTITIONED BY (dt)
    28. COMMENT 'ods层ods_yishou_log_exposure主题数据,所有该主题的原始数据都在此表中'
    29. LOCATION 'obs://yishou-test/yishou_data_qa_test.db/ods_yishou_log_exposure_dt'
    30. ;

    注意:第一次建表后需要对表的分区进行msck,执行如下命令:

    1. MSCK REPAIR TABLE ${yishou_data_dbname}.ods_new_app_log_store_dt;
    2. MSCK REPAIR TABLE ${yishou_data_dbname}.ods_yishou_log_exposure_dt;

    之后查看数据条数可以看到每3分钟会更新一次最新数据:

    由于这是重构,之前的历史数据是存在旧表里,可以直接从旧表中查出数据,写入新表中即可。

    4. 创建调度任务

    在此层中调度任务只需要在凌晨添加当天分区即可,因为数据会实时进入obs文件系统,所以只要添加分区在元数据中有该分区就能直接查询最新的数据,脚本如下:

    1. -- 在 ods_new_app_log_store_dt 表中增加对应日期分区(每天凌晨执行,添加当天分区即可)
    2. ALTER TABLE ${yishou_data_dbname}.ods_new_app_log_store_dt ADD if not exists PARTITION (dt=${gmtdate});

    作业调度如下(可以凌晨0点0分调度):

    注意:对历史数据,当集群资源空闲时,还可以通过同一张表自己写入自己来合并小文件,但其实感觉没有必要;因为当每3分钟生成一个文件时,能看到一般情况下文件大小为60M左右,这不会造成文件系统小文件过多,并且自己写入自己也存在一定风险;并且可以对更新时间进行调节,控制一定文件数量即可。


    注:其他 离线数仓 相关文章链接由此进 -> 离线数仓文章汇总


  • 相关阅读:
    Linux操作系统~基于systemV共享内存的进程间通信
    机器学习基础算法--回归类型和评价分析
    NR CSI(三) CQI
    数据挖掘实战(6)——文本分类(今日头条tnews数据集)
    微信小程序员 java地图服务 高速公路服务区充电桩在线预订系统
    传感器_三相-双极性-开关型-霍尔传感器 速度+电角度解算理解
    【TUM公开数据集RGBD-Benchmark工具evaluate_ate.py参数用法原理解读】
    什么是邮件签名证书?
    oracle数据库赋权
    uni-app 介绍及使用
  • 原文地址:https://blog.csdn.net/yang_shibiao/article/details/126607375