• 业务数据准实时增量进入数据仓库并保留历史所有变更记录


    目录

    0. 相关文章链接

    1. 为什么要实现将业务数据实时写入到数据仓库中

    2. 架构设计

    3. FlinkSQL将binlog写入到HDFS中

    4. 创建增量外部表(binlog表)

    5. 主作业

    5.1. 概述

    5.2. fmys_goods_ext_start节点说明

    5.3. ods_fmys_goods_ext_dt_CDM任务节点说明

    5.4. ods_fmys_goods_ext_dt_DLI任务节点说明

    5.5. dwd_fmys_goods_ext_视图创建节点说明

    5.6. dwd_fmys_goods_ext_dt_dwd层分区表节点说明

    5.7. dwd_fmys_goods_ext_record_dwd层历史记录表节点说明

    6. 总结

    6.1. 历史数据和增量数据合并的注意事项

    6.2. Java的nanoTime()

    6.3. 各表保存时间以及资源消耗

    6.4. 优化前后各方面对比

    6.5. 注意事项


    注意:博主原有相关博文 FlinkSQL+HDFS+Hive+SparkSQL实现业务数据增量进入数据仓库 ,此方案是在原方案上优化而来,故其中术语名词等说明会跟原博文一致

    0. 相关文章链接

       开发随笔文章汇总  

    1. 为什么要实现将业务数据实时写入到数据仓库中

            在我们的数据仓库中,一般情况下都是通过Spoop和Datax等数据传输框架,将数据按天同步到数据仓库中。而且根据业务库的表的类型,可以选择全量同步(每天全量或者一次拉取,比如省份表等)、增量同步(按照更新时间字段拉取数据,并将数据和原表合并,比如订单表)、拉链表(按照更新时间拉取数据,并将数据和原表进行计算,比如用户表)等类型进行数据同步。

            但是有时当我们需要实时数据的时候(一般是指小于1小时,但是又没有达到秒级),这按天同步就不能满足需求了,而如果每个任务都使用Flink的话,又会照成资源浪费。

            这个时候就可以考虑将业务数据实时同步到数据仓库中,然后后端使用PerstoAPI等框架,每5分钟等计算一次,并将数据进行缓存,这样就可以进行准实时数据协助,并能有效节省资源。

    2. 架构设计

    1. 在Hive表中会存在一张外部表,该表为分区表,存储每天对应业务表的历史数据(有2种来源,1种是手动跑数时通过Spoop从MySQL业务库中拉取全量数据,第2种是昨天的离线表和昨天的增量表合并生成的)
    2. 需要使用FlinkCDC将MySQL中的binlog数据获取,并发送到Kafka对应的topic中
    3. 通过FlinkSQL将Kafka的数据获取出来,并直接将数据通过filesystem写入到HDFS中,通过外部表指定路径(使用华为云的话可以通过FlinkSQL写入到OBS中)
    4. 在每天执行的作业中添加该作业对应的日期和表的分区
    5. 将ods昨天分区+binlog昨天以及以后分区数据合并写入到ods的新分区中(历史分区数据可删除)
    6. 将ods今天分区+binlog今天以及以后分区创建视图
    7. 将dwd今天数据保存到分区中(历史分区数据可删除)
    8. 将dwd历史操作记录表昨天分区+binlog昨天数据合并写入该表今天分区中

    3. FlinkSQL将binlog写入到HDFS中

    注意:博主使用的是华为云产品,所以使用的是华为云DLI中的FlinkSQL和OBS,可以类比开源的FlinkSQL和HDFS

    1. -- kafka的 bigdata_mysql_binlog 主题中示例数据如下
    2. -- 注意:其中的nanoTime是使用Java中的System.nanoTime()函数生成的,也就是说,如果程序失败,需要将分区数据删除,然后在flink作业中设置拉取时间重跑
    3. -- 具体字段描述如下:
    4. -- db: 在MySQL中对应的数据库
    5. -- tb: 在MySQL中对应的表
    6. -- columns: 一个JSONArray,表示所有的字段,如下示例,一个JSONArray中存在该表中所有字段,存储的json的key为字段名,在该json中又有value这个属性,表示该字段对应的值
    7. -- event: event对应操作类型和数值: insert = 0 ; delete = 1 ; update = 2 ; alter = 3 ;
    8. -- sql: 触发该更改或删除等操作的sql语句
    9. -- primary_key: 该更改的主键
    10. -- create_time: 该数据从binlog中获取出来的时间(分区就用此时间)
    11. -- sendTime: 该数据进入到kafka的时间(该时间不用)
    12. -- nano_time: 纳秒级时间戳,用了区分数据的前后,使用此时间可以保证数据唯一
    13. -- {
    14. -- "columns": {
    15. -- "admin_note": {
    16. -- "key": false,
    17. -- "mysqlType": "varchar(255)",
    18. -- "name": "admin_note",
    19. -- "null_val": false,
    20. -- "update": false,
    21. -- "value": ""
    22. -- },
    23. -- "allot_num": {
    24. -- "key": false,
    25. -- "mysqlType": "smallint(5) unsigned",
    26. -- "name": "allot_num",
    27. -- "null_val": false,
    28. -- "update": false,
    29. -- "value": "0"
    30. -- }
    31. -- },
    32. -- "createTime": 1649213973044,
    33. -- "db": "xxx",
    34. -- "event": 2,
    35. -- "nanoTime": 23494049498709146,
    36. -- "primaryKey": "157128418",
    37. -- "sendTime": 1649213973045,
    38. -- "sql": "",
    39. -- "tb": "fmys_order_infos"
    40. -- }
    41. -- source端,对接kafka,从kafka中获取数据
    42. CREATE SOURCE STREAM bigdata_binlog_ingest_source_bigdata_mysql_binlog (
    43. db STRING,
    44. tb STRING,
    45. columns STRING,
    46. event INT,
    47. `sql` STRING,
    48. `primary_key` STRING,
    49. create_time bigint,
    50. send_time bigint,
    51. nano_time bigint
    52. ) WITH (
    53. type = "kafka",
    54. kafka_bootstrap_servers = "bigdata1:9092,bigdata2:9092,bigdata3:9092",
    55. kafka_group_id = "bigdata_mysql_binlog_dim_to_obs_test",
    56. kafka_topic = "bigdata_mysql_binlog_dim",
    57. encode = "json",
    58. json_config = "db=db;tb=tb;columns=columns;event=event;sql=sql;primary_key=primaryKey;create_time=createTime;send_time=sendTime;nano_time=nanoTime;"
    59. );
    60. -- sink端,对接obs,将数据写入到obs中
    61. CREATE SINK STREAM bigdata_binlog_ingest_sink_bigdata_mysql_binlog (
    62. db STRING,
    63. tb STRING,
    64. columns STRING,
    65. event int,
    66. `sql` STRING,
    67. `primary_key` STRING,
    68. create_time bigint,
    69. nano_time bigint,
    70. dt STRING
    71. ) PARTITIONED BY(dt,tb) WITH (
    72. type = "filesystem",
    73. file.path = "obs://xxx-bigdata/xxx/xxx/ods_binlog_data",
    74. encode = "parquet",
    75. ak = "akxxx",
    76. sk = "skxxx"
    77. );
    78. -- 从source表获取数据,写出到sink表中
    79. CREATE FUNCTION date_formatted AS 'com.xxx.cdc.udf.DateFormattedUDF';
    80. INSERT INTO
    81. bigdata_binlog_ingest_sink_bigdata_mysql_binlog
    82. SELECT
    83. db,
    84. tb,
    85. columns,
    86. event,
    87. `sql`,
    88. `primary_key`,
    89. create_time,
    90. nano_time,
    91. date_formatted(
    92. cast(coalesce(create_time, UNIX_TIMESTAMP_MS()) / 1000 as VARCHAR(10)),
    93. 'yyyyMMdd'
    94. ) as dt
    95. FROM
    96. bigdata_binlog_ingest_source_bigdata_mysql_binlog;

    上述FlinkSQL主要通过source获取Kafka主题中的数据,然后什么都不要改变,将数据写入到HDFS中,写入的时候注意,以dt为一级分区,tb为二级分区,主要考虑如下几点:

    1. binlog增量数据总体来说数据量并不大,并是使用了分区,所以所有的数据写入到一张表中即可
    2. dt日期分区为一级分区,tb表名为二级分区,主要考虑的是删除历史数据容易删除

    4. 创建增量外部表(binlog表)

    外部表建表语句如下:

    1. CREATE TABLE yishou_data.ods_binlog_data (
    2. `db` STRING COMMENT '数据库名',
    3. `columns` STRING COMMENT '列数据',
    4. `event` BIGINT COMMENT '操作类型',
    5. `sql` STRING COMMENT '执行的SQL',
    6. `primary_key` STRING COMMENT '主键的值',
    7. `create_time` BIGINT COMMENT '从MySQL中获取binlog的13位时间戳',
    8. `send_time` BIGINT COMMENT '发送binlog到kafka的13位时间戳',
    9. `nano_time` BIGINT COMMENT '纳秒级更新时间戳',
    10. `dt` BIGINT COMMENT '日期分区',
    11. `tb` STRING COMMENT '表名'
    12. ) USING parquet
    13. PARTITIONED BY (dt, tb) COMMENT 'ods层binlog表(所有通过binlog进入数仓的数据均在此表中)'
    14. LOCATION 'obs://yishou-bigdata/yishou_data.db/ods_binlog_data'
    15. ;

    建表完后查询结果如下(注意该表是直接插入数据到文件系统中,所以需要根据实际情况来msck该表):

    select * from yishou_data.ods_binlog_data_table where dt = 20220816 and tb = 'fmys_goods_ext';

    5. 主作业

    5.1. 概述

    架构图如下:

    该作业目的:

    1. 定时创建视图(在dwd的视图中不用指定时间,永远都是最新的数据,并只有1天的小文件)
    2. 定时将昨天的old表数据和昨天的binlog表数据合并,并导入今天的old表中
    3. 当数据不对或者失败时,可以通过重跑从业务库中拉取全量数据(提高抗风险性)
    4. 将历史所有的binlog进行保存,可以获取所有的历史修改记录

    作业大致说明:

    1. 该调度任务时间为凌晨 0点 30分
    2. 在start节点中会添加binlog的分区,并针对其他表执行msck操作(下述会详细说明)
    3. 上面部分CDM任务的IF条件为 #{DateUtil.format(Job.planTime,"HHmmss") != '003000' ? 'true' : 'false'} ,即当调度时间不为 00点30分00秒 时,会执行此任务,从业务库中拉取全量数据到数仓中(即不是自动调度就会执行此选项)
    4. 下面部分DLI任务的IF条件为 #{DateUtil.format(Job.planTime,"HHmmss") == '003000' ? 'true' : 'false'} ,即当调度时间为 00点30分00秒 时,会执行此任务,将ods昨天的数据和binlog昨天的数据合并,并导入到ods今天的数据中(当自动调度就会执行此选项)

    5.2. fmys_goods_ext_start节点说明

    该节点为SQL节点,具体代码如下:

    1. -- 在ods_binlog_data表中增加对应日期和表名的分区
    2. ALTER TABLE yishou_data.ods_binlog_data ADD if not exists PARTITION (dt=${gmtdate}, tb = 'fmys_goods_ext');
    3. -- msck此作业涉及的会删除历史数据的3张表
    4. MSCK REPAIR TABLE yishou_data.ods_fmys_goods_ext_dt;
    5. MSCK REPAIR TABLE yishou_data.dwd_fmys_goods_ext_dt;
    6. MSCK REPAIR TABLE yishou_data.dwd_fmys_goods_ext_record_dt;

    主要目的是新增ods_binlog_data表对应日期和业务表的分区,并对其它表进行msck操作(因为其它表会删除历史数据)。

    5.3. ods_fmys_goods_ext_dt_CDM任务节点说明

    此节点为CMD(即Sqoop)作业,会将业务库中的全量数据拉取到数仓的ods表中(通过overwrite的方式,将数据写入到以今天的分区中),当手动调度时才执行此节点,主要目的是当感觉数据有误时,可以使用此作业来更新替换最新最全最正确的数据。

    ods建表语句如下:

    1. CREATE EXTERNAL TABLE yishou_data.ods_fmys_goods_ext_dt(
    2. `goods_id` BIGINT COMMENT '*',
    3. `limit_day` BIGINT COMMENT '*',
    4. `auto_time` STRING COMMENT '*',
    5. `is_new` BIGINT COMMENT '-1 无效(下架) 0 旧款(有效/上架) 1 新款',
    6. `grade` BIGINT COMMENT '商品档次',
    7. `season` BIGINT COMMENT '季节',
    8. `goods_type` BIGINT COMMENT '商品类型{1:特价,}'
    9. ) COMMENT '商品拓展信息表(ods层分区表,保留7天的数据)'
    10. PARTITIONED BY (`dt` BIGINT COMMENT '日期分区(yyyymmdd),跑数时的日期,比如今天2022年8月15日0点30分跑数,那这分区就是20220815')
    11. STORED AS orc LOCATION 'obs://yishou-bigdata/yishou_data.db/ods_fmys_goods_ext_dt'
    12. ;

    CDM任务配置如下:

    5.4. ods_fmys_goods_ext_dt_DLI任务节点说明

    当正常调度时,会执行此节点,会使用ods昨天的分区然后和昨天以及之后的binlog数据合并,再将数据插入到ods的今天的分区中;因为是跑的SQL作业,不需要拉取数据,所以会执行的较快,正常调度时就使用此种方法运行。

    1. -- DLI sql
    2. -- ******************************************************************** --
    3. -- author: yangshibiao
    4. -- create time: 2022/08/15 10:15:46 GMT+08:00
    5. -- ******************************************************************** --
    6. -- fmys_goods_ext 表在数仓中ods的建表语句(ods层历史数据表,在对应分区内保存着当天0点之前的所有数据,如果0点30分跑数,也会将这30分的数据放入)
    7. -- CREATE EXTERNAL TABLE yishou_data.ods_fmys_goods_ext_dt(
    8. -- `goods_id` BIGINT COMMENT '*',
    9. -- `limit_day` BIGINT COMMENT '*',
    10. -- `auto_time` STRING COMMENT '*',
    11. -- `is_new` BIGINT COMMENT '-1 无效(下架) 0 旧款(有效/上架) 1 新款',
    12. -- `grade` BIGINT COMMENT '商品档次',
    13. -- `season` BIGINT COMMENT '季节',
    14. -- `goods_type` BIGINT COMMENT '商品类型{1:特价,}'
    15. -- ) COMMENT '商品拓展信息表(ods层分区表,保留7天的数据)'
    16. -- PARTITIONED BY (`dt` BIGINT COMMENT '日期分区(yyyymmdd),跑数时的日期,比如今天2022年8月15日0点30分跑数,那这分区就是20220815')
    17. -- STORED AS orc LOCATION 'obs://yishou-bigdata/yishou_data.db/ods_fmys_goods_ext_dt'
    18. -- ;
    19. -- 从ods_fmys_goods_ext_dt表中获取出昨天分区的数据(即相当于昨天0点之前的所有数据)
    20. -- 从ods_binlog_data表中获取出昨天分区以及之后分区的数据(即昨天的增量更新的binlog数据)
    21. -- 然后2份数据对主键开窗,获取最新的,并且不是被删除的数据,然后将数据插入到ods_fmys_goods_ext今天的分区中(即今天0点之前的所有历史数据)
    22. insert overwrite table yishou_data.ods_fmys_goods_ext_dt partition(dt)
    23. select
    24. goods_id
    25. , limit_day
    26. , auto_time
    27. , is_new
    28. , grade
    29. , season
    30. , goods_type
    31. , substr(${bdp.system.cyctime}, 1, 8) as dt
    32. from (
    33. select
    34. goods_id
    35. , limit_day
    36. , auto_time
    37. , is_new
    38. , grade
    39. , season
    40. , goods_type
    41. , nano_time
    42. , event
    43. , row_number() over(partition by goods_id order by nano_time desc) as row_number
    44. from (
    45. select
    46. goods_id
    47. , limit_day
    48. , auto_time
    49. , is_new
    50. , grade
    51. , season
    52. , goods_type
    53. , -987654321012345 as nano_time
    54. , 0 as event
    55. from yishou_data.ods_fmys_goods_ext_dt
    56. where dt = ${bdp.system.bizdate}
    57. union all
    58. select
    59. get_json_object(columns ,'$.goods_id.value') as goods_id
    60. , get_json_object(columns ,'$.limit_day.value') as limit_day
    61. , get_json_object(columns ,'$.auto_time.value') as auto_time
    62. , get_json_object(columns ,'$.is_new.value') as is_new
    63. , get_json_object(columns ,'$.grade.value') as grade
    64. , get_json_object(columns ,'$.season.value') as season
    65. , get_json_object(columns ,'$.goods_type.value') as goods_type
    66. , nano_time as nano_time
    67. , event as event
    68. from yishou_data.ods_binlog_data
    69. where
    70. dt >= ${bdp.system.bizdate}
    71. and tb = 'fmys_goods_ext'
    72. )
    73. )
    74. -- event为1就是删除的数据,当最后一次记录为删除数据的时候,就不用这个主键了,因为已经删除了
    75. where row_number = 1 and event != 1
    76. ;

    5.5. dwd_fmys_goods_ext_视图创建节点说明

    此节点的目的是使用ods今天的分区和binlog今天以及之后的分区生成一张dwd的视图,因为binlog的数据是实时更新的,所以这个dwd的视图每次查询的都是最新的,跟业务库可以做到准实时。

    1. -- DLI sql
    2. -- ******************************************************************** --
    3. -- author: yangshibiao
    4. -- create time: 2022/08/15 10:25:56 GMT+08:00
    5. -- ******************************************************************** --
    6. drop view if exists yishou_data.dwd_fmys_goods_ext;
    7. create view if not exists yishou_data.dwd_fmys_goods_ext(
    8. goods_id
    9. , limit_day
    10. , auto_time
    11. , is_new
    12. , grade
    13. , season
    14. , goods_type
    15. ) comment 'fmys_goods_ext在dwd层视图(根据业务库数据,分钟级别更新)'
    16. as
    17. select
    18. goods_id
    19. , limit_day
    20. , auto_time
    21. , is_new
    22. , grade
    23. , season
    24. , goods_type
    25. from (
    26. select
    27. goods_id
    28. , limit_day
    29. , auto_time
    30. , is_new
    31. , grade
    32. , season
    33. , goods_type
    34. , nano_time
    35. , event
    36. , row_number() over(partition by goods_id order by nano_time desc) as row_number
    37. from (
    38. select
    39. goods_id
    40. , limit_day
    41. , auto_time
    42. , is_new
    43. , grade
    44. , season
    45. , goods_type
    46. , -987654321012345 as nano_time
    47. , 0 as event
    48. from yishou_data.ods_fmys_goods_ext_dt
    49. where dt = substr(${bdp.system.cyctime}, 1, 8)
    50. union all
    51. select
    52. get_json_object(columns ,'$.goods_id.value') as goods_id
    53. , get_json_object(columns ,'$.limit_day.value') as limit_day
    54. , get_json_object(columns ,'$.auto_time.value') as auto_time
    55. , get_json_object(columns ,'$.is_new.value') as is_new
    56. , get_json_object(columns ,'$.grade.value') as grade
    57. , get_json_object(columns ,'$.season.value') as season
    58. , get_json_object(columns ,'$.goods_type.value') as goods_type
    59. , nano_time as nano_time
    60. , event as event
    61. from yishou_data.ods_binlog_data
    62. where
    63. dt >= substr(${bdp.system.cyctime}, 1, 8)
    64. and tb = 'fmys_goods_ext'
    65. )
    66. )
    67. where row_number = 1 and event != 1
    68. ;

    5.6. dwd_fmys_goods_ext_dt_dwd层分区表节点说明

    此节点的主要目的是将业务库的数据完整保存到一个分区中,主要解决的问题是当业务库的数据变化时(比如订单的状态、快递配送的状态),下一层重跑时使用新状态的问题,当dwd层有分区之后,每次重跑都只会使用那一天的状态,但是要注意,下一层的脚本需要设置分区,并且该分区可以只保留少数分区即可。

    1. -- DLI sql
    2. -- ******************************************************************** --
    3. -- author: yangshibiao
    4. -- create time: 2022/08/15 10:43:41 GMT+08:00
    5. -- ******************************************************************** --
    6. -- DWD层分区表建表语句
    7. -- CREATE EXTERNAL TABLE `yishou_data`.`dwd_fmys_goods_ext_dt`(
    8. -- `goods_id` BIGINT COMMENT '*',
    9. -- `limit_day` BIGINT COMMENT '*',
    10. -- `auto_time` STRING COMMENT '*',
    11. -- `is_new` BIGINT COMMENT '-1 无效(下架) 0 旧款(有效/上架) 1 新款',
    12. -- `grade` BIGINT COMMENT '商品档次',
    13. -- `season` BIGINT COMMENT '季节',
    14. -- `goods_type` BIGINT COMMENT '商品类型{1:特价,}'
    15. -- ) COMMENT '商品拓展信息表(此表为分区表,保存历史90天的数据)'
    16. -- PARTITIONED BY (`dt` BIGINT COMMENT '日期分区(yyyymmdd),跑数分区,即当脚本调度时间为2022年8月15日0点30分,那分区为20220815')
    17. -- STORED AS orc LOCATION 'obs://yishou-bigdata/yishou_data.db/dwd_fmys_goods_ext_dt'
    18. -- ;
    19. -- 从dwd视图中查出数据,并插入到dwd的分区表中(注意:可以根据具体情况设置文件个数,一般每个文件大小为30M到120M即可)
    20. INSERT OVERWRITE TABLE `yishou_data`.`dwd_fmys_goods_ext_dt` partition(dt)
    21. select
    22. goods_id
    23. , limit_day
    24. , auto_time
    25. , is_new
    26. , grade
    27. , season
    28. , goods_type
    29. , substr(${bdp.system.cyctime}, 1, 8) as dt
    30. from yishou_data.dwd_fmys_goods_ext
    31. DISTRIBUTE BY floor(rand()*20)
    32. ;

    5.7. dwd_fmys_goods_ext_record_dwd层历史记录表节点说明

    此节点主要目的是将业务库历史记录进行保存,该表因为保存的历史记录,所以使用昨天的分区和昨天的binlog数据聚合即可,注意该表需要初始化,详细说明在如下代码中有说明。

    1. -- DLI sql
    2. -- ******************************************************************** --
    3. -- author: yangshibiao
    4. -- create time: 2022/08/15 11:45:36 GMT+08:00
    5. -- ******************************************************************** --
    6. -- DWD层记录表建表语句
    7. -- CREATE EXTERNAL TABLE `yishou_data`.`dwd_fmys_goods_ext_record_dt`(
    8. -- `goods_id` BIGINT COMMENT '*',
    9. -- `limit_day` BIGINT COMMENT '*',
    10. -- `auto_time` STRING COMMENT '*',
    11. -- `is_new` BIGINT COMMENT '-1 无效(下架) 0 旧款(有效/上架) 1 新款',
    12. -- `grade` BIGINT COMMENT '商品档次',
    13. -- `season` BIGINT COMMENT '季节',
    14. -- `goods_type` BIGINT COMMENT '商品类型{1:特价,}',
    15. -- `event` BIGINT comment '数据操作类型(0:插入;1:删除;2:修改)',
    16. -- `record_time` BIGINT comment '从MySQL中获取binlog的13位时间戳',
    17. -- `nano_time` BIGINT comment '纳秒级更新时间戳(当record_time相同时,使用此时间戳来判断先后)'
    18. -- ) COMMENT '商品拓展信息表历史更改记录表(此表为分区表,保存历史7天的数据)'
    19. -- PARTITIONED BY (`dt` BIGINT COMMENT '日期分区(yyyymmdd),跑数分区,即当脚本调度时间为2022年8月15日0点30分,那分区为20220815')
    20. -- STORED AS orc LOCATION 'obs://yishou-bigdata/yishou_data.db/dwd_fmys_goods_ext_record_dt'
    21. -- ;
    22. -- 执行脚本(记录表昨天的数据和binlog昨天的数据 union all 即可)
    23. insert overwrite table `yishou_data`.`dwd_fmys_goods_ext_record_dt` partition(dt)
    24. select
    25. goods_id
    26. , limit_day
    27. , auto_time
    28. , is_new
    29. , grade
    30. , season
    31. , goods_type
    32. , event
    33. , record_time
    34. , nano_time
    35. , substr(${bdp.system.cyctime}, 1, 8) as dt
    36. from (
    37. select
    38. goods_id
    39. , limit_day
    40. , auto_time
    41. , is_new
    42. , grade
    43. , season
    44. , goods_type
    45. , event
    46. , record_time
    47. , nano_time
    48. from yishou_data.dwd_fmys_goods_ext_record_dt
    49. where dt = ${bdp.system.bizdate}
    50. union all
    51. select
    52. get_json_object(columns ,'$.goods_id.value') as goods_id
    53. , get_json_object(columns ,'$.limit_day.value') as limit_day
    54. , get_json_object(columns ,'$.auto_time.value') as auto_time
    55. , get_json_object(columns ,'$.is_new.value') as is_new
    56. , get_json_object(columns ,'$.grade.value') as grade
    57. , get_json_object(columns ,'$.season.value') as season
    58. , get_json_object(columns ,'$.goods_type.value') as goods_type
    59. , event
    60. , create_time as record_time
    61. , nano_time as nano_time
    62. from yishou_data.ods_binlog_data
    63. where
    64. dt = ${bdp.system.bizdate}
    65. and tb = 'fmys_goods_ext'
    66. )
    67. DISTRIBUTE BY floor(rand()*30)
    68. ;
    69. -- 初始化脚本(任务调度之前手动执行初始化脚本)(将昨天的历史所有数据和昨天的binlog汇总,根据主键去重,并设置event为0)
    70. -- 注意:执行初始化脚本时,需要注意昨天的历史数据是否有(即 yishou_data.ods_fmys_goods_ext_dt 表中昨天分区是否有数据 以及binlog昨天对应分区的数据是否有 )
    71. -- 如果没有,可以在功能上线正常运行的第二天,执行如下初始化脚本,再手动重跑该作业即可,这样会通过CDM重新拉取业务库中的数据(并且其中的分区表只会保存一定日期,所以可以忽略)
    72. -- insert overwrite table `yishou_data`.`dwd_fmys_goods_ext_record_dt` partition(dt)
    73. -- select
    74. -- goods_id
    75. -- , limit_day
    76. -- , auto_time
    77. -- , is_new
    78. -- , grade
    79. -- , season
    80. -- , goods_type
    81. -- , 0 as event
    82. -- , record_time
    83. -- , nano_time
    84. -- , substr(${bdp.system.cyctime}, 1, 8) as dt
    85. -- from (
    86. -- select
    87. -- goods_id
    88. -- , limit_day
    89. -- , auto_time
    90. -- , is_new
    91. -- , grade
    92. -- , season
    93. -- , goods_type
    94. -- , record_time
    95. -- , nano_time
    96. -- , event
    97. -- , row_number() over(partition by goods_id order by nano_time desc) as row_number
    98. -- from (
    99. -- select
    100. -- goods_id
    101. -- , limit_day
    102. -- , auto_time
    103. -- , is_new
    104. -- , grade
    105. -- , season
    106. -- , goods_type
    107. -- , 0 as record_time
    108. -- , -987654321012345 as nano_time
    109. -- , 0 as event
    110. -- from yishou_data.ods_fmys_goods_ext_dt
    111. -- where dt = ${bdp.system.bizdate}
    112. -- union all
    113. -- select
    114. -- get_json_object(columns ,'$.goods_id.value') as goods_id
    115. -- , get_json_object(columns ,'$.limit_day.value') as limit_day
    116. -- , get_json_object(columns ,'$.auto_time.value') as auto_time
    117. -- , get_json_object(columns ,'$.is_new.value') as is_new
    118. -- , get_json_object(columns ,'$.grade.value') as grade
    119. -- , get_json_object(columns ,'$.season.value') as season
    120. -- , get_json_object(columns ,'$.goods_type.value') as goods_type
    121. -- , create_time as record_time
    122. -- , nano_time as nano_time
    123. -- , event
    124. -- from yishou_data.ods_binlog_data
    125. -- where
    126. -- dt = ${bdp.system.bizdate}
    127. -- and tb = 'fmys_goods_ext'
    128. -- )
    129. -- )
    130. -- where row_number = 1 and event != 1
    131. -- ;
    132. -- 创建记录表视图(该记录表今天的分区 union all 今天以及之后的binlog的数据,准实时更新)
    133. drop view if exists yishou_data.dwd_fmys_goods_ext_record;
    134. create view if not exists yishou_data.dwd_fmys_goods_ext_record(
    135. goods_id
    136. , limit_day
    137. , auto_time
    138. , is_new
    139. , grade
    140. , season
    141. , goods_type
    142. , event
    143. , record_time
    144. , nano_time
    145. ) comment 'dwd_fmys_goods_ext_record在dwd层视图(根据业务库数据,分钟级别更新)'
    146. as
    147. select
    148. goods_id
    149. , limit_day
    150. , auto_time
    151. , is_new
    152. , grade
    153. , season
    154. , goods_type
    155. , event
    156. , record_time
    157. , nano_time
    158. from (
    159. select
    160. goods_id
    161. , limit_day
    162. , auto_time
    163. , is_new
    164. , grade
    165. , season
    166. , goods_type
    167. , event
    168. , record_time
    169. , nano_time
    170. from yishou_data.dwd_fmys_goods_ext_record_dt
    171. where dt = substr(${bdp.system.cyctime}, 1, 8)
    172. union all
    173. select
    174. get_json_object(columns ,'$.goods_id.value') as goods_id
    175. , get_json_object(columns ,'$.limit_day.value') as limit_day
    176. , get_json_object(columns ,'$.auto_time.value') as auto_time
    177. , get_json_object(columns ,'$.is_new.value') as is_new
    178. , get_json_object(columns ,'$.grade.value') as grade
    179. , get_json_object(columns ,'$.season.value') as season
    180. , get_json_object(columns ,'$.goods_type.value') as goods_type
    181. , event
    182. , create_time as record_time
    183. , nano_time as nano_time
    184. from yishou_data.ods_binlog_data
    185. where
    186. dt >= substr(${bdp.system.cyctime}, 1, 8)
    187. and tb = 'fmys_goods_ext'
    188. )
    189. ;

    6. 总结

    6.1. 历史数据和增量数据合并的注意事项

            该脚本为每天凌晨运行,直接从ods_fmys_goods_ext_dt表中获取昨天的数据,然后从ods_binlog_data表中也获取昨天并对应的业务表的数据。然后根据主键开窗,按照nano_time降序排序,并取第一条,并且要过滤event为1的(event为1的数据不要,这是删除的数据)数据,取出数据后将数据插入到ods_fmys_goods_ext_dt中今天的分区中(保留历史数据,降低误差,提高健壮性)。

            在ods_fmys_goods_ext_dt表中没有nano_time和event字段,需要设置nano_time为-987654321012345(这个值可以随意设置,只要够小就行,关于这样设置可以参考如下的nanoTime的含义),也需要设置event为0,表示这条数据为插入。

    6.2. Java的nanoTime()

            java有两个获取和时间相关的秒数方法,一个是广泛使用的 System.currentTimeMillis() , 返回的是从一个长整型结果,表示毫秒。另一个是 System.nanoTime(), 返回的是纳秒。

            “纳”这个单位 一般不是第一次见。前几年相当火爆的“纳米”和他是同一级别。纳表示的是10的-9次方。在真空中,光一纳秒也只能传播30厘米。比纳秒大一级别的是微秒,10的-6次方;然后是就是毫秒,10的-3次方。纳秒下面还有皮秒、飞秒等。既然纳秒比毫秒高10的6次方精度,那么他们的比值就应该是10的6次方。然而并非如此。

            大家可能都知道毫秒方法返回的是自1970年到现在的毫秒数。而Java的日期也是如此,所以他俩是等值的。但是因为纳秒数值精度太高,所以不能从指定1970年到现在纳秒数,这个输出在不同的机器上可能不一样。

    具体参考如下纳秒方法的注释:

    1. Returns the current value of the running Java Virtual Machine's high-resolution time source, in nanoseconds.
    2. This method can only be used to measure elapsed time and is not related to any other notion of system or wall-clock time. The value returned represents nanoseconds since some fixed but arbitrary origin time (perhaps in the future, so values may be negative). The same origin is used by all invocations of this method in an instance of a Java virtual machine; other virtual machine instances are likely to use a different origin.
    3. 返回当前JVM的高精度时间。该方法只能用来测量时段而和系统时间无关。
    4. 它的返回值是从某个固定但随意的时间点开始的(可能是未来的某个时间)。
    5. 不同的JVM使用的起点可能不同。

            这样有点恐怖的是我们相同的代码在不同机器运行导致结果可能不同。所以它很少用来计算。通常都是测量,并且还有可能是负数。所以如上脚本中,设置ods_fmys_goods_ext_dt表中nano_time的数值,才会设置一个负数,并且是一个够小的负数。

    6.3. 各表保存时间以及资源消耗

    • yishou_data.ods_binlog_data :binlog数据表,以日期和业务库表名为分区,只需要保存7天数据即可
    • yishou_data.ods_fmys_goods_ext_dt :ods层分区表,每个分区都是前一天的全量数据,只需要保存7天数据即可
    • yishou_data.dwd_fmys_goods_ext :dwd层视图,准实时更新,每天会新建视图
    • yishou_data.dwd_fmys_goods_ext_dt :dwd层历史快照,每天都是保存的业务表全量数据,跟ods层类似,但因为ods层正常来说不能提供dws层和分析师使用,所以新建dwd分区快照,可以选择保存90天数据即可
    • yishou_data.dwd_fmys_goods_ext_record :dwd层历史变更记录视图,按分钟级更新,每次查询都是最新的历史变更记录
    • yishou_data.dwd_fmys_goods_ext_record_dt :dwd层历史变更记录表,会将上线之后业务表的所有变更全部保存,因为每天是保存的全量数据,所以保存7天数据即可,而且之所以用分区表保存7天,是为了防止作业运行异常会导致数据异常提高容错率所设,这样前一个分区+binlog数据写入到新分区中

    之前每天通过CDM全量拉取数据,需要一台专有的CDM服务器(因为同时运行多作业需要资源较多)和业务库的大IO。而现在增量只需要2个Flink任务,并且能准实时获取数据,同时还能保存历史快照和历史变更记录。

    CDM集群 + 业务库映射服务器 + OBS数据存储 = 4000 + 2000 + 100 = 6100

    Flink作业 + OBS数据存储 = 600 + 9100 = 9700

    一个快照全量数据为1T数据左右,按上述数据保存,大概会保存100T左右数据(90分区 + 7分区 + 7分区),按照华为云OBS收费,一个月花费9100左右。

    6.4. 优化前后各方面对比

    指标优化前优化后备注
    资源1、需要一台CDM集群
    2、需要业务库高IO(添加映射服务器)
    1、需要2个Flink任务
    2、需要跑SQL的资源
    优化前为每天全量从业务库中拉取数据
    功能1、数仓中每张业务表只有1张全量表
    2、只能离线(1小时以上)更新,不能实时
    1、数仓中有准实时表
    2、数仓中有按天保存的快照数据
    3、数仓中有保存历史所有的变更记录数据
    既有准实时,又有按天保存快照,还有历史变更记录,数据存储会增多
    花费1、专用CDM集群(4000)
    2、业务库映射服务器(2000)
    3、OBS数据存储(100)
    1、Flink作业(2个作业一共600)
    2、OBS数据存储(9100)
    3、DLI中跑SQL可以忽略
    较原先,花费会提高百分之50,但效率、容错等会有本质提高

    6.5. 注意事项

    1. 进行如上操作后,数仓中的业务库的表就会按照分钟级别更新,同理,对流量数据,也可以参考如上操作,做到分钟级别更新
    2. 分钟级别更新会比较消耗资源,但白天没作业运行时,集群中的资源也是空闲状态,可以利用上述方法提供一些分钟级别的数据协助
    3. 上述操作主要是提供分钟级别数据协助,这样消耗的资源比Flink的秒级更新会少很多,具体使用可以考虑资源和效率的选择
    4. 上述操作均在华为云中进行,如使用其他云平台或者使用开源大数据平台,可以参照上述思想来具体实现

    注:其他相关文章链接由此进 ->    开发随笔文章汇总 


  • 相关阅读:
    USB母座引脚定义
    云流化:XR扩展现实应用发展的一个新方向!
    超神之路 数据结构 3 —— Stack栈实现及应用
    想要精通算法和SQL的成长之路 - 连续的子数组和
    论文解读:Large Language Models as Analogical Reasoners
    js红宝书学习笔记(一)引用类型
    Linux系统安装Nginx
    python制作小游戏之二2048第二部分
    【计算机视觉 | 图像分割】arxiv 计算机视觉关于图像分割的学术速递(8 月 29 日论文合集)
    main.jsError: error:0308010C:digital envelope routines::unsupported
  • 原文地址:https://blog.csdn.net/yang_shibiao/article/details/126370851