• 【Spark】spark对mysql的操作


    目录

    一、前言

    二、使用技巧

    1、读取mysql满足条件的行记录

    2、整体写入mysql的操作

    3、更新mysql的某行记录


    一、前言

            使用spark技术和mysql交互的时候往往会遇到以下几种情况,需要编写不同的api方式来满足开发过程中的不同需求,这里使用的语言为scala变成语言;

    • 读取mysql满足条件的行记录
    • 整体写入mysql的操作
    • 更新mysql的某行记录

    二、使用技巧

    1、读取mysql满足条件的行记录

    • 首先需要初始化SparkSession对象,这里比较常用通过连接hive的api获取、同理其他方式获取也可以;

            连接hive获取:

    1. //conHive方法在DBConUtil类中;
    2. def conHive(appName:String):SparkSession={
    3. SparkSession.builder()
    4. //.master("local[2]")
    5. .appName(appName)
    6. .config("spark.sql.broadcastTimeout","36000")
    7. // .config("spark.default.parallelism",1000)
    8. .config("hive.exec.dynamici.partition", true)
    9. .config("hive.exec.dynamic.partition.mode", "nonstrict")
    10. .enableHiveSupport()
    11. .getOrCreate()
    12. }
    13. val spark: SparkSession = DBConUtil.conHive("test")

            其他方式获取:

    1. val spark: SparkSession = SparkSession
    2. .builder()
    3. .appName("test")
    4. .master("local[*]")
    5. .getOrCreate()
    • 然后使用初始化好的SparkSession对象进行mysql数据库数据的读取操作;
    1. val properties = new Properties()
    2. properties.setProperty("user","mysqldb")
    3. properties.setProperty("password","pwd")
    4. val url="jdbc:mysql://ip:3306/test?characterEncoding=utf8&useSSL=true"
    5. var df= spark.read.jdbc(url,"table1",properties)
    6. .select("name","age","sex")
    7. .where("age>20")

    2、整体写入mysql的操作

            这里的整体写入mysql的操作的含义是将条件筛选之后的DataFram或着DataSet直接写入mysql,调用的是spark官方提供的api。所以首先要创建出来一个DataFram或者DataSet数据集,接下来就是直接写入;

    1. val properties = new Properties()
    2. properties.setProperty("user", mysqlUser)
    3. properties.setProperty("password", mysqlPwd)
    4. df.repartition(80).write.mode(SaveMode.Append).option("driver","com.mysql.jdbc.Driver")
    5. .jdbc(mysqlUrl, mysqlRetTable, properties)

    存储模式主要包含如下几种:

    1. SaveMode.ErrorIfExists【默认】模式,该模式下,如果数据库中已经存在该表,则会直接报异常,导致数据不能存入数据库;
    2. SaveMode.Append 如果表已经存在,则追加在该表中;若该表不存在,则会先创建表,再插入数据;
    3. SaveMode.Overwrite 重写模式,其实质是先将已有的表及其数据全都删除,再重新创建该表,最后插入新的数据;
    4. SaveMode.Ignore 若表不存在,则创建表,并存入数据;在表存在的情况下,直接跳过数据的存储,不会报错。

    3、更新mysql的某行记录

            有时候在写spark程序的时候需要对mysql中的单行或者多行的某些字段进行更新操作,spark api并没有提供这些操作,这里需要自己写原生的JDBC操作更新或者批量更新mysql记录;

    1. val connection: Connection = JdbcTemplateUtil.
    2. getConnection("jdbc:mysql://ip:3306/url_analyse?characterEncoding=utf8&useSSL=false",
    3. "mysqldb", "pwd")
    4. JdbcTemplateUtil.executeSql(connection,"insert into test01(id,test) values(?,?)",Array("117","aa"))
    5. //批量插入
    6. // var arrayBuffer = new ArrayBuffer[Array[String]]()
    7. // arrayBuffer += Array("220","bb")
    8. // arrayBuffer += Array("330","cc")
    9. // arrayBuffer += Array("440","dd")
    10. // JdbcTemplateUtil.executeBatchSql(connection,"insert into test01(id,test) values(?,?)",arrayBuffer)
    1. import com.mysql.jdbc.exceptions.jdbc4.CommunicationsException
    2. import java.sql.{Connection, DriverManager}
    3. import scala.collection.mutable.ArrayBuffer
    4. object JdbcTemplateUtil {
    5. /**
    6. * 单条操作
    7. * @param sql
    8. * @param params
    9. */
    10. def executeSql(conn: Connection, sql: String, params: Array[String]): Unit = {
    11. try {
    12. val ps = conn.prepareStatement(sql)
    13. if (params != null) {
    14. for (i <- params.indices)
    15. ps.setString(i + 1, params(i))
    16. }
    17. val update = ps.executeUpdate()
    18. ps.close()
    19. } catch {
    20. case e: Exception => println(">>>Execute Sql Exception..." + e)
    21. }
    22. }
    23. /**
    24. * 批量操作
    25. * @param sql
    26. * @param paramList
    27. */
    28. def executeBatchSql(conn: Connection, sql: String, paramList: ArrayBuffer[Array[String]]): Unit = {
    29. try {
    30. val ps = conn.prepareStatement(sql)
    31. conn.setAutoCommit(false)
    32. for (params: Array[String] <- paramList) {
    33. if (params != null) {
    34. for (i <- params.indices) ps.setString(i + 1, params(i))
    35. ps.addBatch()
    36. }
    37. }
    38. ps.executeBatch()
    39. conn.commit()
    40. ps.close()
    41. conn.close()
    42. } catch {
    43. case e: Exception => println(">>>Execute Batch Sql Exception..." + e)
    44. }
    45. }
    46. /**
    47. * 获取mysql连接
    48. * @param url
    49. * @param user
    50. * @param pwd
    51. * @return
    52. */
    53. def getConnection(url:String,user:String,pwd:String):Connection={
    54. //classOf[com.mysql.cj.jdbc.Driver]
    55. Class.forName("com.mysql.jdbc.Driver")
    56. DriverManager.getConnection(url,user,pwd)
    57. }
    58. }

  • 相关阅读:
    笔记 记录
    开发笔记 —— Centos7 在急救模式下修改密码
    Ble Mesh的Generic Model ID&Opcode
    AD20绘制电路板的外形
    【性能优化】虚拟懒加载(下拉滚动加载长列表)element-puls+el-table
    【Solidity】智能合约案例——②供应链金融合约
    【Kingbase FlySync】命令模式:部署双轨并行,并实现切换同步
    串行并行数据转换
    【React】classnames 库(可添加多个 className 类名)
    概括23种设计模式
  • 原文地址:https://blog.csdn.net/hyj_king/article/details/126852447