目录
5.3. ods_fmys_goods_ext_dt_CDM任务节点说明
5.4. ods_fmys_goods_ext_dt_DLI任务节点说明
5.5. dwd_fmys_goods_ext_视图创建节点说明
5.6. dwd_fmys_goods_ext_dt_dwd层分区表节点说明
5.7. dwd_fmys_goods_ext_record_dwd层历史记录表节点说明
注意:博主原有相关博文 FlinkSQL+HDFS+Hive+SparkSQL实现业务数据增量进入数据仓库 ,此方案是在原方案上优化而来,故其中术语名词等说明会跟原博文一致
在我们的数据仓库中,一般情况下都是通过Spoop和Datax等数据传输框架,将数据按天同步到数据仓库中。而且根据业务库的表的类型,可以选择全量同步(每天全量或者一次拉取,比如省份表等)、增量同步(按照更新时间字段拉取数据,并将数据和原表合并,比如订单表)、拉链表(按照更新时间拉取数据,并将数据和原表进行计算,比如用户表)等类型进行数据同步。
但是有时当我们需要实时数据的时候(一般是指小于1小时,但是又没有达到秒级),这按天同步就不能满足需求了,而如果每个任务都使用Flink的话,又会照成资源浪费。
这个时候就可以考虑将业务数据实时同步到数据仓库中,然后后端使用PerstoAPI等框架,每5分钟等计算一次,并将数据进行缓存,这样就可以进行准实时数据协助,并能有效节省资源。
注意:博主使用的是华为云产品,所以使用的是华为云DLI中的FlinkSQL和OBS,可以类比开源的FlinkSQL和HDFS
-
- -- kafka的 bigdata_mysql_binlog 主题中示例数据如下
-
- -- 注意:其中的nanoTime是使用Java中的System.nanoTime()函数生成的,也就是说,如果程序失败,需要将分区数据删除,然后在flink作业中设置拉取时间重跑
-
- -- 具体字段描述如下:
- -- db: 在MySQL中对应的数据库
- -- tb: 在MySQL中对应的表
- -- columns: 一个JSONArray,表示所有的字段,如下示例,一个JSONArray中存在该表中所有字段,存储的json的key为字段名,在该json中又有value这个属性,表示该字段对应的值
- -- event: event对应操作类型和数值: insert = 0 ; delete = 1 ; update = 2 ; alter = 3 ;
- -- sql: 触发该更改或删除等操作的sql语句
- -- primary_key: 该更改的主键
- -- create_time: 该数据从binlog中获取出来的时间(分区就用此时间)
- -- sendTime: 该数据进入到kafka的时间(该时间不用)
- -- nano_time: 纳秒级时间戳,用了区分数据的前后,使用此时间可以保证数据唯一
-
- -- {
- -- "columns": {
- -- "admin_note": {
- -- "key": false,
- -- "mysqlType": "varchar(255)",
- -- "name": "admin_note",
- -- "null_val": false,
- -- "update": false,
- -- "value": ""
- -- },
- -- "allot_num": {
- -- "key": false,
- -- "mysqlType": "smallint(5) unsigned",
- -- "name": "allot_num",
- -- "null_val": false,
- -- "update": false,
- -- "value": "0"
- -- }
- -- },
- -- "createTime": 1649213973044,
- -- "db": "xxx",
- -- "event": 2,
- -- "nanoTime": 23494049498709146,
- -- "primaryKey": "157128418",
- -- "sendTime": 1649213973045,
- -- "sql": "",
- -- "tb": "fmys_order_infos"
- -- }
-
-
- -- source端,对接kafka,从kafka中获取数据
- CREATE SOURCE STREAM bigdata_binlog_ingest_source_bigdata_mysql_binlog (
- db STRING,
- tb STRING,
- columns STRING,
- event INT,
- `sql` STRING,
- `primary_key` STRING,
- create_time bigint,
- send_time bigint,
- nano_time bigint
- ) WITH (
- type = "kafka",
- kafka_bootstrap_servers = "bigdata1:9092,bigdata2:9092,bigdata3:9092",
- kafka_group_id = "bigdata_mysql_binlog_dim_to_obs_test",
- kafka_topic = "bigdata_mysql_binlog_dim",
- encode = "json",
- json_config = "db=db;tb=tb;columns=columns;event=event;sql=sql;primary_key=primaryKey;create_time=createTime;send_time=sendTime;nano_time=nanoTime;"
- );
-
- -- sink端,对接obs,将数据写入到obs中
- CREATE SINK STREAM bigdata_binlog_ingest_sink_bigdata_mysql_binlog (
- db STRING,
- tb STRING,
- columns STRING,
- event int,
- `sql` STRING,
- `primary_key` STRING,
- create_time bigint,
- nano_time bigint,
- dt STRING
- ) PARTITIONED BY(dt,tb) WITH (
- type = "filesystem",
- file.path = "obs://xxx-bigdata/xxx/xxx/ods_binlog_data",
- encode = "parquet",
- ak = "akxxx",
- sk = "skxxx"
- );
-
-
- -- 从source表获取数据,写出到sink表中
- CREATE FUNCTION date_formatted AS 'com.xxx.cdc.udf.DateFormattedUDF';
- INSERT INTO
- bigdata_binlog_ingest_sink_bigdata_mysql_binlog
- SELECT
- db,
- tb,
- columns,
- event,
- `sql`,
- `primary_key`,
- create_time,
- nano_time,
- date_formatted(
- cast(coalesce(create_time, UNIX_TIMESTAMP_MS()) / 1000 as VARCHAR(10)),
- 'yyyyMMdd'
- ) as dt
- FROM
- bigdata_binlog_ingest_source_bigdata_mysql_binlog;
上述FlinkSQL主要通过source获取Kafka主题中的数据,然后什么都不要改变,将数据写入到HDFS中,写入的时候注意,以dt为一级分区,tb为二级分区,主要考虑如下几点:
外部表建表语句如下:
- CREATE TABLE yishou_data.ods_binlog_data (
- `db` STRING COMMENT '数据库名',
- `columns` STRING COMMENT '列数据',
- `event` BIGINT COMMENT '操作类型',
- `sql` STRING COMMENT '执行的SQL',
- `primary_key` STRING COMMENT '主键的值',
- `create_time` BIGINT COMMENT '从MySQL中获取binlog的13位时间戳',
- `send_time` BIGINT COMMENT '发送binlog到kafka的13位时间戳',
- `nano_time` BIGINT COMMENT '纳秒级更新时间戳',
- `dt` BIGINT COMMENT '日期分区',
- `tb` STRING COMMENT '表名'
- ) USING parquet
- PARTITIONED BY (dt, tb) COMMENT 'ods层binlog表(所有通过binlog进入数仓的数据均在此表中)'
- LOCATION 'obs://yishou-bigdata/yishou_data.db/ods_binlog_data'
- ;
建表完后查询结果如下(注意该表是直接插入数据到文件系统中,所以需要根据实际情况来msck该表):
select * from yishou_data.ods_binlog_data_table where dt = 20220816 and tb = 'fmys_goods_ext';
架构图如下:
该作业目的:
作业大致说明:
该节点为SQL节点,具体代码如下:
- -- 在ods_binlog_data表中增加对应日期和表名的分区
- ALTER TABLE yishou_data.ods_binlog_data ADD if not exists PARTITION (dt=${gmtdate}, tb = 'fmys_goods_ext');
-
- -- msck此作业涉及的会删除历史数据的3张表
- MSCK REPAIR TABLE yishou_data.ods_fmys_goods_ext_dt;
- MSCK REPAIR TABLE yishou_data.dwd_fmys_goods_ext_dt;
- MSCK REPAIR TABLE yishou_data.dwd_fmys_goods_ext_record_dt;
主要目的是新增ods_binlog_data表对应日期和业务表的分区,并对其它表进行msck操作(因为其它表会删除历史数据)。
此节点为CMD(即Sqoop)作业,会将业务库中的全量数据拉取到数仓的ods表中(通过overwrite的方式,将数据写入到以今天的分区中),当手动调度时才执行此节点,主要目的是当感觉数据有误时,可以使用此作业来更新替换最新最全最正确的数据。
ods建表语句如下:
- CREATE EXTERNAL TABLE yishou_data.ods_fmys_goods_ext_dt(
- `goods_id` BIGINT COMMENT '*',
- `limit_day` BIGINT COMMENT '*',
- `auto_time` STRING COMMENT '*',
- `is_new` BIGINT COMMENT '-1 无效(下架) 0 旧款(有效/上架) 1 新款',
- `grade` BIGINT COMMENT '商品档次',
- `season` BIGINT COMMENT '季节',
- `goods_type` BIGINT COMMENT '商品类型{1:特价,}'
- ) COMMENT '商品拓展信息表(ods层分区表,保留7天的数据)'
- PARTITIONED BY (`dt` BIGINT COMMENT '日期分区(yyyymmdd),跑数时的日期,比如今天2022年8月15日0点30分跑数,那这分区就是20220815')
- STORED AS orc LOCATION 'obs://yishou-bigdata/yishou_data.db/ods_fmys_goods_ext_dt'
- ;
CDM任务配置如下:
当正常调度时,会执行此节点,会使用ods昨天的分区然后和昨天以及之后的binlog数据合并,再将数据插入到ods的今天的分区中;因为是跑的SQL作业,不需要拉取数据,所以会执行的较快,正常调度时就使用此种方法运行。
- -- DLI sql
- -- ******************************************************************** --
- -- author: yangshibiao
- -- create time: 2022/08/15 10:15:46 GMT+08:00
- -- ******************************************************************** --
-
- -- fmys_goods_ext 表在数仓中ods的建表语句(ods层历史数据表,在对应分区内保存着当天0点之前的所有数据,如果0点30分跑数,也会将这30分的数据放入)
- -- CREATE EXTERNAL TABLE yishou_data.ods_fmys_goods_ext_dt(
- -- `goods_id` BIGINT COMMENT '*',
- -- `limit_day` BIGINT COMMENT '*',
- -- `auto_time` STRING COMMENT '*',
- -- `is_new` BIGINT COMMENT '-1 无效(下架) 0 旧款(有效/上架) 1 新款',
- -- `grade` BIGINT COMMENT '商品档次',
- -- `season` BIGINT COMMENT '季节',
- -- `goods_type` BIGINT COMMENT '商品类型{1:特价,}'
- -- ) COMMENT '商品拓展信息表(ods层分区表,保留7天的数据)'
- -- PARTITIONED BY (`dt` BIGINT COMMENT '日期分区(yyyymmdd),跑数时的日期,比如今天2022年8月15日0点30分跑数,那这分区就是20220815')
- -- STORED AS orc LOCATION 'obs://yishou-bigdata/yishou_data.db/ods_fmys_goods_ext_dt'
- -- ;
-
-
- -- 从ods_fmys_goods_ext_dt表中获取出昨天分区的数据(即相当于昨天0点之前的所有数据)
- -- 从ods_binlog_data表中获取出昨天分区以及之后分区的数据(即昨天的增量更新的binlog数据)
- -- 然后2份数据对主键开窗,获取最新的,并且不是被删除的数据,然后将数据插入到ods_fmys_goods_ext今天的分区中(即今天0点之前的所有历史数据)
- insert overwrite table yishou_data.ods_fmys_goods_ext_dt partition(dt)
- select
- goods_id
- , limit_day
- , auto_time
- , is_new
- , grade
- , season
- , goods_type
- , substr(${bdp.system.cyctime}, 1, 8) as dt
- from (
- select
- goods_id
- , limit_day
- , auto_time
- , is_new
- , grade
- , season
- , goods_type
- , nano_time
- , event
- , row_number() over(partition by goods_id order by nano_time desc) as row_number
- from (
- select
- goods_id
- , limit_day
- , auto_time
- , is_new
- , grade
- , season
- , goods_type
- , -987654321012345 as nano_time
- , 0 as event
- from yishou_data.ods_fmys_goods_ext_dt
- where dt = ${bdp.system.bizdate}
-
- union all
-
- select
- get_json_object(columns ,'$.goods_id.value') as goods_id
- , get_json_object(columns ,'$.limit_day.value') as limit_day
- , get_json_object(columns ,'$.auto_time.value') as auto_time
- , get_json_object(columns ,'$.is_new.value') as is_new
- , get_json_object(columns ,'$.grade.value') as grade
- , get_json_object(columns ,'$.season.value') as season
- , get_json_object(columns ,'$.goods_type.value') as goods_type
- , nano_time as nano_time
- , event as event
- from yishou_data.ods_binlog_data
- where
- dt >= ${bdp.system.bizdate}
- and tb = 'fmys_goods_ext'
- )
- )
- -- event为1就是删除的数据,当最后一次记录为删除数据的时候,就不用这个主键了,因为已经删除了
- where row_number = 1 and event != 1
- ;
此节点的目的是使用ods今天的分区和binlog今天以及之后的分区生成一张dwd的视图,因为binlog的数据是实时更新的,所以这个dwd的视图每次查询的都是最新的,跟业务库可以做到准实时。
- -- DLI sql
- -- ******************************************************************** --
- -- author: yangshibiao
- -- create time: 2022/08/15 10:25:56 GMT+08:00
- -- ******************************************************************** --
- drop view if exists yishou_data.dwd_fmys_goods_ext;
- create view if not exists yishou_data.dwd_fmys_goods_ext(
- goods_id
- , limit_day
- , auto_time
- , is_new
- , grade
- , season
- , goods_type
- ) comment 'fmys_goods_ext在dwd层视图(根据业务库数据,分钟级别更新)'
- as
- select
- goods_id
- , limit_day
- , auto_time
- , is_new
- , grade
- , season
- , goods_type
- from (
- select
- goods_id
- , limit_day
- , auto_time
- , is_new
- , grade
- , season
- , goods_type
- , nano_time
- , event
- , row_number() over(partition by goods_id order by nano_time desc) as row_number
- from (
- select
- goods_id
- , limit_day
- , auto_time
- , is_new
- , grade
- , season
- , goods_type
- , -987654321012345 as nano_time
- , 0 as event
- from yishou_data.ods_fmys_goods_ext_dt
- where dt = substr(${bdp.system.cyctime}, 1, 8)
-
- union all
-
- select
- get_json_object(columns ,'$.goods_id.value') as goods_id
- , get_json_object(columns ,'$.limit_day.value') as limit_day
- , get_json_object(columns ,'$.auto_time.value') as auto_time
- , get_json_object(columns ,'$.is_new.value') as is_new
- , get_json_object(columns ,'$.grade.value') as grade
- , get_json_object(columns ,'$.season.value') as season
- , get_json_object(columns ,'$.goods_type.value') as goods_type
- , nano_time as nano_time
- , event as event
- from yishou_data.ods_binlog_data
- where
- dt >= substr(${bdp.system.cyctime}, 1, 8)
- and tb = 'fmys_goods_ext'
- )
- )
- where row_number = 1 and event != 1
- ;
此节点的主要目的是将业务库的数据完整保存到一个分区中,主要解决的问题是当业务库的数据变化时(比如订单的状态、快递配送的状态),下一层重跑时使用新状态的问题,当dwd层有分区之后,每次重跑都只会使用那一天的状态,但是要注意,下一层的脚本需要设置分区,并且该分区可以只保留少数分区即可。
- -- DLI sql
- -- ******************************************************************** --
- -- author: yangshibiao
- -- create time: 2022/08/15 10:43:41 GMT+08:00
- -- ******************************************************************** --
-
- -- DWD层分区表建表语句
- -- CREATE EXTERNAL TABLE `yishou_data`.`dwd_fmys_goods_ext_dt`(
- -- `goods_id` BIGINT COMMENT '*',
- -- `limit_day` BIGINT COMMENT '*',
- -- `auto_time` STRING COMMENT '*',
- -- `is_new` BIGINT COMMENT '-1 无效(下架) 0 旧款(有效/上架) 1 新款',
- -- `grade` BIGINT COMMENT '商品档次',
- -- `season` BIGINT COMMENT '季节',
- -- `goods_type` BIGINT COMMENT '商品类型{1:特价,}'
- -- ) COMMENT '商品拓展信息表(此表为分区表,保存历史90天的数据)'
- -- PARTITIONED BY (`dt` BIGINT COMMENT '日期分区(yyyymmdd),跑数分区,即当脚本调度时间为2022年8月15日0点30分,那分区为20220815')
- -- STORED AS orc LOCATION 'obs://yishou-bigdata/yishou_data.db/dwd_fmys_goods_ext_dt'
- -- ;
-
- -- 从dwd视图中查出数据,并插入到dwd的分区表中(注意:可以根据具体情况设置文件个数,一般每个文件大小为30M到120M即可)
- INSERT OVERWRITE TABLE `yishou_data`.`dwd_fmys_goods_ext_dt` partition(dt)
- select
- goods_id
- , limit_day
- , auto_time
- , is_new
- , grade
- , season
- , goods_type
- , substr(${bdp.system.cyctime}, 1, 8) as dt
- from yishou_data.dwd_fmys_goods_ext
- DISTRIBUTE BY floor(rand()*20)
- ;
此节点主要目的是将业务库历史记录进行保存,该表因为保存的历史记录,所以使用昨天的分区和昨天的binlog数据聚合即可,注意该表需要初始化,详细说明在如下代码中有说明。
- -- DLI sql
- -- ******************************************************************** --
- -- author: yangshibiao
- -- create time: 2022/08/15 11:45:36 GMT+08:00
- -- ******************************************************************** --
-
- -- DWD层记录表建表语句
- -- CREATE EXTERNAL TABLE `yishou_data`.`dwd_fmys_goods_ext_record_dt`(
- -- `goods_id` BIGINT COMMENT '*',
- -- `limit_day` BIGINT COMMENT '*',
- -- `auto_time` STRING COMMENT '*',
- -- `is_new` BIGINT COMMENT '-1 无效(下架) 0 旧款(有效/上架) 1 新款',
- -- `grade` BIGINT COMMENT '商品档次',
- -- `season` BIGINT COMMENT '季节',
- -- `goods_type` BIGINT COMMENT '商品类型{1:特价,}',
- -- `event` BIGINT comment '数据操作类型(0:插入;1:删除;2:修改)',
- -- `record_time` BIGINT comment '从MySQL中获取binlog的13位时间戳',
- -- `nano_time` BIGINT comment '纳秒级更新时间戳(当record_time相同时,使用此时间戳来判断先后)'
- -- ) COMMENT '商品拓展信息表历史更改记录表(此表为分区表,保存历史7天的数据)'
- -- PARTITIONED BY (`dt` BIGINT COMMENT '日期分区(yyyymmdd),跑数分区,即当脚本调度时间为2022年8月15日0点30分,那分区为20220815')
- -- STORED AS orc LOCATION 'obs://yishou-bigdata/yishou_data.db/dwd_fmys_goods_ext_record_dt'
- -- ;
-
- -- 执行脚本(记录表昨天的数据和binlog昨天的数据 union all 即可)
- insert overwrite table `yishou_data`.`dwd_fmys_goods_ext_record_dt` partition(dt)
- select
- goods_id
- , limit_day
- , auto_time
- , is_new
- , grade
- , season
- , goods_type
- , event
- , record_time
- , nano_time
- , substr(${bdp.system.cyctime}, 1, 8) as dt
- from (
- select
- goods_id
- , limit_day
- , auto_time
- , is_new
- , grade
- , season
- , goods_type
- , event
- , record_time
- , nano_time
- from yishou_data.dwd_fmys_goods_ext_record_dt
- where dt = ${bdp.system.bizdate}
-
- union all
-
- select
- get_json_object(columns ,'$.goods_id.value') as goods_id
- , get_json_object(columns ,'$.limit_day.value') as limit_day
- , get_json_object(columns ,'$.auto_time.value') as auto_time
- , get_json_object(columns ,'$.is_new.value') as is_new
- , get_json_object(columns ,'$.grade.value') as grade
- , get_json_object(columns ,'$.season.value') as season
- , get_json_object(columns ,'$.goods_type.value') as goods_type
- , event
- , create_time as record_time
- , nano_time as nano_time
- from yishou_data.ods_binlog_data
- where
- dt = ${bdp.system.bizdate}
- and tb = 'fmys_goods_ext'
- )
- DISTRIBUTE BY floor(rand()*30)
- ;
-
-
-
- -- 初始化脚本(任务调度之前手动执行初始化脚本)(将昨天的历史所有数据和昨天的binlog汇总,根据主键去重,并设置event为0)
- -- 注意:执行初始化脚本时,需要注意昨天的历史数据是否有(即 yishou_data.ods_fmys_goods_ext_dt 表中昨天分区是否有数据 以及binlog昨天对应分区的数据是否有 )
- -- 如果没有,可以在功能上线正常运行的第二天,执行如下初始化脚本,再手动重跑该作业即可,这样会通过CDM重新拉取业务库中的数据(并且其中的分区表只会保存一定日期,所以可以忽略)
- -- insert overwrite table `yishou_data`.`dwd_fmys_goods_ext_record_dt` partition(dt)
- -- select
- -- goods_id
- -- , limit_day
- -- , auto_time
- -- , is_new
- -- , grade
- -- , season
- -- , goods_type
- -- , 0 as event
- -- , record_time
- -- , nano_time
- -- , substr(${bdp.system.cyctime}, 1, 8) as dt
- -- from (
- -- select
- -- goods_id
- -- , limit_day
- -- , auto_time
- -- , is_new
- -- , grade
- -- , season
- -- , goods_type
- -- , record_time
- -- , nano_time
- -- , event
- -- , row_number() over(partition by goods_id order by nano_time desc) as row_number
- -- from (
- -- select
- -- goods_id
- -- , limit_day
- -- , auto_time
- -- , is_new
- -- , grade
- -- , season
- -- , goods_type
- -- , 0 as record_time
- -- , -987654321012345 as nano_time
- -- , 0 as event
- -- from yishou_data.ods_fmys_goods_ext_dt
- -- where dt = ${bdp.system.bizdate}
-
- -- union all
-
- -- select
- -- get_json_object(columns ,'$.goods_id.value') as goods_id
- -- , get_json_object(columns ,'$.limit_day.value') as limit_day
- -- , get_json_object(columns ,'$.auto_time.value') as auto_time
- -- , get_json_object(columns ,'$.is_new.value') as is_new
- -- , get_json_object(columns ,'$.grade.value') as grade
- -- , get_json_object(columns ,'$.season.value') as season
- -- , get_json_object(columns ,'$.goods_type.value') as goods_type
- -- , create_time as record_time
- -- , nano_time as nano_time
- -- , event
- -- from yishou_data.ods_binlog_data
- -- where
- -- dt = ${bdp.system.bizdate}
- -- and tb = 'fmys_goods_ext'
- -- )
- -- )
- -- where row_number = 1 and event != 1
- -- ;
-
-
- -- 创建记录表视图(该记录表今天的分区 union all 今天以及之后的binlog的数据,准实时更新)
- drop view if exists yishou_data.dwd_fmys_goods_ext_record;
- create view if not exists yishou_data.dwd_fmys_goods_ext_record(
- goods_id
- , limit_day
- , auto_time
- , is_new
- , grade
- , season
- , goods_type
- , event
- , record_time
- , nano_time
- ) comment 'dwd_fmys_goods_ext_record在dwd层视图(根据业务库数据,分钟级别更新)'
- as
- select
- goods_id
- , limit_day
- , auto_time
- , is_new
- , grade
- , season
- , goods_type
- , event
- , record_time
- , nano_time
- from (
- select
- goods_id
- , limit_day
- , auto_time
- , is_new
- , grade
- , season
- , goods_type
- , event
- , record_time
- , nano_time
- from yishou_data.dwd_fmys_goods_ext_record_dt
- where dt = substr(${bdp.system.cyctime}, 1, 8)
-
- union all
-
- select
- get_json_object(columns ,'$.goods_id.value') as goods_id
- , get_json_object(columns ,'$.limit_day.value') as limit_day
- , get_json_object(columns ,'$.auto_time.value') as auto_time
- , get_json_object(columns ,'$.is_new.value') as is_new
- , get_json_object(columns ,'$.grade.value') as grade
- , get_json_object(columns ,'$.season.value') as season
- , get_json_object(columns ,'$.goods_type.value') as goods_type
- , event
- , create_time as record_time
- , nano_time as nano_time
- from yishou_data.ods_binlog_data
- where
- dt >= substr(${bdp.system.cyctime}, 1, 8)
- and tb = 'fmys_goods_ext'
- )
- ;
该脚本为每天凌晨运行,直接从ods_fmys_goods_ext_dt表中获取昨天的数据,然后从ods_binlog_data表中也获取昨天并对应的业务表的数据。然后根据主键开窗,按照nano_time降序排序,并取第一条,并且要过滤event为1的(event为1的数据不要,这是删除的数据)数据,取出数据后将数据插入到ods_fmys_goods_ext_dt中今天的分区中(保留历史数据,降低误差,提高健壮性)。
在ods_fmys_goods_ext_dt表中没有nano_time和event字段,需要设置nano_time为-987654321012345(这个值可以随意设置,只要够小就行,关于这样设置可以参考如下的nanoTime的含义),也需要设置event为0,表示这条数据为插入。
java有两个获取和时间相关的秒数方法,一个是广泛使用的 System.currentTimeMillis() , 返回的是从一个长整型结果,表示毫秒。另一个是 System.nanoTime(), 返回的是纳秒。
“纳”这个单位 一般不是第一次见。前几年相当火爆的“纳米”和他是同一级别。纳表示的是10的-9次方。在真空中,光一纳秒也只能传播30厘米。比纳秒大一级别的是微秒,10的-6次方;然后是就是毫秒,10的-3次方。纳秒下面还有皮秒、飞秒等。既然纳秒比毫秒高10的6次方精度,那么他们的比值就应该是10的6次方。然而并非如此。
大家可能都知道毫秒方法返回的是自1970年到现在的毫秒数。而Java的日期也是如此,所以他俩是等值的。但是因为纳秒数值精度太高,所以不能从指定1970年到现在纳秒数,这个输出在不同的机器上可能不一样。
具体参考如下纳秒方法的注释:
- Returns the current value of the running Java Virtual Machine's high-resolution time source, in nanoseconds.
- This method can only be used to measure elapsed time and is not related to any other notion of system or wall-clock time. The value returned represents nanoseconds since some fixed but arbitrary origin time (perhaps in the future, so values may be negative). The same origin is used by all invocations of this method in an instance of a Java virtual machine; other virtual machine instances are likely to use a different origin.
-
- 返回当前JVM的高精度时间。该方法只能用来测量时段而和系统时间无关。
- 它的返回值是从某个固定但随意的时间点开始的(可能是未来的某个时间)。
- 不同的JVM使用的起点可能不同。
这样有点恐怖的是我们相同的代码在不同机器运行导致结果可能不同。所以它很少用来计算。通常都是测量,并且还有可能是负数。所以如上脚本中,设置ods_fmys_goods_ext_dt表中nano_time的数值,才会设置一个负数,并且是一个够小的负数。
之前每天通过CDM全量拉取数据,需要一台专有的CDM服务器(因为同时运行多作业需要资源较多)和业务库的大IO。而现在增量只需要2个Flink任务,并且能准实时获取数据,同时还能保存历史快照和历史变更记录。
CDM集群 + 业务库映射服务器 + OBS数据存储 = 4000 + 2000 + 100 = 6100
Flink作业 + OBS数据存储 = 600 + 9100 = 9700
一个快照全量数据为1T数据左右,按上述数据保存,大概会保存100T左右数据(90分区 + 7分区 + 7分区),按照华为云OBS收费,一个月花费9100左右。
指标 | 优化前 | 优化后 | 备注 |
资源 | 1、需要一台CDM集群 2、需要业务库高IO(添加映射服务器) | 1、需要2个Flink任务 2、需要跑SQL的资源 | 优化前为每天全量从业务库中拉取数据 |
功能 | 1、数仓中每张业务表只有1张全量表 2、只能离线(1小时以上)更新,不能实时 | 1、数仓中有准实时表 2、数仓中有按天保存的快照数据 3、数仓中有保存历史所有的变更记录数据 | 既有准实时,又有按天保存快照,还有历史变更记录,数据存储会增多 |
花费 | 1、专用CDM集群(4000) 2、业务库映射服务器(2000) 3、OBS数据存储(100) | 1、Flink作业(2个作业一共600) 2、OBS数据存储(9100) 3、DLI中跑SQL可以忽略 | 较原先,花费会提高百分之50,但效率、容错等会有本质提高 |
注:其他相关文章链接由此进 -> 开发随笔文章汇总