• 030-从零搭建微服务-消息队列(二)


    写在最前

    如果这个项目让你有所收获,记得 Star 关注哦,这对我是非常不错的鼓励与支持。

    源码地址(后端):mingyue-springcloud-learning: 🎉 基于 Spring Boot、Spring Cloud & Alibaba 的分布式微服务架构基础服务中心🎉 给出微服务的一些搭建建议

    源码地址(前端):mingyue-springcloud-ui: 🎉 基于 Vue3 + TS + Vite + Element plus 等技术,适配 MingYue 后台微服务

    文档地址:Wiki - Gitee.com

    mingyue-common-mq

    添加依赖

    根据需要在 mingyue-common-mq 模块中添加所需的 MQ 中间件,例如:RocketMQ、Kafka。

    
        
        
            com.alibaba.cloud
            spring-cloud-starter-stream-rocketmq
        
    ​
        
        
            org.springframework.cloud
            spring-cloud-starter-stream-kafka
        
    

    集成 RocketMQ

    引入依赖

    
    
        com.csp.mingyue
        mingyue-common-mq
    

    Nacos 配置

    spring:
      cloud:
        stream:
          function:
            # 重点配置 与 binding 名与消费者对应
            definition: rocketmqDemo
          rocketmq:
            binder:
                # rocketmq 地址
                name-server: 192.168.21.32:9876
            bindings:
                rocketmqDemo-out-0:
                    producer:
                        # 必须得写
                        group: default
          bindings:
            rocketmqDemo-out-0:
                content-type: application/json
                destination: stream-rocketmq-demo-topic
                group: demo-group
                binder: rocketmq
            rocketmqDemo-in-0:
                content-type: application/json
                destination: stream-rocketmq-demo-topic
                group: demo-group
                binder: rocketmq

    RocketMQ 生产者

    @Component
    public class RocketMqProducer {
    ​
        @Resource
        private StreamBridge streamBridge;
    ​
        public void rocketMqDemoMsg(String msg) {
            // 构建消息对象
            MqMessageDto messageDto = new MqMessageDto()
                    .setMsgId(IdUtil.fastSimpleUUID())
                    .setMsgText(msg);
    ​
            streamBridge.send("rocketmqDemo-out-0", MessageBuilder.withPayload(messageDto).build());
        }
    ​
    }

    RocketMQ 消费者

    @Slf4j
    @Component
    public class RocketMqConsumer {
    ​
        @Bean
        Consumer rocketmqDemo() {
            log.info("Rocket MQ 初始化订阅");
            return msg -> {
                log.info("通过 Rocket MQ 消费到消息 => {}", msg.toString());
            };
        }
    ​
    }

    推送消息到 RocketMQ

    @GetMapping("/sendRocketMq")
    @Operation(summary = "发送消息到RocketMQ", parameters = { @Parameter(name = "msg", description = "推送的消息体", required = true) })
    public R sendRocketMq(String msg) {
        rocketMqProducer.rocketMqDemoMsg(msg);
        return R.ok();
    }

    集成 Kafka

    引入依赖

    
    
        com.csp.mingyue
        mingyue-common-mq
    

    Nacos 配置

    spring:
      cloud:
        stream:
          function:
            # 重点配置 与 binding 名与消费者对应
            definition: kafkaDemo
          kafka:
            binder:
                brokers: 192.168.21.32:9092
          bindings:
            kafkaDemo-out-0:
                destination: stream-kafka-demo-topic
                contentType: application/json
                group: demo-group
                binder: kafka
            kafkaDemo-in-0:
                destination: stream-kafka-demo-topic
                contentType: application/json
                group: demo-group
                binder: kafka

    Kafka 生产者

    @Component
    public class KafkaProducer {
    ​
        @Resource
        private StreamBridge streamBridge;
    ​
        public void kafkaDemoMsg(String msg) {
            // 构建消息对象
            MqMessageDto messageDto = new MqMessageDto()
                    .setMsgId(IdUtil.fastSimpleUUID())
                    .setMsgText(msg);
    ​
            streamBridge.send("kafkaDemo-out-0", MessageBuilder.withPayload(messageDto).build());
        }
    ​
    }

    Kafka 消费者

    @Slf4j
    @Component
    public class KafkaConsumer {
    ​
        @Bean
        Consumer kafkaDemo() {
            log.info("Kafka 初始化订阅");
            return msg -> {
                log.info("通过 Kafka 消费到消息 => {}", msg.toString());
            };
        }
    ​
    }

    推送消息到 Kafka

    @GetMapping("/sendKafka")
    @Operation(summary = "发送消息到Kafka", parameters = { @Parameter(name = "msg", description = "推送的消息体", required = true) })
    public R sendKafka(String msg) {
        kafkaProducer.kafkaDemoMsg(msg);
        return R.ok();
    }

    拓展 RabbitMQ

    mingyue-common-mq 添加依赖

    
        org.springframework.cloud
        spring-cloud-starter-stream-rabbit
    

    Nacos 配置

    --- # rabbitmq 配置
    spring:
      rabbitmq:
        host: rabbitmqIp
        port: 5672
        username: root
        password: root
      cloud:
        stream:
          function:
              # 重点配置 与 binding 名与消费者对应
              definition: rabbitmqDemo
          rabbit:
            bindings:
              rabbitmqDemo-in-0:
                consumer:
                  delayedExchange: true
              rabbitmqDemo-out-0:
                producer:
                  delayedExchange: true
          bindings:
            rabbitmqDemo-in-0:
              destination: delay.exchange.demo
              content-type: application/json
              group: delay-group
              binder: rabbit
            rabbitmqDemo-out-0:
              destination: delay.exchange.demo
              content-type: application/json
              group: delay-group
              binder: rabbit

    小结

    MQ 基础搭建已经完成,后续会编写一些实际开发中使用到队列的场景,如:

    1. 订单处理:

      • 电子商务平台可以使用消息队列来处理订单,确保订单的创建、支付、发货和通知等各个步骤都能按顺序和可靠地执行。

    2. 通知和提醒:

      • 网站或应用程序可以使用消息队列来发送通知和提醒,如电子邮件通知、短信通知、推送通知等,以便与用户互动。

    3. 用户注册和身份验证:

      • 当用户注册或请求密码重置时,消息队列可以用于生成和发送验证链接或令牌,确保用户身份验证的安全性和可扩展性。

    4. 数据同步:

      • 在多个系统之间同步数据,以确保数据的一致性,例如将用户配置信息从一个微服务同步到另一个微服务。

    5. 事件日志和审计:

      • 记录应用程序事件、用户活动和系统操作,以进行审计、监视和故障排除。

    6. 批量处理:

      • 处理大量数据导入、数据清洗、ETL(提取、转换、加载)操作等批处理任务,以提高性能和可维护性。

    7. 异步任务处理:

      • 处理后台任务,如图像处理、视频编码、生成报告等,以减少响应时间和提高系统的吞吐量。

    8. 队列服务:

      • 提供队列服务以支持其他应用程序或团队的异步通信需求,例如云服务提供商的消息队列服务。

    9. 数据分发:

      • 将数据从生产者分发给多个消费者,以实现发布-订阅模式,例如新闻订阅、市场报价和天气预报。

    10. 错误处理和重试:

      • 处理意外错误和故障,将失败的操作或任务放入队列,以便进行重试或错误处理。

    这些业务使用场景只是消息队列的一些示例。消息队列有助于提高系统的可扩展性、弹性和可靠性,允许异步处理和解耦合组件,从而改善了应用程序的整体性能和用户体验。不同的业务需求可能需要不同类型的消息队列系统和配置。

  • 相关阅读:
    No165.精选前端面试题,享受每天的挑战和学习
    学习MySQL-第二章
    Stream学习2
    程序员网上接单盛行,到底该怎样选择一个好用不坑的接单平台?
    STM32Cube高效开发教程<基础篇>(八)----通用定时器-输入捕获、输出比较、PWM输出/互补输出等
    算法---矩阵中战斗力最弱的 K 行(Kotlin)
    C#面:简述一下面向对象的三大特性?
    JavaSE之反射
    目标检测算法——遥感影像数据集资源汇总(附下载链接)
    【软件工程期末复习】知识点+大题详解(E-R图、数据流图、N-S盒图、状态图、活动图、用例图....)
  • 原文地址:https://blog.csdn.net/csp732171109/article/details/133362384