一、构建Spark镜像
1、 上传Spark的压缩包到software,进行解压
[root@k8s101 ~]# cd /opt/software/ [root@k8s101 software]# tar -zxvf spark-3.0.0-bin-hadoop2.7.tgz
2、 修改spark配置文件
[k8s@k8s101 software]$ cd spark-3.0.0-bin-hadoop2.7 [k8s@k8s101 spark-3.0.0-bin-hadoop2.7]$ cd conf/ [k8s@k8s101 conf]$ mv spark-env.sh.template spark-env.sh [k8s@k8s101 conf]$ vim spark-env.sh HADOOP_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop export LD_LIBRARY_PATH=$HADOOP_HOME/lib/native
3、 创建阿里云仓库



4、 根据官方提供的脚本构建镜像,仓库名就是当前阿里云创建的仓库名
[root@k8s101 spark-3.0.0-bin-hadoop2.7]# bin/docker-image-tool.sh -r registry.cn-zhangjiakou.aliyuncs.com/lizunting_test-t my_spark3.0 build
5、 查看docker镜像
[root@k8s101 spark-3.0.0-bin-hadoop2.7]# docker images

6、 将spark3镜像推送到阿里云上
[root@k8s101 spark-3.0.0-bin-hadoop2.7]# docker tagc54e8c28ed7b registry.cn-zhangjiakou.aliyuncs.com/lizunting_test/my_spark3.0:1.0 [root@k8s101 spark-3.0.0-bin-hadoop2.7]# docker pushregistry.cn-zhangjiakou.aliyuncs.com/lizunting_test/my_spark3.0:1.0

二、构建Spark pvc
创建spark pvc
[k8s@k8s101 ~]$ mkdir spark
[k8s@k8s101 ~]$ cd spark/
[k8s@k8s101 spark]$ vim spark-pvc.yaml
kind:PersistentVolumeClaim
apiVersion:v1
metadata:
name: spark-pvc
annotations:
volume.beta.kubernetes.io/storage-class:"managed-nfs-storage"
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 2G
[k8s@k8s101 spark]$ kubectl create -f spark-pvc.yaml
三、创建Spark需要的ConfigMap
[k8s@k8s101 software]$ kubectl create configmap spark-conf-volume--from-file=/opt/software/spark-3.0.0-bin-hadoop2.7/conf/ [k8s@k8s101 software]$ kubectl create configmap hadoop-properties--from-file=/opt/module/hadoop-3.1.3/etc/hadoop/
四、创建Endpoint和Service
创建endpoint指向各节点,创建service暴露服务和端口,为了让pod能访问到外部hadoop集群
[k8s@k8s101 spark]$ vim node-endpoints.yaml
apiVersion:v1
kind:Endpoints
metadata:
name: k8s101
namespace: default
subsets:
- addresses:
- ip: 172.26.64.126
ports:
- port: 8020
---
apiVersion:v1
kind:Endpoints
metadata:
name: k8s102
namespace: default
subsets:
- addresses:
- ip: 172.26.64.124
ports:
- port: 8020
---
apiVersion:v1
kind:Endpoints
metadata:
name: k8s103
namespace: default
subsets:
- addresses:
- ip: 172.26.64.125
ports:
- port: 8020
[k8s@k8s101 spark]$ kubectl create -f node-endpoints.yaml
[k8s@k8s101 spark]$ vim node-service.yaml
apiVersion:v1
kind:Service
metadata:
name: k8s101
spec:
ports:
- port: 8020
---
apiVersion:v1
kind:Service
metadata:
name: k8s102
spec:
ports:
- port: 8020
---
apiVersion:v1
kind:Service
metadata:
name: k8s103
spec:
ports:
-port: 8020
[k8s@k8s101 spark]$ kubectl create -f node-service.yaml
五、创建Spark用户并赋予权限
[k8s@k8s101 ~]$ kubectl create serviceaccount spark [k8s@k8s101 ~]$ kubectl create clusterrolebinding spark-role --clusterrole=edit--serviceaccount=default:spark --namespace=default
六、测试例子
1、 查看master地址
[root@k8s101 spark-3.0.0-bin-hadoop2.7]# su k8s [k8s@k8s101 spark-3.0.0-bin-hadoop2.7]$ kubectl cluster-info

2、执行Spark任务提交命令
bin/spark-submit \
--master k8s://https://k8s101:6443 \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--confspark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.executor.instances=2 \
--confspark.kubernetes.container.image=registry.cn-zhangjiakou.aliyuncs.com/lizunting_test/my_spark3.0:1.0\
--confspark.kubernetes.file.upload.path=hdfs://mycluster/tmp \
--confspark.kubernetes.driver.volumes.persistentVolumeClaim.data.options.claimName=spark-pvc\
--confspark.kubernetes.driver.volumes.persistentVolumeClaim.data.mount.path=/data/nfs/ \
/opt/software/spark-3.0.0-bin-hadoop2.7/examples/jars/spark-examples_2.12-3.0.0.jar
3、 查看日志

[k8s@k8s101 spark-3.0.0-bin-hadoop2.7]$ kubectl logs spark-pi-ee19b477342871ff-driver

运行成功
七、测试Spark SQL任务
1、 上传测试的日志文件到/opt/software目录,然后使用hdfs命令上传hadoop路径
[root@k8s101 software]# hadoop dfs -mkdir -p /user/atguigu/ods/ [root@k8s101 software]# hadoop dfs -put *.log /user/atguigu/ods/
2、 进入到hive创建三个库dwd,dws,ads。然后见表,参见参考文件“用户注册模块.txt”。
3、 将hive.xml复制spark conf下,spark on hive模式。需要重新更新下spark-conf-volume
[k8s@k8s101 software]$ cp /opt/module/apache-hive-3.1.2-bin/conf/hive-site.xml/opt/software/spark-3.0.0-bin-hadoop2.7/conf/ [k8s@k8s101 software]$ kubectl delete configmap spark-conf-volume [k8s@k8s101 software]$ kubectl create configmap spark-conf-volume--from-file=/opt/software/spark-3.0.0-bin-hadoop2.7/conf/
4、 重新更新下endpoint和service需要对外开放9083 hive元数据服务端口。hive装在101上所以只更改101的配置
[k8s@k8s101 software]$ cd ~
[k8s@k8s101 ~]$ cd spark/
[k8s@k8s101 spark]$ kubectl delete -f node-endpoints.yaml -f node-service.yaml
[k8s@k8s101 spark]$ vim node-endpoints.yaml
apiVersion:v1
kind:Endpoints
metadata:
name: k8s101
namespace: default
subsets:
- addresses:
- ip: 172.26.64.126
ports:
- port: 8020
name: rpc
- port: 9866
name: dnport
- port: 9083
name: hivemetastore
[k8s@k8s101 spark]$ vim node-service.yaml
apiVersion:v1
kind:Service
metadata:
name: k8s101
spec:
ports:
- port: 8020
name: rpc
- port: 9866
name: dnport
- port: 9083
name: hivemetastore
[k8s@k8s101 spark]$ kubectlcreate -f node-endpoints.yaml -f node-service.yaml
5、 然后将编写完的代码打成jar包,上传进行测试打成jar包的时候,需要将集群的hive-site.xml,hdfs-site.xml,core-site.xml放到resources源码包下,不然会识别不到hive。
[k8s@k8s101 software]$ cd spark-3.0.0-bin-hadoop2.7 执行以下命令 bin/spark-submit \ --master k8s://https://k8s101:6443 \ --deploy-mode cluster \ --name spark-pi \ --class com.atguigu.member.controller.DwdMemberController \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark\ --conf spark.executor.instances=3 \ --confspark.kubernetes.container.image=registry.cn-zhangjiakou.aliyuncs.com/lizunting_test/my_spark3.0:1.0\ --confspark.kubernetes.file.upload.path=hdfs://mycluster/tmp \ --confspark.kubernetes.driver.volumes.persistentVolumeClaim.data.options.claimName=spark-pvc\ --confspark.kubernetes.driver.volumes.persistentVolumeClaim.data.mount.path=/data/nfs/ \ /opt/software/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar
6、 任务跑完之后,查看pod
[k8s@k8s101 software]$ kubectl get pods

7、 可以看到pod状态已完成,根据pod名称使用log命令可以查看对应日志,如果报错,日志里有错误信息.
[k8s@k8s101 software]$ kubectl logs spark-pi-ee19b477342871ff-driver
8、 日志里并没有出错,再查看hive里是否有数据。随便抽张表
[k8s@k8s101 software]$ hive hive(default)> use dwd; hive(dwd)> select *from dwd_base_ad;

八、测试Spark Streaming任务
1、 启动kafka,创建测试topic
[root@k8s101 spark-3.0.0-bin-hadoop2.7]# cd /opt/module/kafka_2.11-2.4.0/ [k8s@k8s101 kafka_2.11-2.4.0]$ bin/kafka-topics.sh --zookeeper k8s101:2181/kafka_2.4--create --replication-factor 2 --partitions 10 --topic register_topic
2、 同样需要开放kafka9092端口,修改对应endpoint和servicec开发端口
[k8s@k8s101 ~]$ cd spark/
[k8s@k8s101 spark]$ kubectl delete -f node-endpoints.yaml -f node-service.yaml
[k8s@k8s101 spark]$ vim node-endpoints.yaml
apiVersion:v1
kind:Endpoints
metadata:
name: k8s101
namespace: default
subsets:
- addresses:
- ip: 172.26.64.126
ports:
- port: 8020
name: rpc
- port: 9866
name: dnport
- port: 9083
name: hivemetastore
- port: 9092
name: kafka
---
apiVersion:v1
kind:Endpoints
metadata:
name: k8s102
namespace: default
subsets:
- addresses:
- ip: 172.26.64.124
ports:
-port: 8020
name: rpc
- port: 9866
name: dnport
- port: 9092
name: kafka
---
apiVersion:v1
kind:Endpoints
metadata:
name: k8s103
namespace: default
subsets:
- addresses:
- ip: 172.26.64.125
ports:
-port: 8020
name: rpc
- port: 9866
name: dnport
- port: 9092
name: kafka
[k8s@k8s101 spark]$ vim node-service.yaml
apiVersion:v1
kind:Service
metadata:
name: k8s101
spec:
ports:
- port: 8020
name: rpc
- port: 9866
name: dnport
- port: 9083
name: hivemetastore
- port: 9092
name: kafka
---
apiVersion:v1
kind:Service
metadata:
name: k8s102
spec:
ports:
- port: 8020
name: rpc
- port: 9866
name: dnport
- port: 9092
name: kafka
---
apiVersion:v1
kind:Service
metadata:
name: k8s103
spec:
ports:
- port: 8020
name: rpc
- port: 9866
name: dnport
- port: 9092
name: kafka
[k8s@k8s101 spark]$ kubectl create -f node-endpoints.yaml -f node-service.yaml
3、 使用准备好的数据和代码往对应topic灌测试数据,参考数据文件以及数据发送代码register.log,registerProducer.scala
4、 kafka有了数据之后,可以测试spark streaming任务了。
package com.atguigu.qzpoint.streaming
import java.lang
import java.sql.ResultSet
import com.atguigu.qzpoint.util.{DataSourceUtil, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object RegisterStreaming2 {
private val groupid ="register_group_test"
def main(args: Array[String]): Unit = {
val conf = newSparkConf().setAppName(this.getClass.getSimpleName)
.set("spark.streaming.kafka.maxRatePerPartition","300")
val ssc = new StreamingContext(conf,Seconds(3))
val sparkContext = ssc.sparkContext
sparkContext.hadoopConfiguration.set("fs.defaultFS","hdfs://nameservice1")
sparkContext.hadoopConfiguration.set("dfs.nameservices","nameservice1")
val topics =Array("register_topic")
val kafkaMap: Map[String, Object] =Map[String, Object](
"bootstrap.servers" ->"k8s101:9092,k8s102:9092,k8s103:9092",
"key.deserializer" ->classOf[StringDeserializer],
"value.deserializer"-> classOf[StringDeserializer],
"group.id" -> groupid,
"auto.offset.reset" ->"earliest",
"enable.auto.commit"-> (false: lang.Boolean)
)
val stream:InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics, kafkaMap))
val resultDStream =stream.filter(item => item.value().split("\t").length == 3).
mapPartitions(partitions => {
partitions.map(item => {
val line = item.value()
val arr =line.split("\t")
val app_name = arr(1) match {
case "1" =>"PC"
case "2" =>"APP"
case _ =>"Other"
}
(app_name, 1)
})
})
resultDStream.reduceByKeyAndWindow((x: Int, y: Int) => x + y,Seconds(60), Seconds(6)).print()
ssc.start()
ssc.awaitTermination()
}
}
5、 将代码打成jar包上传到集群进行测试。
[k8s@k8s101 software]$ cdspark-3.0.0-bin-hadoop2.7
bin/spark-submit\
--master k8s://https://k8s101:6443 \
--deploy-mode cluster \
--name spark-pi \
--classcom.atguigu.qzpoint.streaming.RegisterStreaming2\
--confspark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.executor.instances=3 \
--conf spark.kubernetes.container.image=registry.cn-zhangjiakou.aliyuncs.com/lizunting_test/my_spark3.0:1.0\
--confspark.kubernetes.file.upload.path=hdfs://mycluster/tmp \
--confspark.kubernetes.driver.volumes.persistentVolumeClaim.data.options.claimName=spark-pvc\
--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.data.mount.path=/data/nfs/ \
/opt/software/com_atguigu_sparkstreaming-1.0-SNAPSHOT-jar-with-dependencies.jar
6、 运行成功查看pod,有2个executor的pod pending,使用describe可查看详情,显示cpu资源不足,所以没起来,不影响处理。

7、 查看日志看输出结果。-f参数 一直监控日志
[k8s@k8s101 root]$ kubectl logs -f spark-pi-a27e40773a316496-driver

具体资源参数控制可参考官网:
http://spark.apache.org/docs/latest/running-on-kubernetes.html#configuration

九、Spark UI
1、 spark任务运行在容器内部,这个时候直接访问4040端口是访问不到的,需要对外进行转发暴露。根据pod名称使用命令,
spark-pi-1f85e07747273183-driver为pod名称
[k8s@k8s101 root]$ kubectl port-forward --address 0.0.0.0 spark-pi-1f85e07747273183-driver4040:4040
2、 转发命令完成后,使用浏览器访问,就可以访问到UI界面了

3、 如果想杀死对应任务,使用命令。defalut是pod所在namespace
[k8s@k8s101 spark-3.0.0-bin-hadoop2.7]$ bin/spark-submit --killdefault:spark-pi-1f85e07747273183-driver --master k8s://https://k8s101:6443
十、历史服务器
1、 修改spark.default.conf配置文件
[k8s@k8s101 ~]$ cd /opt/software/spark-3.0.0-bin-hadoop2.7/conf [k8s@k8s101 conf]$ mv spark-defaults.conf.template spark-defaults.conf [k8s@k8s101 conf]$ vim spark-defaults.conf spark.eventLog.enabledtrue spark.eventLog.dirhdfs://mycluster/sparklogs spark.history.fs.logDirectory hdfs://mycluster/sparklogs
2、 创建目录
[root@k8s101 conf]# hadoop dfs -mkdir /sparklogs
3、 启动spark历史服务
[root@k8s101 spark-3.0.0-bin-hadoop2.7]# sbin/start-history-server.sh

4、 浏览器访问18080端口

5、 重新更新spark-conf-volume cobfigmap
[k8s@k8s101 spark-3.0.0-bin-hadoop2.7]$ kubectl delete configmap spark-conf-volume [k8s@k8s101 spark-3.0.0-bin-hadoop2.7]$ kubectl create configmap spark-conf-volume--from-file=/opt/software/spark-3.0.0-bin-hadoop2.7/conf/
6、 测试,再次跑spark on k8s的任务,查看页面,可以看到历史任务

7、 点击查看详情
