• springboot集成kafka


    创建工程

    • 父工程pom

    父工程做了子工程管理和包管理

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.7.15</version>
        </parent>
    
        <groupId>com.wzw</groupId>
        <artifactId>kafka</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>11</maven.compiler.source>
            <maven.compiler.target>11</maven.compiler.target>
        </properties>
    
        <packaging>pom</packaging>
    
        <modules>
            <module>producer</module>
        </modules>
    
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-web</artifactId>
                    <version>2.4.13</version>
                </dependency>
                <dependency>
                    <groupId>org.springframework.kafka</groupId>
                    <artifactId>spring-kafka</artifactId>
                    <version>2.8.1</version>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
    
    </project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    发布者

    • pom
      引入包,继承父工程
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <parent>
            <groupId>com.wzw</groupId>
            <artifactId>kafka</artifactId>
            <version>1.0-SNAPSHOT</version>
        </parent>
    
        <groupId>com.wzw</groupId>
        <artifactId>producer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>producer</name>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
        </dependencies>
    
    
    </project>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • yml
    server:
      port: 7070
    spring:
      kafka:
        bootstrap-servers: 192.168.3.32:19092
        producer: # 生产者
          retries: 3 # 设置大于 0 的值,则客户端会将发送失败的记录重新发送
          batch-size: 16384
          buffer-memory: 33554432
          acks: 1
          # 指定消息key和消息体的编解码方式
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          group-id: default-group
          enable-auto-commit: false
          auto-offset-reset: earliest
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          max-poll-records: 500
        listener:
          # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
          # RECORD
          # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
          # BATCH
          # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
          # TIME
          # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
          # COUNT
          # TIME | COUNT 有一个条件满足时提交
          # COUNT_TIME
          # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
          # MANUAL
          # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
          # MANUAL_IMMEDIATE
          ack-mode: MANUAL_IMMEDIATE
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • config配置类
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class KafkaProducerConfig {
    
        @Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;
    
        @Bean
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
    
        @Bean
        public ProducerFactory<String, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 请求测试接口
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class KafkaController {
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        @RequestMapping("/send")
        public void sendMessage(String message) {
            kafkaTemplate.send("my-topic", message);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    订阅者

    • pom
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>com.wzw</groupId>
            <artifactId>kafka</artifactId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <groupId>com.example</groupId>
        <artifactId>consumer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>consumer</name>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
        </dependencies>
    
    </project>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • yml
    server:
      port: 7071
    spring:
      kafka:
        bootstrap-servers: 192.168.3.32:19092
        producer: # 生产者
          retries: 3 # 设置大于 0 的值,则客户端会将发送失败的记录重新发送
          batch-size: 16384
          buffer-memory: 33554432
          acks: 1
          # 指定消息key和消息体的编解码方式
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          group-id: default-group
          enable-auto-commit: false
          auto-offset-reset: earliest
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          max-poll-records: 500
        listener:
          # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
          # RECORD
          # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
          # BATCH
          # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
          # TIME
          # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
          # COUNT
          # TIME | COUNT 有一个条件满足时提交
          # COUNT_TIME
          # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
          # MANUAL
          # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
          # MANUAL_IMMEDIATE
          ack-mode: MANUAL_IMMEDIATE
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 配置类
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {
    
        @Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;
    
        @Value("${spring.kafka.consumer.group-id}")
        private String groupId;
    
        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return props;
        }
    
        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 监听类
      有消息进来就输出
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Service;
    
    @Service
    public class KafkaConsumer {
    
        @KafkaListener(topics = "my-topic", groupId = "default-group")
        public void consume(String message) {
            System.out.println("Received message: " + message);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    测试

    在这里插入图片描述

    注意事项

    kafka的配置文件kafka_2.13-2.8.1/config/server.properties,里面有三个配置需要注意,不然的话服务器测试正常,但是java或者其它地址的客户端报错
    在这里插入图片描述
    我的环境是docker搭建的,

    • listeners
      kafka容器的网络地址是172.18.0.5,kafka的端口是9092
      listeners=PLAINTEXT://172.18.0.5:9092
      我的虚拟机ip是192.168.3.32,然后启动kafka容器的时候,映射的地址是-p 19092:9092
      不配置正确的话,会连接不到,可能报错Connection to node 1 (kafka1/172.18.0.2:9092) could not be established. Broker may not be available.
    • advertised.listeners
      advertised.listeners=PLAINTEXT://192.168.3.32:39092
      不配置正确的话,会连接不到,可能报错kafka disconnecting from node xxx due to request timeout.
    • advertised.host.name
      最底下有个属性advertised.host.name
      配置成kafka容器的网络地址172.18.0.5
      advertised.host.name=172.18.0.5
      如果这里不配置advertised.host.name为ip,而是使用名称,会报错,
      配置为:advertised.host.name=kafka
      报错如下
      Error connecting to node kafka:19092 (id: 1001 rack: null)
      
      • 1
      • 2
      • 3
      解决方法:访问的客户端需要修改hosts,太麻烦,所以直接修改成ip
  • 相关阅读:
    电影票小程序插件 电影票CPS插件 电影票微信小程序插件
    校区多,客情管理难?看中进教育使用明道云的新解法
    Sentinel简介
    为什么企业需要生产运营管理系统
    计算机毕业设计 高校普法系统的设计与实现 Java实战项目 附源码+文档+视频讲解
    堆技巧 数组反向越界泄露地址
    六、流量监管、流量整形
    几道简单的Linux驱动相关面试题,你看你会几题?
    MySQL笔记
    神经网络-最大池化的使用
  • 原文地址:https://blog.csdn.net/a3562323/article/details/132887720