• Spark生产环境高效读写My SQL(df,rdd)


    读优化

    多分区并行读取1

    此种方式对于抽取数据量较大的表有很好的性能提升,但仅限于有连续数值型主键(比如自增id)或者比较散列的数值型字段的数据表。
    这样可以有效的防止数据倾斜的问题。

    val df1: DataFrame = session.read.format("jdbc")
          .option("url", "jdbc:mysql://node119:3306/hrdb_new?characterEncoding=utf8&serverTimezone=UTC&useSSL=false")
          .option("driver", "com.mysql.jdbc.Driver")
          .option("dbtable", "all_info")
          .option("user", "root")
          .option("password", "xxx")
          .option("fetchsize", "10000")
          // 以下4个配置项必须同时使用
          // 分区数量,可以理解为读取并行度、线程数
          .option("numPartitions", 200)
          // 分区字段,必须为数字、日期、时间戳字段
          .option("partitionColumn", "id")
          // lowerBound 和 upperBound 仅用于计算每个分区的取数步长,不用于数据过滤
          // 分区字段的最小值
          .option("lowerBound", lowerBound) //select min(id) from table
          // 分区字段的最大值
          .option("upperBound", upperBound) //select max(id) from table
          .load()
        println(df1.rdd.getNumPartitions) //200
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    多分区并行读取2

    此种方式对于第一种方式的扩展,比第一种更加的灵活。
    此方式可以指定除主键外的其他字段分区规则,更加灵活

    //借助jdbc函数,我们可以自定义predicates规则,predicates其实就是传入多个where条件,组装成数据,
    // 分区数就是predicates.size
    def jdbc(
          url: String,
          table: String,
          predicates: Array[String],
          connectionProperties: Properties): DataFrame = {}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    predicates 示例

    /**
    * 将9月16-12月15三个月的数据取出,按时间分为6个partition
    * 为了减少事例代码,这里的时间都是写死的
    * modified_time 为时间字段
    */
    
    
    val predicates =
        Array(
          "2015-09-16" -> "2015-09-30",
          "2015-10-01" -> "2015-10-15",
          "2015-10-16" -> "2015-10-31",
          "2015-11-01" -> "2015-11-14",
          "2015-11-15" -> "2015-11-30",
          "2015-12-01" -> "2015-12-15"
        ).map {
          case (start, end) =>
            s"cast(modified_time as date) >= date '$start' " + s"AND cast(modified_time as date) <= date '$end'"
        }
    
    
    /**
       * 将近90天的数据进行分区读取
       * 每一天作为一个分区,例如
       * Array(
       * "2015-09-17" -> "2015-09-18",
       * "2015-09-18" -> "2015-09-19",
       * ...)
       **/
       def getPredicates = {
        
        val cal = Calendar.getInstance()
        cal.add(Calendar.DATE, -90)
        val array = ArrayBuffer[(String,String)]()
        for (i <- 0 until 90) {
          val start = new SimpleDateFormat("yyyy-MM-dd").format(cal.getTime())
          cal.add(Calendar.DATE, +1)
          val end = new SimpleDateFormat("yyyy-MM-dd").format(cal.getTime())
          array += start -> end
        }
        val predicates = array.map {
          case (start, end) => s"gmt_create >= '$start' AND gmt_create < '$end'"
        }
        
        predicates.toArray
        }
    
    val url = "jdbc:mysql://node119:3306/hrdb_new?characterEncoding=utf8&serverTimezone=UTC&useSSL=false"
    val tableName1 = "all_info"
    val prop = new java.util.Properties
    prop.setProperty("user","root")
    prop.setProperty("password","xxx")
    prop.setProperty("driver", "com.mysql.jdbc.Driver")
    
    val df1: DataFrame = session.read.jdbc(url, tableName1,getPredicates, prop)
    
    println(df1.rdd.getNumPartitions) // 90
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    建议在使用此方式进行分片时,需要评估好 predicates.size 的个数,防止单片数据过大;同时需要自定义where语句的查询效率,防止查询语句出现慢SQL影响取数效率

    写优化

    更新写(UPSERT/INSERT OR UPDATE)

    更新接入比较复杂一些,一般结合 foreachPartition 使用。需要注意分区大小,数据量过大且分区数过小时容易出现OOM.
    同时需要目标表创建 UNIQUE KEY,因为需要基于UNIQUE KEY来实现UPSERT。

    df.foreachPartition(iter => {
      val conn = ds.getConnection
      val sql =
          """
            |INSERT INTO test_table (uid,a,b,c,d,e)
            |VALUES (?,?,?,?,?,?)
            |ON DUPLICATE KEY
            |UPDATE c = ?, d = ?
            |""".stripMargin
      val ps = conn.prepareStatement(sql)
      iter.foreach(row => {
        val uid = row.getAs[Long]("pid")
        val a = row.getAs[Long]("a")
        val b = row.getAs[String]("b")
        val c = row.getAs[java.math.BigDecimal]("c")
        val d = row.getAs[java.math.BigDecimal]("d")
        val e = row.getAs[Byte]("e")
        ps.setLong(1, uid)
        ps.setLong(2, a)
        ps.setString(3, b)
        ps.setBigDecimal(4, c)
        ps.setBigDecimal(5, d)
        ps.setByte(6, e)
        ps.setBigDecimal(7, c)
        ps.setBigDecimal(8, d)
        ps.executeUpdate()
      })
      DbUtil.close(conn)
    })
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    覆盖写

    df.write
      .format("jdbc")
      .mode(SaveMode.Overwrite)
      .option("driver", "com.mysql.jdbc.Driver")
      .option("url", "jdbc:mysql://192.168.x.xxx:3306/hrdb_test?rewriteBatchedStatements=true")
      .option("dbtable", table)
      .option("user", jdbcUser)
      .option("password", jdbcPass)
      // JDBC批大小,默认 1000,灵活调整该值可以提高写入性能
      .option("batchsize", 10000)
      // 事务级别,默认为 READ_UNCOMMITTED,无事务要求可以填 NONE 以提高性能
      .option("isolationLevel", "NONE")
      // 需要注意该配置项,Overwrite 模式下,不设置为 true 会删表重建
      .option("truncate", "true")
      .save()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    option("truncate", "false") :MySQL会先delete目标表,在create目标表
    .
    option("truncate", "true"):MySQL会直接truncate 目标表,避免重建表

    关于【rewriteBatchedStatements=true】

    此参数是加在 jdbcUrl里面的,下面讲下为什么要加这个参数?
    先说结论:

    源码中的实现使用的确实是 PreparedStatement 的addBatch()方法和executeBatch()方法,但是看了executeBatch()方法的实现后发现,它并不是每次执行一批插入,而是循环的去执行每条insert插入语句,这就造成只插入一条数据,而不是一批数据,导致每条数据到要开启和关闭一次事务,大量的事务操作很浪费时间。

    df-jdbc源码:

    1.addBatch()

    方法JdbcUtils.scala类下的savePartition方法下(注释很详细)

    在这里插入图片描述

    2.executeBatch()

    方法的具体实现是在StatementImpl.java 下的protected long[] executeBatchInternal() throws SQLException 方法
    主要代码如下:
    在这里插入图片描述
    在这里插入图片描述

    3. 通俗解释

    说句人话,df-jdbc的批量提交方法是这样:

    NSERT INTO table1 (name, age) VALUES (“xiaoming”, 10);
    INSERT INTO table1 (name, age) VALUES (“xiaoli”, 5);

    加上 rewriteBatchedStatements=true 就是这样:

    INSERT INTO table1 (name, age) VALUES (“xiaoming”, 10), (“xiaoli”, 5), …;

    4. 效率对比
    • 没加之前
      在这里插入图片描述
    • 加之后
      在这里插入图片描述

    还是很慢怎么办?

    有可能是你的服务器的磁盘IO达到了瓶颈了,下面给个我遇到的情况说明下

    场景:往MySQL同步数据,写了很久还没写完,spark-ui可以看到
    在这里插入图片描述

    原本以为是代码效率问题,但是我去检查了下MySQL所在机器的磁盘IO情况:
    在这里插入图片描述

    1. 可以看到 此时IO已经快打满了,说明有大量的磁盘操作。
      遇到这种情况,就要避开业务繁忙时期,写个定时,选择在无人的深夜进行数据导入,或者说服你的领导花钱上固态。
    2. 当然影响读写的因素还有网络带宽,如果你不是IO的原因,建议检查一下带宽
  • 相关阅读:
    CAA的VS Studio安装
    Vue.js核心技术解析与uni-app跨平台实战开发学习笔记 第12章 Vue3.X新特性解析 12.7 watch监听的使用
    单例设计模式
    Linux串口编程进阶
    STM32cubeMX配置FreeRTOS-51、USB-U盘读写
    了解Oracle中的Dual系统表
    spark基本原理&UI界面解读
    该段代码的程序流程是什么
    Jmeter快速入门
    第九天 Python爬虫之Scrapy(框架工作原理 )
  • 原文地址:https://blog.csdn.net/Lzx116/article/details/127687237