• rabbitmq发送消息通用接口


    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


    前言

    在项目中我们经常会使用异步消息,然而每次发送消息都需要写很多重复性工作。


    0. 项目结构:

    在这里插入图片描述

    一、公共类

    提示:公共依赖的对象

    代码如下

    /**
     * 功能描述:
     * 〈〉
     *
     * @author : huan
     * @date   : 2022/11/1
     */
    public interface BaseEventTypeEnum<E, V, T, X ,D> {
    
        E getEvent();
    
        X getExchange();
    
        T getType();
    
        V getServiceName();
    
        D getDesc();
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    import cn.hutool.extra.spring.SpringUtil;
    
    /**
     * 

    描述:spring上下文工具类

    * * @author huan */
    public class SpringContextUtils { /** * 获取指定的Bean * @param beanName * @param clz * @param * @return */ public static <T> T getBean(String beanName, Class<T> clz) { return SpringUtil.getBean(beanName, clz); } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    二、消息体Bean对象

    代码如下:

    import lombok.Data;
    import lombok.experimental.Accessors;
    
    import java.io.Serializable;
    
    /**
     * 功能描述: 消息体
     * 〈〉
     *
     * @author : huan
     * @date   : 2022/11/1
     */
    @Data
    @Accessors(chain = true)
    public class EventMessageModel<T> implements Serializable {
    
        /** 业务id */
        private String businessId;
    
        /** 业务类型 上下架、更新信息 */
        private String businessType;
    
        /** 业务时间 */
        private Long businessTime;
    
        /** 业务数据 */
        private T businessData;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    import lombok.Data;
    import lombok.experimental.Accessors;
    
    import java.io.Serializable;
    
    /**
     * 功能描述: 事件对象
     * 〈〉
     *
     * @author : huan
     * @date   : 2022/10/31
     */
    @Data
    @Accessors(chain = true)
    public class EventModel<T> implements Serializable {
    
        /** 事件id */
        private String eventId;
    
        /** 事件类型  */
        private String eventType;
    
        /** 时间 */
        private Long eventTime;
    
        /** 业务数据 */
        private T data;
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    三、定义构建消息体接口

    提示:此方法用于构建发送消息的消息体

    代码如下:

    import cn.hutool.core.lang.UUID;
    import com.saas.common.rabbitmq.enums.BaseEventTypeEnum;
    import com.saas.common.rabbitmq.model.EventMessageModel;
    import com.saas.common.rabbitmq.model.EventModel;
    
    /**
     * 功能描述: 课程消息生产服务
     * 〈〉
     *
     * @author : huan
     * @date   : 2022/11/1
     */
    public interface MessageProduceService<T, E> {
    
        /**
         * 功能描述: 前置处理
         * 

    控制消息是否进行发送

    * * @param e * @return : Boolean * @author : huan * @date : 2022/11/1 */
    Boolean preHandle(E e); /** * 功能描述: 后置处理 *

    消息发送异常后会调用此方法

    * * @param eventModel * @param typeEnum * @return : void * @author : huan * @date : 2022/11/10 */
    void postHandle(EventModel<EventMessageModel<T>> eventModel, BaseEventTypeEnum typeEnum); /** * 功能描述: 发送业务消息 * 〈〉 * * @param businessId * @param typeEnum * @return : void * @author : huan * @date : 2022/11/1 */ EventModel<EventMessageModel<T>> buildEventModel(String businessId, BaseEventTypeEnum typeEnum); /** * 功能描述: 初始化-消息对象 *

    无特殊要求:请不要重写此方法

    * * @param messageModel * @return : EventModel> * @author : huan * @date : 2022/11/1 */
    default EventModel<EventMessageModel<T>> initModel(EventMessageModel<T> messageModel, BaseEventTypeEnum typeEnum) { return new EventModel<EventMessageModel<T>>() .setEventId(UUID.fastUUID().toString()) .setEventType(typeEnum.getEvent().toString()) .setEventTime(System.currentTimeMillis()) .setData(messageModel); } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    四、发送消息接口及实现类

    提示:此方法是一个公共发送消息的接口

    代码如下:

    /**
     * 功能描述: 发送消息
     * 〈〉
     *
     * @author : huan
     * @date   : 2022/11/1
     */
    public interface MessageSendService<T> {
    
        /**
         * 功能描述: 发送消息
         * 〈〉
         *
         * @param businessId    业务id
         * @param t             业务数据(用于拦截消息发送)
         * @param typeEnum      消息枚举
         * @return : void
         * @author : huan
         * @date   : 2022/11/2
         */
        void send(String businessId, T t, BaseEventTypeEnum typeEnum);
    
        /**
         * 功能描述: 发送消息
         * 〈〉
         *
         * @param businessId
         * @param typeEnum
         * @return : void
         * @author : huan
         * @date   : 2022/11/2
         */
        void send(String businessId, BaseEventTypeEnum typeEnum);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    提示:此方法是一个公共发送消息的实现类

    代码如下:

    import cn.hutool.json.JSONUtil;
    import com.saas.common.core.utils.SpringContextUtils;
    import com.saas.common.rabbitmq.enums.BaseEventTypeEnum;
    import com.saas.common.rabbitmq.model.EventMessageModel;
    import com.saas.common.rabbitmq.model.EventModel;
    import com.saas.common.rabbitmq.service.MessageProduceService;
    import com.saas.common.rabbitmq.service.MessageSendService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.Optional;
    
    /**
     * 功能描述: 通用发送消息业务层
     * 〈〉
     *
     * @author : huan
     * @date   : 2022/11/1
     */
    @Slf4j
    @Component
    public class MessageSendServiceImpl<T> implements MessageSendService {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Override
        public void send(String businessId, Object obj, BaseEventTypeEnum typeEnum) {
            log.info("【消息】 发送参数 -> businessId:{}, obj:{}, typeEnum:{}", businessId, obj, typeEnum.getEvent());
    
            MessageProduceService produceService = getService(typeEnum.getServiceName().toString());
            log.info("【消息】 获取到的实例:{}", produceService.getClass().getName());
    
            // 是否允许发送
            Boolean sendFlag = produceService.preHandle(obj);
            if(!sendFlag) {
                log.warn("【消息】 禁止发送消息-> businessId:{}", businessId);
                return;
            }
    
            // 构建消息体
            EventModel eventModel = produceService.buildEventModel(businessId, typeEnum);
    
            // 发送消息
            Optional.ofNullable(eventModel).ifPresent(model -> {
                doSend(eventModel, typeEnum, produceService);
            });
        }
    
        @Override
        public void send(String businessId, BaseEventTypeEnum typeEnum) {
            send(businessId, null, typeEnum);
        }
    
        /**
         * 功能描述: 获取指定service
         * 〈〉
         *
         * @param beanName
         * @return : MessageProduceService
         * @author : huan
         * @date   : 2022/11/2
         */
        protected MessageProduceService getService(String beanName) {
            return SpringContextUtils.getBean(beanName, MessageProduceService.class);
        }
    
        /**
         * 功能描述: 发送消息幂等
         * 〈〉
         *
         * @param eventModel
         * @return : void
         * @author : huan
         * @date   : 2022/11/1
         */
        private void doSend(EventModel<EventMessageModel> eventModel, BaseEventTypeEnum typeEnum, MessageProduceService produceService) {
            String desc = typeEnum.getDesc().toString();
            String event = typeEnum.getEvent().toString();
            String exchange = typeEnum.getExchange().toString();
    
            try{
                String msg = JSONUtil.toJsonStr(eventModel);
                rabbitTemplate.convertAndSend(exchange, event, msg);
                log.info("【{}】异步发送成功, exchange:{}, routingKey:{}, msg:{}", desc, exchange, event, msg);
            }catch (Exception e){
                // 消息发送失败处理-- 比如使用redis保存发送失败的消息
                produceService.postHandle(eventModel, typeEnum);
                log.error("【{}】 -> eventId:{} 异步发送失败 error:{}", desc, eventModel.getEventId(), e.getMessage());
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95

    如果 MessageSendServiceImpl 是在公共jar中,需要使用spring.factories 将bean加载到spring容器中(在resources文件夹中创建META-INF文件夹,在META-INF创建spring.factories文件)

    spring.factories文件内容如下:

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
      com.test.common.rabbitmq.service.impl.MessageSendServiceImpl
    
    • 1
    • 2

    在这里插入图片描述

    五、消息类型扩展

    1.自定义ContentEventTypeEnum枚举

    提示:不同的消息只需要实现 BaseEventTypeEnum,按ContentEventTypeEnum自定义即可

    代码如下:

    /**
     * 功能描述: 内容事件类型
     * 〈〉
     *
     * @author : huan
     * @date   : 2022/11/7
     */
    
    @AllArgsConstructor
    public enum ContentEventTypeEnum implements BaseEventTypeEnum {
    
        CREATE("content_add_event", "test_content","create","contentEventProducer","内容创建事件"),
    
        UPDATE_CONTENT("content_update_event", "test_content","update","contentEventProducer","内容修改事件"),
        UPDATE_SHELF("content_update_event", "test_content","shelf","contentEventProducer","内容上下架事件"),
        UPDATE_AUDIT("content_update_event", "test_content","audit","contentEventProducer","内容审核事件"),
    
        DELETE("content_update_event", "test_content","delete","contentEventProducer","内容删除事件"),
        ;
    
    	/** 事件 */
        @Getter
        private String event;
    	/** 交换机 */
        @Getter
        private String exchange;
    	/** 事件类型 */
        @Getter
        private String type;
    	/** 处理Bean的名称 */
        @Getter
        private String serviceName;
        /** 备注 */
        @Getter
        private String desc;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    2.消息bean生产者

    代码如下:

    
    /**
     * 功能描述: 消息事件生产者
     * 〈〉
     *
     * @author : huan
     * @date   : 2022/11/8
     */
    @Slf4j
    @Component
    public class ContentEventProducer implements MessageProduceService<TestBean, Boolean> {
    
        @Autowired
        private TestMapper testMapper;
    
        @Override
        public Boolean preHandle(Boolean flag) {
            return Optional.ofNullable(flag).orElse(Boolean.TRUE);
        }
    
        @Override
        public void postHandle(EventModel<EventMessageModel<TestBean>> eventModel, BaseEventTypeEnum typeEnum) {
            log.info("【内容】消息发送失败了!!!");
        }
    
        @Override
        public EventModel<EventMessageModel<TestBean>> buildEventModel(String businessId, BaseEventTypeEnum typeEnum) {
            // 获取业务数据
            TestBean testBean = testMapper.selectById(businessId);
            log.info("【内容】从库中查询到的信息:{}", cmsQuestion);
            if(!Optional.ofNullable(testBean).isPresent()) {
                log.error("【内容】从库中未查询到信息,终止消息发送 -> businessId:{}", businessId);
                return null;
            }
    
            Date businessDate = Optional.ofNullable(testBean.getUpdateTime())
                    .orElse(testBean.getCreateTime());
            // 构建消息体
            EventMessageModel messageModel = new EventMessageModel<TestBean>()
                    .setBusinessId(businessId)
                    .setBusinessType(typeEnum.getType().toString())
                    .setBusinessTime(businessDate.getTime())
                    .setBusinessData(testBean);
    
            // 设置类型
            EventModel<EventMessageModel<TestBean>> eventModel = initModel(messageModel, typeEnum);
    
            log.info("【内容】发送的消息体:{}", eventModel);
            return eventModel;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    3.消息发送

    代码如下:

    	@Resource
        private MessageSendService sendService;
    
        // 异步消息 - 具体消息类型,通过枚举ContentEventTypeEnum控制
        sendService.send(test.getId().toString(), ContentEventTypeEnum.DELETE);
    
    • 1
    • 2
    • 3
    • 4
    • 5

    4.消息消费

    代码如下:

    /**
     * 功能描述: 接口消息处理
     * 

    ** 注意 **

    *

    测试消费消息

    * * @author : huan * @date : 2022/11/2 */
    @Deprecated @Slf4j @Component public class ContentEventConsumer { @RabbitListener(containerFactory = "simpleRabbitListenerContainerFactory", bindings = @QueueBinding( // @Queue 属性value: 绑定的队列名称 - 自定义 value = @Queue(value = "test_content_update_queue", durable = "true"), // @Exchange 属性name: 交换器名称 // 对应 ContentEventTypeEnum中的 exchange exchange = @Exchange(name = "test_content", durable = "true", type = "topic"), // key: 路由key, 也可以理解为topic, 需与发送端保持一致, 否则无法监听到消息 // 对应 ContentEventTypeEnum中的event key = "content_update_event" )) public void handle(Channel channel, Message message, @Headers Map<String, Object> headers) { long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); try { String msg = new String(message.getBody(), StandardCharsets.UTF_8); log.info("【测试】消息消费成功: msg:{}", msg ); }catch(Exception e){ log.error("【测试】消息消费失败, errMsg:{}", e.getMessage()); e.printStackTrace(); }finally { try { channel.basicAck(deliveryTag,Boolean.FALSE); } catch (IOException e) { e.printStackTrace(); } } } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    总结

    1. 通过实现BaseEventTypeEnum 来实现自定义的消息类型
    2. 消息生产时候,我们只需要关注消息体的构建 buildEventModel方法,控制是否发送消息通过preHandle方法, 消息发送失败后,通过postHandle方法处理
  • 相关阅读:
    小满nestjs(第十六章 nestjs 响应拦截器)
    c# --- 泛型
    计算机设计大赛 深度学习疲劳检测 驾驶行为检测 - python opencv cnn
    神经网络与深度学习——第7章 网络优化与正则化
    android 代码设置静态Ip地址的方法
    Vue中...(扩展运算符)的作用
    Kali生成windows木马程序
    numpy中的seed
    11.20 至 11.27 五道典型题记录: 贪心 | 应用题 | 脑筋急转弯 | 区间问题 | 双指针
    九、react生命周期钩子(旧)
  • 原文地址:https://blog.csdn.net/qq_40915333/article/details/127772964