• Rocketmq使用手册第一篇(springboot整合rocketmq)


    springboot整合rocketmq

    一,引入依赖

    4.7.0版本引入2.1.0版本,4.8.0引入2.2.0

    org.apache.rocketmq rocketmq-spring-boot-starter 2.1.0 or 2.2.0

    二,配置项:

    生产者配置:

    #消息轨迹功能,如果开启了acl控制
    #在给生产者和消费者分配用户权限时需要额外分配-i PUB(defaultTopicPerm=PUB),否则默认的RMQ_SYS_TRACE_TOPIC无权限
    #如果需要指定自定义消息轨迹topic,需要提前申请创建对应的topic,默认自动创建被禁用
    rocketmq.producer.customized-trace-topic=RMQ_SYS_TRACE_TOPIC
    rocketmq.producer.enable-msg-trace=true

    #rocketmq连接地址
    rocketmq.name-server=127.0.0.1:9876
    #生产者组自定义即可
    rocketmq.producer.group=my-group

    #集群关闭自动创建topic的话topic无法自动创建,需要提前通过管理台创建
    demo.rocketmq.topic=test

    #配置acl用户名密码
    rocketmq.producer.access-key=rocketmq02
    rocketmq.producer.secret-key=12345678

    #其他的NameServer地址,对于需要操作多数据源的项目配置不同于rocketmq.name-server的值即可
    demo.rocketmq.extNameServer=127.0.0.1:9876

    性能优化

    ##生产者发送消息超时时间,默认3s
    rocketmq.producer.send-message-timeout=15000
    ##生产者消息压缩大小,默认达到4KB启用消息压缩
    rocketmq.producer.compress-message-body-threshold=4096

    ##生产者发送消息最大字节数,默认4MB
    rocketmq.producer.max-message-size=4194304
    ##生产者发送异步消息失败后重试次数,默认0次
    rocketmq.producer.retry-times-when-send-async-failed=0
    ##生产者消息失败容错策略,默认false不开启,生产环境建议开启
    rocketmq.producer.retry-next-server=true
    ##生产者发送同步消息失败后重试次数,默认2次
    rocketmq.producer.retry-times-when-send-failed=2

    消费者配置:

    #消费端配置
    #Push模式
    #demo.rocketmq.topic=test5
    demo.rocketmq.consumerGroup=test5_group
    rocketmq.consumer.access-key=rocketmq06
    rocketmq.consumer.secret-key=12345678

    #性能优化 Push模式
    #设置demo.rocketmq.selector-expression=order只消费tag=order的数据,消息过滤表达式
    demo.rocketmq.selector-expression=*
    demo.rocketmq.selector-type=TAG
    demo.rocketmq.message-model=MessageModel.CLUSTERING

    #pull消费模式配置,pull模式只支持2.2.0以上的版本
    #rocketmq.consumer.group=test1_group
    #rocketmq.consumer.topic=test1
    #rocketmq.consumer.access-key=rocketmq2
    #rocketmq.consumer.secret-key=12345678

    #性能优化 Pull模式下
    #rocketmq.consumer.selector-expression=*
    #rocketmq.consumer.pull-batch-size=10
    #rocketmq.consumer.message-model=CLUSTERING
    #TAG or SelectorType.SQL92
    #rocketmq.consumer.selector-type=TAG

    三,代码展示:

    生产者代码:

    @SpringBootTest
    class DemoApplicationTests {
    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @Resource(name = "extRocketMQTemplate")
    private RocketMQTemplate extRocketMQTemplate;
    
    @Value("${demo.rocketmq.topic}")
    private String springTopic;
    
    
    /\*\*
     \* 发送单向消息
     \* @throws Exception
     \*/
    @Test
    void SimpleOnewayProducer() throws Exception {
        rocketMQTemplate.sendOneWay(springTopic, "Hello, World!");
    }
    
    /\*\*
     \* 发送同步消息
     \* @throws Exception
     \*/
    @Test
    void SimpleSyncProducer() throws Exception {
    
        SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello, World!");
        System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
    
    }
    
    
    /\*\*
     \* 发送异步消息
     \* @throws Exception
     \*/
    @Test
    void SimpleAsyncProducer() throws Exception {
        rocketMQTemplate.asyncSend(springTopic, "Hello, World!", new SendCallback() {
            @Override
            public void onSuccess(SendResult var1) {
                System.out.printf("async onSucess SendResult=%s %n", var1);
            }
    
            @Override
            public void onException(Throwable var1) {
                System.out.printf("async onException Throwable=%s %n", var1);
            }
        });
    }
    
    /\*\*
     \* 发送延时消息
     \* 延时等级1到16分别表示 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,3表示延迟10s发送
     \* @throws Exception
     \*/
    @Test
    void ScheduledSyncProducer() throws Exception {
        String replyString  = rocketMQTemplate.syncSend(springTopic,MessageBuilder.withPayload("request string").build(),15000,3);
        System.out.printf("send %s and receive %s %n", "request string", replyString);
    }
    
    /\*\*
     \* 发送顺序消息
     \* @throws Exception
     \*/
    @Test
    void OrderProducer() throws Exception {
    
        rocketMQTemplate.setMessageQueueSelector((List mqs, Message msg, Object arg)->{
            /\*\*
             \* mqs:要发送消息的topic下的所有消息队列集合,集群的话默认是所有broker的队列
             \* msg:发送的消息
             \* arg:发送消息时传递的参数 通过该参数指定发送到哪个队列,对应 String.valueOf(i)
             \*/
            int queueNum = Integer.valueOf(String.valueOf(arg)) % mqs.size();
            System.out.println("队列id:"+queueNum+" 消息:"+new String(msg.getBody()));
            return mqs.get(queueNum);
        });
    
        for(int i=1;i<=100;i++){
            String msg="type:"+i%4+" value:"+i;
            rocketMQTemplate.syncSendOrderly(springTopic,msg, String.valueOf(i));
        }
    }
    
    
    /\*\*
     \* 批量发送消息
     \* @throws Exception
     \*/
    @Test
    void BatchProducer() throws Exception {
        List msgs = new ArrayList();
        for (int i = 0; i < 10; i++) {
            msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
                    setHeader(RocketMQHeaders.KEYS, "KEY\_" + i).build());
        }
        SendResult sr = rocketMQTemplate.syncSend(springTopic, msgs, 60000);
        System.out.printf("--- Batch messages send result :" + sr);
    }
    
    
    /\*\*
     \* 发送过滤消息
     \* @throws Exception
     \*/
    @Test
    void FilterProducer() throws Exception {
        //发送带有标签的消息
        rocketMQTemplate.convertAndSend(springTopic + ":tag0", "Hello, World!I'm from tag0");
    
    }
    
    /\*\*
     \* 发送事务消息
     \*  回传的事务状态:
     \*  TransactionStatus.CommitTransaction:   提交事务,它允许消费者消费此消息。
     \*  TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
     \*  TransactionStatus.Unknown:             中间状态,它代表需要检查消息队列来确定状态。
     \* @throws Exception
     \*/
    @Test
    void TransactionProducer() throws Exception {
        String\[\] tags = new String\[\] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
    
                org.springframework.messaging.Message msg = MessageBuilder.withPayload("rocketMQTemplate transactional message " + i).
                        setHeader(RocketMQHeaders.TRANSACTION\_ID, "KEY\_" + i).build();
                SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(
                        springTopic + ":" + tags\[i % tags.length\], msg, null);
                System.out.printf("------rocketMQTemplate send Transactional msg body = %s , sendResult=%s %n",
                        msg.getPayload(), sendResult.getSendStatus());
    
                Thread.sleep(10);
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    /\*\*
     \* 多数据源使用
     \* @throws Exception
     \*/
    @Test
    void ExtRocketMQTemplateProducer() throws Exception {
        SendResult sendResult1 = extRocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload("Hello, World!2222".getBytes()).build());
        System.out.printf("extRocketMQTemplate.syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult1);
    }
    /\*\*
     \* 事务消息监听实现
     \*/
    @RocketMQTransactionListener
    class TransactionListenerImpl implements RocketMQLocalTransactionListener {
        private AtomicInteger transactionIndex = new AtomicInteger(0);
    
        private ConcurrentHashMap localTrans = new ConcurrentHashMap();
    
        //Broker预提交成功后回调Producer的executeLocalTransaction方法
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(org.springframework.messaging.Message msg, Object arg) {
            String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION\_ID);
            System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n",
                    transId);
            int value = transactionIndex.getAndIncrement();
            int status = value % 3;
            localTrans.put(transId, status);
            if (status == 0) {
                // Return local transaction with success(commit), in this case,
                // this message will not be checked in checkLocalTransaction()
                System.out.printf("    # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());
                return RocketMQLocalTransactionState.COMMIT;
            }
    
            if (status == 1) {
                // Return local transaction with failure(rollback) , in this case,
                // this message will not be checked in checkLocalTransaction()
                System.out.printf("    # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());
                return RocketMQLocalTransactionState.ROLLBACK;
            }
    
            System.out.printf("    # UNKNOW # Simulating %s related local transaction exec UNKNOWN! 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182

    ");
    return RocketMQLocalTransactionState.UNKNOWN;
    }

        //Broker超时未接受到Producer的反馈,会定时重试调用Producer.checkLocalTransaction,Producer会根据自己的执行情况Ack给Broker
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(org.springframework.messaging.Message msg) {
            String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION\_ID);
            RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
            Integer status = localTrans.get(transId);
            if (null != status) {
                switch (status) {
                    case 0:
                        retState = RocketMQLocalTransactionState.UNKNOWN;
                        break;
                    case 1:
                        retState = RocketMQLocalTransactionState.COMMIT;
                        break;
                    case 2:
                        retState = RocketMQLocalTransactionState.ROLLBACK;
                        break;
                }
            }
            System.out.printf("------ !!! checkLocalTransaction is executed once," +
                            " msgTransactionId=%s, TransactionState=%s status=%s %n",
                    transId, retState, status);
            return retState;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    }

    消费者代码:

    1,push 模式

    a,默认消费

    /**
    * StringConsumer
    */
    @Service
    @RocketMQMessageListener(topic = “ d e m o . r o c k e t m q . t o p i c " , c o n s u m e r G r o u p = " s t r i n g _ c o n s u m e r " , s e l e c t o r E x p r e s s i o n = " {demo.rocketmq.topic}", consumerGroup = "string\_consumer", selectorExpression = " demo.rocketmq.topic",consumerGroup="string_consumer",selectorExpression="{demo.rocketmq.tag}”)
    public class StringConsumer implements RocketMQListener {
    @Override
    public void onMessage(String message) {
    System.out.printf("------- StringConsumer received: %s
    ", message);
    }
    }

    b,配置消费端消费起始位点

    需要实现RocketMQPushConsumerLifecycleListener接口,在实现方法prepareStart里配置属性

    代码如下:

    /**
    * RocketMQ Push模式消费
    */
    @Service
    @RocketMQMessageListener(
    topic = “ d e m o . r o c k e t m q . t o p i c " , c o n s u m e r G r o u p = " {demo.rocketmq.topic}", consumerGroup = " demo.rocketmq.topic",consumerGroup="{demo.rocketmq.consumerGroup}”,
    selectorExpression = “${demo.rocketmq.selector-expression}”,
    selectorType = SelectorType.TAG,
    messageModel = MessageModel.CLUSTERING
    // accessKey = “rocketmq02”, // It will read by `rocketmq.consumer.access-key` key
    // secretKey = “12345678” // It will read by `rocketmq.consumer.secret-key` key
    )
    public class ACLStringPushConsumer implements RocketMQListener, RocketMQPushConsumerLifecycleListener {
    /**
    * 客户端收到的消息
    * @param message
    */
    @Override
    public void onMessage(String message) {
    System.out.printf("------- ACL StringConsumer received: %s
    ", message);
    }

    /\*\*
    
    • 1

    * 对消费者客户端的一些配置
    * @param consumer
    */
    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
    //设置从当前时间开始消费
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
    System.out.println(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
    consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
    //设置最大重试次数.默认16次
    consumer.setMaxReconsumeTimes(10);
    }
    }

    c,修改消费重试的时间间隔以及自定义是否进入死信队列配置

    rocketmq默认消息发送16次后数据会自动进入死信队列,每次重试的偏移量会在重试队列记录,修改重试的时间间隔以及自定义是否直接进入死信队列这些配置再springboot中暂时没有提供可用的接口,官方建议使用默认的方式,

    如果有此业务需求,可以通过支持原生Listener的使用方式自己控制ConsumeConcurrentlyStatus实现,具体实现代码如下

    /**
    * RocketMQ Push模式消费
    */
    @Service
    @RocketMQMessageListener(
    topic = “ d e m o . r o c k e t m q . t o p i c " , c o n s u m e r G r o u p = " {demo.rocketmq.topic}", consumerGroup = " demo.rocketmq.topic",consumerGroup="{demo.rocketmq.consumerGroup}”,
    selectorExpression = “${demo.rocketmq.selector-expression}”,
    selectorType = SelectorType.TAG,
    messageModel = MessageModel.CLUSTERING
    // accessKey = “znb00004”, // It will read by `rocketmq.consumer.access-key` key
    // secretKey = “12345678” // It will read by `rocketmq.consumer.secret-key` key
    )
    public class ACLStringPushConsumer implements RocketMQPushConsumerLifecycleListener {
    @Autowired
    private MyMessageListenerConcurrently myMyMessageListenerConcurrently;

    /**
    * 对消费者客户端的一些配置
    * @param consumer
    */
    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
    //设置从当前时间开始消费
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
    System.out.println(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
    consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
    //设置最大重试次数.默认16次
    consumer.setMaxReconsumeTimes(10);
    //配置重试消息逻辑,默认是context.setDelayLevelWhenNextConsume(0);
    consumer.setMessageListener(myMyMessageListenerConcurrently);
    }
    }

    自定义消息监听器MyMessageListenerConcurrently实现

    @Service
    public class MyMessageListenerConcurrently implements MessageListenerConcurrently {

    private static final Logger log = LoggerFactory.getLogger(MyMessageListenerConcurrently.class);
    
    • 1

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
    //1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    // 0表示每次按照上面定义的时间依次递增,第一次为10s,第二次为30s…
    //-1表示直接发往死信队列,不经过重试队列.
    //>0表示每次重试的时间间隔,由我们用户自定义,1表示重试间隔为1s,2表示5s,3表示10秒,依次递增,重试次数由配置consumer.setMaxReconsumeTimes(10)决定
    //发送的默认重试队列topic名称为%RETRY%+消费者组名,发送的默认死信队列topic名称为%DLQ%+消费者组名
    context.setDelayLevelWhenNextConsume(1); //表示重试间隔为1s
    MessageExt msg = msgs.get(0);
    log.debug(“received msg: {}”, msg);
    try {
    System.out.printf(“%s Receive New Messages: %s %n”, Thread.currentThread().getName(), msgs);
    String msgBody = new String(msg.getBody(), “utf-8”);
    if (“message0”.equals(msgBody)) {
    System.out.println(“失败消息开始=”);
    System.out.println(“msg:” + msg);
    System.out.println(“msgBody:” + msgBody);
    System.out.println(“失败消息结束=”);
    int i = 1 / 0;
    System.out.println(i);
    }
    } catch (Exception e) {
    log.warn(“consume message failed. messageExt:{}”, msg, e);
    System.out.println(“------------------最大重试次数为:” + msgs.get(0).getReconsumeTimes() + “次!--------------------”);
    System.out.println(“-------延迟级别设置:” + context.getDelayLevelWhenNextConsume());
    long d = System.currentTimeMillis();
    SimpleDateFormat sdf = new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”);
    System.out.println(“当前时间:” + sdf.format(d));
    if (msgs.get(0).getReconsumeTimes() > 3) {
    context.setDelayLevelWhenNextConsume(-1); //重试大于3次直接发往死信队列
    }
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    }

    注:springboot批量消费consumeMessageBatchMaxSize配置了无效,默认每次都是单条串行消费的,如果对消费速率有需求可以使用pull模式或者原生rocketmq进行批量消费

    2,pull模式

    /**
    * RocketMQ Pull模式消费
    */
    @Service
    public class ACLStringPullConsumer implements CommandLineRunner {

    public static volatile boolean running = true;
    
    • 1

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void run(String… args) throws Exception {
    // List messages = rocketMQTemplate.receive(String.class);
    // System.out.printf(“receive from rocketMQTemplate, messages=%s %n”, messages);
    while (running) {
    List messages = rocketMQTemplate.receive(String.class);
    if(messages.size()>0){
    System.out.println(“messages.size:” + messages.size());
    for(int i=0;i System.out.println(“msgBody:” + messages.get(i));
    }
    }
    // Thread.sleep(1000); //1秒钟拉取一批数据,每批数据量由配置rocketmq.consumer.pull-batch-size决定
    }
    }
    }

    3,等待回复的消费

    String replyString = rocketMQTemplate.sendAndReceive(stringRequestTopic, “request string”, String.class, 3000);

    生产者sendAndReceive对应的消费代码:

    /**
    * The consumer that replying String
    */
    @Service
    @RocketMQMessageListener(topic = “ d e m o . r o c k e t m q . s t r i n g R e q u e s t T o p i c " , c o n s u m e r G r o u p = " {demo.rocketmq.stringRequestTopic}", consumerGroup = " demo.rocketmq.stringRequestTopic",consumerGroup="{demo.rocketmq.stringRequestConsumer}”, selectorExpression = “${demo.rocketmq.tag}”)
    public class StringConsumerWithReplyString implements RocketMQReplyListener {

    @Override
    public String onMessage(String message) {
        System.out.printf("------- StringConsumerWithReplyString received: %s 
    
    • 1
    • 2
    • 3

    ", message);
    return “reply string”;
    }
    }

    4,其他数据源消费

    /**
    * MessageExtConsumer, consume listener impl class.
    */
    @Service
    @RocketMQMessageListener(topic = “ d e m o . r o c k e t m q . m s g E x t T o p i c " , s e l e c t o r E x p r e s s i o n = " t a g 0 ∣ ∣ t a g 1 " , c o n s u m e r G r o u p = " {demo.rocketmq.msgExtTopic}", selectorExpression = "tag0||tag1", consumerGroup = " demo.rocketmq.msgExtTopic",selectorExpression="tag0∣∣tag1",consumerGroup="{spring.application.name}-message-ext-consumer”)
    public class MessageExtConsumer implements RocketMQListener, RocketMQPushConsumerLifecycleListener {
    @Override
    public void onMessage(MessageExt message) {
    System.out.printf("------- MessageExtConsumer received message, msgId: %s, body:%s
    ", message.getMsgId(), new String(message.getBody()));
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        // set consumer consume message from now
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME\_FROM\_TIMESTAMP);
        consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    }

    如何声明不同name-server或者其他特定的属性来定义非标的RocketMQTemplate?

    第一步: 定义非标的RocketMQTemplate使用你需要的属性,可以定义与标准的RocketMQTemplate不同的nameserver、groupname等。如果不定义,它们取全局的配置属性值或默认值。

    // 这个RocketMQTemplate的Spring Bean名是’extRocketMQTemplate’, 与所定义的类名相同(但首字母小写)
    @ExtRocketMQTemplateConfiguration(nameServer=“127.0.0.1:9876”
    , … // 定义其他属性,如果有必要。
    )
    public class ExtRocketMQTemplate extends RocketMQTemplate {
    //类里面不需要做任何修改
    }

    第二步: 使用这个非标RocketMQTemplate

    @Resource(name = “extRocketMQTemplate”) // 这里必须定义name属性来指向上述具体的Spring Bean.
    private RocketMQTemplate extRocketMQTemplate;

    接下来就可以正常使用这个extRocketMQTemplate了。

    先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小。自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。添加下方名片,即可获取全套学习资料哦

  • 相关阅读:
    【网络篇】第十篇——线程池版的TCP网络程序
    数据挖掘--认识数据
    论文解读( N2N)《Node Representation Learning in Graph via Node-to-Neighbourhood Mutual Information Maximization》
    dpdk结合sriov测试vpp ipsec性能
    Node.js 做 Web 后端的优势在哪?为什么是明智的选择?
    关于elementui表单验证数字的问题
    GitOps 实践之渐进式发布
    为什么我抓不到baidu的数据包
    HTML静态网页成品作业(HTML+CSS+JS)—— 美食企业曹氏鸭脖介绍网页(4个页面)
    1.6 列表(Python)
  • 原文地址:https://blog.csdn.net/m0_54883970/article/details/126114019