下面是一个详细的 Spark 单机和集群环境部署教程,以及部署过程中的注意事项和一个使用 Java 和 Python 实现的 Spark 应用案例。
在 Ubuntu 中:
sudo apt update
sudo apt install openjdk-11-jdk
在 CentOS 中:
sudo yum install java-11-openjdk
验证 Java 安装:
java -version
访问 Spark 官网 下载最新版本的 Spark。
wget https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
tar -xzvf spark-3.3.1-bin-hadoop3.tgz
mv spark-3.3.1-bin-hadoop3 /usr/local/spark
编辑 ~/.bashrc 文件,添加以下内容:
export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
应用更改:
source ~/.bashrc
Spark 提供了交互式的 Spark Shell,可以使用 Scala 和 Python 进行交互式开发:
# 使用 Scala 启动 Spark Shell
spark-shell
# 使用 Python 启动 PySpark
pyspark
start-master.sh
start-worker.sh spark://<master-hostname>:7077
在浏览器中访问 Spark Web 界面:
通过 Spark Shell 运行简单的 Spark 应用进行验证:
// 使用 Scala 创建 RDD
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
// 求和
val sum = distData.reduce((a, b) => a + b)
println("Sum: " + sum)
# 使用 Python 创建 RDD
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
# 求和
sum = distData.reduce(lambda a, b: a + b)
print("Sum:", sum)
JAVA_HOME。在 Master 节点上生成 SSH 密钥:
ssh-keygen -t rsa
将公钥复制到所有 Worker 节点:
ssh-copy-id user@worker1
ssh-copy-id user@worker2
在所有节点上安装 Spark,步骤与单机安装相同。
在 Spark 的配置目录中编辑 spark-env.sh 文件(如果不存在,则创建),添加以下内容:
# Master 节点的配置
export SPARK_MASTER_HOST=<master-hostname>
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
export SPARK_WORKER_MEMORY=2g # Worker 节点内存配置
export SPARK_WORKER_CORES=2 # Worker 节点 CPU 核心数
在所有节点上配置相同的 spark-env.sh 文件。
在 Master 节点上,编辑 $SPARK_HOME/conf/slaves 文件,添加所有 Worker 节点的主机名:
worker1
worker2
在 Master 节点上执行:
start-all.sh
访问 Spark Master 的 Web 界面,确保所有节点正常运行:
ntpd 或 chrony 确保所有节点的时钟同步。下面是一个使用 Java 和 Python 实现的 Spark 应用程序示例,完成 Word Count 任务。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import scala.Tuple2;
import java.util.Arrays;
public class JavaWordCount {
public static void main(String[] args) {
// 创建 Spark 配置和上下文
SparkConf conf = new SparkConf().setAppName("Java Word Count");
JavaSparkContext sc = new JavaSparkContext(conf);
// 读取输入文件
JavaRDD<String> input = sc.textFile(args[0]);
// 分割单词
JavaRDD<String> words = input.flatMap(s -> Arrays.asList(s.split(" ")).iterator());
// 转换为键值对
JavaPairRDD<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
// 计数
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
// 输出结果
counts.saveAsTextFile(args[1]);
// 关闭 Spark 上下文
sc.close();
}
}
确保你已经安装了 Maven,并在 pom.xml 文件中配置了 Spark 依赖:
<dependencies>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-core_2.12artifactId>
<version>3.3.1version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-sql_2.12artifactId>
<version>3.3.1version>
dependency>
dependencies>
编译并打包:
mvn clean package
将输入文件上传到 HDFS:
hdfs dfs -put localfile.txt /user/hadoop/input
运行 Spark 应用:
spark-submit \
--class JavaWordCount \
--master spark://<master-hostname>:7077 \
target/java-word-count-1.0-SNAPSHOT.jar \
hdfs://<hadoop-cluster>/user/hadoop/input \
hdfs://<hadoop-cluster>/user/hadoop/output
hdfs dfs -cat /user/hadoop/output/part-00000
from pyspark import SparkConf, SparkContext
# 创建 Spark 配置和上下文
conf = SparkConf().setAppName("Python Word Count")
sc = SparkContext(conf=conf)
# 读取输入文件
input = sc.textFile("hdfs:///user/hadoop/input" )
# 分割单词
words = input.flatMap(lambda line: line.split(" "))
# 转换为键值对
pairs = words.map(lambda word: (
word, 1))
# 计数
counts = pairs.reduceByKey(lambda a, b: a + b)
# 输出结果
counts.saveAsTextFile("hdfs:///user/hadoop/output" )
# 关闭 Spark 上下文
sc.stop()
将输入文件上传到 HDFS:
hdfs dfs -put localfile.txt /user/hadoop/input
运行 Spark 应用:
spark-submit \
--master spark://<master-hostname>:7077 \
wordcount.py
hdfs dfs -cat /user/hadoop/output/part-00000
通过以上步骤,我们成功部署了 Spark 单机和集群环境,并实现了一个简单的 Word Count 应用。Spark 提供了强大的分布式计算能力,可以处理大规模数据,并支持多种编程语言。
ntpd 或 chrony 确保所有节点的时钟同步,防止因时间不一致导致的任务调度问题。通过合理配置和优化,Spark 可以在多种场景下提供高效的大数据处理能力,支持实时流处理和批处理等多种任务类型。