• Centos安装RabbitMQ,JavaSpring发送RabbitMQ延迟延时消息,JavaSpring消费RabbitMQ消息


    1,版本说明

    erlang 和 rabbitmq 版本说明
    https://www.rabbitmq.com/which-erlang.html
    确认需要安装的mq版本以及对应的erlang版本。

    2,下载安装文件

    RabbitMQ下载地址:
    https://packagecloud.io/rabbitmq/rabbitmq-server

    Erlang下载地址:
    https://packagecloud.io/rabbitmq/erlang

    RabbitMQ延迟消息插件下载
    https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

    下载文件如图

    在这里插入图片描述

    3,安装步骤

    3.1, 查询是否有安装过erlang、rabbitmq, 查询到有的话需要删除。

    	rpm -qa | grep rabbitmq-server
    	rpm -qa | grep erlang
    	# 删除
    	yum -y remove rabbitmq-server.noarch
    
    • 1
    • 2
    • 3
    • 4

    3.2, 本地安装erlang

    	yum localinstall erlang-23.2.7-2.el7.x86_64.rpm
    	# 查询安装的版本
    	erl -version
    	# Erlang (SMP,ASYNC_THREADS,HIPE) (BEAM) emulator version xxx
    
    • 1
    • 2
    • 3
    • 4

    3.3, 本地安装rabbitmq

    	yum localinstall rabbitmq-server-3.9.0-1.el7.noarch.rpm
    
    • 1
    	# 启动rabbitmq
    	systemctl start rabbitmq-server
    
    	# 查看rabbitmq状态
    	systemctl status rabbitmq-server
    
    	# 设置rabbitmq服务开机自启动
    	systemctl enable rabbitmq-server
    
    	# 关闭rabbitmq服务
    	systemctl stop rabbitmq-server
    
    	# 重启rabbitmq服务
    	systemctl restart rabbitmq-server
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    3.4, mq 端口开放:

    	firewall-cmd --zone=public --add-port=5672/tcp --permanent
    	firewall-cmd --zone=public --add-port=15672/tcp --permanent
    	firewall-cmd --reload
    	firewall-cmd --zone=public --list-ports
    
    • 1
    • 2
    • 3
    • 4

    3.5, 安装mq管理界面

    	
    	# 启用管理界面插件
    	rabbitmq-plugins enable rabbitmq_management
    
    	curl http://localhost:15672 就可以打开web管理页面
    
    	# rabbitmq有一个默认的账号密码guest,但该情况仅限于本机localhost进行访问,所以需要添加一个远程登录的用户
    
    	# 添加用户
    	rabbitmqctl add_user 用户名 密码
    
    	rabbitmqctl add_user admin 123456
    
    	# 设置用户角色,分配操作权限
    	rabbitmqctl set_user_tags 用户名 角色
    
    	rabbitmqctl set_user_tags admin administrator
    
    	# 为用户添加资源权限(授予访问虚拟机根节点的所有权限)
    	rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*"
    
    	rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
    
    	# 角色有四种:
    	# administrator:可以登录控制台、查看所有信息、并对rabbitmq进行管理
    	# monToring:监控者;登录控制台,查看所有信息
    	# policymaker:策略制定者;登录控制台指定策略
    	# managment:普通管理员;登录控制
    
    	# 修改密码
    	rabbitmqctl change_ password 用户名 新密码
    
    	# 删除用户
    	rabbitmqctl delete_user 用户名
    
    	# 查看用户清单
    	rabbitmqctl list_users
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    3.6, 延迟消息插件安装:

        # 把插件包先复制到	 /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.0/plugins
        cp rabbitmq_delayed_message_exchange-3.9.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.0/plugins/
    	rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    	#重启mq		
    	systemctl restart rabbitmq-server
    	rabbitmq-plugins list
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    3.7,登录测试

    访问地址: ip:15672 账号密码: admin 123456
    登录界面

    找到交换机 exchange,看看类型是否有延迟消息类型的
    在这里插入图片描述

    然后就可以写代码去连接发消息了。

    4, Java代码

    4.1, pom 引入:

             <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-amqpartifactId>
            dependency>
    
    • 1
    • 2
    • 3
    • 4

    4.2, 配置类:

    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitMQConfig {
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            rabbitAdmin.setAutoStartup(true);
            return rabbitAdmin;
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    4.3, 消息定义配置类:

    
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class OrderRabbitMQConfig {
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
        //================================订单延时=================================
        @Bean
        CustomExchange order_pay_delay_exchange() {
            HashMap<String, Object> args = new HashMap<>();
            args.put("x-delayed-type", "direct");
            return new CustomExchange("order_pay_delay_exchange", "x-delayed-message", true, false, args);
        }
        @Bean
        public Queue order_pay_delay_queue() {
            Queue queue = new Queue("order_pay_delay_queue", true, false, false);
            rabbitAdmin.declareQueue(queue);
            return queue;
        }
        @Bean
        public Binding order_pay_delay_binding() {
            return BindingBuilder.bind(order_pay_delay_queue())
                    .to(order_pay_delay_exchange()).with("order_pay_delay_routing").noargs();
        }
    
        //================================订单支付通知======================================
        @Bean
        public DirectExchange order_pay_notify_exchange() {
            return new DirectExchange("order_pay_notify_exchange", true, false);
        }
        @Bean
        public Queue order_pay_notify_direct_queue() {
            Map<String, Object> argsMap = new HashMap<>();
            argsMap.put("x-max-priority", 5);
            Queue queue = new Queue("order_pay_notify_queue", true, false, false, argsMap);
            rabbitAdmin.declareQueue(queue);
            return queue;
        }
        @Bean
        public Binding ctc_bidding_auction_pay_notify_binding() {
            return BindingBuilder.bind(order_pay_notify_direct_queue())
                    .to(order_pay_notify_exchange()).with("order_pay_notify_routing");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    4.4, 消息发送类:

    
    import cn.hutool.json.JSONUtil;
    import com.xxx.rabbitmq.dto.PayOrderNotifyDto;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Slf4j
    @Component
    public class RabbitMQSendUtils {
    
        private static RabbitTemplate rabbitTemplate;
    
        @Autowired
        public RabbitMQSendUtils(RabbitTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
        }
    
        /**
         * 订单支付延时通知、发送MQ消息
         */
        public static void sendPayDelayMessage(PayOrderNotifyDto dto, final Integer delayTimes) {
            //给延迟队列发送消息
            String msg = JSONUtil.toJsonStr(dto);
            log.info("订单支付延时通知、发送MQ消息: {}, delayTimes={}", msg, delayTimes);
            rabbitTemplate.convertAndSend("order_pay_delay_exchange", "order_pay_delay_routing", msg, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //给消息设置延迟毫秒值
                    message.getMessageProperties().setDelay(delayTimes);
                    return message;
                }
            });
        }
    
        /**
         * 订单支付通知,发送MQ消息
         */
        public static void sendPayNotifyMsg(PayOrderNotifyDto dto) {
            log.info("订单支付通知,发送MQ消息: {}", dto);
            rabbitTemplate.convertAndSend("order_pay_notify_exchange", "order_pay_notify_routing", JSONUtil.toJsonStr(dto));
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    4.5, 消息监听消费类:

    
    import cn.hutool.json.JSONUtil;
    import com.xxx.rabbitmq.dto.PayOrderNotifyDto;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * MQ消费监听
     */
    @Slf4j
    @Component
    public class OrderMQListener {
        /**
         * 订单延时通知 消息
         */
        @RabbitListener(queues = {"order_pay_delay_queue"})
        public void payDelayNotify(Message message) {
            try {
                String msg = new String(message.getBody());
                log.info("【消费】订单延时通知 MQ 消息内容: {}, Message={}", msg, message);
                //支付订单改成超时未支付》取消
                PayOrderNotifyDto dto = JSONUtil.toBean(msg, PayOrderNotifyDto.class);
    
            } catch (Exception e) {
                log.error("订单延时通知 消息消费失败:", e);
            }
        }
        /**
         * 订单支付通知 消息
         */
        @RabbitListener(queues = {"order_pay_notify_queue"})
        public void payNotify(Message message) {
            try {
                String msg = new String(message.getBody());
                log.info("订单支付通知 MQ 消息内容:{}, {}", msg, message);
                PayOrderNotifyDto payOrderNotifyDto = JSONUtil.toBean(msg, PayOrderNotifyDto.class);
            } catch (Exception e) {
                log.error("订单支付通知 消息消费失败:", e);
            }
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
  • 相关阅读:
    【Python入门】文件内容操作
    网络库OKHttp(1)流程+拦截器
    shell 监听指定日志变化进行相关业务处理
    Spring Cloud Gateway系列【5】GatewayFilter网关过滤器详解
    Power Modeling and Analysis
    EXCEL 求解线性规划问题
    如何把图片转换成pdf格式?图片转PDF方法分享
    微博、虎牙挺进兴趣社区:同行不同路
    如何解决 Redis 的并发竞争 key 问题
    利用PaddleDetection 训练自定义COCO数据集进行目标检测
  • 原文地址:https://blog.csdn.net/huweijun_2012/article/details/134060447