• 消息驱动 —— SpringCloud Stream


    Stream 简介

    Spring Cloud Stream 是用于构建消息驱动的微服务应用程序的框架,提供了多种中间件的合理配置

    Spring Cloud Stream 包含以下核心概念:

    • Destination Binders:目标绑定器,目标指的是 Kafka 或者 RabbitMQ,绑定器就是封装了目标中间件的包,如果操作的是 Kafka,就使用 Kafka Binder,如果操作的是 RabbitMQ,就使用 RabbitMO Binder
    • Bindings:外部消息传递系统和应用程序之间的桥梁,提供消息的“生产者”和“消费者”(由目标绑定器创建)
    • Message:一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信

    应用程序通过 inputs 或者 outpus 与 Spring Cloud Stream 的 Binder 交互,Binder 层负责和中间件的通信,通过配置来 binding。通过定义 Binder 作为中间层,实现了应用程序与消息中间件细节之间的隔离,应用程序不需要再考虑各种不同的消息中间件实现。当需要升级消息中间件或是更换其他消息中间件产品时,只需要更换对应的 Binder 绑定器

    在这里插入图片描述


    Stream 整合 kafka

    以 Kafka 为例,确保安装 Kafka 并启动

    分别创建生产者和消费者项目,分别添加依赖

    <dependency>
        <groupId>org.springframework.cloudgroupId>
        <artifactId>spring-cloud-starter-stream-kafkaartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    1. 创建生产者

    开发 MqSource 接口

    public interface MqSource {
    
        @Output("test-topic")
        MessageChannel testTopic();
    
        @Output("test-topic-2")
        MessageChannel testTopic2();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    通过 @Output@Input 注解定义消息输入和输出通道的名称定义,输出通道需要返回 MessageChannel 接口对象,它定义了向消息通道发送消息的方法。默认情况下,通道的名称就是注解的方法的名称,也能自己定义通道名称,只需要给 @Input@Output 注解传入 String 类型参数通道名称即可,这里指定两个通道分别为 test-topictest-topic-2

    开发 MsgProducer 类

    @Slf4j
    @EnableBinding(MqSource.class)
    public class MsgProducer {
    
        @Autowired
        private MqSource mqSource;
    
        public void sendTestTopicMessage(String msg) {
            try {
                mqSource.testTopic().send(MessageBuilder.withPayload(msg).build());
            } catch (Exception e) {
                log.error("sendTestTopicMessage error", e);
            }
        }
    
        public void sendTestTopic2Message(String msg) {
            try {
                mqSource.testTopic2().send(MessageBuilder.withPayload(msg).build());
            } catch (Exception e) {
                log.error("sendTestTopic2Message error", e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    使用 @EnableBinding 创建和绑定通道,绑定通道是指将通道和 Binder 进行绑定,比如 Kafka、RabbiMQ 等。如果类路径下只有一种 Binder,那么 Spring Cloud Stream 会找到并绑定它,不需要进行配置。如果有多个就需要明确配置

    调用 MqSource 接口方法获取输出通道对象,接着调用 send 方法发送数据。send 方法接收一个 Message 对象,这个对象不能直接新建,需要使用 MessageBuilder 获取

    2. 创建消费者
    public interface MqSink {
    
        @Input("test-topic")
        MessageChannel testTopic();
    
        @Input("test-topic-2")
        MessageChannel testTopic2();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    与生产者的 MqSource 同理

    开发 MsgReceiver 类,@StreamLisiener 接收的参数是要处理的通道名,所注解的方法就是处理从通道获取数据的方法,方法的参数就是获取到的数据

    @Slf4j
    @EnableBinding(MqSink.class)
    public class MsgReceiver {
    
        @StreamListener("test-topic")
        public void testTopicMessageListen(String msg) {
            log.info("testTopicMessageListen: {}", msg);
        }
    
        @StreamListener("test-topic-2")
        public void testTopic2MessageListen(String msg) {
            log.info("testTopic2MessageListen: {}", msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

  • 相关阅读:
    【JavaScript】HTML文件插入JavaScript函数
    C语言数组全面解析:从初学到精通
    Maven环境变量配置
    Java项目:洗浴中心管理系统(java+SSM+JSP+jQuery+javascript+Mysql)
    后端程序员生产力工具合集
    Vue3 + Ts实现NPM插件 - 定制loading
    多线程、多进程同时操作MMAP,会怎么样?
    Linux C语言开发-D7D8运算符
    Elasticsearch:使用不同的 CA 更新安全证书 (一)
    第2-4-5章 规则引擎Drools高级语法-业务规则管理系统-组件化-中台
  • 原文地址:https://blog.csdn.net/CSDN_handsome/article/details/133499801