• springboot简单使用 kafka


    1. 引入kafka

    pom文件引入kafka

    项目用的是java8&java11 ,spring boot 2.X,对应kafka2.X
    如果项目是java17 ,spring boot3.X,对应kafka3.X

     <dependency>
         <groupId>org.springframework.kafkagroupId>
         <artifactId>spring-kafkaartifactId>
         <version>2.5.4.RELEASEversion>
     dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    注意:在之前的版本中,需要引入apache下的kafka,现在spring已经将kafka整合入springframework中了,只引这一个包就可以
    在这里插入图片描述

    2. 配置文件

      kafka:
        bootstrap-servers: 192.168.2.91:9090,192.168.2.91:9091,192.168.2.91:9092 # 集群的地址
        producer:
          retries: 1   #设置大于0的值,则客户端会将发送失败的记录重新发送。
          batch-size: 16384 #16KB
          buffer-memory: 33554432  #32MB
          ack-s: all  #指定消息key和消息体的编码方式。 0,1,all
          key-serializer: org.apache.kafka.common.serialization.StringSerializer  #键的序列化
          value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化
        consumer:
          group-id: default
          enable-auto-commit: false  # 自动提交
          auto-commit-interval: 100 # 自动提交次数
          auto-offset-reset: earliest   #当默认的消费组启动的时候,会从默认的第一个消费组开始消费。
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          max-poll-records: 500  #一次最多拉500条消息。
        listener:
          #        RECORD, 每条记录被监听
          #        BATCH, 批量
          #        TIME,
          #        COUNT,
          #        COUNT_TIME,
          #        MANUAL, 手动通知
          #        MANUAL_IMMEDIATE; 立即手动通知
          ack-mode: RECORD
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    3. 创建KafkaTopicConfig文件

    在这里插入图片描述

    (1) spring 自动创建创建topic

    @Configuration
    public class KafkaTopicConfig {
    /**
         * 创建TopicName为topic.quick.initial的Topic并设置分区数为8以及副本数为1
         * 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小
         * @return
         */
        @Bean
        public NewTopic newTopic() {
            return new NewTopic("topic.quick.initial",8, (short) 1);
        }
        @Bean
        public NewTopic newDefaultTopic() {
            return new NewTopic("topic.quick.default",2, (short) 1);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    (2)手动创建topic(不常用)

    **
         * 手动创建topic
         * @return
         */
        @Bean
        public AdminClient adminClient() {
            return AdminClient.create(kafkaAdmin().getConfigurationProperties());
        }
    
        public KafkaAdmin kafkaAdmin() {
            Map<String, Object> props = new HashMap<>();
            //配置Kafka实例的连接地址
            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.91:9090");
            KafkaAdmin admin = new KafkaAdmin(props);
            return admin;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    代码如下:
    在这里插入图片描述
    启动后项目后,打开offset
    就可以看见我们创建的topic了
    在这里插入图片描述

    4. 接收消息

    在这里插入图片描述

    @Component
    public class KafkaReceiver {
    	/**
    	 * id 该listener的id ,默认也是listener的组(groupId)
    	 * topics 该listener监听的topic(可以同时监听多个topic)
    	 */
    	 @KafkaListener(id = "rollback_default_test", topics = {"topic.quick.default"})
        public void receiveSk(ConsumerRecord<String, String> record) {
            System.out.println(record);
            System.out.println("我收到了普通消息");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    在这里插入图片描述

    5. 发送消息

    kafka的发送消息很简单

        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        @RequestMapping("/userGets")
        public Object gets() {
        	// send 第一个参数为topic的名称,第二个参数为我们要发送的信息
            kafkaTemplate.send("topic.quick.default","1231235");
            return "发送成功";
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在这里插入图片描述

    6. 用postmain测试

    在这里插入图片描述
    控制台打印如下消息
    在这里插入图片描述
    在这里插入图片描述

  • 相关阅读:
    .NET8 AOT和JIT的性能,谁更高呢?
    C语言 | 类型的基本归类
    web期末网站设计大作业:基于HTML+CSS+JavaScript制作新能源汽车企业网站
    Linux系统下邮件服务器的搭建(Postfix+Dovecot+SSL)
    C语言自定义类型
    「广告」和「噱头」撑得起千亿床垫市场吗?
    变限积分的导数
    敏捷、DevOps和嵌入式系统测试
    Android高级编程之自定义ContentProvider
    多测师肖sir_高级金牌讲师__git讲解
  • 原文地址:https://blog.csdn.net/gd898989/article/details/125994959