• spark


    目标

    我们的spark目标 是作为一个backup来使用 当tez有bug的时候启用备用的spark来完成此sql。所以呢表信息是只要一份就好。所以必须呢是使用之前的hive的schema,这就要求spark和hive必须集合起来用。

    一Spark  与 hive 结合方式

    1 Spark on Hive :数据源是:hive  Spark 获取hive中的数据,然后进行SparkSQL的操作  (hive只是作为一个spark的数据源)。  (我们是这种)

    2 Hvie on Spark :(数据源是hive本身)   Hvie 将自己的MapReduce计算引擎替换为Spark,当我们执行HiveSQL(HQL)时 (这种方式修改的地方比较多 比较麻烦)

    二选择Spark on Hive之后

     Spark on Hive 两种sql方式

    1:是spark-sql 提交sql到yarn里完成

    2:thrift  server 服务端口10016 接受sql 然后提交到yarn

    这两种都可以采用两种方式来完成sql

    1  Clint

    2  yarn-client

    Spark 运行的sql的时候需要两个东西 1driver 2 task

    当选Clint 的时候呢 driver 和 task都在服务器本地完成。

    当选择yarn-clint的时候

    Driver 在服务器上运行 task在yarn上执行

    spark-sql  和 thrift -server 他们之间sql解析会有不同

    thrift -server 非常不好用 多几个连接会死 需要定时重启 还有bug

    三 Driver 和 task 的关系

    Driver 将数据从hdfs里获取 然后分配任务给task,总而言之就是管理task,和相关数据的程序

    四Spark-sql on yarn-cluster

    yarn-client 里 driver 会运行在本地服务器上 而且占用大量内存,这样导致yarn分配到的数据变少。Ambari里每台机器分到的内存设置必须是一样的。所以呢如果yarn-client运行在本地服务器上会造成大量内存浪费。

    所以呢最好是使用spark-sql在cluster上运行

    1. spark-submit \
    2.   --name test-of-spark-sql-cluter-mode \
    3.   --class org.apache.spark.sql.hive.my.SparkSQLExecute \
    4.   --master yarn \
    5.   --deploy-mode cluster \
    6.   --executor-cores 2 \
    7.   --executor-memory 4G \
    8.   --num-executors 8 \
    9.   --conf spark.executor.memoryOverhead=1G \
    10.   --conf spark.submit.deployMode=cluster \
    11. /home/xaadmin/my-spark-sql-cluster.jar  'prd' 'info' 'ods' 'select * from xxxx limit 10'

    将下面scala打包成jar包

    1. package org.apache.spark.sql.hive.my
    2. import org.apache.log4j.{Level, Logger}
    3. import org.apache.spark.internal.Logging
    4. import org.apache.spark.sql.hive.HiveContext
    5. import org.apache.spark.{SparkConf, SparkContext}
    6. object SparkSQLExecute extends Logging {
    7. def main(args: Array[String]): Unit = {
    8. val startTime = System.currentTimeMillis
    9. log.info("=======================> 开始 <==========================")
    10. val app = s"${this.getClass.getSimpleName}".filter(!_.equals('$'))
    11. println("appName:" + app)
    12. // TODO: 初始化参数集合
    13. println("参数集合....")
    14. //参数校验
    15. if (args.length < 4) {
    16. logError("Usage: 未指定参数列表 " +
    17. "[env][loglevel] [hive_db] [SQL]")
    18. System.exit(1)
    19. }
    20. // TODO: 运行环境 (研发、测试、生产)
    21. val env = args(0)
    22. println("运行模式:" + env)
    23. // TODO: 日志级别设置
    24. val logLevel = args(1)
    25. println("日志级别:" + logLevel)
    26. logLevel match {
    27. case "debug" => Logger.getLogger("org").setLevel(Level.DEBUG)
    28. case "info" => Logger.getLogger("org").setLevel(Level.INFO)
    29. case "warn" => Logger.getLogger("org").setLevel(Level.WARN)
    30. case "error" => Logger.getLogger("org").setLevel(Level.ERROR)
    31. }
    32. // TODO: 验证参数
    33. println("验证参数....")
    34. val hive_db = args(2)
    35. val sqlText = args(3)
    36. // TODO:初始化环境
    37. println("初始化环境....")
    38. val sparkConf = new SparkConf()
    39. .setAppName(app)
    40. .set /*spark.sql.codegen 是否预编译sql成java字节码,长时间或频繁的sql有优化效果*/ ("spark.sql.codegen", "true")
    41. .set /*spark.sql.inMemoryColumnarStorage.batchSize 一次处理的row数量,小心oom*/ ("spark.sql.inMemoryColumnarStorage.batchSize", "20000")
    42. .set /*spark.sql.inMemoryColumnarStorage.compressed 设置内存中的列存储是否需要压缩*/ ("spark.sql.inMemoryColumnarStorage.compressed", "true")
    43. .set /*spark.sql.autoBroadcastJoinThreshold,解决数据倾斜*/ ("spark.sql.autoBroadcastJoinThreshold", "20971520")
    44. .set("hive.execution.engine", "spark")
    45. // .set("spark.sql.shuffle.partitions","200")
    46. if (env.equals("local")) {
    47. sparkConf.setMaster("local[*]")
    48. }
    49. val sc = new SparkContext(sparkConf)
    50. logLevel match {
    51. case "debug" => sc.setLogLevel(Level.DEBUG.toString)
    52. case "info" => sc.setLogLevel(Level.INFO.toString)
    53. case "warn" => sc.setLogLevel(Level.WARN.toString)
    54. case "error" => sc.setLogLevel(Level.ERROR.toString)
    55. }
    56. val hiveContext = new HiveContext(sc)
    57. if (env.equals("prd")) {
    58. hiveContext.sql("use " + hive_db)
    59. hiveContext.sql("set hive.exec.dynamic.partition = true")
    60. hiveContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
    61. }
    62. val arr = sqlText.split(";")
    63. for (i <- 0 until arr.length) {
    64. println(arr(i))
    65. hiveContext.sql(arr(i)).show(false)
    66. }
    67. // TODO:输出
    68. // resRDD.cache()
    69. // if(env=="local"){
    70. // resRDD.foreach(println)
    71. // }
    72. // val result_table = "LAR_DCS_RESULT_IND"
    73. //
    74. // println(s"$result_table 数据量:"+resRDD.count)
    75. //
    76. // if(env=="prd"){
    77. // resRDD.saveAsTextFile(paramConf.getString("out.hdfs.path")+"_"+System.currentTimeMillis())
    78. // }
    79. // hiveContext.sql(s"LOAD DATA LOCAL INPATH '${hdfsPath}' OVERWRITE INTO TABLE sx_core_safe.LAR_DCS_RESULT_IND")
    80. // val hdfs = org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfiguration)
    81. // val path = new Path(hdfsPath)
    82. // if(hdfs.exists(path)){
    83. // //为防止误删,禁止递归删除
    84. // hdfs.delete(path,false)
    85. // }
    86. //TODO:结束
    87. sc.stop()
    88. val endTime = System.currentTimeMillis
    89. println("耗时:" + (endTime - startTime) / 1000 + " 秒")
    90. println("=======================> 结束<==========================")
    91. }
    92. }

  • 相关阅读:
    基于armv8的kvm实现分析(一)虚拟化介绍
    ssm基于Html+css的音乐网站的设计与实现毕业设计源码181627
    手机相机系统介绍
    [NOI2018]情报中心
    排序2:直接选择排序、堆排序、直接插入排序、希尔排序
    tensorflow求解泊松方程
    Camera-ISP DP/DM/EE/NR/CR
    MySQL数据库 || 增删改查操作详解
    wxWidgets Here
    clone()方法使用时遇到的问题解决方法(JAVA)
  • 原文地址:https://blog.csdn.net/jy00885876/article/details/126181455