• spark写带sasl认证的kafka


    背景描述

    spark任务写kafka集群是常规操作,一直很稳健。最近运维提供了带ACL功能的kafka集群,启用sasl认证,spark任务写kafka集群时异常。

    异常日志

    22/08/11 16:02:15 INFO TransactionManager: [Producer clientId=producer-2] Transiting to fatal error state due to org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
    22/08/11 16:02:15 INFO TransactionManager: [Producer clientId=producer-1] Transiting to fatal error state due to org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
    22/08/11 16:02:16 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
    org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
            at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125)
            at org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:442)
            at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:998)
            at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:912)
            at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:797)
           …………  
    Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
    22/08/11 16:02:16 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
    org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
            at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125)
            at org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:442)
            at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:998)
            at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:912)
            at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:797)

            …………
           org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1465)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
    22/08/11 16:02:16 ERROR TaskSetManager: Task 1 in stage 0.0 failed 1 times; aborting job

    问题分析

    根据异常日志分析,估计带ACL的kafka集群禁用了IdempotentWrite特性,但spark默认开启了IdempotentWrite特性,导致spark写kafka异常,报以上错误

    解决方案

    提供简单示例,如下

    1. Properties props = new Properties();
    2. props.put("bootstrap.servers", "xxx");
    3. props.put("enable.idempotence", "false");
    4. props.put("acks", "all");
    5. props.put("key.serializer", StringSerializer.class.getName());
    6. props.put("value.serializer", StringSerializer.class.getName());
    7. props.put("security.protocol", "SASL_PLAINTEXT");
    8. props.put("sasl.mechanism", "SCRAM-SHA-512");
    9. props.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxx\" password=\"xxx\";");
    10. String topic = "xxx";
    11. String mykey = "mykey";
    12. String myvalue = "myvalue";
    13. System.out.println(props.toString());
    14. KafkaProducer producer = new KafkaProducer(props);
    15. producer.send(new ProducerRecord(topic, mykey, myvalue));
    16. producer.close();

    参考文档

    Apache Kafka

    Kafka producer property enable.idempotence=true is causing error - Stack Overflow

  • 相关阅读:
    Go 语言基础
    vagrant安装k8s集群
    Linux基本命令(一)
    记录一次内存泄漏导致页面崩溃的排查过程
    SpringBoot中读取配置文件的几个注解
    1161 Merging Linked Lists – PAT甲级真题
    Docker 容器跨主机通信 - Flannel
    MySQL-数据库的操作
    247:vue+openlayers 根据坐标显示多边形(3857投影),计算出最大幅宽
    Flink动态业务规则的实现
  • 原文地址:https://blog.csdn.net/L13763338360/article/details/126288319