• kafka配置SASL/PLAIN 安全认证


    1 zookeeper配置启动

    1.1 zookeeper添加SASL支持

    为zookeeper添加SASL支持,在配置文件zoo.cfg添加

    authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    requireClientAuthScheme=sasl
    jaasLoginRenew=3600000
    
    • 1
    • 2
    • 3

    1.2 zk_server_jaas.conf文件

    新建zk_server_jaas.conf文件,为Zookeeper添加账号认证信息.这个文件你放在哪里随意,只要后面zkEnv配置正确的路径就好了。我是放在/opt/zookeeper/conf/home路径下。zk_server_jaas.conf文件的内容如下
    Server {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username=“cluster”
    password=“clusterpasswd”
    user_kafka=“kafkapasswd”;
    };`
    username和paasword是zk集群之间的认证密码。
    user_kafka=“kafkapasswd"定义了一个用户"kafka”,密码是"kafkapasswd",本次测试用户是kafka broker。

    1.3 引入jar包

    由上一步可发现,认证方式使用的是Kafka的认证类org.apache.kafka.common.security.plain.PlainLoginModule。因此zk需要依赖几个jar包。
    在home下新建zk_sasl_dependency目录,从kafka/lib目录下复制以下几个jar包到该目录下。根据kafka版本不同,几个jar包的版本可能不一样

    -rw-r--r-- 1 root root 1893564 Aug 29 10:53 kafka-clients-2.0.0.jar
    -rw-r--r-- 1 root root  489884 Aug 29 11:14 log4j-1.2.17.jar
    -rw-r--r-- 1 root root  370137 Aug 29 10:53 lz4-java-1.4.1.jar
    -rw-r--r-- 1 root root   41203 Aug 29 10:53 slf4j-api-1.7.25.jar
    -rw-r--r-- 1 root root   12244 Aug 29 10:53 slf4j-log4j12-1.7.25.jar
    -rw-r--r-- 1 root root 2019013 Aug 29 10:53 snappy-java-1.1.7.1.jar
    [root@sm_qf-bj_hydgpt_192-168-151-168 home]# 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    1.4.修改zkEnv.sh

    在zkEnv.sh添加

    
    for i in /opt/zookeeper/conf/home/zk_sasl_dependency/*.jar
    do
       CLASSPATH="$i:$CLASSPATH"
    done
    SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/opt/zookeeper/conf/home/zk_server_jaas.conf "
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    1.5.启动zk服务端

    执行./zkServer.sh restart重新启动zk。如果启动异常查看日志排查问题
    
    • 1

    2 kafka配置和启动

    2.1.新建kafka_server_jaas.conf,为kafka添加认证信息

    KafkaServer {
     org.apache.kafka.common.security.plain.PlainLoginModule required
     username="cluster"
     password="cluster"
     user_cluster=“clusterpasswd”
     user_kafka="kafkapasswd" ;
    };
    Client{
     org.apache.kafka.common.security.plain.PlainLoginModule required  
     username="kafka"  
     password="kafkapasswd";  
    };
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    KafkaServer,第一行指定了认证方法为PLAIN,usernam和password是kafka的多个broker之间进行认证的账号密码。
    user_kafka="kafkapasswd"设置了用户kafka,密码为kafkapswd,用于客户端的生产者和消费者连接认证。
    网上的说法是 Client,是kafka作为用户使用zk的认证信息,这里的username和password一定要和zk_server_jaas.conf的配置对的上。

    2. 2.在kafka的配置文件开启SASL认证

    listeners=SASL_PLAINTEXT://(IP):9092
    security.inter.broker.protocol=SASL_PLAINTEXT
    sasl.mechanism.inter.broker.protocol=PLAIN 
    sasl.enabled.mechanisms=PLAIN
    allow.everyone.if.no.acl.found=true
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.3 .在server启动脚本JVM参数

    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    改为
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/kafka_server_jaas.conf"

    2.4 启动

    ./kafka-server-start.sh ../config/server.properties

    2.5

    kafka服务端正常启动后,应该会有类似下面这行的日志信息,说明认证功能开启成功

    Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint((IP),9092,ListenerName(SASL_PLAINTEXT),SASL_PLAINTEXT) (kafka.utils.ZkUtils)
    
    
    • 1
    • 2

    3 kafka的SASL认证功能认证和使用

    1.使用kafka脚本认证

    我们使用kafka自带的脚本进行认证。

    1.新建kafka_client_jaas.conf,为客户端添加认证信息

    在/home下新建kafka_client_jaas.conf,添加以下信息

    KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="kafka"
      password="kafkapasswd";
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2.修改客户端配置信息

    修改producer.properties和consumer.properties,添加认证机制

    security.protocol=SASL_PLAINTEXT 
    sasl.mechanism=PLAIN 
    
    • 1
    • 2

    3.修改客户端启动脚本

    修改kafka-console-producer.sh,配置认证文件kafka_client_jaas.conf,将

    export KAFKA_HEAP_OPTS=“-Xmx512M”

    export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/home/kafka_client_jaas.conf"
    
    • 1

    kafka-console-consumer.sh的修改类似。

    4.客户端启动并认证

    启动consumer

    ./bin/kafka-console-consumer.sh --bootstrap-server (IP):9092 --topic test --from-beginning --consumer.config config/consumer.properties
    
    • 1

    启动producer

    ./bin/kafka-console-producer.sh --broker-list (IP):9092 --topic test --producer.config configoducer.properties
    
    • 1

    producer端发送消息,consumer端成功接收到消息。

    4.Java客户端认证

    package com.zte.sdn.oscp.jms.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.junit.Test;
    
    import java.util.Collections;
    import java.util.Properties;
    
    public class KafkaTest {
    
        @Test
        public void testProduct() throws Exception {
            System.setProperty("java.security.auth.login.config", "F:/kafka_client_jaas.conf");
    
            Properties props = new Properties();
            props.put("bootstrap.servers", "IP:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            props.put("security.protocol", "SASL_PLAINTEXT");
            props.put("sasl.mechanism", "PLAIN");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            while (true){
    			long startTime = System.currentTimeMillis();
    			for (int i = 0; i < 100; i++) {
    				producer.send(new ProducerRecord<>("kafkatest", Integer.toString(i), Integer.toString(i)));
    			}
    			System.out.println(System.currentTimeMillis()-startTime);
    			Thread.sleep(5000);
    		}
        }
    
        @Test
        public void testConsumer() throws Exception {
            System.setProperty("java.security.auth.login.config", "F:/kafka_client_jaas.conf");
    
            Properties props = new Properties();
            props.put("bootstrap.servers", "(IP):9092");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("group.id", "kafka_test_group");
            props.put("session.timeout.ms", "6000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            props.put("security.protocol", "SASL_PLAINTEXT");
            props.put("sasl.mechanism", "PLAIN");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList("kafkatest"));
            while (true) {
                long startTime = System.currentTimeMillis();
                ConsumerRecords<String, String> records = consumer.poll(1000);
                System.out.println(System.currentTimeMillis() - startTime);
                System.out.println("recieve message number is " + records.count());
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s, partition = %d %n",
                            record.offset(),
                            record.key(),
                            record.value(),
                            record.partition());
                }
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
  • 相关阅读:
    跨平台的桌面应用开发,技术框架选择
    ubuntu18/20 下如何生成core文件
    Android入门第2天-Android Studio中新建项目
    重磅发布|腾讯云容器安全服务网络隔离功能已上线
    6. 【图的存储结构】邻接矩阵、邻接表 、十字链表 、邻接多重表
    CLIP扩展
    SQL注入之 无列名注入 原理详解
    基于Hadoop的MapReduce网站日志大数据分析(含预处理MapReduce程序、hdfs、flume、sqoop、hive、mysql、hbase组件、echarts)
    MMDetection 使用示例:从入门到出门
    C++入门
  • 原文地址:https://blog.csdn.net/weixin_44280356/article/details/132557702