• (项目实战)RocketMQ5.0延迟消息在聚合支付系统中的应用


    在这里插入图片描述

    1 基于业务场景掌握RocketMQ5.0

    本篇文章主要结合聚合支付系统中的业务场景来落地RocketMQ中间件的应用聚合支付系统主要在支付系统超时订单和商户支付结果异步通知场景中会使用到RocketMQ消息中间件。本文使用到了RocketMQ中的延迟消息知识点,RocketMQ延迟消息投递等级一共为18个等级,具体投递等级和延迟时间如下

    投递等级(delay level)延迟时间投递等级(delay level)延迟时间
    11s106min
    25s117min
    310s128min
    430s139min
    51min1410min
    62min1520min
    73min1630min
    84min171h
    95min182h

    RocketMQ消息类型主要有普通消息、顺序消息、延迟消息、批量消息、事务消息
    在这里插入图片描述
    接下来我们一起来分析延迟消息在具体业务场景中的应用吧

    1.1 超时订单(下游商户未支付)

    在聚合支付系统和下游商户系统业务场景中,下游商户在调用聚合支付系统支付接口后,用户实际并没有对该笔订单进行支付。所以这时我们需要对未支付的订单进行订单关闭操作,那么这时我们使用消息队列中的延迟消息实现该业务功能 ,以下为具体的业务流程
    在这里插入图片描述

    1.2 下游商户支付结果通知

    下游商户调用聚合支付系统统一支付功能后,用户完成支付,这时支付渠道需要把用户支付结果返回给聚合支付系统,聚合支付系统再通过RocketMQ延迟消息通知给下游商户系统。该业务功能同样是基于消息队列的延迟消息进行技术实现,系统第一次通知采用RocketMQ普通实时消息进行支付结果通知,在没有收到商户系统支付通知响应结果时聚合支付系统会采用延迟消息每隔10s通知,循环通知5次。
    在这里插入图片描述

    2 RocketMQ延迟消息核心代码实现

    本技术文档采用SpringCloud2021.x和RocketMQ5.0进行代码实现

    Spring Cloud Alibaba VersionSpring Cloud VersionSpring Boot Version
    2021.xSpring Cloud 2021.x2.7.18

    2.1 在SpringCloud中集成RocketMQ流程

    2.1.1 引入RocketMQ Stream Starter
    <dependency>
        <groupId>com.alibaba.cloudgroupId>
        <artifactId>spring-cloud-starter-stream-rocketmqartifactId>
    dependency>
    
    2.1.2 修改application.properties配置文件
    server.port=1000
    spring.application.name=rocketmq-delay-consume-pay
    spring.cloud.stream.function.definition=consumer;
    spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
    spring.cloud.stream.rocketmq.bindings.producer-out-0.producer.group=output_1
    spring.cloud.stream.bindings.producer-out-0.destination=delay
    spring.cloud.stream.bindings.consumer-in-0.destination=delay
    spring.cloud.stream.bindings.consumer-in-0.group=delay-group
    logging.level.org.springframework.context.support=debug
    
    
    2.1.3 代码实现

    1 生产者

    package cn.itbeien.mq;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.common.message.MessageConst;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.cloud.stream.function.StreamBridge;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.GenericMessage;
    import org.springframework.stereotype.Service;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.function.Consumer;
    
    /**
     * @author beien
     * @date 2024-06-19 23:00
     * Copyright© 2024 beien
     */
    @Service
    @Slf4j
    public class ProducerService {
        @Autowired
        private StreamBridge streamBridge;
    
        /**
         * 生产者
         * @return
         */
        @Bean
        public void producerDelay() {
                    String key = "KEY01";
                    Map<String, Object> headers = new HashMap<>();
                    headers.put(MessageConst.PROPERTY_KEYS, key);
                    headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, "1001");
                    headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 3);//10秒
                    SimpleMsg simpleMsg =  new SimpleMsg();
                    simpleMsg.setOrderId("10001");
                    Message<SimpleMsg> msg = new GenericMessage(simpleMsg, headers);
                    streamBridge.send("producer-out-0", msg);
    
        }
    }
    

    2 消费者

    package cn.itbeien.mq;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.common.message.MessageConst;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.cloud.stream.function.StreamBridge;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.GenericMessage;
    import org.springframework.stereotype.Service;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.function.Consumer;
    
    /**
     * @author beien
     * @date 2024-06-19 23:05
     * Copyright© 2024 beien
     */
    @Service
    @Slf4j
    public class ConsumerService {
    
        /**
         * 消费者
         */
        @Bean
        public Consumer<Message<SimpleMsg>> consumer() {
            return msg -> {
                log.info(Thread.currentThread().getName() + " Consumer Receive New Messages: " + msg.getPayload().getOrderId());
            };
        }
    }
    

    3 关注我

    欢迎关注我的视频号和公众号,视频号有相关技术和业务视频可学习支付业务/文旅行业数字化。探讨技术(系统架构、微服务、容器化、云原生、云原生),支付系统实战。
    在这里插入图片描述

  • 相关阅读:
    No169.精选前端面试题,享受每天的挑战和学习
    数据收集-数据收集软件-数据收集工具免费
    LCR 034.验证外星语词典
    快速上手Linux核心命令(三):文件和目录操作命令
    Aspose.Words利用Word模板导出Word文档
    如何确保面试流程标准化操作,避免人为因素影响**
    ES6 对象面试题
    Chisel-Strike:一款功能强大的.NET异或XOR加密CobaltStrike Aggressor实现
    canvas之时钟
    [补题记录] Atcoder Beginner Contest 325(E、F)
  • 原文地址:https://blog.csdn.net/BenMicro/article/details/139868926