• 长周期数据更新的处理


    方案1

    该方案糅杂了更新的逻辑和大量的join处理

    import org.apache.spark.sql.SparkSession
    
    
    object dwd_order_info_abi {
    
      def main(args: Array[String]): Unit = {
    
        var dt = "2023-07-01"
        var dt1 = "2023-07-17"
    
        System.setProperty("HADOOP_USER_NAME", "root")
        val builder = SparkUtils.getBuilder
    
        if (System.getProperties.getProperty("os.name").contains("Windows")) {
          builder.master("local[*]")
        } else {
    
          dt = args(0)
          dt1 = args(1)
    
        }
        val spark = builder.appName("dwd_order_info_abi").getOrCreate()
    
        HiveUtil.openDynamicPartition(spark)
        spark.sql("set spark.sql.shuffle.partitions=1")
    
        new IDate {
          override def onDate(dt: String): Unit = {
    
            process(spark, dt)
    
          }
        }.invoke(dt, dt1)
    
        spark.stop()
    
      }
    
    
      /**
       * insert overwrite table ${to_db}.dwd_order_info_abi partition(dt)
       */
      private def process(spark: SparkSession, do_date: String) = {
    
        val hive_db = "paascloud"
    
        spark.sql(s"use $hive_db")
    
        val frame = spark.sql(
          s"""
             |
             |
             |with t_org_flat as(select * from ods_t_org_flat),
             |
             |detail as (select * from ${hive_db}.tbworkorder_config_detail)
             |
             |insert overwrite table ${hive_db}.dwd_order_info_abi partition(dt)
             |
             |select
             |
             |if(new.orderid is null,old.orderid,new.orderid) as orderid
             |,if(new.ordercreatetime is null,old.ordercreatetime,new.ordercreatetime) as ordercreatetime
             |,if(new.create_by is null,old.create_by,new.create_by) as create_by
             |,if(new.ct_name is null,old.ct_name,new.ct_name) as ct_name
             |,if(new.ct_phone is null,old.ct_phone,new.ct_phone) as ct_phone
             |,if(new.splicing_name is null,old.splicing_name,new.splicing_name) as splicing_name
             |
             |,if(new.operatelifecycle is null,old.operatelifecycle,new.operatelifecycle) as operatelifecycle
             |
             |,if(new.stars is null,old.stars,new.stars) as stars
             |,if(new.isfinished is null,old.isfinished,new.isfinished) as isfinished
             |
             |,if(new.AUDIT_ORDER_star is null,old.AUDIT_ORDER_star,new.AUDIT_ORDER_star) as AUDIT_ORDER_star
             |,if(new.EVALUATE_ORDER_star is null,old.EVALUATE_ORDER_star,new.EVALUATE_ORDER_star) as EVALUATE_ORDER_star
             |,if(new.RETURN_VISIT_star is null,old.RETURN_VISIT_star,new.RETURN_VISIT_star) as RETURN_VISIT_star
             |,if(new.overall_score is null,old.overall_score,new.overall_score) as overall_score
             |,if(new.overall_score_level is null,old.overall_score_level,new.overall_score_level) as overall_score_level
             |
             |
             |,if(new.operateUsers is null,old.operateUsers,new.operateUsers) as operateUsers
             |,if(new.evaluate_operator is null,old.evaluate_operator,new.evaluate_operator) as evaluate_operator
             |,if(new.revisit_operator is null,old.revisit_operator,new.revisit_operator) as revisit_operator
             |
             |,if(new.corcode_f3 is null,old.corcode_f3,new.corcode_f3) as corcode_f3
             |,if(new.name_f3 is null,old.name_f3,new.name_f3) as name_f3
             |,if(new.sort_f3 is null,old.sort_f3,new.sort_f3) as sort_f3
             |,if(new.corcode_f2 is null,old.corcode_f2,new.corcode_f2) as corcode_f2
             |,if(new.name_f2 is null,old.name_f2,new.name_f2) as name_f2
             |,if(new.sort_f2 is null,old.sort_f2,new.sort_f2) as sort_f2
             |,if(new.corcode_f1 is null,old.corcode_f1,new.corcode_f1) as corcode_f1
             |,if(new.name_f1 is null,old.name_f1,new.name_f1) as name_f1
             |,if(new.sort_f1 is null,old.sort_f1,new.sort_f1) as sort_f1
             |,if(new.project_star is null,old.project_star,new.project_star) as project_star
             |,if(new.project_nature is null,old.project_nature,new.project_nature) as project_nature
             |
             |,if(new.companyId is null,old.companyId,new.companyId) as companyId
             |,if(new.areasId is null,old.areasId,new.areasId) as areasId
             |,if(new.institutionId is null,old.institutionId,new.institutionId) as institutionId
             |,if(new.platfromFiledCode is null,old.platfromFiledCode,new.platfromFiledCode) as platfromFiledCode
             |,if(new.orderLargerType is null,old.orderLargerType,new.orderLargerType) as orderLargerType
             |,if(new.orderSecondType is null,old.orderSecondType,new.orderSecondType) as orderSecondType
             |,if(new.orderThirdlyType is null,old.orderThirdlyType,new.orderThirdlyType) as orderThirdlyType
             |,if(new.serviceFlowAlias is null,old.serviceFlowAlias,new.serviceFlowAlias) as serviceFlowAlias
             |,if(new.orderSource is null,old.orderSource,new.orderSource) as orderSource
             |,if(new.orderSourceName is null,old.orderSourceName,new.orderSourceName) as orderSourceName
             |,if(new.orderStatus is null,old.orderStatus,new.orderStatus) as orderStatus
             |,if(new.orderStatusName is null,old.orderStatusName,new.orderStatusName) as orderStatusName
             |
             |,if(new.actualhour is null,old.actualhour,new.actualhour) as actualhour
             |,if(new.second_hour is null,old.second_hour,new.second_hour) as second_hour
             |,if(new.third_hour is null,old.third_hour,new.third_hour) as third_hour
             |,if(new.man_hour is null,old.man_hour,new.man_hour) as man_hour
             |,if(new.istimeout is null,old.istimeout,new.istimeout) as istimeout
             |
             |,if(new.isurgent is null,old.isurgent,new.isurgent) as isurgent
             |,if(new.isimportance is null,old.isimportance,new.isimportance) as isimportance
             |,if(new.issupervise is null,old.issupervise,new.issupervise) as issupervise
             |,if(new.reworknum is null,old.reworknum,new.reworknum) as reworknum
             |,if(new.isrework is null,old.isrework,new.isrework) as isrework
             |,if(new.isexception is null,old.isexception,new.isexception) as isexception
             |
             |,if(new.deal_user_ids is null,old.deal_user_ids,new.deal_user_ids) as deal_user_ids
             |,if(new.dealUserOrgIds is null,old.dealUserOrgIds,new.dealUserOrgIds) as dealUserOrgIds
             |
             |,if(new.comefrom is null,old.comefrom,new.comefrom) as comefrom
             |,if(new.address_id is null,old.address_id,new.address_id) as address_id
             |,if(new.flow_subsystem_code is null,old.flow_subsystem_code,new.flow_subsystem_code) as flow_subsystem_code
             |,if(new.floor_id is null,old.floor_id,new.floor_id) as floor_id
             |,if(new.is_over_time is null,old.is_over_time,new.is_over_time) as is_over_time
             |,if(new.out_source is null,old.out_source,new.out_source) as out_source
             |,if(new.send_user_ids is null,old.send_user_ids,new.send_user_ids) as send_user_ids
             |
             |,date_format(if(new.ordercreatetime is null,old.ordercreatetime,new.ordercreatetime),'yyyy-MM-dd') as dt
             |from
             |
             |(
             |
             |select
             |
             |	*
             |from
             |	${hive_db}.dwd_order_info_abi
             |where
             |	dt in (
             |	select
             |		date_format(ordercreatetime, 'yyyy-MM-dd')
             |	from
             |		${hive_db}.ods_order
             |	where
             |		dt = '$do_date'
             |  group by date_format(ordercreatetime, 'yyyy-MM-dd')
             |  )
             |
             |) old
             |
             |full outer join
             |(
             |
             |select
             |	info.orderid,
             |	info.ordercreatetime,
             |  info.create_by,
             |	user.user_name as ct_name,
             |	user.telephone as ct_phone,
             |	build.splicing_name,
             |	trace.operatelifecycle,
             |	trace.stars,
             |
             |	if(info.orderstatus in('ORDER_YWC','ORDER_DHYWC','ORDER_YQX','ORDER_YCGB'),'1','0') as isfinished,
             |
             |	if(if(info.orderstatus in('ORDER_YWC','ORDER_DHYWC','ORDER_YQX','ORDER_YCGB'),'1','0')='1',nvl(trace.stars['AUDIT_ORDER'],'0'),'-1') as AUDIT_ORDER_star,--负1表示无效分,这个工单还没有完成,完成的情况下没有值给0
             |	if(if(info.orderstatus in('ORDER_YWC','ORDER_DHYWC','ORDER_YQX','ORDER_YCGB'),'1','0')='1',nvl(trace.stars['RETURN_VISIT'],'4'),'-1') as RETURN_VISIT_star,--负1表示无效分,这个工单还没有完成,完成的情况下没有值给默认值4
             |	if(if(info.orderstatus in('ORDER_YWC','ORDER_DHYWC','ORDER_YQX','ORDER_YCGB'),'1','0')='1',nvl(trace.stars['EVALUATE_ORDER'],'4'),'-1') as EVALUATE_ORDER_star,--负1表示无效分,这个工单还没有完成,完成的情况下没有值给默认值4
             |
             |	round((if(if(info.orderstatus in('ORDER_YWC','ORDER_DHYWC','ORDER_YQX','ORDER_YCGB'),'1','0')='1',nvl(trace.stars['AUDIT_ORDER'],'0'),'-1')*0.2+
             |		if(if(info.orderstatus in('ORDER_YWC','ORDER_DHYWC','ORDER_YQX','ORDER_YCGB'),'1','0')='1',nvl(trace.stars['RETURN_VISIT'],'4'),'-1')*0.1+
             |		if(if(info.orderstatus in('ORDER_YWC','ORDER_DHYWC','ORDER_YQX','ORDER_YCGB'),'1','0')='1',nvl(trace.stars['EVALUATE_ORDER'],'4'),'-1')*0.7),1) as overall_score,
             |
             |	round((if(if(info.orderstatus in('ORDER_YWC','ORDER_DHYWC','ORDER_YQX','ORDER_YCGB'),'1','0')='1',nvl(trace.stars['AUDIT_ORDER'],'0'),'-1')*0.2+
             |                if(if(info.orderstatus in('ORDER_YWC','ORDER_DHYWC','ORDER_YQX','ORDER_YCGB'),'1','0')='1',nvl(trace.stars['RETURN_VISIT'],'4'),'-1')*0.1+
             |                if(if(info.orderstatus in('ORDER_YWC','ORDER_DHYWC','ORDER_YQX','ORDER_YCGB'),'1','0')='1',nvl(trace.stars['EVALUATE_ORDER'],'4'),'-1')*0.7),0) as overall_score_level,
             |
             |	trace.operateUsers,
             |	nvl(trace.operateUsers['RETURN_VISIT'],'-1') as revisit_operator,
             |	nvl(trace.operateUsers['EVALUATE_ORDER'],'-1') as evaluate_operator,
             |
             |	t_org_flat.corcode_f3,
             |	t_org_flat.name_f3,t_org_flat.sort_f3,
             |	t_org_flat.corcode_f2,
             |	t_org_flat.name_f2,t_org_flat.sort_f2,
             |	t_org_flat.corcode_f1,
             |	t_org_flat.name_f1,t_org_flat.sort_f1,
             | 	t_org_flat.project_star,
             | 	t_org_flat.project_nature,
             |
             |	info.companyId,
             |	info.areasId,
             |	info.institutionId ,
             |	info.platfromFiledCode ,
             |	info.orderLargerType,
             |	info.orderSecondType,
             |	info.orderThirdlyType,
             |	info.serviceFlowAlias ,
             |	info.orderSource ,
             |	info.orderSourceName ,
             |	info.orderStatus ,
             |	info.orderStatusName ,
             |
             |	info.actualhour ,
             |	detail.man_hour as second_hour ,
             |	detail2.man_hour as third_hour ,
             |	case
             |		when orderThirdlyType is not null then detail2.man_hour
             |		when orderThirdlyType is null and orderSecondType is not null then detail.man_hour
             |		when orderThirdlyType is null and orderSecondType is null then '9999'
             |	end as man_hour ,
             |	--该工单对应的服务类型下的定义工时
             | 	case
             |		when if(info.orderstatus in('ORDER_YWC','ORDER_DHYWC','ORDER_YQX','ORDER_YCGB'),'1','0')='0' then '-1'
             |		when orderThirdlyType is not null then if(info.actualHour>detail2.man_hour, 1, 0)
             |		when orderThirdlyType is null and orderSecondType is not null then if(info.actualHour>detail.man_hour, 1, 0)
             |		when orderThirdlyType is null and orderSecondType is null then '0' --三级二级都没有,则默认为及时
             |	end as istimeout,
             |
             |	if(info.urgent=1 or info.urgent=2,1,0) as isurgent,
             |	info.importance as isimportance,
             |	if(info.superviseNum>0,1,0) as issupervise,
             |	info.reworknum as reworknum,--该工单返工次数
             |	if(info.reworkNum>0,1,0) as isrework,
             |
             |	if(info.orderStatus = 'ORDER_YQX' or info.orderstatus = 'ORDER_YCGB'
             |	or info.superviseNum>0
             |	or info.reworkNum>0
             |	or nvl(trace.stars['AUDIT_ORDER'],'100')<3
             |	or nvl(trace.stars['RETURN_VISIT'],'100')<3
             |	or nvl(trace.stars['EVALUATE_ORDER'],'100')<3 , '1', '0') as isexception,
             |
             |	info.dealUserIds as deal_user_ids,
             |	info.dealUserOrgIds as dealUserOrgIds,
             |
             |	info.comefrom as comefrom,
             |  if(info.home_address is null,info.floor_id,info.home_address) as address_id
             |  ,info.flow_subsystem_code as flow_subsystem_code
             |  ,info.floor_id as floor_id
             |  ,info.is_over_time as is_over_time
             |  ,info.out_source as out_source
             |  ,info.send_user_ids as send_user_ids
             |from
             |	(
             |	select
             |		*
             |	from
             |		${hive_db}.ods_order
             |	where
             |		dt = '$do_date' ) info
             |left join (
             |	select
             |		workOrderId
             |		,str_to_map(concat_ws(',', collect_set(concat(operateType, '=>', createTime))), ',', '=>') operatelifecycle
             |		,str_to_map(concat_ws(',', collect_set(concat(operateType, '=>', if(evaluateLevel is not null,evaluateLevel,'-1')))), ',', '=>') stars
             |		,str_to_map(concat_ws(',', collect_set(concat(operateType, '=>', if(operateUser is not null,operateUser,'-1')))), ',', '=>') operateUsers
             |	from
             |		${hive_db}.ods_order_track
             |		where dt in(
             |  	select
             |		date_format(ordercreatetime, 'yyyy-MM-dd')
             |	from
             |		${hive_db}.ods_order
             |	where
             |		dt = '$do_date'
             |  group by date_format(ordercreatetime, 'yyyy-MM-dd')
             |
             |  )
             |
             |		group by workOrderId ) trace on
             |	info.orderid = trace.workOrderId
             |
             |	left join
             |		 detail on
             |		info.orderSecondType=detail.id
             |	left join
             |		 detail detail2 on
             |		info.orderThirdlyType=detail2.id
             |
             |        left join  t_org_flat on
             |                info.institutionId=t_org_flat.corcode_f1
             |                or info.institutionid=t_org_flat.corcode_f2
             |
             |        left join(
             |
             |
             |		select * from ${hive_db}.t_uac_user
             |	) user
             |		on info.create_by=user.user_id
             |
             |	left join(
             |
             |		select * from ${hive_db}.t_uac_building_room
             |	) build
             |		on info.home_address=build.id
             |
             |	) new
             |on old.orderid=new.orderid
             |
             |
             |""".stripMargin)
        frame
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310

    方案2

    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types.StringType
    import org.apache.spark.storage.StorageLevel
    
    object ods_record_detailed_update {
    
      val hive_db = "tcm"
    
      def main(args: Array[String]): Unit = {
    
    
        var dt = "2023-06-30"
        var dt1 = "2023-06-30"
    
    
        System.setProperty("HADOOP_USER_NAME", "root")
        val builder = SparkUtils.getBuilder
    
        if (System.getProperties.getProperty("os.name").contains("Windows")) {
          builder.master("local[*]")
    
        } else {
    
          dt = args(0)
          dt1 = args(1)
        }
    
        val spark = builder
          .appName(this.getClass.getSimpleName).getOrCreate()
        HiveUtil.openDynamicPartition(spark)
        spark.sql("set spark.sql.shuffle.partitions=1")
    
        spark.sql(s"use $hive_db")
    
    
        new IDate {
          override def onDate(dt: String): Unit = {
    
    
            /**
             * may be include (dt1,dt2,dt3...)
             */
            val f_new = spark.sql(
              s"""
                 |
                 |select * from ods_record_detailed where dt='${dt}'
                 |
                 |""".stripMargin)
              .select(
                "record_id"
                , "record_detailed_id"
                , "record_content"
                , "create_datetime"
                , "update_datetime"
                , "record_detailed_state"
                , "dt"
              )
    
    
            val f_old = spark.sql(
              s"""
                 |
                 |select * from ods_record_detailed_update2 where dt in (
                 |
                 |
                 |SELECT date_format(create_datetime,'yyyy-MM-dd') from ods_record_detailed
                 |WHERE dt='$dt' GROUP BY   date_format(create_datetime, 'yyyy-MM-dd')
                 |
                 |
                 |) and dt !='$dt'
                 |
                 |""".stripMargin)
              .select(
                "record_id"
                , "record_detailed_id"
                , "record_content"
                , "create_datetime"
                , "update_datetime"
                , "record_detailed_state"
                , "dt"
              )
    
    
            val f_all = f_new.unionByName(f_old)
    
            /**
             * 一份提交的 record_detailed_id,有更新的会有多个,取dt最大的
             * 注意不能用更新时间,因为updateTime在重新加载后,不同dt的两条数据会一模一样
             */
            val windowSpec = Window.partitionBy("record_detailed_id").orderBy(col("dt").desc)
    
            val f_rank = f_all
              .withColumn("rank", row_number.over(windowSpec))
              .persist(StorageLevel.MEMORY_ONLY)
    
    
            /**
             * 基于 create_datetime 重新设置dt,在这里按照业务创建时间分区
             */
            val r = f_rank
              .where("rank<=1")
              .withColumn("dt", to_date(col("create_datetime"), "yyyy-MM-dd").cast(StringType))
              .select(
                "record_id"
                , "record_detailed_id"
                , "record_content"
                , "create_datetime"
                , "update_datetime"
                , "record_detailed_state"
                , "rank"
                , "dt"
              )
            println("r show===>")
            r.show(30, false)
    
    
            val r_ck = SparkUtils.persistDataFrame(spark, r)
    
            r_ck._1.createOrReplaceTempView("r_ck")
    
            /**
             * need hive table,not spark table
             *
             *
             * create table tcm.ods_record_detailed_update2(
             *
             * record_id string,
             * record_detailed_id string,
             *
             * record_content string,
             * create_datetime string,
             * update_datetime string,
             * record_detailed_state string
             *
             * )
             * partitioned by (dt string)
             * stored as parquet
             * location '/user/hive/warehouse/tcm.db/ods_record_detailed_update2'
             */
            spark.sql(
              """
                |
                |insert overwrite table tcm.ods_record_detailed_update2 partition(dt)
                |
                |select
                |
                |record_id,
                |record_detailed_id,
                |record_content,
                |create_datetime,
                |update_datetime,
                |record_detailed_state,
                |
                |dt as dt
                |from r_ck
                |
                |
                |""".stripMargin)
    
            SparkUtils.unPersistDataFrame(spark, r_ck._2)
          }
        }.invoke(dt, dt1)
    
        spark.stop()
    
      }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169

    变化日期的获取脚本

    get_changed_dt.sh

    
    #!/bin/bash
    db=paascloud
    hive=/opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/bin/hive
    if [ $1 ];
    then
    dt=$1
    else
     
    dt=`date -d "-1 day" +%F`
     
    fi
     
    if [ $2 ];
    then
    dt1=$2
    else
    dt1=`date -d "-1 day" +%F`
    fi
     
     
     
    flag=""
     
    f(){
     
     
    do_date=$1
    #echo "===函数get_changed_dt.f()执行日期为 $do_date==="
     
     
    sql="
    use $db;
    set hive.cli.print.header=false;
    set HIVE_SKIP_SPARK_ASSEMBLY=true;
    select 
    ct
    from (
            select
                    date_format(ordercreatetime, 'yyyy-MM-dd') as ct
            from
                    ods_order
            where
                   dt = '$do_date' 
    ) t group by ct
    order by ct desc
    ;
     
    "
     
     
     
    WORK_DIR=$(cd "$(dirname "$0")";pwd)
     
    LOG_PATH="$WORK_DIR/log/$do_date"
    mkdir -p $LOG_PATH
    FILE_NAME="get_changed_dt"
    FILE_NAME1="get_changed_dt_result"
     
     
    flag=`$hive -e "$sql" 2>&1 | tee $LOG_PATH/${FILE_NAME}.log | grep -v "WARN"`
     
     
    while read -r line
    do
     # echo "$line"
      error="FAILED"
      if [[ $line == *$error* ]]
      then
        #echo "HIVE JOB EXECUTION FAILED AND DATE IS 【${do_date}】......"
        #exit 1
    	sleep 1m
        f ${do_date}
      fi 
    done < ${LOG_PATH}/${FILE_NAME}.log
     
     
     
     
    #sink
     
    arr=() 
    arr+=($do_date)
    for i in $flag
    do
    	       if [[ $i == "20"* ]] && [[ ${#i} == 10 ]] &&  [ $i \< $do_date ]
                    	then
    			arr+=($i)
           		fi
    done
     
     
    echo ${arr[*]} 2>&1 | tee  $LOG_PATH/${FILE_NAME1}
     
    }
     
     
    start_day=$dt
    end_day=$dt1
    dt=$start_day
    while [[ $dt < `date -d "+1 day $end_day" +%Y-%m-%d` ]]
    do
    	#echo "while process:"$dt
            f $dt
       dt=`date -d "+1 day $dt" +%Y-%m-%d`
    done
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106

    长周期数据批处理和单独更新处理的统一逻辑

    [root@mz-hadoop-01 tcm]# cat hive_to_olap_4_tcm_parse.sh 
    
    
    #!/bin/bash
    
    
    source /root/bin/common_config/db_config.properties
    
    
    
    hive_table=$1
    target_table=$2
    
    
    
    if [ $3 ];
    then
    dt=$3
    else
     
    dt=`date -d "-1 day" +%F`
     
    fi
     
    if [ $4 ];
    then
    dt1=$4
    else
    dt1=`date -d "-1 day" +%F`
    fi
    
    
    
    
    echo "起始日期为$dt"
    echo "结束日期为$dt1"
    
    
    f(){
    
    do_date=$1
    echo "===函数执行日期为 $do_date==="
    
    
    	/root/bin/hive_to_mysql.sh "$olap_host" tcm "$hive_table" "$olap_db" "$target_table" "$olap_user" "$olap_pwd" $1 $1
    
    
    }
    
    
      if [[ $dt == $dt1 ]]
      then
        echo "dt = dt1......"
    
    
    for i in `/mnt/db_file/tcm/get_changed_dt.sh $dt`
    do
    
    echo "同步变化的日期======================>$i"
    
    
    	f $i
    done
    
    else
    
    echo "batch process..."
    
    
    start_day=$dt
    end_day=$dt1
    dt=$start_day
    while [[ $dt < `date -d "+1 day $end_day" +%Y-%m-%d` ]]
    do
            echo "批处理===>"$dt
            f $dt
       dt=`date -d "+1 day $dt" +%Y-%m-%d`
    done
    
      fi 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
  • 相关阅读:
    程序员必备的IP查询工具
    学习c#的第十五天
    小程序开发——小程序的事件
    Dubbo windows下Dubbo安装及相关配置
    【C语言】.c源文件从编译到链接生成可.exe执行程序的过程
    Python之数据库(MYSQL)连接
    江苏数据中心是如何保证数据安全
    剑指 Offer 34. 二叉树中和为某一值的路径
    网络安全运维工程师(NISP-SO)需要掌握那些知识点
    【Python】模块
  • 原文地址:https://blog.csdn.net/u013727054/article/details/134502414