rocketmq-all-4.7.0-bin-release.zip
ROCKETMQ_HOME E:\rocketmq-all-4.7.0-bin-release
进入目录E:\rocketmq-all-4.7.0-bin-release\bin 执行start mqnamesrv.cmd
start mqbroker.cmd -n localhost:9876 -c …/conf/broker.conf autoCreateTopicEnable=true 可能会报如下错误。找不到或无法加载主类,如果出此情况,打开bin–>runbroker.cmd,修改%CLASSPATH%成"%CLASSPATH%",保存再次执行如上命令。执行成功后,提示boot success 代表成功。
com.alibaba.cloud
spring-cloud-starter-stream-rocketmq
spring.cloud.stream.rocketmq.binder.name-server
The name server of RocketMQ Server(Older versions use the namesrv-addr configuration item).
Default: 127.0.0.1:9876.
spring.cloud.stream.rocketmq.binder.access-key
The AccessKey of Alibaba Cloud Account.
Default: null.
spring.cloud.stream.rocketmq.binder.secret-key
The SecretKey of Alibaba Cloud Account.
Default: null.
spring.cloud.stream.rocketmq.binder.enable-msg-trace
Enable Message Trace feature for all producers and consumers.
Default: true.
spring.cloud.stream.rocketmq.binder.customized-trace-topic
The trace topic for message trace.
Default: RMQ_SYS_TRACE_TOPIC.
The following properties are available for RocketMQ producers only and must be prefixed with
spring.cloud.stream.rocketmq.bindings..consumer..
enable
Enable Consumer Binding.
Default: true.
tags
Consumer subscription tags expression, tags split by ||.
Default: empty.
sql
Consumer subscription sql expression.
Default: empty.
broadcasting
Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
Default: false.
orderly
Receiving message concurrently or orderly.
Default: false.
delayLevelWhenNextConsume
Message consume retry strategy for concurrently consume:
-1,no retry,put into DLQ directly
0,broker control retry frequency
>0,client control retry frequency
Default: 0.
suspendCurrentQueueTimeMillis
Time interval of message consume retry for orderly consume.
Default: 1000.
The following properties are available for RocketMQ producers only and must be prefixed with
spring.cloud.stream.rocketmq.bindings..producer..
enable
Enable Producer Binding.
Default: true.
group
Producer group name.
Default: empty.
maxMessageSize
Maximum allowed message size in bytes.
Default: 8249344.
transactional
Send Transactional Message.
Default: false.
sync
Send message in synchronous mode.
Default: false.
vipChannelEnabled
Send message with vip channel.
Default: true.
sendMessageTimeout
Millis of send message timeout.
Default: 3000.
compressMessageBodyThreshold
Compress message body threshold, namely, message body larger than 4k will be compressed on default.
Default: 4096.
retryTimesWhenSendFailed
Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
Default: 2.
retryTimesWhenSendAsyncFailed
Maximum number of retry to perform internally before claiming sending failure in asynchronous mode.
Default: 2.
retryNextServer
Indicate whether to retry another broker on sending failure internally.
Default: false.
##rocketmq-console
生产者:生产者设置一下header
public void sendObject(T msg, String tag) {
Message message = MessageBuilder.withPayload(msg)
.setHeader("a","b")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build();
this.mqTrainTopic.operationLogOutput().send(message);
}
消费者
@StreamListener(value = MQTopic.OPERATE_LOG_INPUT, condition = "headers['a']=='b'")
public void operateLogInpoutReceive(@Payload OperationVo vo){
this.IOperationLogMapper.insertOperationLog(vo.getOperationLog());
this.IOperationLogDetailMapper.insertOperationLogDetail(vo.getDetails(),vo.getOperationLog().getId());
}
生产者
public void sendObject(T msg, String tag) {
Message message = MessageBuilder.withPayload(msg)
.setHeader(RocketMQHeaders.TAGS, tag)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build();
this.mqTrainTopic.operationLogOutput().send(message);
}
消费者
接口
public interface MQTopic {
String OPERATE_LOG_INPUT = "operate_log_input";
/**
* 操作日志
* @return
*/
@Input(value = OPERATE_LOG_INPUT)
SubscribableChannel operateLogInput();
}
注解
@EnableBinding(value = {MQTopic.class})
配置
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
operate_log_input: {consumer.orderly: true, consumer.tags: o_log}
bindings:
operate_log_input: {destination: operate_log, content-type: application/plain, group: gorup1, consumer.maxAttempts: 1}
3、 sql92(用了sql,就不要用Tag)
rocketmq开启sql92
在 conf/broker.conf添加配置
enablePropertyFilter = true
启动mq
start bin/mqbroker -n localhost:9876 -c ./conf/broker.conf
生产者
public void sendObject(T msg, String tag) {
Message message = MessageBuilder.withPayload(msg)
.setHeader("index","1000")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build();
this.mqTrainTopic.operationLogOutput().send(message);
}
消费者
接口
public interface MQTopic {
String OPERATE_LOG_INPUT = "operate_log_input";
/**
* 操作日志
* @return
*/
@Input(value = OPERATE_LOG_INPUT)
SubscribableChannel operateLogInput();
}
配置
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
operate_log_input: {consumer.sql: index < 1000}
bindings:
operate_log_input: {destination: operate_log, content-type: application/plain, group: gorup1, consumer.maxAttempts: 1}
代码
public void sendObject(T msg, String tag) {
Message message = MessageBuilder.withPayload(msg)
.setHeader("index","1000")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build();
this.mqTrainTopic.operationLogOutput().send(message);
}