• 拉链表-spark版本


    采用spark实现的拉链表

    拉链表初始化

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions.lit
    
    /**
     * 拉链表初始化
     */
    object table_zip_initial {
    
      val lastDay = "9999-12-31"
    
      def main(args: Array[String]): Unit = {
    
        var table_base = "t_uac_organization" //基表
        var table_zip = "ods_uac_org_zip" //拉链表
        /**
         * 基于该天的t_uac_organization
         */
        var dt = "2023-01-31"
    
    
        System.setProperty("HADOOP_USER_NAME", "root")
        val builder = SparkUtils.getBuilder
    
        if (System.getProperties.getProperty("os.name").contains("Windows")) {
          builder.master("local[*]")
    
        } else {
    
          table_base = args(0)
          table_zip = args(1)
          dt = args(2)
        }
    
        val spark = builder
          .appName(this.getClass.getName).getOrCreate()
    
        val hive_db = "common"
    
        spark.sql(s"use $hive_db")
    
    
        /**
         * 初始化,一次
         */
        if (!TableUtils.tableExists(spark, hive_db, table_zip)) {
    
          println(s"$table_zip not exists,初始化")
          init(dt, spark, hive_db, table_base, table_zip)
    
        } else {
    
          val t_zip = spark.sql(
            s"""
               |
               |select * from $table_zip where dt='$lastDay'
               |
               |""".stripMargin)
    
          if (t_zip.isEmpty) {
            //init
    
            println(s"$table_zip isEmpty 初始化")
            init(dt, spark, hive_db, table_base, table_zip)
    
          } else {
    
            println(s"$table_zip exist and not empty,无需初始化!!!")
          }
    
        }
    
        spark.stop()
    
      }
    
      private def init(dt: String, spark: SparkSession, hive_db: String, table_base: String, table_zip: String): Unit = {
        val t_base = spark.sql(
          s"""
             |
             |select * from $table_base where dt='${dt}'
             |""".stripMargin)
    
    
        println(s"$table_base show")
        t_base.show(false)
    
        val ods_zip = t_base
          .drop("dt")
          .withColumn("t_start", lit(dt))
          .withColumn("t_end", lit(lastDay))
          .withColumn("dt", lit(lastDay))
    
        if (!ods_zip.isEmpty) {
    
          println(s"$table_zip show")
          ods_zip.show(false)
    
          println(s"$table_zip 初始化...")
    
          SinkUtil.sink_to_hive(lastDay, spark, ods_zip, hive_db, hive_table = s"$table_zip", "parquet", MySaveMode.OverWriteByDt)
        } else {
    
          println(s"$table_zip is empty,初始化失败...")
    
        }
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108

    拉链表每日滚动计算

    import org.apache.spark.sql.functions.{count, lit}
    import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
    import org.apache.spark.storage.StorageLevel
    
    /**
     * 明智优点组织架构拉链表
     * 记录其状态变化过程
     * 拉链表只能从装载首日起,一天一天滚动计算
     */
    object ods_uac_org_zip {
    
      val lastDay = "9999-12-31"
    
      def main(args: Array[String]): Unit = {
    
    
        var dt = "2023-02-01"
        var dt1 = "2023-02-02"
    
        System.setProperty("HADOOP_USER_NAME", "root")
        val builder = SparkUtils.getBuilder
    
        if (System.getProperties.getProperty("os.name").contains("Windows")) {
          builder.master("local[*]")
    
        } else {
    
          dt = args(0)
          dt1 = args(1)
        }
    
    
        val spark = builder
          .appName(this.getClass.getName).getOrCreate()
    
        val hive_db = "common"
    
        spark.sql(s"use $hive_db")
    
    
        new IDate {
          override def onDate(dt: String): Unit = {
    
            processByDt(spark, dt, hive_db)
    
          }
        }.invoke(dt, dt1)
    
    
        spark.stop()
    
      }
    
      /**
       * 滚动计算每个dt的对应的过期数据
       */
      def processByDt(spark: SparkSession, dt: String, hive_db: String): Unit = {
    
    
        val theDayBeforeDt = DateUtil.back1Day(dt + " 00:00:00").split(" ")(0)
    
        /**
         * 一定需要先缓存
         * 否则重算则fileNotFoundException
         * 但是缓存也有坑,不一定所有的数据都会缓存
         * 因此需要借助临时表处理或者设置ck
         */
        //TODO
        var ods_uac_org_zip = spark.sql(
          s"""
             |
             |select * from ods_uac_org_zip where dt='$lastDay'
             |""".stripMargin)
          .persist(StorageLevel.MEMORY_ONLY_SER_2)
    
        /**
         * 持久化为临时表
         */
        ods_uac_org_zip
          .repartition(3)
          .write
          .format("parquet")
          .mode(SaveMode.Overwrite)
          .saveAsTable(s"${hive_db}.ods_uac_org_zip_tmp")
    
        /**
         * 已经指向临时表
         * 后续方便对源表(ods_uac_org_zip)进行更新
         */
        ods_uac_org_zip = spark.sql(
          s"""
             |
             |select * from ods_uac_org_zip_tmp
             |""".stripMargin)
    
    
        /**
         * old,已经存在的拉链表的最新全量
         */
        val f_old_9999 = ods_uac_org_zip
          .drop("dt")
    
    
        println("f_old_9999 show")
        f_old_9999.show(false)
    
        /**
         * dt该天的新增和变化
         */
        val f_new = spark.sql(
          s"""
             |
             |select * from new_change_t_uac_organization where dt='${dt}'
             |""".stripMargin)
          .drop("dt")
    
          .withColumnRenamed("id", "id2")
          .withColumnRenamed("org_name", "org_name2")
          .withColumnRenamed("parent_id", "parent_id2")
          .withColumnRenamed("sort", "sort2")
          .withColumnRenamed("org_type", "org_type2")
          .withColumnRenamed("org_level", "org_level2")
          .withColumnRenamed("is_auth_scope", "is_auth_scope2")
          .withColumnRenamed("parent_auth_scope_id", "parent_auth_scope_id2")
          .withColumnRenamed("status", "status2")
          .withColumnRenamed("icon_class", "icon_class2")
          .withColumnRenamed("create_id", "create_id2")
          .withColumnRenamed("create_time", "create_time2")
          .withColumnRenamed("update_id", "update_id2")
          .withColumnRenamed("update_time", "update_time2")
          .withColumnRenamed("version", "version2")
          .withColumn("t_start2", lit(dt))
          .withColumn("t_end2", lit(lastDay))
    
    
        println("f_new show")
        f_new.show(false)
    
        val f1 = f_old_9999.join(f_new, f_old_9999.col("id") === f_new.col("id2"), "full_outer")
    
        f1.createOrReplaceTempView("v1")
    
        println("v1 temp show")
        f1.show(false)
    
        f1.filter(s"id='1008'").show(false)
    
    
        /**
         * 这是所有dt=9999的
         */
        val f_9999: DataFrame = spark.sql(
          """
            |
            |select
            |nvl(id2,id) as id
            |,nvl(org_name2,org_name) as org_name
            |,nvl(parent_id2,parent_id) as parent_id
            |,nvl(sort2,sort) as sort
            |,nvl(org_type2,org_type) as org_type
            |,nvl(org_level2,org_level) as org_level
            |,nvl(is_auth_scope2,is_auth_scope) as is_auth_scope
            |,nvl(parent_auth_scope_id2,parent_auth_scope_id) as parent_auth_scope_id
            |,nvl(status2,status) as status
            |,nvl(icon_class2,icon_class) as icon_class
            |,nvl(create_id2,create_id) as create_id
            |,nvl(create_time2,create_time) as create_time
            |,nvl(update_id2,update_id) as update_id
            |,nvl(update_time2,update_time) as update_time
            |,nvl(version2,version) as version
            |,nvl(t_start2,t_start) as t_start
            |,nvl(t_end2,t_end) as t_end
            |,nvl(t_end2,t_end) as dt
            |
            |from v1
            |
            |
            |""".stripMargin)
    
    
        /**
         * +----+--------------------------------------------+---------+----+--------+---------+-------------+--------------------+------+-------------------------+---------+-------------------+---------+-------------------+-------+----------+----------+----------+
         * |id  |org_name                                    |parent_id|sort|org_type|org_level|is_auth_scope|parent_auth_scope_id|status|icon_class               |create_id|create_time        |update_id|update_time        |version|t_start   |t_end     |dt        |
         * +----+--------------------------------------------+---------+----+--------+---------+-------------+--------------------+------+-------------------------+---------+-------------------+---------+-------------------+-------+----------+----------+----------+
         * |1   |运营系统                                    |0        |0   |4       |1        |N            |null                |1     |iconfont icon-xitong     |655      |2019-05-20 17:58:11|null     |null               |null   |2023-01-31|9999-12-31|9999-12-31|
         */
        println("f_9999 show")
        f_9999.show(false)
    
    
        println(s"在${dt}的发生状态变化的,新的有效区间[$dt,$lastDay]...")
        f_9999.filter(s"t_start='$dt'").show()
    
    
        f_9999.groupBy("dt")
          .agg(count("id"))
          .show()
    
    
        /**
         * 过期的数据
         * 需要闭合t_end
         * dt天发现有变化,那么则在dt-1天过期
         * 过期的数据:上一次的起始时间,必然小于dt(这个条件很重要,否则幂等计算会有问题,会把计算过的历史分区的起始时间给覆盖掉)
         */
        val f_expire = spark.sql(
          s"""
             |
             |select
             |id,
             |org_name,
             |parent_id,
             |sort,
             |org_type,
             |org_level,
             |is_auth_scope,
             |parent_auth_scope_id,
             |status,
             |icon_class,
             |create_id,
             |create_time,
             |update_id,
             |update_time,
             |version,
             |t_start,
             |cast(date_add('${dt}',-1) as string) as t_end,
             |cast(date_add('${dt}',-1) as string) as dt
             |
             |from v1
             |where id2 is not null and id is not null and t_start<'$dt'
             |
             |""".stripMargin)
    
        println("f_expire show")
        f_expire.show(false)
    
    
        /**
         * 没有动态分区,那就分别各自持久化
         */
        if (!f_9999.isEmpty) {
    
          SinkUtil.sink_to_hive(lastDay, spark, f_9999, hive_db, hive_table = "ods_uac_org_zip", "parquet", MySaveMode.OverWriteByDt)
    
        }
    
        if (!f_expire.isEmpty) {
    
          SinkUtil.sink_to_hive(theDayBeforeDt, spark, f_expire, hive_db, hive_table = "ods_uac_org_zip", "parquet", MySaveMode.OverWriteByDt)
    
        }
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
  • 相关阅读:
    【List-Watch】
    【SpringBoot】微服务中异步调用数据提交数据库的问题
    android开发布局知识
    SpringBoot实现登录拦截器
    Simple RPC - 02 通用高性能序列化和反序列化设计与实现
    Spring简介、IOC容器
    数据建设实践之大数据平台(二)安装zookeeper
    ES6 解构赋值--一般用法
    如何进行Java项目的构建和部署?
    基于微信小程序的商城设计
  • 原文地址:https://blog.csdn.net/u013727054/article/details/134502942