SpringCloud Stream消息驱动
在上一篇文章中《SpringCloud集成RocketMQ》;我们介绍了如何在自己的SpringCloud项目中使用RocketMQ消息中间件来实现消息队列的生产和消费的过程;在文章中我们使用的集成方式;并不是直接使用的RocketMQ的API进行调用,都是通过SpringCloud Stream提供给我们的接口方式来实现的,今天我们这个文章给大家介绍的就是这个SpringCloud Stream消息驱动框架,以及其特点;
Spring Cloud Stream是SpringCloud成员中的一个框架组件;用于构建基于消息的微服务应用框架,在一般的消息中间件中都有一个 Broker Server(代理服务器)或者类似功能的部分,作为消息中转的角色,负责存储消息、转发消息。
Spring Cloud Stream 提供了消息中间件的统一抽象,通过binder和binding的抽象;对各种消息中间件产品中的publish-subscribe、consumer groups、partition 这些概念进行了统一;各种消息中间件产品实现支持SpringCloud Stream的Binder实现;在Spring Cloud Stream的消息流框架里通过inputs或者outputs来与SpringCloud Stream中的binder进行交互,SpringCloud Stream 的binder负责与中间件交互,所以我们只需要和Stream交互就可以很方便实现消息驱动。 如下图所示:
如上图所示;通过Spring Cloud Stream的框架;应用业务这块的实现代码,都是通过inputs,outputs在springCloud Stream体系之内和Binder进行交互; 而binder在具体去和对应的消息中间件的产品进行交互;通过这样的方式,在我们的实现的应用中没有任何具体的消息中间件的接口调用的代码侵入; 所有的实现代码都是通过Spring Cloud Stream框架提供的, 要切换不同的消息中间件产品的实现,我们只需要通过spring Cloud stream提供的配置就可以轻松的实现IOC;以后的业务如果发生消息中间件的任何变化,都不会影响到我们的业务代码变化; 这样就是实现了对具体实现的依赖。业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可!
Binder
Binder是SpringCloud Stream的一个抽象概念,是应用与消息中间件之间交互的实现,目前SpringCloud Stream实现支持Kafka,RabbitMQ,RocketMQ等消息中间件的binder; 通过binder,可以实现中间件的连接,可以动态的改变消息的destinations(对应于 Kafka的topic,RabbitMQ的exchanges),都可以通过外部配置项来实现,而不需要修改一行代码
例如:在上一文中的代码样例里,出现的
- spring:
- cloud:
- # Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
- stream:
- rocketmq:
- # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
- binder:
- name-server: 192.168.56.101:9876 # RocketMQ Namesrv 地址
Bindings
包括 Input Binding 和 Output Binding。Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个接口和规范,开发者只需使用SpringCloud Stream里定义的接口和配置规范;就可以在咱们的业务应用程序里实现消息通道里的生产者Producer和消费者Consumer ,发送消息或者处理消息数据,开发者不需要考虑与底层消息中间件的接口调用。
例如:在上一文中的生产者的代码和消费者的代码
生产者
- @SpringBootApplication
- @EnableBinding(Source.class)
- public class ProducerApplication {
- public static void main(String[] args) {
- SpringApplication.run(ProducerApplication.class, args);
- }
- }
消费者
- @EnableBinding(Sink.class)
- public class ConsumerApplication {
-
- public static void main(String[] args) {
- SpringApplication.run(ConsumerApplication.class, args);
- }
- }
生产消息
- @Autowired
- private Source source;
-
- @GetMapping("/produce")
- public boolean produce(String msg) {
- MyMessage message = new MyMessage(msg).setId(new Random().nextInt());
- Message
springMessage = MessageBuilder.withPayload(message) - .build();
-
- return source.output().send(springMessage);
- }
处理消息
- @Component
- public class MyConsumer {
-
- private Logger logger = LoggerFactory.getLogger(MyConsumer.class);
-
- @StreamListener(Sink.INPUT)
- public void onMessage(@Payload MyMessage message) {
- logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
- }
- }
结束语
本文主要介绍的Spring Cloud Stream;并且Spring Cloud Stream的特点以及使用的方法;在SpringCloud的项目中,如果有需要集成Kafka,RabbitMQ,RocketMQ等消息中间进行消息处理时;可以通过Spring Cloud Stream来进行实现; 大家还可以结合着上一篇文章《SpringCloud集成RocketMQ》来具体使用SpringCloud Stream来实现对RocketMQ的集成。
谢谢大家持续的关注