• 基于Headless构建高可用spark+pyspark集群


    1、创建Headless Service服务

    Headless 服务类型并不分配容器云虚拟 IP,而是直接暴露所属 Pod 的 DNS 记录。没有默认负载均衡器,可直接访问 Pod IP 地址。因此,当我们需要与集群内真实的 Pod IP 地址进行直接交互时,Headless 服务就很有用。
    其中Service的关键配置如下:clusterIP: None,不让其获取clusterIP , DNS解析的时候直接走pod。

    ---
    kind: Service
    apiVersion: v1
    metadata:
      name: ecc-spark-service
      namespace: ecc-spark-cluster
    spec:
      clusterIP: None
      ports:
        - port: 7077
          protocol: TCP
          targetPort: 7077
          name: spark
        - port: 10000
          protocol: TCP
          targetPort: 10000
          name: thrift-server-tcp
        - port: 8080
          targetPort: 8080
          name: http
        - port: 45970
          protocol: TCP
          targetPort: 45970
          name: thrift-server-driver-tcp  
        - port: 45980
          protocol: TCP
          targetPort: 45980
          name: thrift-server-blockmanager-tcp    
        - port: 4040
          protocol: TCP
          targetPort: 4040
          name: thrift-server-tasks-tcp              
      selector:
        app: ecc-spark-service
    
    EOF
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    Service的完全域名: ecc-spark-service.ecc-spark-cluster.svc.cluster.local
    headless service的完全域名: headless-service.ecc-spark-cluster.svc.cluster.local
    在容器里面ping 完全域名, service解析出的地址是clusterIP,headless service 解析出来的地址是 pod IP。

    2、构建spark集群

    2.1 、创建spark master

    spark master分为两个部分,一个是类型为ReplicationController的主体,命名为ecc-spark-master.yaml,另一部分为一个service,暴露master的7077端口给slave使用。

    #如下是把thriftserver部署在master节点,则需要暴露thriftserver端口、driver端口、
    #blockmanager端口服务,以提供worker节点executor与driver交互.
    cat >ecc-spark-master.yaml <<EOF
    kind: Deployment
    apiVersion: apps/v1
    metadata:
      name: ecc-spark-master
      namespace: ecc-spark-cluster
      labels:
        app: ecc-spark-master
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: ecc-spark-master
      template:
        metadata:
          labels:
            app: ecc-spark-master
        spec:
          serviceAccountName: spark-cdp
          securityContext: {}
          dnsPolicy: ClusterFirst
          hostname: ecc-spark-master
          containers:
            - name: ecc-spark-master
              image: spark:3.4.1
              imagePullPolicy: IfNotPresent
              command: ["/bin/sh"]
              args: ["-c","sh /opt/spark/sbin/start-master.sh && tail -f /opt/spark/logs/spark--org.apache.spark.deploy.master.Master-1-*"]
              ports:
                - containerPort: 7077
                - containerPort: 8080
              volumeMounts:
                - mountPath: /opt/usrjars/
                  name: ecc-spark-pvc
              livenessProbe:
                failureThreshold: 9
                initialDelaySeconds: 2
                periodSeconds: 15
                successThreshold: 1
                tcpSocket:
                  port: 8080
                timeoutSeconds: 10
              resources:
                requests:
                  cpu: "2"
                  memory: "6Gi"
                limits:
                  cpu: "2"
                  memory: "6Gi"
             - env:
                - SPARK_LOCAL_DIRS
                  value: "/odsdata/sparkdirs/"             
          volumes:
            - name: ecc-spark-pvc
              persistentVolumeClaim:
                claimName: ecc-spark-pvc-static
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    2.2、创建spark worker

    在启动spark worker脚本中需要传入master的地址,在容器云kubernetes dns且设置了service的缘故,可以通过ecc-spark-master.ecc-spark-cluster.svc.cluster.local:7077访问。

    cat >ecc-spark-worker.yaml <<EOF
    kind: Deployment
    apiVersion: apps/v1
    metadata:
      name: ecc-spark-worker
      namespace: ecc-spark-cluster
      labels:
        app: ecc-spark-worker
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: ecc-spark-worker
      template:
        metadata:
          labels:
            app: ecc-spark-worker
        spec:
          serviceAccountName: spark-cdp
          securityContext: {}
          dnsPolicy: ClusterFirst
          hostname: ecc-spark-worker
          containers:
            - name: ecc-spark-worker
              image: spark:3.4.1
              imagePullPolicy: IfNotPresent
              command: ["/bin/sh"]
              args: ["-c","sh /opt/spark/sbin/start-worker.sh spark://ecc-spark-master.ecc-spark-cluster.svc.cluster.local:7077;tail -f /opt/spark/logs/spark--org.apache.spark.deploy.worker.Worker*"]
              ports:
                - containerPort: 8081
              volumeMounts:
                - mountPath: /opt/usrjars/
                  name: ecc-spark-pvc
              resources:
                requests:
                  cpu: "2"
                  memory: "2Gi"
                limits:
                  cpu: "2"
                  memory: "4Gi"
            - env:
                - SPARK_LOCAL_DIRS
                  value: "/odsdata/sparkdirs/"              
          volumes:
            - name: ecc-spark-pvc
              persistentVolumeClaim:
                claimName: ecc-spark-pvc-static
    
    EOF
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    2.3 构建pyspark提交环境

    import json
    import flask
    from flask import Flask
    from concurrent.futures import ThreadPoolExecutor
    
    app = Flask(__name__)
    pool = ThreadPoolExecutor(max_workers=8)
    
    @app.route('/')
    def hello_world():  # put application's code here
        return 'Hello World!'
    
    @app.route('/downloadCode', methods=['post'])
    def download_file():
        model_id = flask.request.json.get('modelId')
        print(model_id)
        """
        异步提交任务:pool.submit()
        """
        return json.dumps(0, ensure_ascii=False)
    
    @app.route('/modelRun', methods=['post'])
    def model_run():
        """
        异步提交任务:pool.submit()
        """
        return json.dumps(0, ensure_ascii=False)
    
    if __name__ == '__main__':
        app.run()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    spark@c67e6477b2f1:/opt/spark$ python3
    Python 3.8.10 (default, May 26 2023, 14:05:08) 
    [GCC 9.4.0] on linux
    Type "help", "copyright", "credits" or "license" for more information.
    >>> 
    >>> 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    将python的调用整合到:start-master.sh 文件末尾启动调用,便可以通过k8s暴露spark-master的F5端口实现http调用。

    3、使用spark-operator安装spark集群方式

    可以参考阿里云文章:搭建Spark应用

  • 相关阅读:
    每日一题:Spring MVC 的执行流程是什么❓
    QT人脸识别知识
    递归:理解和应用
    【GD32F427开发板试用】+DHT11温湿度监测
    如何去掉照片中多余路人?一分钟帮你搞定
    基于SSM的工资管理系统
    使用 Zokrates 在 BSV 上创建您的第一个 zkSNARK 证明
    jar包中取消乱码的配置
    JVM之【执行引擎】
    行业追踪,2023-09-27
  • 原文地址:https://blog.csdn.net/software444/article/details/132438355