conf = SparkConf().setMaster("local[*]").setAppName("WordCountHelloWorld")
sc = SparkContext(conf=conf)
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[*]").setAppName("WordCountHelloWorld")
# 通过SparkConf对象构建SparkContext对象
sc = SparkContext(conf=conf)
#读取HDFS上的文件
file_rdd = sc.textFile("hdfs://node1:8020/Test/WordCount.txt")
#将单词切割,得到一个存储全部单词的集合对象
words_rdd = file_rdd.flatMap(lambda line : line.split(" "))
#将单词转换为元组对象,Key是单词,Value是数字
words_with_one_rdd = words_rdd.map(lambda x: (x, 1))
#将元组的value按照key来分组,对所有的value进行聚合操作(相加)
result_rdd = words_with_one_rdd.reduceByKey(lambda a, b : a + b)
#通过collect方法手机数据打印结果
print(result_rdd.collect())
#在node1上创建一个py程序
vim wordcount.py
#将代码粘贴到py文件中
#执行spark-submit
/export/server/spark-3.2.0/bin/spark-submit --master local[*] wordcount.py
# 提交到Yarn执行
/export/server/spark-3.2.0/bin/spark-submit --master yarn wordcount.py
Python On Spark 执行原理架构图:
Spark本身执行是通过JVM Driver和JVM Executor去跑Spark程序的。而且Python是如何跑在Spark中,如上图白色部分为python的代码部分,需要通过一个socket网络通道去连接到JVM Driver中,然后通过Py4j来讲python程序翻译成JVM代码,变成JVMDriver来运行。而在Executor中,有很多python RDD算子是不兼容的,这个时候JVM Executor会开启pyspark.daemon的守护进程。然后通过守护进程来来让pyspark正常工作。总体来说Executor上,还是由python进程来工作的。
Python → JVM代码 → JVM Driver → RPC → 调用JVM Executor → PySpark中转 → Python Executor进程。