• SpringBoot 集成RabbitMQ 实现钉钉日报定时发送功能


    一、RabbitMq 下载安装

    官网:https://www.rabbitmq.com/docs

    二、开发步骤:

    1.MAVEN 配置

       		
            
                org.springframework.boot
                spring-boot-starter-amqp
                2.7.7
            
    

    2. RabbitMqConfig 配置

    package com.lq.common.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.CustomExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    
    @Configuration
    public class RabbitMqConfig {
        /**延迟交换机名称*/
        public static final String  DELAY_EXCHANGE="DelayExchange";
        /**延迟队列名称*/
        public static final String  DELAY_QUEUE="DelayQueue";
    
        public static final String ROUTING_KEY="delay";
    
    
        @Bean
        public CustomExchange customExchange(){
            Map map = new HashMap<>();
            //设置交换机支持延迟消息推送
            map.put("x-delayed-type","direct");
            return new CustomExchange(DELAY_EXCHANGE,"x-delayed-message",true,false,map);
        }
    
        @Bean
        public Queue delayQueue(){
    
            return new Queue(DELAY_QUEUE,true);
    
        }
    
        @Bean
        public Binding DelayBinding(){
            return BindingBuilder.bind(delayQueue()).to(customExchange()).with(ROUTING_KEY).noargs();
        }
    
    }
    

    3. RabbitMqUtil 工具类

    package com.lq.common.util;
    
    import com.lq.common.config.RabbitMqConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.core.ReturnedMessage;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    
    
    @Service
    @Slf4j
    public class RabbitMqUtil {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        private DateTimeFormatter formatterDateTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        @PostConstruct
        public void init(){
            /**
             * 消息发送到交换机成功回调函数
             */
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
    
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    if (ack){
                        log.info("消息投递到交换机成功");
                    }else {
                        log.error("消息投递到交换机失败,原因->{}",cause);
                    }
                }
            });
            /**交换机投递到队列失败回调函数**/
            rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
                @Override
                public void returnedMessage(ReturnedMessage returned) {
                    log.error("投递到队列失败,错误原因->{}",returned);
                }
            });
    
        }
    
        /**
         * @Description 发送延迟消息
         * @param content 延迟内容
         * @param delayTime 延迟时间 ,单位ms;  例如 5000 代表 5 秒
         * @Author hqd
         * @Date 2024-10-21
         */
        public Boolean sendDelayMessage(String content,Integer delayTime){
            log.info("消息发送时间->{}",LocalDateTime.now().format(formatterDateTime));
    
            rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE, RabbitMqConfig.ROUTING_KEY, content, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    log.info("延迟时间->{}",delayTime);
                    //这个底层就是setHeader("x-delay",i);是一样的 设置延时时间
                    message.getMessageProperties().setDelay(delayTime);//单位毫秒
                    return message;
                }
            });
            return true;
    
        }
    
    
    }
    

    4. DailyDelaySendConsumer 消费者监听

    package com.lq.daily.mq.consumer;
    
    import cn.hutool.core.util.ObjectUtil;
    import cn.hutool.core.util.StrUtil;
    import com.alibaba.fastjson.JSONObject;
    import com.lq.common.config.RabbitMqConfig;
    import com.lq.daily.dto.DailyDelaySendDTO;
    import com.lq.daily.service.ILqDailyService;
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    
    /**
     * @Description 日报延迟发送消费者
     * @Author hqd
     * @Date 2024-10-21 16:04
     */
    @Slf4j
    @Component
    public class DailyDelaySendConsumer {
        @Autowired
        private ILqDailyService lqDailyService;
    
        private DateTimeFormatter formatterDateTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        @RabbitListener(queues = RabbitMqConfig.DELAY_QUEUE)
        public void dailyDelaySendListener(String content, Channel channel, Message message) throws IOException, InterruptedException{
            log.info("消息接收时间->{}", LocalDateTime.now().format(formatterDateTime));
            log.info("接收消息内容是->{}",content);
            log.info("{}",message.getMessageProperties().getDeliveryTag());
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    
            //处理日报发送业务逻辑
            if (StrUtil.isNotBlank(content)&& content.startsWith("{")){
                DailyDelaySendDTO dto = JSONObject.parseObject(content, DailyDelaySendDTO.class);
                if (ObjectUtil.isNotEmpty(dto)){
                    lqDailyService.updateDailyDelaySend(dto.getDailyCode(), LocalDateTime.parse(dto.getDelaySendTime(),DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm")));
                }
            }
    
        }
    }
    

    5. 测试延迟发送

       @PassToken
        @GetMapping("/testDelayMq")
        @ApiOperation("测试Mq 延迟消息发送")
        public void testDelayMq(){
            DailyDelaySendDTO dto = new DailyDelaySendDTO();
            dto.setDailyCode("DC2024101015135400001");
            dto.setDelaySendTime("2024-10-22 10:58");
    
            LocalDateTime sendTime = LocalDateTime.parse(dto.getDelaySendTime()+":00", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            long between = ChronoUnit.MILLIS.between(LocalDateTime.now(), sendTime);
    
            rabbitMqUtil.sendDelayMessage(JSON.toJSONString(dto),new Long(between).intValue());
        }
    

    在这里插入图片描述

  • 相关阅读:
    RabbitMQ消费失败重试策略、及重试策略应用场景详解
    在 Android 中创建静态应用程序快捷方式
    Android应用性能优化
    一条SQL引起的系统不可用
    Spring中自定义类型转换器
    CleanMyMac X免费电脑清理加速软件-清理内存磁盘缓存注册表
    ununtu中vim的使用
    硬件开发趋势与技术探索
    将 List 转换为 String
    万字长文,手把手教你重构
  • 原文地址:https://blog.csdn.net/u014212540/article/details/143144665