• springboot简单使用 kafka


    1. 引入kafka

    pom文件引入kafka

    项目用的是java8&java11 ,spring boot 2.X,对应kafka2.X
    如果项目是java17 ,spring boot3.X,对应kafka3.X

     <dependency>
         <groupId>org.springframework.kafkagroupId>
         <artifactId>spring-kafkaartifactId>
         <version>2.5.4.RELEASEversion>
     dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    注意:在之前的版本中,需要引入apache下的kafka,现在spring已经将kafka整合入springframework中了,只引这一个包就可以
    在这里插入图片描述

    2. 配置文件

      kafka:
        bootstrap-servers: 192.168.2.91:9090,192.168.2.91:9091,192.168.2.91:9092 # 集群的地址
        producer:
          retries: 1   #设置大于0的值,则客户端会将发送失败的记录重新发送。
          batch-size: 16384 #16KB
          buffer-memory: 33554432  #32MB
          ack-s: all  #指定消息key和消息体的编码方式。 0,1,all
          key-serializer: org.apache.kafka.common.serialization.StringSerializer  #键的序列化
          value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化
        consumer:
          group-id: default
          enable-auto-commit: false  # 自动提交
          auto-commit-interval: 100 # 自动提交次数
          auto-offset-reset: earliest   #当默认的消费组启动的时候,会从默认的第一个消费组开始消费。
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          max-poll-records: 500  #一次最多拉500条消息。
        listener:
          #        RECORD, 每条记录被监听
          #        BATCH, 批量
          #        TIME,
          #        COUNT,
          #        COUNT_TIME,
          #        MANUAL, 手动通知
          #        MANUAL_IMMEDIATE; 立即手动通知
          ack-mode: RECORD
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    3. 创建KafkaTopicConfig文件

    在这里插入图片描述

    (1) spring 自动创建创建topic

    @Configuration
    public class KafkaTopicConfig {
    /**
         * 创建TopicName为topic.quick.initial的Topic并设置分区数为8以及副本数为1
         * 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小
         * @return
         */
        @Bean
        public NewTopic newTopic() {
            return new NewTopic("topic.quick.initial",8, (short) 1);
        }
        @Bean
        public NewTopic newDefaultTopic() {
            return new NewTopic("topic.quick.default",2, (short) 1);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    (2)手动创建topic(不常用)

    **
         * 手动创建topic
         * @return
         */
        @Bean
        public AdminClient adminClient() {
            return AdminClient.create(kafkaAdmin().getConfigurationProperties());
        }
    
        public KafkaAdmin kafkaAdmin() {
            Map<String, Object> props = new HashMap<>();
            //配置Kafka实例的连接地址
            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.91:9090");
            KafkaAdmin admin = new KafkaAdmin(props);
            return admin;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    代码如下:
    在这里插入图片描述
    启动后项目后,打开offset
    就可以看见我们创建的topic了
    在这里插入图片描述

    4. 接收消息

    在这里插入图片描述

    @Component
    public class KafkaReceiver {
    	/**
    	 * id 该listener的id ,默认也是listener的组(groupId)
    	 * topics 该listener监听的topic(可以同时监听多个topic)
    	 */
    	 @KafkaListener(id = "rollback_default_test", topics = {"topic.quick.default"})
        public void receiveSk(ConsumerRecord<String, String> record) {
            System.out.println(record);
            System.out.println("我收到了普通消息");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    在这里插入图片描述

    5. 发送消息

    kafka的发送消息很简单

        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        @RequestMapping("/userGets")
        public Object gets() {
        	// send 第一个参数为topic的名称,第二个参数为我们要发送的信息
            kafkaTemplate.send("topic.quick.default","1231235");
            return "发送成功";
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在这里插入图片描述

    6. 用postmain测试

    在这里插入图片描述
    控制台打印如下消息
    在这里插入图片描述
    在这里插入图片描述

  • 相关阅读:
    逐点流行正则化方法及其在半监督学习上的应用
    Ubuntu系统安装MySQL主从模式集群(成功!)
    Flutter DataGrid教程之表格图标日历Excel完整App源码(教程含源码)
    Golang 程序启动原理详解
    是否应该升级到ChatGPT 4.0?深度对比ChatGPT 3.5与4.0的差异
    A股风格因子看板 (2023.09 第04期)
    分布式事务
    VUE2 组件间传值
    Ps:利用 AI 技术创建人像皮肤图层蒙版
    中台框架模块开发实践-用 Admin.Core 代码生成器生成通用代码生成器的模块代码
  • 原文地址:https://blog.csdn.net/gd898989/article/details/125994959