多个微服务

消息中间件都有自己的Binder实现;比如Kafka 的实现KafkaMessageChannelBinder,RabbitMQ的实现RabbitMessageChannelBinder以及RocketMQ 的实现 RocketMQMessageChannelBinder;
包括 Input Binding 和 Output Binding;
Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触;
| 组成 | 说明 |
| Binder 绑定器(各个消息中间件的绑定器不同) | Binder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现; |
| @Input 注解,表明是一个输入管道 | 该注解标识输入通道,通过该输入通道接收消息进入应用程序 |
| @Output 注解,表明是一个输出管道 | 该注解标识输出通道,发布的消息将通过该通道离开应用程序 |
| @StreamListener 注解 | 监听队列,用于消费者的队列的消息接收 |
| @EnableBinding 注解,开启(Enable)绑定功能 | 将信道channel和exchange、topic绑定在一起 |


org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.alibaba.cloud.stream.binder.rocketmq.config.RocketMQComponent4BinderAutoConfiguration
RocketMQ的自动装配:@AutoConfigureAfter({RocketMQAutoConfiguration.class})
RocketMQ属性的配置:@EnableConfigurationProperties({RocketMQProperties.class})
# 客户端接入点,必填
spring.cloud.stream.rocketmq.binder.name-server=192.168.172.128:9876
对应的源码,如下图所示

<dependency>
<groupId>com.alibaba.cloudgroupId>
<artifactId>spring-cloud-starter-stream-rocketmqartifactId>
dependency>
第2步:依赖父项目spring cloud alibaba,解决了spring cloud stream的起步依赖没有版本号从而不能引入的问题:
<spring-cloud-alibaba.version>2.2.1.RELEASEspring-cloud-alibaba.version>
注意:兼容性问题:
注意版本需要使用springboot2.2.5
<spring-boot.version>2.2.5.RELEASEspring-boot.version>
<spring-cloud-alibaba.version>2.2.1.RELEASEspring-cloud-alibaba.version>
第3步:依赖spring-cloud-starter-web,有
第二步:application.properties配置文件:组一样竞争关系
########## RocketMQ 通用配置
# 配置绑定器binder连接某个消息中间件(由ip和port指定),高度抽象,用于屏蔽不同消息中间件之间的差异(相当于ORM框架中的hibernate和mybatis)
spring.cloud.stream.rocketmq.binder.name-server=192.168.172.128:9876
# 日志级别
logging.level.com.alibaba.cloud.stream.binder.rocketmq=INFO
########## Consumer Config### bindings是微服务与具体消息中间件沟通(发消息,收消息)的桥梁 #######
# input 的配置:
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.input.group=test-group
########## Produce Config
# output 的配置如下:
spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.bindings.output.group=test-group
第三步:编码
第1步:SendService.java
发送消息:
@EnableBinding(Source.class)
@Service
public class SenderService {
//Source是spring cloud stream封装的
@Autowired
private Source source;
//发送消息的方法
public void send(String msg) throws Exception {
//获取订阅通(信)道:source.output()==MessageChannel
//发送消息MessageChannel.send()的参数是:Message.java
// 返回值:布尔类型
boolean flag = source.output().send(MessageBuilder.withPayload(msg).build());
System.out.println("消息发送:" + flag);
}
}
第2步:Application.java发送消息的方式(或使用XxxController.java)
ReceiveService.java接收消
方案1:手动调用方法去接收(while true死循环):Sink.input获取订阅信道
//获取消息通(信)道:source.input()==SubscribableChannel
//发送消息SubscribableChannel.subscrible()的参数是:Message.java
方案2:自动监听方式@EnableBinding(Sink.class) + @StreamListener("input")
@EnableBinding(Sink.class)
public class ReceiveService {
@StreamListener("input")
public void receiveInput1(String receiveMsg) {
System.out.println("input 接收到的消息: " + receiveMsg);
}
}
第四步:验证
MQ管控台
目的地有消息:spring.cloud.stream.bindings.input.destination=test-topic
消息被消费
控制台打印
在前面的案例中,我们已经实现了一个基础的 Spring Cloud Stream 消息传递处理操作,但在操作之中使用的是系统提供的 Source (output)、Sink(input),接下来我们来看一下自定义信道名称;
########## 自定义
# input 的配置:
spring.cloud.stream.bindings.input1.destination=test-topic1
spring.cloud.stream.bindings.input1.content-type=text/plain
spring.cloud.stream.bindings.input1.group=test-group1
# output 的配置:
spring.cloud.stream.bindings.output1.destination=test-topic1
spring.cloud.stream.bindings.output1.content-type=text/plain
spring.cloud.stream.bindings.input1.group=test-group1
第三步:编码
第1步:自定义Source接口
public interface MySource {
String OUTPUT1 = "output1";
@Output(MySource.OUTPUT1)
MessageChannel output1();
}
第2步:发送消息业务层,SendService.java
第3步:自定义Sink接口
public interface MySink {
String INPUT1 = "input1";
@Input(MySink.INPUT1)
SubscribableChannel input1();
}
第4步:接收消息业务层,ReceiveService.java
第5步:applicaton.java 或 XxxController.java中发送、消费消息
发送消息
接收消息
传统方式:手动调用receive方法(while死循环)
自动监听方式:@EnableBinding(Sink.class) + @StreamListener("input")
第四步:验证
第五步:扩展
第1步:多个发送

第2步:多个消费


#-------------------------- 事务消息 begin --------------------------------
#生产的配置
spring.cloud.stream.bindings.outputTX.destination=TransactionTopic
spring.cloud.stream.bindings.outputTX.content-type=application/json
spring.cloud.stream.rocketmq.bindings.outputTX.producer.group=myTxProducerGroup
#是否为事务消息,默认为false表示不是事务消息,true表示是事务消息
spring.cloud.stream.rocketmq.bindings.outputTX.producer.transactional=true
#消费的配置:
spring.cloud.stream.bindings.inputTX.destination=TransactionTopic
spring.cloud.stream.bindings.inputTX.content-type=text/plain
spring.cloud.stream.bindings.inputTX.group=transaction-group
spring.cloud.stream.rocketmq.bindings.inputTX.consumer.broadcasting=false
1#-------------------------- 事务消息 end --------------------------------
public interface MySource {
String OUTPUTTx= "outpuTx";
@Output(MySource.OUTPUTTX)
MessageChannel outputTX();
}
- @Autowired
- private MySource mySource;
-
- public
void sendTransactionalMsg(T msg, int num) throws Exception { - // 需要构建的消息对象MessageBuilder
- MessageBuilder builder = MessageBuilder.withPayload(msg)
- .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);
- builder.setHeader("test", String.valueOf(num));
- Message message = builder.build();
- boolean flag = mySource.outputTX().send(message);
- System.out.println("inputTX 事务消息发送:" + flag);
- }
- // 执行本地事务和本地事务检查:
- // 监听器
- @RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5, maximumPoolSize = 10)
- public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
-
- @Override
- public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
- Object num = msg.getHeaders().get("test");
- if ("1".equals(num)) {
- System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " unknown");
- // 未知,需要二次检查
- return RocketMQLocalTransactionState.UNKNOWN;
- } else if ("2".equals(num)) {
- System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " rollback");
- return RocketMQLocalTransactionState.ROLLBACK;
- }
- System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " commit");
- return RocketMQLocalTransactionState.COMMIT;
- }
-
- // 回调检查
- @Override
- public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
- System.out.println("check: " + new String((byte[]) msg.getPayload()));
- // 假设回调检查没有异常,则返回commit
- return RocketMQLocalTransactionState.COMMIT;
- }
- }
d.程序入口类application.java
第2步:消费者
a.MySink.java
b.ReceiveService.java
监听器自动接收消费
.output 或 .input手动调用消息
第3步:程序入口类
第五步:测试验证
生产者
返回commit,事务正常提交,消费者可见、也可消费消息
返回rallback,事务回滚,消息删除
超时,运行二次检查方法
消费者
成功接收到所有的消息