此种方式对于抽取数据量较大的表有很好的性能提升,但仅限于有
连续数值型主键
(比如自增id)或者比较散列
的数值型字段的数据表。
这样可以有效的防止数据倾斜的问题。
val df1: DataFrame = session.read.format("jdbc")
.option("url", "jdbc:mysql://node119:3306/hrdb_new?characterEncoding=utf8&serverTimezone=UTC&useSSL=false")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "all_info")
.option("user", "root")
.option("password", "xxx")
.option("fetchsize", "10000")
// 以下4个配置项必须同时使用
// 分区数量,可以理解为读取并行度、线程数
.option("numPartitions", 200)
// 分区字段,必须为数字、日期、时间戳字段
.option("partitionColumn", "id")
// lowerBound 和 upperBound 仅用于计算每个分区的取数步长,不用于数据过滤
// 分区字段的最小值
.option("lowerBound", lowerBound) //select min(id) from table
// 分区字段的最大值
.option("upperBound", upperBound) //select max(id) from table
.load()
println(df1.rdd.getNumPartitions) //200
此种方式对于第一种方式的扩展,比第一种更加的灵活。
此方式可以指定除主键外的其他字段分区规则,更加灵活
//借助jdbc函数,我们可以自定义predicates规则,predicates其实就是传入多个where条件,组装成数据,
// 分区数就是predicates.size
def jdbc(
url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame = {}
/**
* 将9月16-12月15三个月的数据取出,按时间分为6个partition
* 为了减少事例代码,这里的时间都是写死的
* modified_time 为时间字段
*/
val predicates =
Array(
"2015-09-16" -> "2015-09-30",
"2015-10-01" -> "2015-10-15",
"2015-10-16" -> "2015-10-31",
"2015-11-01" -> "2015-11-14",
"2015-11-15" -> "2015-11-30",
"2015-12-01" -> "2015-12-15"
).map {
case (start, end) =>
s"cast(modified_time as date) >= date '$start' " + s"AND cast(modified_time as date) <= date '$end'"
}
/**
* 将近90天的数据进行分区读取
* 每一天作为一个分区,例如
* Array(
* "2015-09-17" -> "2015-09-18",
* "2015-09-18" -> "2015-09-19",
* ...)
**/
def getPredicates = {
val cal = Calendar.getInstance()
cal.add(Calendar.DATE, -90)
val array = ArrayBuffer[(String,String)]()
for (i <- 0 until 90) {
val start = new SimpleDateFormat("yyyy-MM-dd").format(cal.getTime())
cal.add(Calendar.DATE, +1)
val end = new SimpleDateFormat("yyyy-MM-dd").format(cal.getTime())
array += start -> end
}
val predicates = array.map {
case (start, end) => s"gmt_create >= '$start' AND gmt_create < '$end'"
}
predicates.toArray
}
val url = "jdbc:mysql://node119:3306/hrdb_new?characterEncoding=utf8&serverTimezone=UTC&useSSL=false"
val tableName1 = "all_info"
val prop = new java.util.Properties
prop.setProperty("user","root")
prop.setProperty("password","xxx")
prop.setProperty("driver", "com.mysql.jdbc.Driver")
val df1: DataFrame = session.read.jdbc(url, tableName1,getPredicates, prop)
println(df1.rdd.getNumPartitions) // 90
建议在使用此方式进行分片时,需要评估好
predicates.size
的个数,防止单片数据过大
;同时需要自定义where语句的查询效率,防止查询语句出现慢SQL影响取数效率
。
更新接入比较复杂一些,一般结合
foreachPartition
使用。需要注意分区大小,数据量过大且分区数过小时容易出现OOM
.
同时需要目标表创建 UNIQUE KEY,因为需要基于UNIQUE KEY来实现UPSERT。
df.foreachPartition(iter => {
val conn = ds.getConnection
val sql =
"""
|INSERT INTO test_table (uid,a,b,c,d,e)
|VALUES (?,?,?,?,?,?)
|ON DUPLICATE KEY
|UPDATE c = ?, d = ?
|""".stripMargin
val ps = conn.prepareStatement(sql)
iter.foreach(row => {
val uid = row.getAs[Long]("pid")
val a = row.getAs[Long]("a")
val b = row.getAs[String]("b")
val c = row.getAs[java.math.BigDecimal]("c")
val d = row.getAs[java.math.BigDecimal]("d")
val e = row.getAs[Byte]("e")
ps.setLong(1, uid)
ps.setLong(2, a)
ps.setString(3, b)
ps.setBigDecimal(4, c)
ps.setBigDecimal(5, d)
ps.setByte(6, e)
ps.setBigDecimal(7, c)
ps.setBigDecimal(8, d)
ps.executeUpdate()
})
DbUtil.close(conn)
})
df.write
.format("jdbc")
.mode(SaveMode.Overwrite)
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://192.168.x.xxx:3306/hrdb_test?rewriteBatchedStatements=true")
.option("dbtable", table)
.option("user", jdbcUser)
.option("password", jdbcPass)
// JDBC批大小,默认 1000,灵活调整该值可以提高写入性能
.option("batchsize", 10000)
// 事务级别,默认为 READ_UNCOMMITTED,无事务要求可以填 NONE 以提高性能
.option("isolationLevel", "NONE")
// 需要注意该配置项,Overwrite 模式下,不设置为 true 会删表重建
.option("truncate", "true")
.save()
option("truncate", "false")
:MySQL会先delete目标表,在create目标表
.
option("truncate", "true")
:MySQL会直接truncate 目标表,避免重建表
此参数是加在 jdbcUrl里面的,下面讲下为什么要加这个参数?
先说结论:
源码中的实现使用的确实是 PreparedStatement 的addBatch()方法和executeBatch()方法,但是看了executeBatch()方法的实现后发现,它并不是每次执行一批插入,而是循环的去执行每条insert插入语句
,这就造成只插入一条数据,而不是一批数据,导致每条数据到要开启和关闭一次事务
,大量的事务操作很浪费时间。
方法JdbcUtils.scala
类下的savePartition
方法下(注释很详细)
方法的具体实现是在StatementImpl.java
下的protected long[] executeBatchInternal() throws SQLException
方法
主要代码如下:
说句人话,df-jdbc的批量提交方法是这样:
NSERT INTO table1 (name, age) VALUES (“xiaoming”, 10);
INSERT INTO table1 (name, age) VALUES (“xiaoli”, 5);
…
加上 rewriteBatchedStatements=true
就是这样:
INSERT INTO table1 (name, age) VALUES (“xiaoming”, 10), (“xiaoli”, 5), …;
有可能是你的服务器的
磁盘IO
达到了瓶颈了,下面给个我遇到的情况说明下
场景:往MySQL同步数据,写了很久还没写完,spark-ui可以看到
原本以为是代码效率问题,但是我去检查了下MySQL所在机器的磁盘IO情况: