• SparkStreaming消费kafka存储到Elasticsearch


    目录

    1.软件环境:

    2.启动haddop yarn hdfs 

    3.启动spark

    4.启动kafka

    5.部署elasticsearch和kibana

    6、spark运行idea打包测scala程序

    7、运行效果


    1.软件环境:

    elk  hadoop-3.3.3  kafka_2.12-3.2.1  spark-3.3.0-bin-hadoop3  spark-demo.jar

    部署目录/home

    2.启动haddop yarn hdfs 

    /home/hadoop-3.3.3/sbin/start-all.sh
    /home/hadoop-3.3.3/sbin/stop-all.sh

    3.启动spark


    /home/spark-3.3.0-bin-hadoop3/sbin/start-all.sh

    /home/spark-3.3.0-bin-hadoop3/sbin/stop-all.sh

    4.启动kafka

    /home/kafka_2.12-3.2.1/bin/zookeeper-server-start.sh /home/kafka_2.12-3.2.1/config/zookeeper.properties
    /home/kafka_2.12-3.2.1/bin/kafka-server-start.sh /home/kafka_2.12-3.2.1/config/server.properties

    5.部署elasticsearch和kibana

    使用docker部署最新版本/home/elk目录挂载配置文件和正数信息。

    elasticsearch8.3.3连接使用了https的密码认证

    6、spark运行idea打包测scala程序

    /home/spark-3.3.0-bin-hadoop3/bin/spark-submit --class org.example.KafkaSparkEsDemo --master yarn --deploy-mode client --driver-memory 512m --executor-memory 512m --executor-cores 1 spark-demo.jar

    开启了远程调试
    /home/spark-3.3.0-bin-hadoop3/bin/spark-submit --class org.example.KafkaSparkEsDemo --master yarn --deploy-mode client --driver-memory 512m --executor-memory 512m --executor-cores 1 --driver-java-options "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8899" spark-demo.jar

    7、运行效果

     

    8.问题记录

     Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
    增加了三个属性

        conf.set("es.internal.es.cluster.name","docker-cluster")
        conf.set("es.internal.es.cluster.uuid","i033q5N4QuqsPAJs9-nCKQ")
        conf.set("es.internal.es.version","8.3.3")

    后来spark stream写入elasticsearch继续报错,获取Es版本信息时候,服务ip:9200响应为空。

    原因是elasticsearch8.3.3版本使用了https访问,需要增加响应的配置。

    临时解决办法,这里部署的elasticsearch使用了docker,所以进入docker容器。

    修改配置文件elasticsearch.yml

    配置文件中两个属性改为false

     官网参考地址:
    https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html

    问题2、

    sun.security.validator.ValidatorException: PKIX path validation failed: java.security.cert.CertPathValidatorException: Path does not chain with any of the trust anchors); no other nodes left - aborting...

    文件路径没有指定对,后缀为p12的文件有http.p12和transport.p12

    es.net.ssl.keystore.location属性配置了transport.p12出现上面的错误,文件的描述信息,可查看官网。
    

    正确的应该是http.p12,而且前面是有前缀的file://

    es.net.ssl.truststore.pass对应的密码怎么设置,如果你也使用docker部署。

    可进入容器,执行命令获取,官网截图

     

    SparkConf的配置

    1. conf.set("es.net.ssl", "true")
    2. conf.set("es.net.ssl.keystore.location", "file:///home/elk/elasticsearch/config/certs/http.p12")
    3. conf.set("es.net.ssl.keystore.pass", "O2_d33WTSGGCq4T2eB28HA")
    4. conf.set("es.net.ssl.keystore.type", "PKCS12")
    5. conf.set("es.net.ssl.truststore.location", "file:///home/elk/elasticsearch/config/certs/http.p12")
    6. conf.set("es.net.ssl.truststore.pass", "O2_d33WTSGGCq4T2eB28HA")
    7. conf.set("es.net.ssl.cert.allow.self.signed", "true")
    8. conf.set("es.index.auto.create", "true")
    9. conf.set("es.scroll.size", "200")
    10. conf.set("es.read.metadata", "true")
    11. conf.set("es.nodes.wan.only", "true")
    12. conf.set("es.nodes", "10.10.10.99")
    13. conf.set("es.port", "9200")
    14. conf.set("es.index.read.missing.as.empty", "true")
    15. conf.set("es.net.http.auth.user", "elastic")
    16. conf.set("es.net.http.auth.pass", "T0e*QUGWRt05*F-2PLFP")
    17. conf.set("es.internal.es.cluster.name", "docker-cluster")
    18. conf.set("es.internal.es.cluster.uuid", "i033q5N4QuqsPAJs9-nCKQ")
    19. conf.set("es.internal.es.version", "8.3.3")
    20. conf.set("es.nodes.client.only", "false")
    21. conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    22. conf.set("spark.executor.memoryOverhead", "1024")
    23. conf.set("spark.streaming.backpressure.enabled", "true")
    24. conf.set("spark.streaming.kafka.maxRatePerPartition", "1000")

  • 相关阅读:
    app逆向(8)|app的加固+脱壳和frida+rpc介绍
    前端、后端面试集锦
    sql更新语句的执行流程
    使用WebDriver采样器将JMeter与Selenium集成
    torchvision.models中模型编辑的requires_grad
    7.22 SpringBoot项目实战【收藏 和 取消收藏】
    博客之QQ登录功能(二)
    朴素贝叶斯(基于概率论)
    computed计算方法不被调用的原因;只有在使用时才会被调用
    对话芯动科技 | 助力云游戏 4K级服务器显卡的探索与创新
  • 原文地址:https://blog.csdn.net/TT1024167802/article/details/126236653