• Spring cloud day(8) stream


    一、消息驱动

    1.1 当前问题

    消息中间件太多了,学习不同消息中间件浪费大量时间,需要一个东西可以再不同场景中切换消息中间件。

    1.2 Spring cloud Stream

    在这里插入图片描述

    -只支持rabbitmp和kfaka

    1.3 input和output

    在这里插入图片描述

    1.4 binder

    在这里插入图片描述

    1.5 stream流程

    在这里插入图片描述

    1.6 常用注解

    在这里插入图片描述

    1.7 项目模块

    在这里插入图片描述

    二、实操

    2.1 provider消息驱动生产者

    2.1.1 作用

    生产消息发送到消息中间件

    • pom
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-eureka-server -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency>
    
            <dependency>
                <groupId>com.atguigu.springcloud</groupId>
                <artifactId>cloud-api-commons</artifactId>
                <version>${project.version}</version>
            </dependency>
    
    
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
    
    
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
            <!--        <dependency>-->
            <!--            <groupId>org.springframework.boot</groupId>-->
            <!--            <artifactId>spring-boot-devtools</artifactId>-->
            <!--            <scope>runtime</scope>-->
            <!--            <optional>true</optional>-->
            <!--        </dependency>-->
    
            <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • yml
    server:
      port: 8801
    
    spring:
      application:
        name: cloud-stream-provider
      cloud:
        stream:
          binders: # 在此处配置要绑定的rabbitmq的服务信息;
            defaultRabbit: # 表示定义的名称,用于于binding整合
              type: rabbit # 消息组件类型
              environment: # 设置rabbitmq的相关的环境配置
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: guest
                    password: guest
          bindings: # 服务的整合处理
            output: # 这个名字是一个通道的名称
              destination: studyExchange # 表示要使用的Exchange名称定义
              content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
              binder: defaultRabbit  # 设置要绑定的消息服务的具体设置
    
    eureka:
      client: # 客户端进行Eureka注册的配置
        service-url:
          defaultZone: http://localhost:7001/eureka
    #  instance:
    #    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    #    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    #    instance-id: send-8801.com  # 在信息列表时显示主机名称
    #    prefer-ip-address: true     # 访问的路径变为IP地址
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    注意这里:binders: # 在此处配置要绑定的rabbitmq的服务信息;
    defaultRabbit: # 表示定义的名称,用于于binding整合
    type: rabbit # 消息组件类型

    • controller
    package com.pack.mymodel.controller;
    
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.annotation.Resource;
    
    @RestController
    public class sendMeesgaController {
        @Resource
        private IMessageProviderService messageProviderService;
    
        @GetMapping(value = "/sendMessage")
        public String sendMessage() {
            return messageProviderService.send();
        }
    
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • services

    接口

    public interface IMessageProviderService {
        /**
         * 定义消息的推送管道
         *
         * @return
         */
        String send();
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    实现类

    package com.pack.mymodel.services.Imp;
    
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.integration.support.MessageBuilder;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageChannel;
    
    import javax.annotation.Resource;
    import java.util.UUID;
    
    
    /**
     * @author lixiaolong
     * @EnableBinding(Source.class) 定义消息的推送管道 将Channel和Exchanges绑定在一起
     * @date 2020/12/31 13:35
     */
    /*
    
     */
    @EnableBinding(Source.class)
    public class MessageProviderImp implements IMessageProviderService {
        /**
         * 消息发送管道/信道
         */
        @Resource
        private MessageChannel output;
    
        @Override
        public String send() {
            String serial = UUID.randomUUID().toString();
            //MessageBuilder.withPayload(serial).build();
            Message<String> stringMessage = MessageBuilder.withPayload(serial).build();
            output.send(stringMessage);
    
            System.out.println("*****serial: " + serial);
            return serial;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • controller类
    package com.pack.mymodel.controller;
    
    import com.pack.mymodel.services.Imp.IMessageProviderService;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.annotation.Resource;
    
    @RestController
    public class sendMeesgaController {
        @Resource
        private IMessageProviderService messageProviderService;
    
        @GetMapping(value = "/sendMessage")
        public String sendMessage() {
            return messageProviderService.send();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    2.1.2 测试消息是否到消息中间件

    • 启动eureka
    • 启动rabbitmq
    • 启动8001
    • 访问http://localhost:8801/sendMessage
      在这里插入图片描述

    2.2 消息驱动消费者

    • yml
    server:
      port: 8802
    
    spring:
      application:
        name: cloud-stream-consumer
      cloud:
        stream:
          binders: # 在此处配置要绑定的rabbitmq的服务信息;
            defaultRabbit: # 表示定义的名称,用于于binding整合
              type: rabbit # 消息组件类型
              environment: # 设置rabbitmq的相关的环境配置
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: guest
                    password: guest
          bindings: # 服务的整合处理
            input: # 这个名字是一个通道的名称
              destination: studyExchange # 表示要使用的Exchange名称定义
              content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
              binder: defaultRabbit  # 设置要绑定的消息服务的具体设置
    
    
    eureka:
      client: # 客户端进行Eureka注册的配置
        service-url:
          defaultZone: http://localhost:7001/eureka
    #  instance:
    #    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    #    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    #    instance-id: receive-8802.com  # 在信息列表时显示主机名称
    #    prefer-ip-address: true     # 访问的路径变为IP地址
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • controller
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Component;
    
    /**
     * 8802 接收消息
     *
     * @author lixiaolong
     * @date 2020/12/31 14:07
     */
    @Component
    @EnableBinding(Sink.class)
    public class mycontroller {
        @Value("${server.port}")
        private String serverPort;
    
        @StreamListener(Sink.INPUT)
        //从消息队列中取出消息
        public void input(Message<String> message) {
            System.out.println("port:" + serverPort + "\t接受:" + message.getPayload());
        }
    
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 主启动类
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    /**
     * @author lixiaolong
     * @date 2020/12/18 16:05
     * @description 支付服务
     */
    @SpringBootApplication
    public class bootMAin {
        public static void main(String[] args) {
            SpringApplication.run(bootMAin.class, args);
            System.out.println("启动成功");
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • pom
    <dependencies>
    
    
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-eureka-server -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency>
    
    
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
    
    
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
            <!--        <dependency>-->
            <!--            <groupId>org.springframework.boot</groupId>-->
            <!--            <artifactId>spring-boot-devtools</artifactId>-->
            <!--            <scope>runtime</scope>-->
            <!--            <optional>true</optional>-->
            <!--        </dependency>-->
    
            <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
        </dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    2.3 消息重复消费问题

    2.3.1 多个消费者

    当多个消费者存在的时候,每个消费者都会消费消息,造成消息的重复消费。

    • 原因
      不同的微服务默认在不同的组,组流水号不一样,所以只需要分到同一个组即可

    在这里插入图片描述

    2.3.2 加入分组配置

    在这里插入图片描述

    • 8802,8803设置成同一个组

    2.4 消息持久化

    • 消费者不配置group,就不会对消息队列的消息进行消费,后台就不会输出消费记录。
    • 配置了group的消费者,重启会消费未被消费的消息。
  • 相关阅读:
    C# Linq中的Join使用
    Objective-C中的Block(基础)
    MyBatis完成品牌数据的查询操作
    同花顺_代码解析_交易系统_J09_18
    MySQL进阶实战5,为什么查询速度会慢
    java计算机毕业设计springboot+vue气象观测数据样本构建与分析系统-天气预报网站
    excel表格乱码怎么解决呢?
    多协议转换网关支持PLC协议转OPC UA
    rpmbuild 包名 version 操作系统信息部分来源 /etc/rpm/macros.dist
    访问者模式
  • 原文地址:https://blog.csdn.net/qq_42306803/article/details/125433668