• SparkSQL部分的代码整理(具体的理解过程结合手册 IDEA版)


    对比于之前所学的MySQL和hive以及Spark,SparkSQL存在不可替代的高性能,SparkSQL在很多公司也是进行使用的,所以就是针对于代码进行一个整理的过程,留下一个熟悉代码的过程.Spark内核是真的难以理解,也就写写SQL比较简单.

    • 针对于Spark环境的理解(资料的2.6节)

    构建之前需要在xml文件之中添加依赖.(注意:这里的依赖的顺序是不可以颠倒的,否则会出现报错的情况,具体原因自己理解)

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.apache.sparkgroupId>
    4. <artifactId>spark-sql_2.12artifactId>
    5. <version>3.0.0version>
    6. dependency>
    7. <dependency>
    8. <groupId>mysqlgroupId>
    9. <artifactId>mysql-connector-javaartifactId>
    10. <version>5.1.27version>
    11. dependency>
    12. <dependency>
    13. <groupId>org.apache.sparkgroupId>
    14. <artifactId>spark-hive_2.12artifactId>
    15. <version>3.0.0version>
    16. dependency>
    17. dependencies>

    写一段json格式的代码进行案例操作:(名称为user.json),实际的文件会出现标红的问题,并不是出现了错误,这里应当注意.

    1. {"id":1001,"age":20,"name":"qiaofeng"}
    2. {"id":1002,"age":19,"name":"xuzhu"}
    3. {"id":1003,"age":18,"name":"duanyu"}

    为什么json格式是上面的样式???

    SparkSQL读取文件采用的是Hadoop的方式,按行进行读取,需要的每一行数据都是符合json格式.

    测试代码段如下所示:

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
    3. object SparkSQL01 {
    4. def main(args: Array[String]): Unit = {
    5. /* 方式一:进行builder构建起进行设计者模式
    6. val sparkSession: SparkSession = SparkSession.builder()
    7. .master("local[2]")
    8. .appName("SparkSQL")
    9. .getOrCreate()
    10. */
    11. //方式二:使用普通模式进行相应的调用过程
    12. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQl")
    13. val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    14. //import sparkSession.implicits._
    15. //TODO: 1.准备json格式的文件
    16. //json文件要求:文件的内容是符合json文件的格式
    17. val dataFrame: DataFrame = sparkSession.read.json("user.json")
    18. //TODO: 2.将DataFrame转换为表进行SQL访问
    19. dataFrame.createOrReplaceTempView("user")
    20. //在程序之中写入sql是不方便的,因此是使用多个引号进行一个操作的过程
    21. sparkSession.sql(
    22. """
    23. |select
    24. |*
    25. |from
    26. |user
    27. """.stripMargin
    28. ).show
    29. sparkSession.stop()
    30. }
    31. }
    • DSL&SQL
    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
    3. object SparkSQL01 {
    4. def main(args: Array[String]): Unit = {
    5. //方式二:使用普通模式进行相应的调用过程
    6. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQl")
    7. val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    8. //import sparkSession.implicits._
    9. //TODO: 1.准备json格式的文件
    10. //json文件要求:文件的内容是符合json文件的格式
    11. val df: DataFrame = spark.read.json("user.json")
    12. //下面进行隐式转换之中spark不是包名,是SparkSession的对象名称
    13. // 使用的scala中的对象导入语法
    14. //这个对象必须是val类型的,var类型是不可以的
    15. import spark.implicits._//这里的引入使得在进行相应的操作的时候不进行标红[这一行的代码必须是在这里的]
    16. //TODO: 2.将DataFrame转换为表进行SQL访问
    17. //TODO 采用另外一种方式进行读取
    18. // df.select("id").show//注意这里必须是存在的一列,讲其中的一列进行读取
    19. df.select($"age"+1).show//上面的隐式转换操作进行注意的地方就是这里
    20. spark.stop()
    21. }
    22. }
    • 数据模型之间的转换(这个操作可以在linux端之间进行操作,下面是在IDEA端进行操作的过程)

    数据模型的转换表是如下所示:

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.rdd.RDD
    3. import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
    4. object SparkSQL01 {
    5. def main(args: Array[String]): Unit = {
    6. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQl")
    7. val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    8. import spark.implicits._//不管是以后有没有用到都是要进行一个添加的过程
    9. //RDD => DataFrame
    10. val rdd = spark.sparkContext.makeRDD(List(
    11. (1,"zhangsan",30),
    12. (2,"lisi",40),
    13. (3,"wangwu",50),
    14. ))
    15. //将DF转换为DS的时候,设定类型时,类型应该是与该结构的字段想匹配的
    16. //讲DF转换为DS的时候,类型的属性可以比字段数少
    17. val df: DataFrame = rdd.toDF("id","name","age")
    18. //DataFrame => Dataset
    19. val ds: Dataset[User] = df.as[User]
    20. //Dataset => DataFrame
    21. val df1: DataFrame = ds.toDF()//toDF之中是可以传入相应的参数,只是将上面的列名进行一个转换的过程,动态转换
    22. //DataFrame => RDD
    23. val rdd1: RDD[Row] = df1.rdd
    24. //RDD => Dataset
    25. val ds2: Dataset[User] = rdd.map {
    26. case (id, name, age) => {
    27. User(id, name, age)
    28. }
    29. }.toDS()
    30. //Dataset => RDD
    31. val rdd2: RDD[User] = ds2.rdd
    32. spark.stop()
    33. }
    34. case class User(id:Int,name: String,age:Int)//样例类
    35. //这里的样例类的顺序如果要是进行改变的话,实际是不改变的 val df: DataFrame = rdd.toDF("id","name","age")已经定义好了
    36. //如果要是这里的定义跟前面是不一样的,将出现错误
    37. //将里面的定义删除掉一个也是可以出现正确的结果的,只是展示的时候是正确的
    38. }
    • DataFrame和DataSet的区别

     

    可以见到,DataFrame和DataSet的颜色是不一样的,但是DataFrame的颜色和String是一样的,点进源码之中可以看到DataFrame就是DataSet的一个别名.

    DataFrame就是DataSet的一个特例.

    • UDF函数(资料的2.7节)

    UDF函数的含义:在SQL之中使用自定义的函数,完成逻辑的实现

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.rdd.RDD
    3. import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
    4. object SparkSQL01 {
    5. def main(args: Array[String]): Unit = {
    6. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQl")
    7. val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    8. import spark.implicits._//不管是以后有没有用到都是要进行一个添加的过程
    9. val df: DataFrame = spark.read.json("user.json")
    10. df.createOrReplaceTempView("user")
    11. //TODO UDF:进行一个逻辑的自定义的过程
    12. spark.udf.register("prefixName",(name:String)=>{
    13. "Name:" + name
    14. })
    15. //注意这里sql之中的想加是进行数字的相加并不是进行一个字符串的加
    16. //mysql => concat(s1,s2,s3)
    17. //oracle => s1 // s2 //s3
    18. spark.sql(
    19. """
    20. |select
    21. |id,age,prefixName(name)
    22. |from
    23. |user
    24. """.stripMargin
    25. )
    26. spark.stop()
    27. }
    28. case class User(id:Int,name: String,age:Int)//样例类
    29. //这里的样例类的顺序如果要是进行改变的话,实际是不改变的 val df: DataFrame = rdd.toDF("id","name","age")已经定义好了
    30. //如果要是这里的定义跟前面是不一样的,将出现错误
    31. //将里面的定义删除掉一个也是可以出现正确的结果的,只是展示的时候是正确的
    32. }
    • UDAF函数

    这里的A是代表聚合的含义.

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.rdd.RDD
    3. import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
    4. import org.apache.spark.sql.types.{DataType, IntegerType, StructField, StructType}
    5. import org.apache.spark.sql._
    6. object SparkSQL01 {
    7. def main(args: Array[String]): Unit = {
    8. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQl")
    9. val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    10. import spark.implicits._//不管是以后有没有用到都是要进行一个添加的过程
    11. val df: DataFrame = spark.read.json("user.json")
    12. df.createOrReplaceTempView("user")
    13. //TODO UDAF:使用一个聚合逻辑进行处理,这里相应的方法见后面的逻辑
    14. spark.udf.register("avgAge",new AvgAgeUDAF)
    15. //注意这里sql之中的想加是进行数字的相加并不是进行一个字符串的加
    16. //mysql => concat(s1,s2,s3)
    17. //oracle => s1 // s2 //s3
    18. spark.sql(
    19. """
    20. |select
    21. |avgAge(age)
    22. |from
    23. |user
    24. """.stripMargin
    25. )
    26. spark.stop()
    27. }
    28. //TODO 自定义聚合函数
    29. //1.继承类
    30. //2.重写方法
    31. class AvgAgeUDAF extends UserDefinedAggregateFunction{
    32. //TODO 输入的数据结构
    33. override def inputSchema: StructType = {//样例类
    34. StructType(
    35. Array(
    36. StructField("age",IntegerType)
    37. )
    38. )
    39. }
    40. //TODO 在上面的数据之中buffer之中的是缓冲区之中的东西 120 数量3
    41. //TODO 缓冲区之中的数据结构
    42. override def bufferSchema: StructType = {//样例类
    43. StructType(
    44. Array(
    45. StructField("total",IntegerType),
    46. StructField("count",IntegerType)
    47. )
    48. )
    49. }
    50. //TODO 输出的数据结构
    51. override def dataType: DataType = IntegerType
    52. //TODO 稳定性
    53. override def deterministic: Boolean = true
    54. //TODO 缓冲区的初始化
    55. override def initialize(buffer: MutableAggregationBuffer): Unit = {
    56. buffer.update(0,0)
    57. buffer.update(1,0)
    58. }
    59. //TODO 更新:使用输入的值进行更新缓冲区
    60. override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    61. buffer.update(0,buffer.getInt(0)+input.getInt(0))//旧的值和新的值的共同作用
    62. buffer.update(0,buffer.getInt(1)+1)
    63. }
    64. //TODO 将多个缓冲区的书进行合并
    65. override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    66. buffer1.update(0,buffer1.getInt(0) + buffer2.getInt(0))//旧的值和新的值的共同作用
    67. buffer1.update(0,buffer1.getInt(1) + buffer2.getInt(1))
    68. }
    69. //TODO 计算结果
    70. override def evaluate(buffer: Row): Any = {
    71. buffer.getInt(0) / buffer.getInt(1)
    72. }
    73. }
    74. }
    • UDAF函数的实现思路

    上述代码的RDD实现过程

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.rdd.RDD
    3. import org.apache.spark.sql._
    4. import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
    5. import org.apache.spark.sql.types.{DataType, IntegerType, StructField, StructType}
    6. object SparkSQL01 {
    7. def main(args: Array[String]): Unit = {
    8. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQl")
    9. val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() //不管是以后有没有用到都是要进行一个添加的过程
    10. val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List(
    11. (1, "张三", 30),
    12. (2, "lisi", 40),
    13. (3, "wangwu", 50)
    14. ))
    15. val rdd1: RDD[Int] = rdd.map(_._3)
    16. val avg = rdd1.sum / rdd1.count()
    17. println(avg)
    18. spark.stop()
    19. }
    20. }

    为何在实际的使用过程之中,我们不采用上述的代码,因为实际应用的时候,上面的rdd重复执行了,并且sum和count是行动算子,就会有两个job,效率很浪费.

    解决上述的方式:方式一

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.rdd.RDD
    3. import org.apache.spark.sql._
    4. import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
    5. import org.apache.spark.sql.types.{DataType, IntegerType, StructField, StructType}
    6. object SparkSQL01 {
    7. def main(args: Array[String]): Unit = {
    8. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQl")
    9. val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() //不管是以后有没有用到都是要进行一个添加的过程
    10. val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List(
    11. (1, "张三", 30),
    12. (2, "lisi", 40),
    13. (3, "wangwu", 50)
    14. ))
    15. //(30,1)
    16. //(40,2)
    17. //(50,1)
    18. val tuple: (Int, Int) = rdd.map {
    19. case (id, name, age) => {
    20. (age, 1)
    21. }
    22. }.reduce(
    23. (t1, t2) => {
    24. (t1._1 + t2._1, t1._2 + t2._2)
    25. }
    26. )
    27. val jieguo = tuple._1 / tuple._2
    28. println((Int)(jieguo))
    29. spark.stop()
    30. }
    31. }

    方式二:reudceByKey(独有的优势)

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.rdd.RDD
    3. import org.apache.spark.sql._
    4. import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
    5. import org.apache.spark.sql.types.{DataType, IntegerType, StructField, StructType}
    6. object SparkSQL01 {
    7. def main(args: Array[String]): Unit = {
    8. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQl")
    9. val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() //不管是以后有没有用到都是要进行一个添加的过程
    10. val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List(
    11. (1, "张三", 30),
    12. (2, "lisi", 40),
    13. (3, "wangwu", 50)
    14. ))
    15. //(30,1)
    16. //(40,2)
    17. //(50,1)
    18. val result: RDD[(Int, (Int, Int))] = rdd.map {
    19. case (id, name, age) => {
    20. (1, (age, 1))
    21. }
    22. }.reduceByKey(
    23. (t1, t2) => {
    24. (t1._1 + t2._1, t1._2 + t2._2)
    25. }
    26. )
    27. val avg = result.map{
    28. case(one,(total,count))=> {
    29. total / count
    30. }
    31. }.collect
    32. println(avg)
    33. spark.stop()
    34. }
    35. }

    通过上面的代码更好的理解缓冲区的概念.

    • UDAF函数-强类型操作
    UserDefinedAggregateFunction是弱类型的,
    UserDefinedAggregateFunction是强类型的,3.0.0之后的可以使用
    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.rdd.RDD
    3. import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction}
    4. import org.apache.spark.sql.types.{DataType, IntegerType, StructField, StructType}
    5. import org.apache.spark.sql._
    6. object SparkSQL01 {
    7. def main(args: Array[String]): Unit = {
    8. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQl")
    9. val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    10. import spark.implicits._//不管是以后有没有用到都是要进行一个添加的过程
    11. val df: DataFrame = spark.read.json("user.json")
    12. df.createOrReplaceTempView("user")
    13. //TODO UDAF:注意这个地方的写法就是与之前的写法是不一样的
    14. spark.udf.register("avgAge",functions.udaf(new AvgAgeUDAF))
    15. //注意这里sql之中的想加是进行数字的相加并不是进行一个字符串的加
    16. //mysql => concat(s1,s2,s3)
    17. //oracle => s1 // s2 //s3
    18. spark.sql(
    19. """
    20. |select
    21. |avgAge(age)
    22. |from
    23. |user
    24. """.stripMargin
    25. )
    26. spark.stop()
    27. }
    28. case class AvgBuffer(var total:Int,var count:Int)
    29. //TODO 自定义聚合函数(强类型)
    30. //1.继承类[-in,buf,out]
    31. //2.定义泛型
    32. //IN
    33. //BUF
    34. //OUT
    35. class AvgAgeUDAF extends Aggregator[Int,AvgBuffer,Int]{//[]之中是相应的类型
    36. //缓冲区的初始化
    37. override def zero: AvgBuffer = {
    38. AvgBuffer(0,0)
    39. }
    40. //将输入的值聚合到缓冲区
    41. override def reduce(b: AvgBuffer, a: Int): AvgBuffer =
    42. {
    43. b.total += a
    44. b.count += 1
    45. b
    46. }
    47. //缓冲区的合并
    48. override def merge(b1: AvgBuffer, b2: AvgBuffer): AvgBuffer = {
    49. b1.total += b2.total
    50. b1.count += b2.count
    51. b1
    52. }
    53. //计算结果
    54. override def finish(b: AvgBuffer): Int = {
    55. b.total/b.count
    56. }
    57. //缓冲区的类型编码,固定写法(自定义的类型)
    58. override def bufferEncoder: Encoder[AvgBuffer] = Encoders.product //当这里的类型是自己定义的时候,固定的写法就是这个样子
    59. override def outputEncoder: Encoder[Int] = Encoders.scalaInt
    60. }
    61. }

    强类型与弱类型的比较,其更加直观,顺序更好理解.

    • 旧版本的强类型的使用

    有的公司仍然是使用的旧版的Aggregete,是不支持SQL的,为了解决这个问题,代码是如下所示:(

    注意:在读取文件的时候,数字就不再是Int类型,而是相应的Long类型)

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.rdd.RDD
    3. import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction}
    4. import org.apache.spark.sql.types.{DataType, IntegerType, StructField, StructType}
    5. import org.apache.spark.sql._
    6. object SparkSQL01 {
    7. def main(args: Array[String]): Unit = {
    8. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQl")
    9. val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    10. import spark.implicits._//不管是以后有没有用到都是要进行一个添加的过程
    11. val df: DataFrame = spark.read.json("user.json")
    12. df.createOrReplaceTempView("user")
    13. //TODO UDAF:Aggregator类型在早起的版本之中是不能够应用SQL
    14. // 将一行的数据作为整体(对象)进行聚合操作
    15. //SQL是弱类型
    16. //DSL是强类型
    17. val ds: Dataset[User] = df.as[User]
    18. //将强类型的UDAF函数作为查询zaiDSL语句之中进行洗那个赢得执行
    19. val udaf = new AvgAgeUDAF
    20. val column: Any = udaf.toColumn
    21. ds.select().show()
    22. spark.stop()
    23. }
    24. case class AvgBuffer(var total:Long,var count:Long)
    25. case class User(id:Long,name:String,age:Long)
    26. //TODO 自定义聚合函数(强类型)
    27. //1.继承类[-in,buf,out]
    28. //2.定义泛型
    29. //IN:User
    30. //BUF
    31. //OUT
    32. class AvgAgeUDAF extends Aggregator[User,AvgBuffer,Long]{//[]之中是相应的类型
    33. //缓冲区的初始化
    34. override def zero: AvgBuffer = {
    35. AvgBuffer(0,0)
    36. }
    37. //将输入的值聚合到缓冲区
    38. override def reduce(b: AvgBuffer, user: User): AvgBuffer =
    39. {
    40. b.total += user.age
    41. b.count += 1
    42. b
    43. }
    44. //缓冲区的合并
    45. override def merge(b1: AvgBuffer, b2: AvgBuffer): AvgBuffer = {
    46. b1.total += b2.total
    47. b1.count += b2.count
    48. b1
    49. }
    50. //计算结果
    51. override def finish(b: AvgBuffer): Long = {
    52. b.total/b.count
    53. }
    54. //缓冲区的类型编码,固定写法(自定义的类型)
    55. override def bufferEncoder: Encoder[AvgBuffer] = Encoders.product //当这里的类型是自己定义的时候,固定的写法就是这个样子
    56. override def outputEncoder: Encoder[Long] = Encoders.scalaLong
    57. }
    58. }

    数据读取与保存-通用方法

    读取

    首先是进入相应的spark模式下面

    <

    我们上面读取的是.json文件,发现是会出现报错的现象.因为spark默认读取的文件是parquet文件之中.

     读取一个相应的parquet文件,如下所示:

    保存数据

    如果要是想要读取的文件是json文件的格式,需要加入一个format

    另一种方式

    保存

    保存的过程和读取的过程是相同的,

    这里的文件保存的路径是/opt/module/spark-local

    操作JSON&CSV

    读取json格式的文件

     SaveMode模式,就是原始的文件是已经存在的,使用其他的方式进行改进.

    CSV文件的读取

    SparkSQL与MySQL联合操作

    首先配置依赖信息

    1. mysql
    2. mysql-connector-java
    3. 5.1.27

    写出相应的操作sql语句

    1. package com.atguigu.spark.sql
    2. import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, functions}
    3. object SparkSQL04_Read {
    4. def main(args: Array[String]): Unit = {
    5. //创建SparkSession对象
    6. val spark: SparkSession = SparkSession.builder()
    7. .master("local[2]")
    8. .appName("SparkSQL")
    9. .getOrCreate()
    10. import spark.implicits._
    11. //读取数据
    12. val df =
    13. spark.read.
    14. format("jdbc")
    15. .option("url", "jdbc:mysql://linux1:3306/spark-sql")
    16. .option("driver", "com.mysql.jdbc.Driver")
    17. .option("user", "root")
    18. .option("password", "123123")
    19. .option("dbtable", "user")
    20. .load()
    21. //保存数据
    22. df.write.format("jdbc")
    23. .option("url", "jdbc:mysql://linux1:3306/spark-sql")
    24. .option("driver", "com.mysql.jdbc.Driver")
    25. .option("user", "root")
    26. .option("password", "123123")
    27. .option("dbtable", "user1") .mode (SaveMode.Append)
    28. //释放资源
    29. spark.stop()
    30. spark.close()
    31. }
    32. }

    操作内置hive

    创建临时视图

    创建长时视图,可以直接访问(首先创建好一个id.txt)

     >spark.sql("select * from atguigu").show

    操作外置hive

    步骤:①首先将hive之中的hive-site.xml文件复制到spark-local之中的conf/文件夹之中;②将MySQL驱动复制到conf之中;③:quit退出,重新启动

    代码操作外置Hive

    ①首先将配置pom环境

    1. org.apache.spark
    2. spark-hive_2.12
    3. 3.0.0
    4. org.apache.hive
    5. hive-exec
    6. 1.2.1
    7. mysql
    8. mysql-connector-java
    9. 5.1.27

    ②拷贝Hive-site.xml文件到classpath之下

    ③配置启用hive的支持

    ④增加依赖关系(MySQL)和第一步一样

    1. package com.atguigu.spark.sql
    2. import org.apache.spark.sql.{DataFrame, SparkSession}
    3. object SparkSQL06_Hive {
    4. def main(args: Array[String]): Unit = {
    5. val sparkSession: SparkSession = SparkSession.builder()
    6. .master("local[2]")
    7. .enableHiveSupport()//启用hive的支持
    8. .appName("SparkSQL")
    9. .getOrCreate()
    10. import sparkSession.implicits._
    11. //读取Hive数据
    12. sparkSession.sql("select * from test.city_info")
    13. sparkSession.close()
    14. }
    15. }

    beenline操作hive

    就是展示效果是比较好的,操作方式是操不多的.

  • 相关阅读:
    算法:滑动窗口
    从零到上亿用户,我是如何一步步优化MySQL数据库的?(建议收藏)
    集团公司管控的三种模式:财务管控、运营管控、战略管
    PostgreSQL创建表空间及用户授权
    Markdown格式表情包大全最新整理分享
    以爱情规律为例,浅谈三段式描述状态机
    Windows中Tomcat服务器搭建与配置
    LeetCode_23_困难_合并 K 个升序链表
    如何高效且优雅地使用Redis
    OAuth2.0协议安全学习
  • 原文地址:https://blog.csdn.net/m0_47489229/article/details/126486356