• Kafka - 13 Java 客户端实现消费者消费消息


    1. 独立消费者案例(订阅主题)

    需求:创建一个独立消费者,消费主题中数据:

    [root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --create --partitions 3 --replication-factor 3 --topic hh
    
    [root@hadoop101 kafka_2.12-2.2.1]# bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic hh
    Topic:hh    PartitionCount:3    ReplicationFactor:3     Configs:segment.bytes=1073741824
    Topic: hh       Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 2,1,0
    Topic: hh       Partition: 1    Leader: 2       Replicas: 2,0,3 Isr: 3,0,2
    Topic: hh       Partition: 2    Leader: 3       Replicas: 3,2,1 Isr: 3,1,2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在这里插入图片描述

    注意:在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组 id 会被自动填写随机的消费者组 id

    public class CustomConsumer {
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            // 创建消费者组,组名任意起名都可以
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");
    
            // 创建消费者
            KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
    
            // 订阅主题
            ArrayList<String> topics = new ArrayList<>();
            topics.add("hh");
            consumer.subscribe(topics);
    
            // 消费数据
            while (true){
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord.value());
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    Springboot 自定义日志配置关闭Kafka消费者debug日志打印:在resource目录下添加文件 logback.xml 即可。

    
    <configuration debug="false">
        
        <property name="LOG_HOME" value="logs/">property>
    
        
        <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
            <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
                
                <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%npattern>
            encoder>
        appender>
    
        
        <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
            <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
                
                <FileNamePattern>${LOG_HOME}/%d{yyyy-MM-dd}.%i.logFileNamePattern>
                <maxFileSize>50MBmaxFileSize>
                
                <maxHistory>30maxHistory>
            rollingPolicy>
            <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
                <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%npattern>
            encoder>
        appender>
    
        
        <root level="INFO">
            <appender-ref ref="STDOUT"/>
            <appender-ref ref="FILE"/>
        root>
    
        
        <logger name="org.apache.kafka" level="info" additivity="false"/>
        
        <logger name="org.apache.kafka.clients" level="info"  additivity="false"/>
    configuration>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    测试生产者发送消息:

    在这里插入图片描述

    2. 独立消费者案例(订阅分区)

    需求:创建一个独立消费者,消费主题 0 号分区的数据。

    在这里插入图片描述

    ① kafka 消费者消费主题0号分区的数据:

    public class CustomConsumerPartition {
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            // 创建消费者组,组名任意起名都可以
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");
    
            // 创建消费者
            KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
    
            // 订阅主题对应的分区
            ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
            topicPartitions.add(new TopicPartition("hh",0));
            consumer.assign(topicPartitions);
    
            // 消费数据
            while (true){
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord.value());
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    ② kafka 生产者向主题的0号分区发送数据:

    public class CustomProducerCallbackPartitions {
        public static void main(String[] args) throws InterruptedException {
            // kafka生产者属性配置
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                           StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                           StringSerializer.class.getName());
    
            // kafka生产者
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
            for(int i=0;i<5;i++){
                kafkaProducer.send(new ProducerRecord<>("hh" ,0,"","hello,kafka" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                        if(exception==null){
                            // 消息发送成功
                            System.out.println("主题"+recordMetadata.topic()+"->"+"分区:"+recordMetadata.partition());
                        }else{
                            // 消息发送失败
                            exception.printStackTrace();
                        }
                    }
                });
                Thread.sleep(2);
            }
            // 关闭资源
            kafkaProducer.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    ③ 测试:先启动消费者程序,再启动生产者程序

    在这里插入图片描述

    3. 消费者组案例

    需求:测试同一个主题的分区数据,只能由一个消费者组中的一个消费。

    在这里插入图片描述

    ① 创建3个消费者:复制2份基础消费者的代码,在 IDEA 中同时启动,即可启动同一个消费者组中的3个消费者。

    public class CustomConsumer {
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            // 创建消费者组,组名任意起名都可以
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");
    
            // 创建消费者
            KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
    
            // 订阅主题
            ArrayList<String> topics = new ArrayList<>();
            topics.add("hh");
            consumer.subscribe(topics);
    
            // 消费数据
            while (true){
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord.value());
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    ② 生产者发送消息:

    public class CustomProducerCallbackPartitions {
        public static void main(String[] args) throws InterruptedException {
            // kafka生产者属性配置
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.23:9092");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
            // 添加自定义分区器
            // properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.hh.producer.MyPartitioner");
    
            // kafka生产者
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
            for(int i=0;i<50;i++){
                kafkaProducer.send(new ProducerRecord<>("hh" ,"hello,kafka" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                        if(exception==null){
                            // 消息发送成功
                            System.out.println("主题"+recordMetadata.topic()+",发往的分区:"+recordMetadata.partition());
                        }else{
                            // 消息发送失败
                            exception.printStackTrace();
                        }
                    }
                });
                Thread.sleep(2);
            }
            // 关闭资源
            kafkaProducer.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    ③ 测试:先启动3个消费者程序,再启动生产者程序

    在这里插入图片描述

    可以看到发送的50条消息分别被消费者组中的不同消费者消费,他们消费的是不同分区的数据。

  • 相关阅读:
    精彩回顾|云原生 Meetup 广州站
    怎样才能在网上快速赚到钱?
    延迟任务多种实现姿势--上
    java毕业设计——基于java+Java Swing+jsp的企业快信系统设计与实现(毕业论文+程序源码)——企业快信系统
    网易Airtest全新推出:小型便携式集群解决方案!
    解决高并发问题
    元胞自动机( Cellular Automata)研究 (Python代码实现)
    Windows Linux常见编译器 msvc gcc clang
    设计模式 17 迭代器模式
    迁移学习——ResNet152
  • 原文地址:https://blog.csdn.net/qq_42764468/article/details/128157329