• 【实战】Spring Cloud Stream 3.1+整合Kafka


    前言

    之前我们已经整合过Spring Cloud Stream 3.0版本与Kafka、RabbitMQ中间件,简直不要太好,直接让我们不用再关心底层MQ如何集与消息收发。但是从Spring Cloud 2020版本开始,Spring Cloud Stream的版本升级至3.1.0以上版本,自此版本开始@StreamListener上面就增加@Deprecated注解,不赞成使用,有可能接下来的版本会删除掉。传说是有利于使用Project Reactor提供的事件流抽象(如Flux和Mono),命令函数在每个单独的事件上触发,而reactive函数只触发一次。故今天我们分享一期Spring Cloud Stream 3.1+整合Kafka,各位看官敬请鉴赏。

    在这里插入图片描述

    新版版本优势

    新版提倡用函数式进行发送和消费信息

    定义返回类型为Supplier, Function or Consumer的bean提供消息发送和消费的bean 看看绑定名称命名规则
    input - + -in- +
    output - + -out- +

    在配置文件中指定spring.cloud.function.definition/spring.cloud.stream.function.definition的名称后会把这个bean绑定到对应的消费者和提供者上。

    比如 inputChannel bean绑定了inputChannel-in-0通道,outputChannel bean绑定了outputChannel-out-0通道:

    spring:
      kafka:
        bootstrap-servers: 192.168.112.10:9092,192.168.112.130:9092,192.168.112.129:9092
      cloud:
        stream:
          kafka:
            binder:
              brokers: ${spring.kafka.bootstrap-servers}
          binders:
            kafkahub:
              type: kafka
              environment:
                spring:
                  cloud:
                    stream:
                      kafka: ${spring.cloud.stream.kafka.binder}
          default-binder: kafkahub 
          function:
              definition: inputChannel,outputChannel
          bindings:
            inputChannel-in-0:
              binder: kafkahub
              destination: test-kafka-topic
              group: test-kafka-group
              content-type: text/plain
            outputChannel-out-0:
              binder: kafkahub
              destination: test-kafka-topic
              content-type: text/plain
              producer:
                partition-count: 3 #分区数目
      
    

    此时消息生产者为:

    @Resource
    private StreamBridge streamBridge;
    
    @GetMapping("/send")
    public Boolean sendMessageToKafka(String msg){
        boolean send = streamBridge.send("outputChannel-out-0", MessageBuilder.withPayload("kafka测试:"+msg).build());
        return send;
    }
    

    此时消息消费者为:

    @Configuration
    public class KafkaChannel {
    
        @Resource
        private StreamBridge streamBridge;
        /**
         * inputChannel 消费者
         * @author senfel
         * @date 2024/6/18 15:26
         * @return java.util.function.Consumer<java.lang.String>
         */
        @Bean
        public Consumer<Message<String>> inputChannel(){
            return message -> {
                System.out.println("接收到消息Payloa:" + message.getPayload());
                System.out.println("接收到消息Header:" + message.getHeaders());
            };
        }
    

    }

    实战演示

    我们简单进行一下演示即可,kafka环境可以看我之前的博文搭建。
    主要演示功能:

    正常情况下生产者发送消息到kafka,消费者监听消息并消费成功
    异常情况下消费者消费失败,立即将异常消息投递到另一个topic上,兜底topic消费者消费

    本次全部采用自动ack模式,如果需要手动ack参照之前的博文配置即可,注意在消费者端加上手动ack逻辑。

    增加maven依赖

     <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.12.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>cce-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>seata-demo-order</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>8</java.version>
        <spring-cloud.version>Hoxton.SR12</spring-cloud.version>
    </properties>
    <dependencies>
           <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
            <version>3.2.4</version>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    

    增加applicaiton.yaml配置

    spring:
      #kafka
      kafka:
        bootstrap-servers: 192.168.112.10:9092,192.168.112.130:9092,192.168.112.129:9092
      cloud:
        stream:
          kafka:  # kafka配置
            binder:
              brokers: ${spring.kafka.bootstrap-servers}
              auto-add-partitions: true #自动分区
              auto-create-topics: true #自动创建主题
              replication-factor: 3 #副本
              min-partition-count: 3 #最小分区
            bindings:
              outputChannel-out-0:
                producer:
                  # 无限制重发不产生消息丢失
                  retries: Integer.MAX_VALUE
                  #acks =0:producer不等待broker的ack,broker一接收到还没有写入磁盘就已经返回,可靠性最低
                  #acks =1:producer等待broker的ack,partition的leader刷盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据,可靠性中
                  #acks = all 、 -1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,可靠性高,但延迟时间长
                  #可以设置的值为:all, -1, 0, 1
                  acks: all
                  min:
                    insync:
                      replicas: 3 #感知副本数
              inputChannel-in-0:
                consumer:
                  concurrency: 1 #消费者数量
                  max-concurrency: 5 #最大消费者数量
                  recovery-interval: 3000  #3s 重连
                  auto-rebalance-enabled: true  #主题分区消费者组成员自动平衡
                  auto-commit-offset: false   #手动提交偏移量
                  enable-dlq: true  # 开启 dlq队列
                  dlq-name: test-kafka-topic.dlq
                  deserializationExceptionHandler: sendToDlq #异常加入死信
          binders: # 与外部mq组件绑定
            kafkahub:
              type: kafka
              environment:
                spring:
                  cloud:
                    stream:
                      kafka: ${spring.cloud.stream.kafka.binder}
    
          default-binder: kafkahub #默认绑定
          function: # 定义channel名字,每个channel又可以作为生产者(in)与消费者(out)
            definition: inputChannel;outputChannel;dlqChannel
          bindings: # 通道绑定
            inputChannel-in-0:
              binder: kafkahub
              destination: test-kafka-topic
              group: test-kafka-group
              content-type: text/plain
              consumer:
                maxAttempts: 1 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3
                backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行
                backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2
                backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10s
            outputChannel-out-0:
              binder: kafkahub
              destination: test-kafka-topic
              content-type: text/plain
              producer:
                partition-count: 3 #分区数目
            dlqChannel-in-0:
              binder: kafkahub
              destination: test-kafka-topic.dlq
              group: test-kafka-group
              content-type: text/plain
              consumer:
                maxAttempts: 1 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3
                backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行
                backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2
                backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10s
            dlqChannel-out-0:
              binder: kafkahub
              destination: test-kafka-topic.dlq
              content-type: text/plain
              producer:
                partition-count: 3 #分区数目
    

    新增Kafka通道消费者

    import org.springframework.cloud.stream.function.StreamBridge;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import javax.annotation.Resource;
    import java.util.function.Consumer;
    
    /**
     * KafkaCustomer
     * @author senfel
     * @version 1.0
     * @date 2024/6/18 15:22
     */
    @Configuration
    public class KafkaChannel {
    
        @Resource
        private StreamBridge streamBridge;
        /**
         * inputChannel 消费者
         * @author senfel
         * @date 2024/6/18 15:26
         * @return java.util.function.Consumer<java.lang.String>
         */
        @Bean
        public Consumer<Message<String>> inputChannel(){
            return message -> {
                System.out.println("接收到消息:" + message.getPayload());
                System.out.println("接收到消息:" + message.getHeaders());
                if(message.getPayload().contains("9")){
                    boolean send = streamBridge.send("dlqChannel-out-0", MessageBuilder.withPayload("kafka异常消息发送到dlq测试:"+message).build());
                    System.err.println("向dlqChannel发送消息:"+send);
                }
            };
        }
    
        /**
         * dlqChannel 死信消费者
         * @author senfel
         * @date 2024/6/18 15:26
         * @return java.util.function.Consumer<java.lang.String>
         */
        @Bean
        public Consumer<Message<String>> dlqChannel(){
            return message -> {
                System.out.println("死信dlqChannel接收到消息:" + message.getPayload());
                System.out.println("死信dlqChannel接收到消息:" + message.getHeaders());
            };
        }
    }
    

    新增发送消息的接口

    @Resource
    private StreamBridge streamBridge;
    
    @GetMapping("/send")
    public Boolean sendMessageToKafka(String msg){
        boolean send = streamBridge.send("outputChannel-out-0", MessageBuilder.withPayload("kafka测试:"+msg).build());
        return send;
    }
    

    实战测试

    postman发送一个正常的消息

    在这里插入图片描述

    postman发送异常消息

  • 相关阅读:
    【2023.10.25练习】数据库-函数2
    【JUC】一文弄懂@Async的使用与原理
    C语言每日一题(20)最大公因数等于 K 的子数组数目
    Spring Security:身份验证入口AuthenticationEntryPoint介绍与Debug分析
    47.(前端)用户列表的数据填充
    [含毕业设计论文+PPT+源码等]ssm培训机构管理系统+Java后台管理系统|前后分离VUE
    【强化学习篇】on-policy 和 off-policy 的区别
    【TVM源码学习笔记】3.1.2. Codegen低级化relay ir前的内存分配
    总结一下使用paramiko遇到的问题
    SpringBoot自动配置原理
  • 原文地址:https://blog.csdn.net/weixin_39970883/article/details/139805324