课程资源:(林子雨)Spark编程基础(Python版)_哔哩哔哩_bilibili
Spark生态系统:

RDD编程:对RDD进行一次又一次的转换操作
两种方式:
(1)从文件系统中加载数据:Spark的SparkContext通过 sc.textFile() 读取数据,生成内存中的RDD
Driver节点为指挥所;SparkContext对象为指挥官

- # 从本地文件系统中加载数据创建RDD
- # sc即driver节点里的SparkContext对象
- lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") # 本地文件是///
- lines.foreach(print) # 遍历RDD每个元素并输出
SparkContext在独立应用程序(即代码文件)里需要编写代码生成;但在pyspark交互式执行环境里,系统会默认创建sc,不需再人为创建
(2)从分布式文件系统HDFS中加载数据:
- # 三条语句完全等价,可以使用其中任一种
- lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt") # hdfs://,localhost为主机名,9000为端口号(系统默认去当前登录Ubuntu系统的用户在HDFS中所对应的用户主目录去找)
- lines = sc.textFile("/user/hadoop/word.txt") # 用户主目录,Linux系统默认/home/用户名,简写为/~
- lines = sc.textFile("word.txt") # txt文件放在当前用户主目录下
(3)通过并行集合(数组/列表)创建RDD:SparkContext 的 sc.parallelize() 方法,可以对array并行化,生成内存中的RDD
- array = [1,2,3,4,5]
- rdd = sc.parallelize(array)
- rdd.foreach(print) # 遍历取出打印
- # 结果:
- # 1
- # 2
- # 3
- # 4
- # 5

RDD操作:

RDD转换 <=> 业务逻辑完成一次又一次转换(形成DAG有向无环图,Spark即解析DAG,生成很多个Stage,每个阶段的子任务提交到不同工作节点的线程去运行)。很多简单的转换组合后可实现复杂的业务逻辑

- lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") # 从底层文本文件加载生成内存中RDD
- linesWithSpark = lines.filter(lambda line:"Spark" in line) # 匿名函数/lambda表达式
- linesWithSpark.foreach(print) # 输出包含Spark的行

- data = [1,2,3,4,5]
- rdd1 = sc.parallelize(data) # 得到一个RDD
- rdd2 = rdd1.map(lambda x:x+10)
- rdd2.foreach(print)

- lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
- words = lines.map(lambda line:line.split(" ")) # 一行语句被拆分后得到的是list
- words.foreach(print) # 包含3个元素,每个元素都是list

- lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
- words = lines.flatMap(lambda line:line.split(" "))
- words.foreach(print) # 包含9个元素(英文单词)

词频统计就是用 flatMap 将一行行语句打散成一个个单词
应用于key-value键值对数据集,返回(key, iterable),即把key相同的值封装成一个可迭代对象
- # 并行化方式生成列表封装的数据集,列表里的元素为键值对
- words = sc.parallelize([("Hadoop",1), ("is",1), ("good",1), ("Spark",1), ("is",1)])
- words1 = words.groupByKey() # 相同key的值会归并起来,pyspark.resultiterable.ResultIterable object封装
- words1.foreach(print)

在groupByKey基础上对value list按照func进行计算
- words = sc.parallelize([("Hadoop",1), ("is",1), ("good",1), ("Spark",1), ("is",1)])
- words1 = words.reduceByKey(lambda a,b:a+b)
- words1.foreach(print) # 如('is',2) 即 is出现2次

如若("is",(1,1,1)),第一个1赋给a,第二个1赋给b,求和得到2赋给a,第三个1赋给b,求和得到3,返回最终结果

转换类型操作为惰性机制,并不真正发生计算,只记录轨迹;当遇到第一个行动类型操作时执行真正的计算(从底层磁盘加载数据、生成数据、转换、得到结果)

rdd.reduce(lambda a,b:a+b):

惰性机制:
- lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") # 转换操作,只是记录轨迹,并未真正加载
- lineLengths = lines.map(lambda s:len(s)) # 转换操作,只是记录轨迹,并未真正加载
- totalLength = lineLengths.reduce(lambda a,b:a+b) # 行动操作,真正执行从头到尾计算
- print(totalLength) # 每行长度相加,得到总长度
多次反复访问同样一组值,不做持久化的话,每次访问都需要重新生成,非常耗时(对于迭代计算而言代价很大,经常需要多次重复使用同一组数据)。持久化将其保存到内存中,下次使用时不需要从头计算

可以通过持久化(缓存)机制避免这种重复计算的开销。持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复利用
可以使用 persist() 方法对一个RDD标记为持久化(之所以说标记为持久化,是因为出现 persist() 语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化)
参数:

.unpersist() 方法手动把持久化的RDD从缓存中移除
RDD就是弹性分布式数据集,可以在计算过程中不断动态调整分区个数
(1)好处:增加并行度(可以在多个节点上同时发生计算);减少通信开销

连接两表,让j1负责连接0-100w的用户id,以此类推

分区后只涉及events表多次的数据分发
(2)分区原则:分区个数 = 集群中CPU核心数目

对于Spark不同部署模式(Local模式、Standalone模式、YARN模式、Mesos模式)而言,通过设置具体参数值(spark.default.parallelism)指定默认的分区数目
(3)指定分区个数:在调用 textFile() 和 parallelize() 方法时指定分区个数
- sc.textFile(path, partitionNum)
- # path指定要加载的文件的地址
- # partitionNum用于指定分区个数
-
- list = [1,2,3,4,5]
- rdd = sc.parallelize(list,2) # 设置两个分区
(4)使用repartition()方法重新设置分区个数:通过转换操作得到新RDD时,直接调用repartition方法即可

(5)自定义分区方法:

- from pyspark import SparkConf, SparkContext
-
- def MyPatitioner(key):
- print("MyPatitioner is running")
- print("The key is %d" %key)
- return key%10 # 作为分区编号返回
-
- def main():
- print("The main function is running")
- conf = SparkConf().setMaster("local").setAppName("MyApp")
- sc = SparkContext(conf=conf)
- data = sc.parallelize(range(10),5) # 分成5个分区
- data.map(lambda x:(x,1)) \ # 键值对
- .partitionBy(10, Mypartitioner) \ # 根据key分区。.partitionBy只接受键值对类型
- .map(lambda x:x[0]) \ # 再从键值对转回原格式
- .saveAsTextFile("file:///usr/local/spark/mycode/rdd/partitioner") # 写入10个分区,每个分区各1个文件
-
- if __name__ == '__main__':
- main()
.partitionBy只接受键值对类型
使用如下命令运行 TestPartitioner.py:
- cd /usr/local/spark/mycode/rdd
- python3 TestPartitioner.py
或者,使用如下命令运行 TestPartitioner.py:
- cd /usr/local/spark/mycode/rdd
- /usr/local/spark/bin/spark-submit TestPartitioner.py
再次强调:
本地文件是三个/,即file:///
hdfs文件是两个/,即hdfs://



在一个集群中同时部署Hadoop和Spark,把集群中某个节点既作为HDFS的存储节点,也作为Spark的WorkerNode,即Spark的计算组件和HDFS的存储组件放在同一台机器上

分布式词频统计:

键值对RDD:RDD的每个元素都是一个键值对 (key, value)
(1)从文件中加载

(2)通过并行集合(列表)创建

总结:groupByKey是对每个key进行操作,但只生成一个sequence,本身不能自定义函数,需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作;reduceByKey用于对每个key对应的多个value进行merge操作,能在本地先进行merge操作,且merge操作可以通过函数自定义
(1)reduceByKey(func):先把key相同的值归并起来,再对值列表用func进行聚合计算

(2)groupBykey()


对比 reduceByKey 和 groupByKey:

一个 reduceByKey 等价于 一个 groupByKey + map
(3)keys()

(4)values()

(5)sortByKey()


(6)mapValues(func):key不变,value用func(lambda表达式)进行计算

给定一组键值对,key为图书名称,value为某天图书销量。计算每种图书的每天平均销量

reduceByKey():

mapValues():

(1)本地文件系统数据读写:

把RDD写入到文本文件中 .saveAsTextFile(给出的是目录,而不是具体文件,因为存在分区的概念):

再次把数据加载到RDD中,也要写目录,而不是文件:

(2)分布式文件系统HDFS数据读写:

把RDD中的数据保存到HDFS文件中(路径是目录,而不是具体文件):

再次强调,本地文件是file:///开头,而分布式文件是hdfs://开头
HBase(分布式数据库)是Google BigTable的开源实现,也是Hadoop的成员组件,构建在Hadoop分布式文件系统HDFS基础上。即HBase的数据是保存在底层HDFS中的

特性:
概念:
HBase - 稀疏、多维度、排序、映射表


四维坐标定位:行键-列族-列限定符-版本时间戳

HBase概念视图:列族contents、列限定符html

HBase物理视图:底层为行键+列族+时间戳

保存机制:水平分区+切分成很多列族(分布式存储)
启动HBase(底层存储基础为HDFS,故要先启动Hadoop):
- # 启动Hadoop
- cd /usr/local/hadoop # 进入Hadoop安装目录
- ./sbin/start-all.sh # 启动Hadoop
- # 或start-dfs.sh(启动hdfs)
-
- # 启动HBase
- cd /usr/local/hbase # 进入HBase安装目录
- ./bin/start-hbase.sh # 启动HBase
- ./bin/hbase shell # 启动HBase Shell
创建student表(确保数据库里不存在student表):

- disable 'student'
- drop 'student'
-
- create 'student', 'info' # 表、列族
-
- # 录入student表第一个学生记录
- put 'student', '1', 'info:name', 'Xueqian'
- put 'student', '1', 'info:gender', 'F'
- put 'student', '1', 'info:age', '23'
-
- # 录入student表第二个学生记录
- put 'student', '2', 'info:name', 'Weiliang'
- put 'student', '2', 'info:gender', 'M'
- put 'student', '2', 'info:age', '24'
把程序运行过程中所需jar包(lib目录下)拷贝到Spark安装目录下(jars目录),需要拷贝:
- cd /usr/local/spark/jars
- mkdir hbase
- cd hbase
- cp /usr/local/hbase/lib/hbase*.jar ./
- cp /usr/local/hbase/lib/guava-12.0.1.jar ./
- cp /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar ./
- cp /usr/local/hbase/lib/protobuf-java-2.5.0.jar ./
此外,在Spark2.0以上版本中,缺少把HBase数据转换成Python可读取数据的jar包,需要另行下载。可以访问下面地址下载spark-examples_2.11-1.6.0-typesafe-001.jar。下载以后保存到 /usr/local/spark/jars/hbase/ 目录中https://mvnrepository.com/artifact/org.apache.spark/spark-examples_2.11/1.6.0-typesafe-001
使用vim编辑器打开spark-env.sh文件,设置Spark的spark-env.sh文件,告诉Spark可以在哪个路径下找到HBase相关的jar文件,命令如下:
- cd /usr/local/spark/conf
- vim spark-env.sh
打开spark-env.sh文件后,可以在文件最前面增加下面一行内容,这样后面编译和运行过程才不会出错:
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/usr/local/hbase/bin/hbase classpath):/usr/local/spark/jars/hbase/*
Spark读取HBase:用SparkContext提供的 newAPIHadoopRDD API将表的内容以RDD的形式加载到Spark中
SparkOperateHBase.py(从HBase读取数据、生成字符串格式并打印到屏幕上):
- #!/usr/bin/env python3
-
- from pyspark import SparkConf, SparkContext
-
- conf = SparkConf().setMaster('local').setAppName("ReadHBase") # 设置连接方式为本地模式,应用名称为ReadHBase
- sc = SparkContext(conf = conf) # 生成SparkContext对象
- host = 'localhost' # ZooKeeper服务器地址(分布式协调一致性作用)
- table = 'student' # 表名
- conf = {"hbase.zookeeper.quorum": host,"hbase.mapreduce.inputtable": table} # 配置ZooKeeper服务器地址、当前读取的输入表
- keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter" # 键转换器,把key从HBase格式转换成字符串格式
- valueConv="org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
- hbase_rdd=sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf) # 读取表的格式、从HBase读取的key的类型、从HBase读取的value的类型、指定key的转换类、指定value的转换类、配置信息
- count=hbase_rdd.count() # 有多少个行键(键值对)
- hbase_rdd.cache() # 缓存
- output=hbase_rdd.collect() # 封装在一个列表中返回
- for (k,v) in output:
- print(k,v)
执行该代码文件:
- cd /usr/local/spark/mycode/rdd
- /usr/local/spark/bin/spark-submit SparkOperateHBase.py
执行结果:

把表中的两个学生信息插入到HBase的student表中:

SparkWriteHBase.py:
- #!/usr/bin/env python3
-
- from pyspark import SparkConf,SparkContext
-
- conf = SparkConf().setMaster('local').setAppName("WriteHBase")
- sc = SparkContext(conf = conf)
- host = 'localhost'
- table = 'student'
- keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" # key转换器,String类型转换成内部格式
- valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter" # value转换器,字符串列表转换成Put单元格
- conf = {"hbase.zookeeper.quorum": host,"hbase.mapred.outputtable": table,"mapreduce.outputformat.class":"org.apache.hadoop.hbase.mapreduce.TableOutputFormat","mapreduce.job.output.key.class":"org.apache.hadoop.hbase.io.ImmutableBytesWritable","mapreduce.job.output.value.class":"org.apache.hadoop.io.Writable"}
-
- rawData=['3,info,name,Rongcheng','3,info,gender,M','3,info,age,26','4,info,name,Guanhua','4,info,gender,M','4,info,age,27']
- # 首先将6个字符串加载到内存生成RDD,再写入HBase
- sc.parallelize(rawData).map(lambda x: (x[0],x.split(','))).saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv) # x[0]为行键,map后6个字符串变为6个键值对(key,value),key为行键,value为字符串列表(即单元格值)
执行:
- cd /usr/local/spark/mycode/rdd
- /usr/local/spark/bin/spark-submit SparkWriteHBase.py
去HBase Shell查看写入结果:scan 'student' 
对一个目录下的所有文件的某字段排序,取top5(topN.py)

- from pyspark import SparkConf, SparkContext
- conf = SparkConf().setMaster("local").setAppName("ReadHBase")
- sc = SparkContext(conf = conf)
- lines = sc.textFile("file:///usr/local/spark/mycode/rdd/file")
- # line.strip() 去掉字符串后面的空格(去掉空行)
- # split后==4,即去掉缺失字段的行
- result1 = lines.filter(lambda line:(len(line.strip()) > 0) and (len(line.split(",")) == 4))
- result2 = result1.map(lambda x:x.split(",")[2]) # 取出payment
- result3 = result2.map(lambda x:(int(x),"")) # 转换为(key, value),为了后面排序
- result4 = result3.repartition(1) # 为了保证全局有序,否则可能分区有序,但全局无序
- result5 = result4.sortByKey(False) # sortByKey()必须根据key来排,即输入是(key,value)。False为降序
- result6 = result5.map(lambda x:x[0]) # 去掉value的""
- result7 = result6.take(5) # 取出前五名
- for a in result7:
- print(a)
过程解析:




读取文件中所有整数并进行排序(FileSort.py)

- #!/usr/bin/env python3
- from pyspark import SparkContext, SparkConf
-
- index = 0
- def getindex(): # 获取全局排序,依次递增
- global index
- index += 1
- return index
-
- def main():
- conf = SparkConf().setMaster('local[1]').setAppName('FileSort')
- sc = SparkContext(conf=conf)
- lines = sc.textFile("file:///usr/local/spark/mycode/rdd/filesort/file*.txt") # 把某个目录下所有文件加载进来生成RDD
- result1 = lines.filter(lambda line:(len(line.strip())>0)) # 消除空行
- result2 = result1.map(lambda x:(int(x.strip()),"") # 生成(key,value)
- result3 = result2.repatition(1) # 1个分区确保全局有序
- result4 = result3.sortByKey(True)
- result5 = result4.map(lambda x:x[0])
- result6 result5.map(lambda x:(getindex(),x))
- result6.saveAsTextFile("file:///usr/local/spark/mycode/rdd/filesort/sortresult")
sortByKey() 的输入必须是 (key, value)
过程解析:




先根据第一列降序排序,第一列值相等再根据第二列降序排序(SecondarySortKey.py)


若就把字符串构建(key,value) 输入sortBykey(),是根据字母的升序排序。故本题必须生成一个可比较的key,即下图中的 SecondarySortKey(5,3)

实现思路:
- from operator import gt
- from pyspark import SparkContext, SparkConf
-
- class SecondarySortKey():
- def __init__(self, k): # __init__为构造函数,k为传入参数,格式为(key,value)
- self.column1 = k[0]
- self.column2 = k[1]
-
- def __gt__(self, other): # 重写比较函数
- if other.column1 == self.column1:
- return gt(self.column2,other.column2) # 若第1列相等,比较第2列
- else:
- return gt(self.column1, other.column1)
-
- def main():
- conf = SparkConf().setAppName('spark_sort').setMaster('local[1]')
- sc = SparkContext(conf=conf)
-
- file="file:///usr/local/spark/mycode/rdd/secondarysort/file4.txt"
- rdd1 = sc.textFile(file)
- rdd2=rdd1.filter(lambda x:(len(x.strip()) > 0)) # 去除空行
- rdd3=rdd2.map(lambda x:((int(x.split(" ")[0]),int(x.split(" ")[1])),x)) # x为字符串
- rdd4=rdd3.map(lambda x: (SecondarySortKey(x[0]),x[1]))
- rdd5=rdd4.sortByKey(False)
- rdd6=rdd5.map(lambda x:x[1]) # 去掉左边的可排序key,只保留右边的字符串
- rdd6.foreach(print)
-
- if __name__ == '__main__':
- main()
过程解析:




Hive:SQL-on-Hadoop(Hadoop平台上提供了SQL查询的能力,在Hadoop平台上构建数据仓库,把SQL语句转换成底层MapReduce程序,对底层HDFS数据进行查询分析)

Shark即Hive on Spark。为了实现和Hive兼容,Shark在HiveQL方面重用了Hive中HiveQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MapReduce作业替换成了Spark作业,通过Hive的HiveQL解析,把HiveQL翻译成Spark上的RDD操作

Spark SQL在Hive兼容层面仅依赖HiveQL解析、Hive元数据模块,其他模块全部重新开发。也就是说,从HQL被解析成抽象语法树AST开始,就全部由Spark SQL接管了。Spark SQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责

Spark SQL增加了DataFrame(即带有Schema信息的RDD),使用户可以在Spark SQL中执行SQL语句。数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据

Why Spark SQL?
Spark SQL填补了这个鸿沟
DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能。Spark能够轻松实现从Mysql到DataFrame的转化,并且支持SQL查询
RDD是Spark Core核心组件的数据抽象,Spark SQL的数据抽象是DataFrame

从Spark2.0以上版本开始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现其对数据加载、转换、处理等功能。SparkSession实现了SQLContext及HiveContext所有功能
SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。SparkSession也提供了HiveQL以及其他依赖于Hive的功能的支持
- from pyspark import SparkContext, SparkConf
- from pyspark.sql import SparkSession
- spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
spark.read() 操作或 spark.read.format().load() 操作
- spark.read.text("xxx.txt")
- spark.read.json("xxx.json")
- spark.read.parquet("xxx.parquet")
-
- spark.read.format("text").load("xxx.txt")
- spark.read.format("json").load("xxx.json")
- spark.read.format("parquet").load("xxx.parquet")
调用 .show() 可以查看数据
使用 spark.write 操作保存DataFrame
- df.write.txt("xxx.txt")
- df.write.json("xxx.json")
- df.write.parquet("xxx.parquet")
-
- df.write.format("text").save("xxx.txt")
- df.write.format("json").save("xxx.json")
- df.write.format("parquet").save("xxx.parquet")

目录名称读取即可加载(注意不是文件名称)
printSchema():打印模式信息
select():选取列显示

filter():过滤
groupBy():分组

sort():排序

/usr/local/spark/examples/src/main/resources/ 目录下:

- from pyspark.sql import Row # 生成row对象封装一行数据
- # spark为SparkSession对象
- people = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(lambda line:line.split(",")).map(lambda p:Row(name=p[0],age=int(p[1]))) # people为RDD
- schemapeople = spark.createDataFrame(people)
-
- # 必须注册为临时表才能供下面的查询使用
- schemapeople.createOrReplaceTempView("people") # people为临时表名称
- personDF = spark.sql("select name,age from people where age>20")
- # 查询得到的结果会被封装在DataFrame中
- personDF.show()
-
- # DataFrame中的每个元素都是一行记录,包含name和age两个字段,分别用p.name和p.age来获取值
- personRDD = personDF.rdd.map(lambda p:"Name:"+p.name+","+"Age:"+str(p.age))
- personRDD.foreach(print)

当无法提前获知数据结构时,采用编程方式定义RDD模式

- from pyspark.sql.types import *
- from pyspark.sql import Row
-
- # 生成表头
- schemaString = "name age" # 包含两个字段 name 和 age
- fields = [StructField(field_name,StringType(),True) for field_name in schemaString.split(" ")] # 列表里每个元素都是一个StructField对象(用来描述字段)
- schema = StructType(fields)
-
- # 生成表中的记录
- lines = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
- parts = lines.map(lambda x:x.split(","))
- people = parts.map(lambda p:Row(p[0],p[1].strip())) # p为列表
-
- # 表头和内容拼接
- schemapeoples = spark.createDataFrame(people,schema) # (表中记录,表头)
-
- # 注册临时表才能查询
- schemapeoples.createOrReplaceTempView("people")
- results = spark.sql("select name,age from people")
- results.show()


查询的结果被封装在DataFrame里
在Linux系统中安装MySQL数据库:Ubuntu安装MySQL及常用操作_厦大数据库实验室博客
- # 在Linux中启动MySQL数据库
- service mysql start
- mysql -u root -p # 屏幕会提示你输入密码(MySQL root用户密码)
-
- # 完成数据库和表的创建
- create database spark
- use spark
- create table student(id int(4), name char(20), gender char(4), age int(4))
- insert into student values (1, "Xueqian", 'F', 23)
- insert into student values (2, "Weiliang", 'M', 24)
- select * from student
Spark SQL通过jdbc连接关系型数据库MySQL,需要安装MySQL的jdbc驱动程序:Linux 下搭建 Hive 环境_mysql-connector-java-5.1.40.tar 对应的mysql版本_GreyZeng的博客-CSDN博客

下载后放入 /usr/local/spark/jars
- # 启动pyspark
- cd /usr/local/spark
- ./bin/pyspark
- >>>jdbcDF = spark.read.format("jdbc") \
- .option("url","jdbc:mysql://localhost:3306/spark") \ # 访问数据库地址及数据库(spark数据库)
- .option("driver","com.mysql.jdbc.Driver") \ # 指定驱动程序
- .option("dbtable", "student") \ # 访问student表
- .option("user", "root") \
- .option("password", "mysql密码").load()
-
- # .option()增加连接参数

- use spark
- select * from student
往 spark.student 中插入两条记录:
- from pyspark.sql.types import Row
- from pyspark.sql.types import *
- from pyspark import SparkContext, SparkConf
- from pyspark.sql import SparkSession
-
- # 生成SparkSession对象(Spark SQL指挥官)
- spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
-
- # 下面要设置模式信息
- >>> schema = StructType([StructField("id", IntegerType(), True), \ # True说明可以为空
- StructField("name", StringType(), True), \
- StructField("gender", StringType(), True), \
- StructField("age",IntegerType(), True)])
-
- # 设置两条数据,表示两个学生信息(封装Row对象)
- studentRDD = spark.sparkContext.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]).map(lambda x:x.split(" "))
-
- # 创建Row对象,每个Row对象都是rowRDD的一行
- rowRDD = studentRDD.map(lambda p:Row(int(p[1].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))
-
- # 建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
- studentDF = spark.createDataFrame(rowRDD, schema)
-
- # 把DataFrame写入数据库
- prop = {}
- prop['user'] = 'root'
- prop['password'] = '填写mysql密码'
- prop['driver'] = "com.mysql.jdbc.Driver" # 驱动程序名称
- # 库名, 表名, 追加, prop为属性集合
- studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark",'student','append', prop)

结果如下:
