• 大数据Hadoop之——Kafka API介绍与实战操作


    一、Kafka API介绍

    Kafka包括五个核心api:

    • Producer API 允许应用程序将数据流发送到 Kafka 集群中的主题。【生产者】
    • Consumer API 允许应用程序从 Kafka 集群中的主题中读取数据流。【消费者】
    • Streams API 允许将数据流从输入主题转换为输出主题。【计算引擎】
    • Connect API 允许实现连接器,这些连接器不断地从某个源系统或应用程序拉入 Kafka,或从 Kafka 推送到某个接收器系统或应用程序。【source与sink】
    • Admin API 允许管理和检查主题、代理和其他 Kafka 对象。

    Java 客户端接口文档:https://kafka.apache.org/32/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
    非 Java 客户端接口文档:https://cwiki.apache.org/confluence/display/KAFKA/Clients

    使用java kafka需引入依赖:

     <!-- Kafka 客户端库。包含内置的序列化器/反序列化器 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.2.0</version>
    </dependency>
    <!-- Kafka Streams 的基础库 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>3.2.0</version>
    </dependency>
    <!-- 用于 Scala 库的 Kafka Streams DSL,用于编写 Scala Kafka Streams 应用程序。不使用 SBT 时,您需要在工件 ID 后缀上您的应用程序使用的正确版本的 Scala ( _2.12, _2.13) -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams-scala_2.13</artifactId>
        <version>3.2.0</version>
    </dependency>
    
    <!-- slf4j 依赖包 -->
    <!-- https://mvnrepository.com/artifact/log4j/log4j -->
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.25</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    slf4j-api、slf4j-log4j12、log4j三者之间的关系如下图所示:
    在这里插入图片描述

    二、实战操作

    1)zookeeper与kafka无鉴权

    关于配置可以参考我这篇文章:大数据Hadoop之——Kafka 图形化工具 EFAK(EFAK环境部署)

    1、启动服务

    $ cd $KAFKA_HOME
    # 启动zookeeper
    $ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
    # 启动zookeeper客户端验证
    $ ./bin/zookeeper-shell.sh hadoop-node1:12181
    
    # 启动kafka
    $ ./bin/kafka-server-start.sh -daemon ./config/server.properties
    # 查看kafka topic列表
    $ ./bin/kafka-topics.sh --bootstrap-server hadoop-node1:19092 --list
    
    # 停止服务
    $ cd $KAFKA_HOME; ./bin/kafka-server-stop.sh ; ./bin/zookeeper-server-stop.sh
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    2、示例(java版本)

    package bigdata.kafka.com;
    
    import java.time.Duration;
    import java.util.*;
    
    import org.apache.kafka.clients.admin.*;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.concurrent.ExecutionException;
    
    public class KafkaTest001 {
    
        public static final String TOPIC = "plaintext";
    
        static Properties getBaseConfig() {
            Properties props = new Properties();
            // Kafka 服务端的主机名和端口号
            props.put("bootstrap.servers", "hadoop-node1:19092,hadoop-node2:19092,hadoop-node3:19092");
    		// 也可以使用下面方式定义配置
            // props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop-node1:19092,hadoop-node2:19092,hadoop-node3:19092");
            
            return props;
        }
    
        /**
         * 创建topic
         */
        public static void createTopic(){
            final Properties props = getBaseConfig();
            KafkaAdminClient adminClient= (KafkaAdminClient) KafkaAdminClient.create(props);
            //创建需要新建的topic
            //1.topic名,2.分区数:1, 3.副本数:1
            NewTopic newTopic = new NewTopic(TOPIC, 3,(short)3);
            // CreateTopicsResult res = adminClient.createTopics(Arrays.asList("topic1", "topic3","topic3"));
            CreateTopicsResult res = adminClient.createTopics(Collections.singletonList(newTopic));
            //等待创建,成功不会有任何报错,如果创建失败和超时会报错。
            try {
                res.all().get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * topic list
         */
        public static void topicList() throws ExecutionException, InterruptedException {
            final Properties props = getBaseConfig();
            KafkaAdminClient adminClient= (KafkaAdminClient) KafkaAdminClient.create(props);
    
            // 查看所有topic列表
            Set<String> topiclist = adminClient.listTopics().names().get();
            System.out.println(topiclist);
            //遍历数组中的元素
            for (String topicname:topiclist){
                System.out.println(topicname);
            }
    
            //查看Topic详情
            DescribeTopicsResult describeTopics = adminClient.describeTopics(topiclist);
            Map<String, TopicDescription> tdm = describeTopics.allTopicNames().get();
            for (Map.Entry<String, TopicDescription> entry : tdm.entrySet()) {
                System.out.println(entry.getKey()+"\t"+entry.getValue());
            }
        }
    
        /**
         * 删除topic
         * @param topicName
         */
        public static void deleteTopic(String topicName){
            final Properties props = getBaseConfig();
            KafkaAdminClient adminClient= (KafkaAdminClient) KafkaAdminClient.create(props);
            DeleteTopicsResult res = adminClient.deleteTopics(Collections.singleton(topicName));
            try {
                System.out.println(res.all().get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
    
        }
    
        /**
         * 生产者
         */
        public static void MyProducer() throws InterruptedException {
            /**
             * 1.创建链接参数
             */
            final Properties props = getBaseConfig();
            // ack:-1 需要 ISR 中所有的 Replica(副本)给予接收确认,速度最慢,安全性最高,但是由于 ISR 可能会缩小到仅包含一个 Replica,所以设置参数为 all 时并不能一定避免数据丢失。
            props.put("acks", "-1");
            // 消息重试次数
            props.put("retries", 3);
            // 设置请求的数据的最大字节数,为了防止发生较大的数据包影响到吞吐量,默认值为1MB。超过 1MB 将会报错。这里设置1M
            props.put("max.request.size",1048576);
            // 每一批次的消息大小,16kb
            props.put("batch.size", 16384);
            // 消息延迟发送时间,默认0,只有满足一批次才会发送。这里设置为1s,一个批次的消息不满足 16kb,也会发送
            props.put("linger.ms", 1);
            // 缓冲池大小,默认32M
            props.put("buffer.memory", 33554432);
    
            // key的序列化方式
            props.put("key.serializer", StringSerializer.class);
            // value的序列化方式
            props.put("value.serializer", StringSerializer.class);
    
            /**
             * 2.创建生产者
             */
            KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props);
            /**
             *  3.调用 send 方法,发送消息(普通异步发送)
             */
            /*for (int i = 0; i < 50; i++) {
                // 异步发送 默认
                ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "key" + i, "value" + i);
                producer.send(record);
    
                // 同步发送
                record.send(record).get();
            }*/
    
            /**
             * 3. 带回调函数的异步发送
             */
            for (int i = 0; i < 5; i++) {
                // 添加回调
                producer.send(new ProducerRecord<>(TOPIC,
                        "key " + i,  "value" + i), new Callback() {
    
                    // 该方法在 Producer 收到 ack 时调用,为异步调用
                    @Override
                    public void onCompletion(RecordMetadata metadata,
                                             Exception exception) {
                        if (exception == null) {
                            // 没有异常,输出信息到控制台
                            System.out.println(" 主题: " +
                                    metadata.topic() + "->" + "分区:" + metadata.partition());
                        } else {
                            // 出现异常打印
                            exception.printStackTrace();
                        }
                    }
                });
                // 延迟一会会看到数据发往不同分区
                Thread.sleep(2);
            }
    
            /**
             * 4.关闭生产者
             */
            producer.close();
        }
    
        /**
         * 消费者
         */
        public static void MyConsumer(){
            // 1.创建链接参数
            final Properties props = getBaseConfig();
            // 制定consumer group
            props.put("group.id", "test");
            // 是否自动确认offset
            props.put("enable.auto.commit", "false");
            // 自动确认offset的时间间隔
            // props.put("auto.commit.interval.ms", "1000");
    
            // 使用消费者组管理时,调用poll()之间的最大延迟。这对消费者在获取更多记录之前可以空闲的时间量设置了一个上限。
            // 如果在此超时过期之前未调用poll(),则认为使用者失败,组将重新平衡,以便将分区重新分配给其他成员
            // 默认值 300s
            props.put("max.poll.interval.ms",300000);
    
            // offset 重置,默认:latest,从最新的开始消费,这里设置为 earliest,重最早的提交的 offset 开始消费
            props.put("auto.offset.reset","earliest");
            // 拉取的消息的最大条数,默认 500
            props.put("max.poll.records",500);
    
            //  这个参数就是为消息的 key 做反序列化的 ;consumer 代码从 broker 端获取的任何消息都是宇节数组的格式,因此消息的每个组件都要执行相应的解序列化操作才能“还原”成原来的对象格式 。
            props.put("key.deserializer", StringDeserializer.class);
            props.put("value.deserializer", StringDeserializer.class);
    
            // 定义consumer
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            // 消费者订阅的topic, 可同时订阅多个
            // consumer.subscribe(Arrays.asList("topic1", "topic3","topic3"));
            consumer.subscribe(Collections.singletonList(TOPIC));
            while (true) {
                // 读取数据,读取超时时间为 10ms
                Duration d = Duration.ofSeconds(10);
                ConsumerRecords<String, String> records = consumer.poll(d);
                
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
                // 同步提交 offset
                // consumer.commitSync();
                // 异步提交 offset
                consumer.commitAsync();
            }
        }
    
        /**
         * 主函数入口
         * @param args
         * @throws ExecutionException
         * @throws InterruptedException
         */
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // deleteTopic(TOPIC);
            // createTopic();
            // topicList();
            MyProducer();
            MyConsumer();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225

    在这里插入图片描述

    2)zookeeper无鉴权与kafka kerberos鉴权

    关于配置可以参考我这篇文章:大数据Hadoop之——Kafka鉴权认证(Kafka kerberos认证+kafka账号密码认证+CDH Kerberos认证)

    1、启动服务

    $ cd $KAFKA_HOME
    # 启动zookeeper
    $ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
    # 启动zookeeper客户端验证
    $ ./bin/zookeeper-shell.sh hadoop-node1:12181
    
    # 启动kafka
    $ ./bin/kafka-server-start-sasl.sh -daemon config/server-sasl.properties
    # 查看kafka topic列表
    $ ./bin/kafka-topics-sasl.sh --list --bootstrap-server hadoop-node1:19092 --command-config config/kerberos/client.properties
    
    # 停止服务
    $ cd $KAFKA_HOME; ./bin/kafka-server-stop.sh ; ./bin/zookeeper-server-stop.sh
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    2、示例(java版本)

    package bigdata.kafka.com;
    
    
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.admin.*;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.config.SaslConfigs;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Map;
    import java.util.Properties;
    import java.util.Set;
    import java.util.concurrent.ExecutionException;
    
    public class KafkaTest002 {
    
        public static final String TOPIC = "KafkaTest002";
    
        static Properties getBaseConfig() {
    
            Properties properties = System.getProperties();
            String krb5_config = (String) properties.get("java.security.krb5.conf");
            String login_config = (String) properties.get("java.security.auth.login.config");
            System.setProperty("java.security.krb5.conf", krb5_config);
            System.setProperty("java.security.auth.login.config", login_config);
    
            Properties props = new Properties();
            // Kafka 服务端的主机名和端口号
            props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "hadoop-node1:19092,hadoop-node2:19092,hadoop-node3:19092");
    
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    
            // 默认就是GSSAPI,kerberos
            props.put(SaslConfigs.SASL_MECHANISM, "GSSAPI");
    
            props.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka-server");
    
            return props;
        }
    
        /**
         * 创建topic
         */
        public static void createTopic(){
            final Properties props = getBaseConfig();
            KafkaAdminClient adminClient= (KafkaAdminClient) KafkaAdminClient.create(props);
            //创建需要新建的topic
            //1.topic名,2.分区数:1, 3.副本数:1
            NewTopic newTopic = new NewTopic(TOPIC, 3,(short)3);
            // CreateTopicsResult res = adminClient.createTopics(Arrays.asList("topic1", "topic3","topic3"));
            CreateTopicsResult res = adminClient.createTopics(Collections.singletonList(newTopic));
            //等待创建,成功不会有任何报错,如果创建失败和超时会报错。
            try {
                res.all().get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * topic list
         */
        public static void topicList() throws ExecutionException, InterruptedException {
            final Properties props = getBaseConfig();
            KafkaAdminClient adminClient= (KafkaAdminClient) KafkaAdminClient.create(props);
    
            // 查看所有topic列表
            Set<String> topiclist = adminClient.listTopics().names().get();
            System.out.println(topiclist);
            //遍历数组中的元素
            for (String topicname:topiclist){
                System.out.println(topicname);
            }
    
            //查看Topic详情
            DescribeTopicsResult describeTopics = adminClient.describeTopics(topiclist);
            Map<String, TopicDescription> tdm = describeTopics.allTopicNames().get();
            for (Map.Entry<String, TopicDescription> entry : tdm.entrySet()) {
                System.out.println(entry.getKey()+"\t"+entry.getValue());
            }
        }
    
        /**
         * 删除topic
         * @param topicName
         */
        public static void deleteTopic(String topicName){
            final Properties props = getBaseConfig();
            KafkaAdminClient adminClient= (KafkaAdminClient) KafkaAdminClient.create(props);
            DeleteTopicsResult res = adminClient.deleteTopics(Collections.singleton(topicName));
            try {
                System.out.println(res.all().get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
    
        }
    
        /**
         * 生产者
         */
        public static void MyProducer(String[] args) throws InterruptedException {
            /**
             * 1.创建链接参数
             */
            final Properties props = getBaseConfig();
            // ack:-1 需要 ISR 中所有的 Replica(副本)给予接收确认,速度最慢,安全性最高,但是由于 ISR 可能会缩小到仅包含一个 Replica,所以设置参数为 all 时并不能一定避免数据丢失。
            props.put("acks", "-1");
            // 消息重试次数
            props.put("retries", 3);
            // 设置请求的数据的最大字节数,为了防止发生较大的数据包影响到吞吐量,默认值为1MB。超过 1MB 将会报错。这里设置1M
            props.put("max.request.size",1048576);
            // 每一批次的消息大小,16kb
            props.put("batch.size", 16384);
            // 消息延迟发送时间,默认0,只有满足一批次才会发送。这里设置为1s,一个批次的消息不满足 16kb,也会发送
            props.put("linger.ms", 1);
            // 缓冲池大小,默认32M
            props.put("buffer.memory", 33554432);
    
            // key的序列化方式
            props.put("key.serializer", StringSerializer.class);
            // value的序列化方式
            props.put("value.serializer", StringSerializer.class);
    
            /**
             * 2.创建生产者
             */
            KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props);
            /**
             *  3.调用 send 方法,发送消息(普通异步发送)
             */
            /*for (int i = 0; i < 50; i++) {
                // 异步发送 默认
                ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "key" + i, "value" + i);
                producer.send(record);
    
                // 同步发送
                record.send(record).get();
            }*/
    
            /**
             * 3. 带回调函数的异步发送
             */
            for (int i = 0; i < 5; i++) {
                // 添加回调
                producer.send(new ProducerRecord<>(TOPIC,
                        "key " + i,  "value" + i), new Callback() {
    
                    // 该方法在 Producer 收到 ack 时调用,为异步调用
                    @Override
                    public void onCompletion(RecordMetadata metadata,
                                             Exception exception) {
                        if (exception == null) {
                            // 没有异常,输出信息到控制台
                            System.out.println(" 主题: " +
                                    metadata.topic() + "->" + "分区:" + metadata.partition());
                        } else {
                            // 出现异常打印
                            exception.printStackTrace();
                        }
                    }
                });
                // 延迟一会会看到数据发往不同分区
                Thread.sleep(2);
            }
    
            /**
             * 4.关闭生产者
             */
            producer.close();
        }
    
        /**
         * 消费者
         */
        public static void MyConsumer(String[] args){
            // 1.创建链接参数
            final Properties props = getBaseConfig();
            // 制定consumer group
            props.put("group.id", "test");
            // 是否自动确认offset
            props.put("enable.auto.commit", "false");
            // 自动确认offset的时间间隔
            // props.put("auto.commit.interval.ms", "1000");
    
            // 使用消费者组管理时,调用poll()之间的最大延迟。这对消费者在获取更多记录之前可以空闲的时间量设置了一个上限。
            // 如果在此超时过期之前未调用poll(),则认为使用者失败,组将重新平衡,以便将分区重新分配给其他成员
            // 默认值 300s
            props.put("max.poll.interval.ms",300000);
    
            // offset 重置,默认:latest,从最新的开始消费,这里设置为 earliest,重最早的提交的 offset 开始消费
            props.put("auto.offset.reset","earliest");
            // 拉取的消息的最大条数,默认 500
            props.put("max.poll.records",500);
    
            //  这个参数就是为消息的 key 做反序列化的 ;consumer 代码从 broker 端获取的任何消息都是宇节数组的格式,因此消息的每个组件都要执行相应的解序列化操作才能“还原”成原来的对象格式 。
            props.put("key.deserializer", StringDeserializer.class);
            props.put("value.deserializer", StringDeserializer.class);
    
            // 定义consumer
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            // 消费者订阅的topic, 可同时订阅多个
            // consumer.subscribe(Arrays.asList("topic1", "topic3","topic3"));
            consumer.subscribe(Collections.singletonList(TOPIC));
            while (true) {
                // 读取数据,读取超时时间为 10ms
                Duration d = Duration.ofSeconds(10);
                ConsumerRecords<String, String> records = consumer.poll(d);
    
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
                // 同步提交 offset
                // consumer.commitSync();
                // 异步提交 offset
                consumer.commitAsync();
            }
        }
    
        /**
         * 主函数入口
         * @param args
         * @throws ExecutionException
         * @throws InterruptedException
         */
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            try {
                Properties properties = System.getProperties();
                String action = (String) properties.get("action");
                System.out.println(action);
    
                for (String arg : args) {
                    System.out.println("main方法参数为:" + arg);
                }
    
                if ( action.equals("deleteTopic") ) {
                    deleteTopic(TOPIC);
                }
    
                if ( action.equals("createTopic") ) {
                    createTopic();
                }
    
                if ( action.equals("topicList") ) {
                    topicList();
                }
    
                if ( action.equals("deleteTopic") ) {
                    deleteTopic(TOPIC);
                }
    
                if ( action.equals("Producer_Consumer") ) {
                    MyProducer(args);
                    MyConsumer(args);
                }
            }catch (Exception e) {
                e.printStackTrace();
                System.out.println("please input args:[-Daction=deleteTopic,-Daction=createTopic,-Daction=topicList,,-Daction=Producer_Consumer]");
            }
    
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272

    Kerberos认证比较特殊,最好在Linux机器上测试,因为krb5.conf引用的文件计较多。这里我也是先打包成jar,放在Linux机器测试。IDEA打包成jar包如下所示:
    在这里插入图片描述

    【温馨提示】Directory for META-INF/MANIFEST.MF选择resources路径,否则运行jar包时会抛no main manifest attribute,in xxx.jar

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    执行

    $ java -jar -Daction=topicList -Djava.security.krb5.conf=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/config/kerberos/krb5.conf -Djava.security.auth.login.config=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/config/kerberos/kafka-client-jaas.conf  kafka.jar bigdata.kafka.com.KafkaTest002
    
    • 1

    【温馨提示】参数必须放在-jar后面

    在这里插入图片描述

    3)zookeeper无鉴权与kafka 账号密码鉴权

    1、启动服务

    $ cd $KAFKA_HOME
    # 启动zookeeper
    $ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
    # 启动zookeeper客户端验证
    $ ./bin/zookeeper-shell.sh hadoop-node1:12181
    
    # 启动kafka
    $ ./bin/kafka-server-start-pwd.sh -daemon config/server-pwd.properties
    # 查看kafka topic列表
    $ ./bin/kafka-topics-pwd.sh --list --bootstrap-server hadoop-node1:19092 --command-config config/userpwd/client.properties
    
    # 停止服务
    $ cd $KAFKA_HOME; ./bin/kafka-server-stop.sh ; ./bin/zookeeper-server-stop.sh
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    2、示例(java版本)

    package bigdata.kafka.com;
    
    
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.admin.*;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.apache.kafka.common.config.SaslConfigs;
    
    import java.time.Duration;
    import java.util.*;
    import java.util.concurrent.ExecutionException;
    
    public class KafkaTest003 {
    
        public static final String TOPIC = "KafkaTest003";
    
        static Properties getBaseConfig() {
            Properties props = new Properties();
            // Kafka 服务端的主机名和端口号
            props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "hadoop-node1:19092,hadoop-node2:19092,hadoop-node3:19092");
    
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
    
            props.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka-server");
    
            String saslJaasConfig = String.format("org.apache.kafka.common.security.plain.PlainLoginModule required \nusername=\"%s\" \npassword=\"%s\";", "kafka", "123456");
            props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
    
            return props;
        }
    
        /**
         * 创建topic
         */
        public static void createTopic(){
            final Properties props = getBaseConfig();
            KafkaAdminClient adminClient= (KafkaAdminClient) KafkaAdminClient.create(props);
            //创建需要新建的topic
            //1.topic名,2.分区数:1, 3.副本数:1
            NewTopic newTopic = new NewTopic(TOPIC, 3,(short)3);
            // CreateTopicsResult res = adminClient.createTopics(Arrays.asList("topic1", "topic3","topic3"));
            CreateTopicsResult res = adminClient.createTopics(Collections.singletonList(newTopic));
            //等待创建,成功不会有任何报错,如果创建失败和超时会报错。
            try {
                res.all().get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * topic list
         */
        public static void topicList() throws ExecutionException, InterruptedException {
            final Properties props = getBaseConfig();
            KafkaAdminClient adminClient= (KafkaAdminClient) KafkaAdminClient.create(props);
    
            // 查看所有topic列表
            Set<String> topiclist = adminClient.listTopics().names().get();
            System.out.println(topiclist);
            //遍历数组中的元素
            for (String topicname:topiclist){
                System.out.println(topicname);
            }
    
            //查看Topic详情
            DescribeTopicsResult describeTopics = adminClient.describeTopics(topiclist);
            Map<String, TopicDescription> tdm = describeTopics.allTopicNames().get();
            for (Map.Entry<String, TopicDescription> entry : tdm.entrySet()) {
                System.out.println(entry.getKey()+"\t"+entry.getValue());
            }
        }
    
        /**
         * 删除topic
         * @param topicName
         */
        public static void deleteTopic(String topicName){
            final Properties props = getBaseConfig();
            KafkaAdminClient adminClient= (KafkaAdminClient) KafkaAdminClient.create(props);
            DeleteTopicsResult res = adminClient.deleteTopics(Collections.singleton(topicName));
            try {
                System.out.println(res.all().get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
    
        }
    
        /**
         * 生产者
         */
        public static void MyProducer() throws InterruptedException {
            /**
             * 1.创建链接参数
             */
            final Properties props = getBaseConfig();
            // ack:-1 需要 ISR 中所有的 Replica(副本)给予接收确认,速度最慢,安全性最高,但是由于 ISR 可能会缩小到仅包含一个 Replica,所以设置参数为 all 时并不能一定避免数据丢失。
            props.put("acks", "-1");
            // 消息重试次数
            props.put("retries", 3);
            // 设置请求的数据的最大字节数,为了防止发生较大的数据包影响到吞吐量,默认值为1MB。超过 1MB 将会报错。这里设置1M
            props.put("max.request.size",1048576);
            // 每一批次的消息大小,16kb
            props.put("batch.size", 16384);
            // 消息延迟发送时间,默认0,只有满足一批次才会发送。这里设置为1s,一个批次的消息不满足 16kb,也会发送
            props.put("linger.ms", 1);
            // 缓冲池大小,默认32M
            props.put("buffer.memory", 33554432);
    
            // key的序列化方式
            props.put("key.serializer", StringSerializer.class);
            // value的序列化方式
            props.put("value.serializer", StringSerializer.class);
    
            /**
             * 2.创建生产者
             */
            KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props);
            /**
             *  3.调用 send 方法,发送消息(普通异步发送)
             */
            /*for (int i = 0; i < 50; i++) {
                // 异步发送 默认
                ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "key" + i, "value" + i);
                producer.send(record);
    
                // 同步发送
                record.send(record).get();
            }*/
    
            /**
             * 3. 带回调函数的异步发送
             */
            for (int i = 0; i < 5; i++) {
                // 添加回调
                producer.send(new ProducerRecord<>(TOPIC,
                        "key " + i,  "value" + i), new Callback() {
    
                    // 该方法在 Producer 收到 ack 时调用,为异步调用
                    @Override
                    public void onCompletion(RecordMetadata metadata,
                                             Exception exception) {
                        if (exception == null) {
                            // 没有异常,输出信息到控制台
                            System.out.println(" 主题: " +
                                    metadata.topic() + "->" + "分区:" + metadata.partition());
                        } else {
                            // 出现异常打印
                            exception.printStackTrace();
                        }
                    }
                });
                // 延迟一会会看到数据发往不同分区
                Thread.sleep(2);
            }
    
            /**
             * 4.关闭生产者
             */
            producer.close();
        }
    
        /**
         * 消费者
         */
        public static void MyConsumer(){
            // 1.创建链接参数
            final Properties props = getBaseConfig();
            // 制定consumer group
            props.put("group.id", "test");
            // 是否自动确认offset
            props.put("enable.auto.commit", "false");
            // 自动确认offset的时间间隔
            // props.put("auto.commit.interval.ms", "1000");
    
            // 使用消费者组管理时,调用poll()之间的最大延迟。这对消费者在获取更多记录之前可以空闲的时间量设置了一个上限。
            // 如果在此超时过期之前未调用poll(),则认为使用者失败,组将重新平衡,以便将分区重新分配给其他成员
            // 默认值 300s
            props.put("max.poll.interval.ms",300000);
    
            // offset 重置,默认:latest,从最新的开始消费,这里设置为 earliest,重最早的提交的 offset 开始消费
            props.put("auto.offset.reset","earliest");
            // 拉取的消息的最大条数,默认 500
            props.put("max.poll.records",500);
    
            //  这个参数就是为消息的 key 做反序列化的 ;consumer 代码从 broker 端获取的任何消息都是宇节数组的格式,因此消息的每个组件都要执行相应的解序列化操作才能“还原”成原来的对象格式 。
            props.put("key.deserializer", StringDeserializer.class);
            props.put("value.deserializer", StringDeserializer.class);
    
            // 定义consumer
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            // 消费者订阅的topic, 可同时订阅多个
            // consumer.subscribe(Arrays.asList("topic1", "topic3","topic3"));
            consumer.subscribe(Collections.singletonList(TOPIC));
            while (true) {
                // 读取数据,读取超时时间为 10ms
                Duration d = Duration.ofSeconds(10);
                ConsumerRecords<String, String> records = consumer.poll(d);
    
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
                // 同步提交 offset
                // consumer.commitSync();
                // 异步提交 offset
                consumer.commitAsync();
            }
        }
    
        /**
         * 主函数入口
         * @param args
         * @throws ExecutionException
         * @throws InterruptedException
         */
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // deleteTopic(TOPIC);
            // createTopic();
            // topicList();
            MyProducer();
            MyConsumer();
    
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235

    在这里插入图片描述

    4)zookeeper与kafka同时开启kerberos鉴权

    关于配置可以参考我这篇文章:大数据Hadoop之——Zookeeper鉴权认证(Kerberos认证+账号密码认证)

    1、启动服务

    $ cd $KAFKA_HOME
    # 启动zookeeper
    $ ./bin/zookeeper-server-start-kerberos.sh -daemon ./config/zookeeper-kerberos.properties
    # 启动zookeeper客户端验证
    $ ./bin/zookeeper-shell-kerberos.sh hadoop-node1:12181
    
    # 启动kafka
    $ ./bin/kafka-server-start-zkcli-kerberos.sh -daemon ./config/server-zkcli-kerberos.properties
    # 查看kafka topic列表
    $ ./bin/kafka-topics-sasl.sh --list --bootstrap-server hadoop-node1:19092 --command-config config/kerberos/client.properties
    
    # 停止服务
    $ cd $KAFKA_HOME; ./bin/kafka-server-stop.sh ; ./bin/zookeeper-server-stop.sh
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    5)zookeeper与kafka同时开启账号密码鉴权

    1、启动服务

    $ cd $KAFKA_HOME
    # 启动zookeeper
    $ ./bin/zookeeper-server-start-userpwd.sh -daemon ./config/zookeeper-userpwd.properties
    # 启动zookeeper客户端验证
    $ ./bin/zookeeper-shell-userpwd.sh hadoop-node1:12181
    
    # 启动kafka
    $ ./bin/kafka-server-start-zkcli-userpwd.sh -daemon ./config/server-zkcli-userpwd.properties
    # 查看kafka topic列表
    $ ./bin/kafka-topics-pwd.sh --list --bootstrap-server hadoop-node1:19092 --command-config config/userpwd/client.properties
    
    # 停止服务
    $ cd $KAFKA_HOME; ./bin/kafka-server-stop.sh ; ./bin/zookeeper-server-stop.sh
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    同时开启认证的方式用的不多,其实代码类似,小伙伴感兴趣的话,可以根据我之前配置的把我上面的代码改一下应该就可以了,难度不大。有疑问的小伙伴欢迎给我留言哦!!!!

  • 相关阅读:
    pytorch学习(一)——图像多分类实例
    鸭绒和鹅绒的区别RDS人道羽绒标准
    SD-MTSP:萤火虫算法(FA)求解单仓库多旅行商问题MATLAB(可更改数据集,旅行商的数量和起点)
    训练正常&异常的GAN损失函数loss变化应该是怎么样的
    【userfaultfd+msg_msg+pipe_buffer】CISCN2022-cactus
    java-net-php-python-2020ssm考研题目管理系统计算机毕业设计程序
    ICPC2023深圳部分题解(A,D,E,F,G,K,L)
    业务-(课程-章节-小节)+课程发布一些业务思路
    为什么阿里人能够快速成长?看完他们 Java 架构进化笔记,我秒懂!
    【slowfast 损失函数改进】深度学习网络通用改进方案:slowfast的损失函数(使用focal loss解决不平衡数据)改进
  • 原文地址:https://blog.csdn.net/qq_35745940/article/details/125349537