定义:用于大规模数据处理的统一分析引擎

运行模式
架构角色

下载spark,地址:https://archive.apache.org/dist/spark/
下载anaconda,地址https://repo.anaconda.com/archive/

安装anaconda
sh ./Anaconda3-2021.04-Linux-x86_64.sh
修改环境变量
vim ~/.condarc
清华源
channels:
- defaults
show_channel_urls: true
channel_alias: https://mirrors.tuna.tsinghua.edu.cn/anaconda
default_channels:
- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main
- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free
- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/r
- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/pro
- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/msys2
custom_channels:
conda-forge: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
msys2: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
bioconda: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
menpo: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
pytorch: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
simpleitk: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
先logout再登录环境才能生效
创建虚拟环境
conda create -n pyspark python=3.8
切换虚拟环境
conda activate pyspark
解压spark
tar -zxvf spark-3.2.0-bin-hadoop3.2.tgz
配置spark环境变量
vim /etc/profile
export JAVA_HOME=/usr/lib/jvm/java
export HADOOP_HOME=/export/server/hadoop-3.3.0
export SPARK_HOME=/export/soft/spark-3.2.0-bin-hadoop3.2
export PYSPARK_PYTHON=/export/soft/anaconda3/envs/pyspark/bin/python3.8
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
vim /root/.bashrc
export JAVA_HOME=/usr/lib/jvm/java
export PYSPARK_PYTHON=/export/soft/anaconda3/envs/pyspark/bin/python3.8
启动一个JVM Process进程启动多个线程执行任务Task
cd /export/soft/spark-3.2.0-bin-hadoop3.2/bin
./pyspark

监控页面WebUI:http://hadoop100:4040
spark自带的集群模式,Driver角色在运行时存在于Master进程内,Executor运行于Worker进程内
Standalone架构

在主节点编辑/export/soft/spark-3.2.0-bin-hadoop3.2/conf目录下的work(先改名),增加Hadoop100,Hadoop101,Hadoop102节点
修改spark-env.sh(先改名)
JAVA_HOME=/usr/lib/jvm/java
HADOOP_CONF_DIR=/export/server/hadoop-3.3.0/etc/hadoop
YARN_CONF_DIR=/export/server/hadoop-3.3.0/etc/hadoop
export SPARK_MASTER_IP=hadoop100
export SPARK_MASTER_PORT=7077
export SPARK_WEBUI_PORT=8080
SPARK_WORKER_CORES=1
SPARK_WORK_MEMORY=1G
export SPARK_WORKER_PORT=7078
SPARK_WORKER_WEBUI_PORT=8081
export SPARK_HISTORY_OPTS="
-Dspark.history.fs.logDirectory=hdfs://hadoop100:8020/sparklog/
-Dspark.history.fs.cleaner.enable=true"
创建spark历史文件夹
hadoop fs -mkdir /sparklog
hadoop fs -chmod 777 /sparklog
修改spark-defaults.conf
spark.eventLog.enable true
spark.eventLog.dir hdfs://hadoop100:8020/sparklog/
spark.eventLog.compress true
修改log4j.properties,将日志级别修改为WARN
启动history server
/export/soft/spark-3.2.0-bin-hadoop3.2/sbin/start-history-server.sh
启动角色
/export/soft/spark-3.2.0-bin-hadoop3.2/sbin/start-all.sh
webUI地址:http://192.168.132.100:8080/

启动集群客户端
cd /export/soft/spark-3.2.0-bin-hadoop3.2/bin
./pyspark --master spark://hadoop100:7077
执行代码
sc.parallelize([1,2,3,4,5]).map(lambda x:x*10).collect()

Spark程序运行层次结构**
job、state、task区别:
各个端口区别
无需部署spark 集群,只要找一台服务器当spark客户端将spark任务提交到集群
master由yarn的ResourceManager担任,work由yarn的NodeManager担任,
Driver角色运行在yarn容器内或提交任务的客户端进程中,Excutor运行在yarn容器中

注意spark-env.sh中要有如下配置:
HADOOP_CONF_DIR=/export/server/hadoop-3.3.0/etc/hadoop
YARN_CONF_DIR=/export/server/hadoop-3.3.0/etc/hadoop
运行命令
bin/pyspark --master yarn


spark on yarn有两种运行模式,一种是cluster模式,一种是client模式,两个模式区别在于Driver运行的位置
cluster:Driver运行在yarn容器内部,和ApplicationMaster在一个容器(查看输入需要进入容器内部,比较麻烦)


client(默认):Driver运行在客户端进程中(通信成本较高,但查看输出、测试方便)


两种模式对照
| cluster | client | |
|---|---|---|
| Driver运行位置 | yarn容器内 | 客户端进程内 |
| 通讯效率 | 高 | 低 |
| 日志查看 | 日志输出在容器内,查看不方便 | 日志输出在客户端的标准输出流中,方便查看 |
| 生产环境 | 推荐 | 不推荐 |
| 稳定性 | 稳定 | 受到客户端进程影响 |
client模式命令
bin/spark-submit --master yarn --deploy-mode client --driver-memory 512m --executor-memory 512m --num-executors 3 --total-executor-cores 3 /export/soft/spark-3.2.0-bin-hadoop3.2/examples/src/main/python/pi.py 100
控制台输出:Pi is roughly 3.152320
cluster模式命令
bin/spark-submit --master yarn --deploy-mode cluster --driver-memory 512m --executor-memory 512m --num-executors 3 --total-executor-cores 3 /export/soft/spark-3.2.0-bin-hadoop3.2/examples/src/main/python/pi.py 100
控制台不会输出,可通过查看yarn日志查看输出

客户端模式提交流程:

集群模式提交流程:

程序入口为SparkContext构建步骤:1、创建SaprkConf对象 2、基于SaprkConf创建SparkContext
测试wordcount
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("WordCount")
sc = SparkContext(conf=conf)
file_rdd = sc.textFile("hdfs://hadoop100:8020/input/words.txt")
rdd_flat_map = file_rdd.flatMap(lambda x: x.split(" "))
withOne = rdd_flat_map.map(lambda x: (x, 1))
reslut = withOne.reduceByKey(lambda a, b: a + b)
print(reslut.collect())
wordcount原理

Python on spark执行原理


RDD-弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合
RDD特性
RDD有分区
RDD的分区是RDD数据存储的最小单位,一份RDD数据本质上分割成多个分区(分区是物理概念)
RDD的方法会作用在其所有分区上
RDD之间有依赖关系
kv型的RDD可以有分区器(可选)
RDD的分区规划会尽量靠近数据所在的服务器
尽量走本地读取,避免网络读取

1、通过并行化集合创建(本地对象转分布式RDD)
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
print("默认分区数是硬件核心数",rdd.getNumPartitions())
rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 2)
print("指定分区数", rdd.getNumPartitions())
#collect收集数据,将分布式对象收集成本地对象,分区数据发送到Driver
print("rdd内容",rdd.collect())
2、读取外部数据源(读取文件)
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
rdd = sc.textFile("/tmp/pycharm_project_337/data/input/words.txt")
print("默认分区数", rdd.getNumPartitions())
print("rdd内容", rdd.collect())
默认分区数与核心数无关
wholeTextFile小文件专用
分布式集合对象的API称为算子
分类:
transformation算子
返回值仍然是RDD称为转换算子,懒加载,没有action算子时,transformation算子不生效
action算子
返回值不是rdd的算子

transformation算子**
map算子
将RDD数据一条条处理
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
#写法1:
def add(data):
return data*10
rdd_map = rdd.map(add)
print(rdd_map.collect())
# 写法2:
print(rdd.map(lambda x:x*5).collect())
flatmap算子
对rdd执行map操作,然后进行解除嵌套操作
list=[[1,2,3],[4,5,6],[7,8,9]]
#解除嵌套后
list=[1,2,3,4,5,6,7,8,9]
使用map获得嵌套的结果
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize(["hadoop spark hadoop","spark hadoop hadoop","flink spark hadoop"])
print(rdd.map(lambda x: x.split(" ")).collect())
[[‘hadoop’, ‘spark’, ‘hadoop’], [‘spark’, ‘hadoop’, ‘hadoop’], [‘flink’, ‘spark’, ‘hadoop’]]
使用flatMap解除嵌套,无需传参
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize(["hadoop spark hadoop","spark hadoop hadoop","flink spark hadoop"])
print(rdd.flatMap(lambda x: x.split(" ")).collect())
[‘hadoop’, ‘spark’, ‘hadoop’, ‘spark’, ‘hadoop’, ‘hadoop’, ‘flink’, ‘spark’, ‘hadoop’]
reduceByKey
针对kv型rdd,按照key分组聚合
rdd.reduceByKey(func)
#func:(V,V)->V
#接受2个参数(类型要一致),返回一个返回值,类型和传入要求一致
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize([('a',1),('a',2),('b',2),('c',1)])
# reduceByKey的参数为迭代器,a为迭代结果,b为每条相同key的值
print(rdd.reduceByKey(lambda a,b:a+b).collect())
mapValues
rdd.mapValues(func)
#func:(V)->U
#传入参数是二元元组的value,这个方法只对value处理
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize([('a',1),('a',2),('b',2),('c',1)])
# reduceByKey的参数为迭代器,a为迭代结果,b为每条相同key的值
print(rdd.mapValues(lambda x:x*10).collect())
groupBy
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize([('a',1),('a',2),('b',2),('c',1)])
# groupBy参数:按谁分组返回谁
rdd_group_by = rdd.groupBy(lambda x: x[0])
print(rdd_group_by.map(lambda x: (x[0], list(x[1]))).collect())
filter
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize([1,2,3,4,5])
# filter参数:过滤方法返回bool,true保留,false丢弃
print(rdd.filter(lambda x: x%2==0).collect())
distinct
通过map、reduce去重
def distinct(self, numPartitions=None):
"""
Return a new RDD containing the distinct elements in this RDD.
Examples
--------
>>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
[1, 2, 3]
"""
return self.map(lambda x: (x, None)) \
.reduceByKey(lambda x, _: x, numPartitions) \
.map(lambda x: x[0])
测试
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize([1,2,4,5,2,3,4,5])
# filter参数:过滤方法返回bool,true保留,false丢弃
print(rdd.distinct().collect())
union
合并rdd但不去重
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize([1,2,4,5,'a','c'])
print(rdd.union(sc.parallelize([13,5,7,"2"])).collect())
join
只能用于二元元组
rdd.join(other_rdd)#内连接
rdd.leftOutJoin(other_rdd)#左外连接
rdd.rightOutJoin(other_rdd)#右外连接
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd1 = sc.parallelize([('a',1),('d',2),('b',2),('c',1)])
rdd2 = sc.parallelize([('a', 'aa'), ('d', 'dd')])
# 只能按照key关联,不能通过value关联
print(rdd1.join(rdd2).collect())
print(rdd1.leftOuterJoin(rdd2).collect())
intersection
求两个rdd的交集
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize([1,2,4,5,'a','c'])
print(rdd.intersection(sc.parallelize([13, 5, 7, "2"])).collect())
glom
按照分区将数据嵌套
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize([1,2,4,5,'a','c'])
print(rdd.glom().collect())
groupByKey
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize([('a',1),('a',2),('b',2),('c',1)])
rdd_group_by = rdd.groupByKey()
print(rdd_group_by.map(lambda x: (x[0], list(x[1]))).collect())
sortBy
对rdd进行排序,只保证分区内有序,分区数设为1则全局有序
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize([('a',1),('d',2),('b',2),('c',1),('e',3),('f',5)])
#对键排序
print(rdd.sortBy(lambda x: x[0], True,3).collect())
#对值排序
print(rdd.sortBy(lambda x: x[1], True, 3).collect())
sortByKey
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize([('a',1),('d',2),('b',2),('c',1),('e',3),('f',5)])
#对键排序
print(rdd.sortByKey().collect())
action算子
countBykey
按key计数
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
file_rdd = sc.textFile("../../data/input/words.txt")
rdd = file_rdd.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1))
print(rdd.countByKey().items())
collect
将各个分区的数据都拉取到Driver形成list,会占用Driver内容,所以数据集不能太大
reduce
对rdd数据按照自定义逻辑进行聚合
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize([1,2,3,4,5])
print(rdd.reduce(lambda a,b:a+b))
fold
和reduce一样,只不过聚合带有初始值
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize([1,2,3,4,5,6],3)
print(rdd.fold(10,lambda a,b:a+b))
分区内有初始值10,分区间也有初始值10
first、take、top、count
first取出rdd第一个元素
take(n)取rdd的前n个元素
top对rdd降序取前n个(只能降序)
count返回rdd有多少个元素
takeSample
参数1:取的数据是否可以重复,参数2:取几个数据,参数3:随机数种子(默认不写)
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize([1,2,3,4,5,6])
print(rdd.takeSample(True,3))
takeOrdered
对rdd进行排序取前n个
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize([1,4,8,2,7,8,2,5])
print(rdd.takeOrdered(3))
print(rdd.takeOrdered(3,lambda x:-x))
foreach
对rdd每个元素执行逻辑操作,相比map没有返回值。由Excutor直接执行,不需要经过Driver,效率高
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize([1,4,8,2,7,8,2,5],1)
rdd.foreach(lambda x:print(x*10))
saveAsTextFile
将rdd数据写入到文件,支持本地和hdfs写出。由Excutor直接执行,不需要经过Driver,效率高
分区操作算子
mapPartitions1.

mapPartitions一次传递的是一整个分区的数据,作为一个迭代器(一次性list)对象传入过来。一次操作一整个分区,map一次操作一条数据
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize([1,4,8,2,7,8,2,5],3)
def process(iter):
result=list()
for i in iter:
result.append(i*10)
return result
print(rdd.mapPartitions(process).collect())
foreachPartition
类似mapPartitions
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize([1,4,8,2,7,8,2,5],3)
def process(iter):
result=list()
for i in iter:
result.append(i*10)
print(result)
rdd.foreachPartition(process)
partitionBy
对RDD进行自定义分区
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize([('a',1),('d',2),('b',2),('c',1),('e',3),('f',5)])
def process(k):
if'a'==k or 'b'==k:return 0
if 'c'==k:return 1
return 2
print(rdd.partitionBy(3, process).glom().collect())
repartiton(慎重使用)
对rdd分区执行重分区。修改分区会影响并行计算,分区增加很可能导致shuffle。尽量减少不增加分区。
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("test")
sc = SparkContext(conf=conf)
#单机对象转分布式对象
rdd = sc.parallelize([1,4,8,2,7,8,2,5],3)
print(rdd.repartition(1).getNumPartitions())
print(rdd.repartition(5).getNumPartitions())
#建议使用
print(rdd.coalesce(1).getNumPartitions())
print(rdd.coalesce(5).getNumPartitions())
print(rdd.coalesce(5, True).getNumPartitions())