• SpringBoot+Kafka



    一、依赖

    
    <dependency>
        <groupId>org.springframework.kafkagroupId>
        <artifactId>spring-kafkaartifactId>
        <version>2.5.1.RELEASEversion>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    二、配置文件

    spring:
      kafka:
        # kafka地址,集群用逗号分隔(localhost:9092,localhost:9093)。缺省:localhost:9092
        bootstrap-servers: localhost:9092
        # 生产者
        #producer:
          # key的序列化方式,缺省:org.apache.kafka.common.serialization.StringSerializer
          #key-serializer: org.apache.kafka.common.serialization.StringSerializer
          # value的序列化方式,缺省:org.apache.kafka.common.serialization.StringSerializer
          #value-serializer: org.apache.kafka.common.serialization.StringSerializer
        # 消费者
        consumer:
          # 消费者组
          group-id: testGroup
          # 自动偏移量
            # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
            # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
            # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
          auto-offset-reset: latest
          # key的序列化方式,缺省:org.apache.kafka.common.serialization.StringSerializer
          #key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          # value的序列化方式,缺省:org.apache.kafka.common.serialization.StringSerializer
          #value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        #listener:
          # SINGLE-单个消费;BATCH-批量消费。缺省SINGLE
          #type: BATCH
          # 消费者监听的主题不存在时,启动项目是否报错。缺省:false
          #missing-topics-fatal: false
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    三、API

    1、生产者

    /**
     * 生产消息
     *
     * @author kimi
     * @date 2023/2/18
     */
    @Component
    public class ProducerMsg {
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
    
        /**
         * 生产消息
         *
         * @param msg
         */
        public void send(String topic, String msg) {
            kafkaTemplate.send(topic, msg);
        }
    
        /**
         * 生产消息+回调
         *
         * @param topic
         * @param msg
         */
        public void sendCallback(String topic, String msg) {
            kafkaTemplate.send(topic, msg).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
    
                //成功的回调
                @Override
                public void onSuccess(SendResult<String, String> stringStringSendResult) {
                    RecordMetadata recordMetadata = stringStringSendResult.getRecordMetadata();
                    //主题
                    final String topic = recordMetadata.topic();
                    //分区
                    final int partition = recordMetadata.partition();
                    //偏移量
                    final long offset = recordMetadata.offset();
                    System.err.println(String.format("生产消息成功:topic: %s,partition: %s,offset: %s", topic, partition, offset));
                }
    
                //失败的回调
                @Override
                public void onFailure(Throwable throwable) {
    
                }
            });
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    2、消费者

    /**
     * 消费者
     *
     * @author kimi
     * @date 2023/2/18
     */
    @Component
    public class ConsumeMsg {
    
        /**
         * 单个消费
         *
         * @param consumer
         */
        @KafkaListener(topics = {"USER", "LOG"})
        public void consumeSingle(ConsumerRecord<String, String> consumer) {
            System.err.println("监听到kafka消息: " + consumer);
            final String topic = consumer.topic();
            final String value = consumer.value();
        }
    
        /**
         * 批量消费
         * 需将配置文件中的listener.type设置成BATCH
         *
         * @param consumers
         */
        //@KafkaListener(topics = {"USER", "LOG"})
        public void consumeBatch(List<ConsumerRecord<String, String>> consumers) {
            consumers.forEach(consumer -> {
                final String topic = consumer.topic();
                final String value = consumer.value();
    
                System.err.println(String.format("topic: %s,value: %s", topic, value));
            });
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
  • 相关阅读:
    【MySQL系列】索引的学习及理解
    计算机组成原理【2022-10-24】
    37、在OAK摄像头上部署tensorflow deeplabv3+进行实例分割
    py 打开多个页面
    【基础建设】浅谈企业网络安全运营体系建设
    【PAT甲级】1021 Deepest Root
    JAVA艾灸减肥管理网站计算机毕业设计Mybatis+系统+数据库+调试部署
    微信小程序OA会议系统个人中心授权登入
    Linux知识
    windows2008+iis7+asp+php环境配置
  • 原文地址:https://blog.csdn.net/weixin_43476020/article/details/136179094