• Spring Cloud Alibaba RocketMQ Binder


    Spring Cloud Alibaba RocketMQ Binder

    下载RocketMQ

    rocketmq-all-4.7.0-bin-release.zip

    配置环境变量

    ROCKETMQ_HOME E:\rocketmq-all-4.7.0-bin-release

    安装RocketMQ

    1. 启动NameServer服务

    进入目录E:\rocketmq-all-4.7.0-bin-release\bin 执行start mqnamesrv.cmd

    1. 启动Broker服务

    start mqbroker.cmd -n localhost:9876 -c …/conf/broker.conf autoCreateTopicEnable=true 可能会报如下错误。找不到或无法加载主类,如果出此情况,打开bin–>runbroker.cmd,修改%CLASSPATH%成"%CLASSPATH%",保存再次执行如上命令。执行成功后,提示boot success 代表成功。

    引入依赖

    
        com.alibaba.cloud
        spring-cloud-starter-stream-rocketmq
    
    
    • 1
    • 2
    • 3
    • 4

    参考 http://www.itmuch.com/spring-cloud-alibaba/spring-cloud-stream-rocketmq-filter-consume/

    Configuration Options

    1. RocketMQ Binder Properties
    spring.cloud.stream.rocketmq.binder.name-server
    The name server of RocketMQ Server(Older versions use the namesrv-addr configuration item).
    Default: 127.0.0.1:9876.
    
    spring.cloud.stream.rocketmq.binder.access-key
    The AccessKey of Alibaba Cloud Account.
    Default: null.
    
    spring.cloud.stream.rocketmq.binder.secret-key
    The SecretKey of Alibaba Cloud Account.
    Default: null.
    
    spring.cloud.stream.rocketmq.binder.enable-msg-trace
    Enable Message Trace feature for all producers and consumers.
    Default: true.
    
    spring.cloud.stream.rocketmq.binder.customized-trace-topic
    The trace topic for message trace.
    Default: RMQ_SYS_TRACE_TOPIC.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    1. RocketMQ Consumer Properties
    The following properties are available for RocketMQ producers only and must be prefixed with 
    spring.cloud.stream.rocketmq.bindings..consumer..
    
    enable
    Enable Consumer Binding.
    Default: true.
    
    tags
    Consumer subscription tags expression, tags split by ||.
    Default: empty.
    
    sql
    Consumer subscription sql expression.
    Default: empty.
    
    broadcasting
    Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
    Default: false.
    
    orderly
    Receiving message concurrently or orderly.
    Default: false.
    
    delayLevelWhenNextConsume
    Message consume retry strategy for concurrently consume:
    -1,no retry,put into DLQ directly
    0,broker control retry frequency
    >0,client control retry frequency
    Default: 0.
    
    suspendCurrentQueueTimeMillis
    Time interval of message consume retry for orderly consume.
    Default: 1000.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    1. RocketMQ Provider Properties
    The following properties are available for RocketMQ producers only and must be prefixed with 
    spring.cloud.stream.rocketmq.bindings..producer..
    
    enable
    Enable Producer Binding.
    Default: true.
    
    group
    Producer group name.
    Default: empty.
    
    maxMessageSize
    Maximum allowed message size in bytes.
    Default: 8249344.
    
    transactional
    Send Transactional Message.
    Default: false.
    
    sync
    Send message in synchronous mode.
    Default: false.
    
    vipChannelEnabled
    Send message with vip channel.
    Default: true.
    
    sendMessageTimeout
    Millis of send message timeout.
    Default: 3000.
    
    compressMessageBodyThreshold
    Compress message body threshold, namely, message body larger than 4k will be compressed on default.
    Default: 4096.
    
    retryTimesWhenSendFailed
    Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
    Default: 2.
    
    retryTimesWhenSendAsyncFailed
    Maximum number of retry to perform internally before claiming sending failure in asynchronous mode.
    Default: 2.
    
    retryNextServer
    Indicate whether to retry another broker on sending failure internally.
    Default: false.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    ##rocketmq-console

    实现消息的过滤

    1. condition

    生产者:生产者设置一下header

     public  void sendObject(T msg, String tag) {
        Message message = MessageBuilder.withPayload(msg)
                .setHeader("a","b")
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build();
        this.mqTrainTopic.operationLogOutput().send(message);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    消费者

    @StreamListener(value = MQTopic.OPERATE_LOG_INPUT, condition = "headers['a']=='b'")
    public void operateLogInpoutReceive(@Payload OperationVo vo){
        this.IOperationLogMapper.insertOperationLog(vo.getOperationLog());
        this.IOperationLogDetailMapper.insertOperationLogDetail(vo.getDetails(),vo.getOperationLog().getId());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    1. TAGS

    生产者

     public  void sendObject(T msg, String tag) {
        Message message = MessageBuilder.withPayload(msg)
                .setHeader(RocketMQHeaders.TAGS, tag)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build();
        this.mqTrainTopic.operationLogOutput().send(message);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    消费者

    接口
    public interface MQTopic {
    
        String OPERATE_LOG_INPUT = "operate_log_input";
    
        /**
         * 操作日志
         * @return
         */
        @Input(value = OPERATE_LOG_INPUT)
        SubscribableChannel operateLogInput();
    }
    
    注解
    @EnableBinding(value = {MQTopic.class})
    
    配置
    spring:
        cloud:
            stream:
              rocketmq:
                binder:
                  name-server: 127.0.0.1:9876
                bindings:
                  operate_log_input: {consumer.orderly: true, consumer.tags: o_log}
              bindings:
                operate_log_input: {destination: operate_log, content-type: application/plain, group: gorup1, consumer.maxAttempts: 1}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    3、 sql92(用了sql,就不要用Tag)

    rocketmq开启sql92

    在 conf/broker.conf添加配置
    enablePropertyFilter = true
    
    
    启动mq
    start bin/mqbroker -n localhost:9876 -c ./conf/broker.conf 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    生产者

     public  void sendObject(T msg, String tag) {
            Message message = MessageBuilder.withPayload(msg)
                    .setHeader("index","1000")
                    .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                    .build();
            this.mqTrainTopic.operationLogOutput().send(message);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    消费者

    接口
    public interface MQTopic {
    
        String OPERATE_LOG_INPUT = "operate_log_input";
    
        /**
         * 操作日志
         * @return
         */
        @Input(value = OPERATE_LOG_INPUT)
        SubscribableChannel operateLogInput();
    }
    
    配置
    spring:
        cloud: 
        stream:
          rocketmq:
            binder:
              name-server: 127.0.0.1:9876
            bindings:
              operate_log_input: {consumer.sql: index < 1000}
          bindings:
            operate_log_input: {destination: operate_log, content-type: application/plain, group: gorup1, consumer.maxAttempts: 1}
    
    代码
     public  void sendObject(T msg, String tag) {
        Message message = MessageBuilder.withPayload(msg)
                .setHeader("index","1000")
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build();
        this.mqTrainTopic.operationLogOutput().send(message);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
  • 相关阅读:
    国际公认—每个领导者必须拥抱的11项领导力转变
    记我第一次做线下技术分享的那些事
    shell脚本实战案例--系统服务脚本
    php Laravel 使用elasticsearch+ik中文分词器搭建搜索引擎
    10-134 4-6 查询在具有最小内存容量的所有PC中具有最快处理器的PC制造商
    一键接入 ChatGPT,让你的QQ群变得热闹起来
    SpringBoot+Vue实现前后端分离灾情救援系统
    cereal:支持C++11的开源序列化库
    Python-NumPy系统教程
    Dubbo不支持远程文件流传输,项目中常用的解决方案
  • 原文地址:https://blog.csdn.net/mysuppper/article/details/127915436