• SpringCloud Stream笔记整理


    1. 添加kafka stream依赖

      <dependency>
          <groupId>org.springframework.cloudgroupId>
          <artifactId>spring-cloud-streamartifactId>
      dependency>
      <dependency>
          <groupId>org.springframework.cloudgroupId>
          <artifactId>spring-cloud-stream-binder-kafkaartifactId>
      dependency>
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
    2. application.yml中添加配置

      --- #stream config
      spring:
        cloud:
          stream:
            binders:
              myKafka1:
                type: kafka
                environment:
                  spring:
                    kafka:
                      bootstrap-servers: 127.0.0.1:9092
            bindings:
              helloFunc-in-0:
                destination: hello-topic
                group: hello-local-test-10
                binder: myKafka1
                consumer:
                  batch-mode: true
              helloFunc-out-0:
                destination: hello-topic
                group: hello-local-test-10
                binder: myKafka1
                consumer:
                  batch-mode: true
          # 注意 function 节点与stream 同级,而非子节点
          function:
            definition: helloFunc;
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
    3. 编写消费者:

      @Slf4j
      @Component
      @RequiredArgsConstructor
      public class HelloConsumer {
          @Bean
          public Consumer<Message<List<String>>> helloFunc() {
              return message -> {
                  log.info("---------------------> ");
                  List<String> list = message.getPayload();
                  boolean result = this.handle(list);
                  if (result) {
                      Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
                      if (acknowledgment != null) {
                          acknowledgment.acknowledge();
                      }
                  } else {
                      throw new RuntimeException("消费数据出错!");
                  }
              };
          }
      
          private boolean handle(List<String> list){
              log.info("list size : {}", list.size());
              if (!CollectionUtils.isEmpty(list)){
                  log.info("group first message : {}", list.get(0));
              }
              return true ;
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
  • 相关阅读:
    docker
    Vue + Flask 实现单页面应用
    结构体初阶
    MT1283·区间Disarium数
    阻塞队列BlockingQueue 源码解析(ArrayBQ和LinkedBQ)
    【剑指 Offer 14- II. 剪绳子 II】(剪绳子I大数)
    EasyRecovery适用于Windows和Mac的专业硬盘恢复软件
    【informer】 时间序列的预测学习 2021 AAAI best paper
    [公派访问学者]申请条件及选拔方法
    相机以及其它传感器传感器
  • 原文地址:https://blog.csdn.net/yichengjie_c/article/details/136631100