在大数据开发过程中,遇到很多Spark写入mysql得场景,目前由于Spark仅支持以下几种
SaveMode: Append、Overwirte、ErrorIfExists、Ignore、ReplaceInto
由于在写入mysql时,需要数据根据主键进行更新,而不覆盖或追加,次业务场景也比较多。
看了Spark源码后,故此有两种方式进行数据更新
- public enum SaveMode {
- /**
- * Append mode means that when saving a DataFrame to a data source, if data/table already exists,
- * contents of the DataFrame are expected to be appended to existing data.
- *
- * @since 1.3.0
- */
- Append,
- /**
- * Overwrite mode means that when saving a DataFrame to a data source,
- * if data/table already exists, existing data is expected to be overwritten by the contents of
- * the DataFrame.
- *
- * @since 1.3.0
- */
- Overwrite,
- /**
- * ErrorIfExists mode means that when saving a DataFrame to a data source, if data already exists,
- * an exception is expected to be thrown.
- *
- * @since 1.3.0
- */
- ErrorIfExists,
- /**
- * Ignore mode means that when saving a DataFrame to a data source, if data already exists,
- * the save operation is expected to not save the contents of the DataFrame and to not
- * change the existing data.
- *
- * @since 1.3.0
- */
- Ignore,
- /**
- * REPLACE INTO用于实时覆盖写入数据。写入数据时,会先根据主键判断待写入的数据是否已经存在于表中,并根据判断结果选择不同的方式写入数据:
- * 如果待写入数据已经存在,则先删除该行数据,然后插入新的数据。
- * 如果待写入数据不存在,则直接插入新数据。--by Jyong
- *
- * @since 1.3.0
- */
- ReplaceInto,
-
- /**
- * 在MySQL数据库中,如果在insert语句后面带上ON DUPLICATE KEY UPDATE 子句,而要插入的行与表中现有记录的惟一索引或主键中产生重复值,
- * 那么就会发生旧行的更新;如果插入的行数据与现有表中记录的唯一索引或者主键不重复,则执行新纪录插入操作
- * 说通俗点就是数据库中存在某个记录时,执行这个语句会更新,而不存在这条记录时,就会插入
- */
- UpdateByDuk
- }
REPLACE INTO用于实时覆盖写入数据。写入数据时,会先根据主键/唯一键判断待写入的数据是否已经存在于表中,并根据判断结果选择不同的方式写入数据:
from mysql: MySQL :: MySQL 5.6 Reference Manual :: 13.2.8 REPLACE Statement
MySQL uses the following algorithm for REPLACE (and LOAD DATA ... REPLACE):
Try to insert the new row into the table
While the insertion fails because a duplicate-key error occurs for a primary key or unique index:
Delete from the table the conflicting row that has the duplicate key value
Try again to insert the new row into the table
此种方式代码开发量少
以Spark2.4.8为例
修改点:
新增一种枚举写入方式:
2.org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider#createRelation
3.org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils#getInsertStatement
在该方法中首先从上游传递SaveMode:mode参数
再更加写入的mode判断返回Insert INTO /Replace into sql

在MySQL数据库中,如果在insert语句后面带上ON DUPLICATE KEY UPDATE 子句,而要插入的行与表中现有记录的惟一索引或主键中产生重复值, 那么就会发生旧行的更新;如果插入的行数据与现有表中记录的唯一索引或者主键不重复,则执行新纪录插入操作说通俗点就是数据库中存在某个记录时,执行这个语句会更新,而不存在这条记录时,就会插入
from mysql:MySQL :: MySQL 5.6 Reference Manual :: 13.2.5.2 INSERT ... ON DUPLICATE KEY UPDATE Statement
If you specify an ON DUPLICATE KEY UPDATE clause and a row to be inserted would cause a duplicate value in a UNIQUE index or PRIMARY KEY, an UPDATE of the old row occurs. For example, if column a is declared as UNIQUE and contains the value 1, the following two statements have similar effect:
这种方式代码量较大
后续补上自己完善的代码
此种方式另外一位博主有源码:https://www.jb51.net/article/237582.htm
缺点:
更新的内容中unique key或者primary key最好保证一个,不然不能保证语句执行正确(有任意一个unique key重复就会走更新,当然如果更新的语句中在表中也有重复校验的字段,那么也不会更新成功而导致报错,只有当该条语句没有任何一个unique key重复才会插入新记录);尽量不对存在多个唯一键的table使用该语句,避免可能导致数据错乱。
在有可能有并发事务执行的insert 语句情况下不使用该语句,可能导致产生death lock。
如果数据表id是自动递增的不建议使用该语句;id不连续,如果前面更新的比较多,新增的下一条会相应跳跃的更大