我们的spark目标 是作为一个backup来使用 当tez有bug的时候启用备用的spark来完成此sql。所以呢表信息是只要一份就好。所以必须呢是使用之前的hive的schema,这就要求spark和hive必须集合起来用。
1 Spark on Hive :数据源是:hive Spark 获取hive中的数据,然后进行SparkSQL的操作 (hive只是作为一个spark的数据源)。 (我们是这种)
2 Hvie on Spark :(数据源是hive本身) Hvie 将自己的MapReduce计算引擎替换为Spark,当我们执行HiveSQL(HQL)时 (这种方式修改的地方比较多 比较麻烦)
Spark on Hive 两种sql方式
1:是spark-sql 提交sql到yarn里完成
2:thrift server 服务端口10016 接受sql 然后提交到yarn
这两种都可以采用两种方式来完成sql
1 Clint
2 yarn-client
Spark 运行的sql的时候需要两个东西 1driver 2 task
当选Clint 的时候呢 driver 和 task都在服务器本地完成。
当选择yarn-clint的时候
Driver 在服务器上运行 task在yarn上执行
spark-sql 和 thrift -server 他们之间sql解析会有不同
thrift -server 非常不好用 多几个连接会死 需要定时重启 还有bug
Driver 将数据从hdfs里获取 然后分配任务给task,总而言之就是管理task,和相关数据的程序
yarn-client 里 driver 会运行在本地服务器上 而且占用大量内存,这样导致yarn分配到的数据变少。Ambari里每台机器分到的内存设置必须是一样的。所以呢如果yarn-client运行在本地服务器上会造成大量内存浪费。
所以呢最好是使用spark-sql在cluster上运行
- spark-submit \
- --name test-of-spark-sql-cluter-mode \
- --class org.apache.spark.sql.hive.my.SparkSQLExecute \
- --master yarn \
- --deploy-mode cluster \
- --executor-cores 2 \
- --executor-memory 4G \
- --num-executors 8 \
- --conf spark.executor.memoryOverhead=1G \
- --conf spark.submit.deployMode=cluster \
- /home/xaadmin/my-spark-sql-cluster.jar 'prd' 'info' 'ods' 'select * from xxxx limit 10'
将下面scala打包成jar包
- package org.apache.spark.sql.hive.my
-
-
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.internal.Logging
- import org.apache.spark.sql.hive.HiveContext
- import org.apache.spark.{SparkConf, SparkContext}
-
-
- object SparkSQLExecute extends Logging {
-
- def main(args: Array[String]): Unit = {
-
- val startTime = System.currentTimeMillis
- log.info("=======================> 开始 <==========================")
-
- val app = s"${this.getClass.getSimpleName}".filter(!_.equals('$'))
-
- println("appName:" + app)
-
- // TODO: 初始化参数集合
- println("参数集合....")
-
- //参数校验
- if (args.length < 4) {
- logError("Usage: 未指定参数列表 " +
- "[env][loglevel] [hive_db] [SQL]")
- System.exit(1)
- }
-
- // TODO: 运行环境 (研发、测试、生产)
- val env = args(0)
- println("运行模式:" + env)
-
- // TODO: 日志级别设置
- val logLevel = args(1)
- println("日志级别:" + logLevel)
-
- logLevel match {
- case "debug" => Logger.getLogger("org").setLevel(Level.DEBUG)
- case "info" => Logger.getLogger("org").setLevel(Level.INFO)
- case "warn" => Logger.getLogger("org").setLevel(Level.WARN)
- case "error" => Logger.getLogger("org").setLevel(Level.ERROR)
- }
-
-
- // TODO: 验证参数
- println("验证参数....")
-
- val hive_db = args(2)
-
- val sqlText = args(3)
-
-
-
-
- // TODO:初始化环境
- println("初始化环境....")
-
-
- val sparkConf = new SparkConf()
- .setAppName(app)
- .set /*spark.sql.codegen 是否预编译sql成java字节码,长时间或频繁的sql有优化效果*/ ("spark.sql.codegen", "true")
- .set /*spark.sql.inMemoryColumnarStorage.batchSize 一次处理的row数量,小心oom*/ ("spark.sql.inMemoryColumnarStorage.batchSize", "20000")
- .set /*spark.sql.inMemoryColumnarStorage.compressed 设置内存中的列存储是否需要压缩*/ ("spark.sql.inMemoryColumnarStorage.compressed", "true")
- .set /*spark.sql.autoBroadcastJoinThreshold,解决数据倾斜*/ ("spark.sql.autoBroadcastJoinThreshold", "20971520")
- .set("hive.execution.engine", "spark")
- // .set("spark.sql.shuffle.partitions","200")
-
-
- if (env.equals("local")) {
- sparkConf.setMaster("local[*]")
- }
-
- val sc = new SparkContext(sparkConf)
-
- logLevel match {
- case "debug" => sc.setLogLevel(Level.DEBUG.toString)
- case "info" => sc.setLogLevel(Level.INFO.toString)
- case "warn" => sc.setLogLevel(Level.WARN.toString)
- case "error" => sc.setLogLevel(Level.ERROR.toString)
- }
-
- val hiveContext = new HiveContext(sc)
-
- if (env.equals("prd")) {
- hiveContext.sql("use " + hive_db)
- hiveContext.sql("set hive.exec.dynamic.partition = true")
- hiveContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
- }
-
- val arr = sqlText.split(";")
-
- for (i <- 0 until arr.length) {
- println(arr(i))
- hiveContext.sql(arr(i)).show(false)
- }
-
- // TODO:输出
- // resRDD.cache()
- // if(env=="local"){
- // resRDD.foreach(println)
- // }
- // val result_table = "LAR_DCS_RESULT_IND"
- //
- // println(s"$result_table 数据量:"+resRDD.count)
- //
- // if(env=="prd"){
- // resRDD.saveAsTextFile(paramConf.getString("out.hdfs.path")+"_"+System.currentTimeMillis())
- // }
-
-
- // hiveContext.sql(s"LOAD DATA LOCAL INPATH '${hdfsPath}' OVERWRITE INTO TABLE sx_core_safe.LAR_DCS_RESULT_IND")
-
-
- // val hdfs = org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfiguration)
- // val path = new Path(hdfsPath)
- // if(hdfs.exists(path)){
- // //为防止误删,禁止递归删除
- // hdfs.delete(path,false)
- // }
-
- //TODO:结束
- sc.stop()
- val endTime = System.currentTimeMillis
- println("耗时:" + (endTime - startTime) / 1000 + " 秒")
- println("=======================> 结束<==========================")
- }
-
- }