• 微服务项目使用RabbitMQ


    微服务项目使用RabbitMQ

    很久未用RabbitMQ了,根据网上的Demo,大多数是一个服务包含所有的生产者和消费者和配置,当自己去搭建服务的时候,还需要一些思考各种包的划分.无法无脑CV大法,所以,下文,我根据实际项目抽离出一个比较完整的小Demo演示微服务项目使用RabbitMQ.注意:这个小Demo并没有做消息的可靠性相关操作!

    公共服务

    一般微服务有一个公共服务,用于存放一些配置类或者常量类等,这里我以service_util举例,这个服务下,应该会有以下四个包,如下:

    ├─config  -- 配置类的包
    │      RabbitMQConfig.java
    │
    ├─MQConstant  -- 常量的包
    │      RabbitMQConstant.java
    │
    ├─service  -- 封装的服务的包
    │      RabbitService.java
    │
    └─vo  -- Vo的包 
            OrderVo.java
    

    RabbitMQConfig

    这个类下,我们可以配置基本的RabbitMQ的消息的序列化,在消息的传输过程中,对象和Json的互相转化,详细代码如下:

    package com.leixin.mq.config;
    
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @Classname RabbitMQConfig
     * @Description TODO
     * @Date 2023-08-07 21:25
     * @Created by LeiXin
     */
    /*
    这个配置类的作用是将一个消息转换器添加到RabbitMQ中,
    这样在消息发送和接收时,
    就能够自动地将Java对象转换为JSON格式的消息,
    以及将JSON格式的消息转换回Java对象。
     */
    @Configuration
    public class RabbitMQConfig {
    
        // 声明一个@Bean注解,表示这是一个Spring Bean,会由Spring容器进行管理和实例化
        @Bean
        public MessageConverter getMessageConverter() {
            // 创建并返回一个Jackson2JsonMessageConverter实例作为消息转换器
            // 这个转换器用于在消息发送和接收之间进行JSON格式的转换
            return new Jackson2JsonMessageConverter();
        }
    }
    

    RabbitMQConstant

    这个包用于指定消息队列的常量

    package com.leixin.mq.MQConstant;
    
    /**
     * @Classname RabbitMQConstant
     * @Description TODO
     * @Date 2023-08-07 23:23
     * @Created by LeiXin
     */
    public class RabbitMQConstant {
    
        /**
         * 预约下单
         */
        public static final String EXCHANGE_DIRECT_ORDER = "exchange.direct.order";
        public static final String ROUTING_ORDER = "order";
        //队列
        public static final String QUEUE_ORDER  = "queue.order";
    
        /**
         * 短信
         */
        public static final String EXCHANGE_DIRECT_SMS = "exchange.direct.msm";
        public static final String ROUTING_SMS_ITEM = "msm.item";
        //队列
        public static final String QUEUE_MSM_SMS  = "queue.msm.item";
    }
    
    

    RabbitService

    这个包用于封装消息的发送方式,之后发送消息只要使用RabbitService.

    package com.leixin.mq.service;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    /**
     * @Classname RabbitService
     * @Description TODO
     * @Date 2023-08-07 23:07
     * @Created by LeiXin
     */
    @Service
    public class RabbitService {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 发送消息
         * @param exchange
         * @param routingKey
         * @param message
         * @return
         */
        public boolean sendMessage(String exchange,String routingKey,Object message){
            rabbitTemplate.convertAndSend(exchange,routingKey,message);
            return true;
        }
    }
    
    

    OrderVo

    这里注意Vo包都要放在公共包中,因为生产者和消费者的服务都使用这个对象进行消息的封装传输,这个OrderVo类,只是简单举个例子.

    package com.leixin.mq.vo;
    
    /**
     * @Classname OrderVo
     * @Description TODO
     * @Date 2023-08-08 0:04
     * @Created by LeiXin
     */
    public class OrderVo {
        String id;
        String orderName;
    
        Integer count;
    
        public Integer getCount() {
            return count;
        }
    
        public void setCount(Integer count) {
            this.count = count;
        }
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getOrderName() {
            return orderName;
        }
    
        public void setOrderName(String orderName) {
            this.orderName = orderName;
        }
    }
    
    

    依赖

      <parent>
            <groupId>com.leixingroupId>
            <artifactId>SpringBoot-RabbitMqartifactId>
            <version>1.0-SNAPSHOTversion>
        parent>
    
        <artifactId>service_utilartifactId>
    
        <properties>
            <maven.compiler.source>17maven.compiler.source>
            <maven.compiler.target>17maven.compiler.target>
            <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
        properties>
        <dependencies>
    
    
    
            
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-webartifactId>
            dependency>
    
            
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-amqpartifactId>
            dependency>
    
            
            <dependency>
                <groupId>org.projectlombokgroupId>
                <artifactId>lombokartifactId>
                <optional>trueoptional>
            dependency>
    
            
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
                <scope>testscope>
            dependency>
    
    
        dependencies>
    

    生产者

    生产者用于发送消息,简单举例,就直接在Controller层发送消息

    └─producer
        │  XxxApplication.java
        │
        └─controller
                XxxController.java
    

    XxxApplication

    SpringBoot的启动类,注意,这个CompontScan主要是为了扫描到公共包的一些配置或者组件

    @SpringBootApplication
    @ComponentScan("com.leixin")
    public class XxxApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(XxxApplication.class, args);
        }
    }
    

    XxxController

    生产者的服务发送至消息队列

    @RestController
    @RequestMapping("/test")
    public class XxxController {
        @Autowired
        private RabbitService rabbitService;
    
        @GetMapping
        public void testSendMessage(){
            OrderVo orderVo = new OrderVo();
            orderVo.setCount(100);
            rabbitService.sendMessage(RabbitMQConstant.EXCHANGE_DIRECT_ORDER
            ,RabbitMQConstant.ROUTING_ORDER,
                    orderVo);
        }
    }
    
    

    配置类

    application.yml

    # Spring配置部分
    spring:
      rabbitmq:
        # RabbitMQ主机地址
        host: localhost
        # RabbitMQ端口
        port: 5672
        # 连接RabbitMQ的用户名
        username: guest
        # 连接RabbitMQ的密码
        password: guest
        # 虚拟主机,默认为根虚拟主机"/"
        virtual-host: /
    
    # Spring Boot内嵌服务器配置部分
    server:
      # 内嵌服务器监听的端口号
      port: 8092
    
    

    依赖

        <dependencies>
            <dependency>
                <groupId>com.leixingroupId>
                <artifactId>service_utilartifactId>
                <version>1.0-SNAPSHOTversion>
            dependency>
        dependencies>
    

    消费者

    消费者主要用于处理一些消息,文件如下:

    │  YyyApplication.java
    │
    └─listener
            YyyMQListener.java
    

    YyyMQListener

    这里就是用于处理消息

    package com.leixin.mq.listener;
    
    import com.leixin.mq.MQConstant.RabbitMQConstant;
    import com.leixin.mq.vo.OrderVo;
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @Classname YyyMQListener
     * @Description TODO
     * @Date 2023-08-07 23:54
     * @Created by LeiXin
     */
    
    @Component
    public class YyyMQListener {
    
        // 使用 @RabbitListener 注解标记这个方法作为消息监听器
        // bindings 属性用于定义队列、交换机以及路由关系
        @RabbitListener(bindings = {
                @QueueBinding(
                        value = @Queue(name = RabbitMQConstant.QUEUE_ORDER, durable = "true"), // 创建队列
                        exchange = @Exchange(name = RabbitMQConstant.EXCHANGE_DIRECT_ORDER), // 创建交换机
                        key = RabbitMQConstant.ROUTING_ORDER
                )
        })
        // consume 方法用于处理收到的消息
        public void consume(OrderVo order, Message message, Channel channel) {
            // 从传入的 OrderVo 对象中获取订单数量,并将其减少
            Integer count = order.getCount();
            count--;
    
            // 输出调试信息,展示订单数量的变化
            System.out.println(count);
    
            // 在这里可以添加你的业务逻辑,处理订单相关的操作
            // 例如,更新数据库中的订单状态、发送通知等
            // 最后,可能需要手动确认消息的处理,通过 channel.basicAck 方法
        }
    }
    

    配置文件

    application.yml

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
        virtual-host: /
    server:
      port: 8093
    

    依赖

        <dependencies>
            <dependency>
                <groupId>com.leixingroupId>
                <artifactId>service_utilartifactId>
                <version>1.0-SNAPSHOTversion>
            dependency>
        dependencies>
    
  • 相关阅读:
    我做了几年的Android应用层开发,为什么还要去学习安卓系统知识?
    【Python21天学习挑战赛】- 函数进阶
    如何获取 mysql 外键索引的元数据信息?
    号码吉凶查询易语言代码
    win10打不开蓝牙,怎么办?几招解决
    物联网:用python调入机器学习分析物联网数据入侵检测模块
    【光学】Matlab模拟等倾干涉仿真
    JavaSE之多线程、线程安全问题和synchronized代码块
    c语言实现通讯录(用三种方法来实现一个属于你的通讯录)
    112-curl http请求监测应用情况
  • 原文地址:https://www.cnblogs.com/flyleixin/p/17613630.html