• RabbitMQ应用:利用Rabbitmq做一个日志小框架,实现自动日志入库功能。


    𝑰’𝒎 𝒉𝒉𝒈, 𝑰 𝒂𝒎 𝒂 𝒈𝒓𝒂𝒅𝒖𝒂𝒕𝒆 𝒔𝒕𝒖𝒅𝒆𝒏𝒕 𝒇𝒓𝒐𝒎 𝑵𝒂𝒏𝒋𝒊𝒏𝒈, 𝑪𝒉𝒊𝒏𝒂.

    • 🏫 𝑺𝒉𝒄𝒐𝒐𝒍: 𝑯𝒐𝒉𝒂𝒊 𝑼𝒏𝒊𝒗𝒆𝒓𝒔𝒊𝒕𝒚
    • 🌱 𝑳𝒆𝒂𝒓𝒏𝒊𝒏𝒈: 𝑰’𝒎 𝒄𝒖𝒓𝒓𝒆𝒏𝒕𝒍𝒚 𝒍𝒆𝒂𝒓𝒏𝒊𝒏𝒈 𝒅𝒆𝒔𝒊𝒈𝒏 𝒑𝒂𝒕𝒕𝒆𝒓𝒏, 𝑳𝒆𝒆𝒕𝒄𝒐𝒅𝒆, 𝒅𝒊𝒔𝒕𝒓𝒊𝒃𝒖𝒕𝒆𝒅 𝒔𝒚𝒔𝒕𝒆𝒎, 𝒎𝒊𝒅𝒅𝒍𝒆𝒘𝒂𝒓𝒆 𝒂𝒏𝒅 𝒔𝒐 𝒐𝒏.
    • 💓 𝑯𝒐𝒘 𝒕𝒐 𝒓𝒆𝒂𝒄𝒉 𝒎𝒆:𝑽𝑿
    • 📚 𝑴𝒚 𝒃𝒍𝒐𝒈: 𝒉𝒕𝒕𝒑𝒔://𝒉𝒉𝒈𝒚𝒚𝒅𝒔.𝒃𝒍𝒐𝒈.𝒄𝒔𝒅𝒏.𝒏𝒆𝒕/
    • 💼 𝑷𝒓𝒐𝒇𝒆𝒔𝒔𝒊𝒐𝒏𝒂𝒍 𝒔𝒌𝒊𝒍𝒍𝒔:𝒎𝒚 𝒅𝒓𝒆𝒂𝒎

    前言

      之前一直都说rabbitmq可以用来做日志,我心里的思路就是把对象发送到mq里面,然后再另一个服务里面接受,入库就行,感觉整体流程应该就这样,差不多,于是开始了我的实现。

    环境说明:

    • RabbitMQ 3.10.7
    • Erlang 25.0.4
    • Ruoyi 3.4.0 因为我是直接改的若以,这样方便点,有现成的项目,可以直接改。

    1 rabbitmq docker部署

    1.1 yml文件

    root@hhg-Lenovo-Y430P:/home/hhg/docker/rabbitmq_test2# cat docker-compose.yml 
    version: '3'
    services:
      rabbitmq1:
        image: rabbitmq:management
        container_name: rabbitmq1
        restart: 'no'
        hostname: rabbitmq1
        ports:
          - "5675:5672"
          - "15673:15672"
        volumes:
          - ./data:/var/lib/rabbitmq
        environment:
          - RABBITMQ_DEFAULT_USER=rabbitmq
          - RABBITMQ_DEFAULT_PASS=rabbitmq
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    docker-compose启动就行,没什么好说的。

    2 Springboot中进行测试

      因为最后要模块化,我们先在一个springboot中去测试成功了,再用进去。上来就弄很多模块给你,你会乱的。

    2.1 application.yml

    server:
      port: 8021
    spring:
      #给项目来个名字
      application:
        name: rabbitmq-provider
      #配置rabbitMq 服务器
      rabbitmq:
        host: 192.168.2.151
        port: 5675
        username: rabbitmq
        password: rabbitmq
        #虚拟host 可以不设置,使用server默认host
        virtual-host: /
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    2.2 Rabbitmq配置

    @Configuration
    public class RabbitmqConfig implements RabbitListenerConfigurer {
        @Autowired
        private CachingConnectionFactory connectionFactory;
        //自动装配消息监听器所在的容器工厂配置类实例
    //    @Autowired
    //    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
    
        @Bean(name = "singleListenerContainer")
        public SimpleRabbitListenerContainerFactory listenerContainer() {
            //定义消息监听器所在的容器工厂
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            //设置容器工厂所用的实例
            factory.setConnectionFactory(connectionFactory);
            //设置消息在传输中的格式,在这里采用JSON的格式进行传输
            factory.setMessageConverter(producerJackson2MessageConverter());
    //        //设置并发消费者实例的初始数量。在这里为1个
    //        factory.setConcurrentConsumers(1);
    //        //设置并发消费者实例的最大数量。在这里为1个
    //        factory.setMaxConcurrentConsumers(1);
    //        //设置并发消费者实例中每个实例拉取的消息数量-在这里为1个
            factory.setPrefetchCount(1);
            // 关闭自动应答
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    
            // 设置批量消费
            factory.setBatchListener(true);
            factory.setBatchSize(3);
            factory.setConsumerBatchEnabled(true);
            return factory;
        }
    
        @Bean
        public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
            return rabbitTemplate;
        }
    
        @Override
        public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
            registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
        }
    
        @Bean
        MessageHandlerMethodFactory messageHandlerMethodFactory() {
            DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
            messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
            return messageHandlerMethodFactory;
        }
    
        @Bean
        public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
            return new MappingJackson2MessageConverter();
        }
        //队列 起名:springDirectQueue
        @Bean
        public Queue springDirectQueue() {
            // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,不然你电脑关机的时候队列就会消失。
            // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
            // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
    
            //一般设置一下队列的持久化就好,其余两个就是默认false
            return new Queue("springDirectQueue", true);
        }
    
        //Direct交换机 起名:springDirectExchange
        @Bean
        public DirectExchange TestDirectExchange() {
            return new DirectExchange("springDirectExchange", true, false);
        }
    
        //用两个队列,将队列和交换机绑定, 并设置用于匹配键:springDirectRouting1, springDirectRouting2,用来实验routingKey的作用
        @Bean
        public Binding bindingDirectQueue1() {
            return BindingBuilder.bind(springDirectQueue()).to(TestDirectExchange()).with("springDirectRouting1");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82

    这些配置的作用:

    1. 保证rabbitmq消费者代码能够直接序列化对象,就是将消息直接封装到你的对象里面。而不是直接操作Message 对象,为什么呢?因为我们如果先将对象序列化放进message的body里面,再从message中序列化弄出来,其实挺麻烦的。
    2. 设置了一个简单的队列和exchange,然后进行了绑定。
    3. 开启了批量消费,因为日志来一个message就处理一条,就往数据库里面插一条数据,那我能不能把几条消息拉到一起,形成一个list呢?然后通过批量插入呢?批量消费就能够实现这样的事情。

    2.3 测试代码

    2.3.1 消费单条message 自动转成对象

        /**
         * 如果是一条一条消费,这么做是可以的,可以直接封装进对象里面,然后进行手动ACK
         * @param message
         * @param channel
         * @param user
         * @throws Exception
         */
        @RabbitListener(queues = "springDirectQueue", containerFactory = "singleListenerContainer")
        public void processMessage2(Message message, Channel channel, SysUser user) throws Exception {
            System.out.println(user);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    2.3.2 消费多条message转成对象

    在这里插入图片描述
    官网对于批量消费,给出了这几种方案。

    • the first is called with the raw, unconverted org.springframework.amqp.core.Message s received.
    • the second is called with the org.springframework.messaging.Message s with converted payloads and mapped headers/properties.
    • the third is called with the converted payloads, with no access to headers/properteis.

    You can also add a Channel parameter, often used when using MANUAL ack mode. This is not very useful with the third example because you don’t have access to the delivery_tag property.
    所以我们都测试一下。

        /**
         * 批量消费, 直接把消费的信息装进list里面,形成usersList,缺点就是,没办法进行手动ACK了
         *
         * @param users
         * @throws Exception
         */
        @RabbitListener(queues = "springDirectQueue", containerFactory = "singleListenerContainer")
        public void processMessage(List<SysUser> users) throws Exception {
    //        System.out.println(users);
            for (SysUser user : users) {
                System.out.println(user);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
        /**
         * 批量消费,将几条消息用一个list来存储,对list进行遍历,将body提出来,转成list,同时进行手动ACK
         *
         * @param messages
         * @param channel
         * @throws Exception
         */
        @RabbitListener(queues = "springDirectQueue", containerFactory = "singleListenerContainer")
        public void processMessage3(List<Message> messages, Channel channel) throws Exception {
            List<SysUser> collect = messages.stream().map(message -> {
                try {
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                return JSON.parseObject(new String(message.getBody()), SysUser.class);
    
            }).collect(Collectors.toList());
            System.out.println(collect);
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    我的理想型,就是上面的这种,可以同时ACK以及,封装userList,但是需要多几行代码才行。

    3 ruoyi部分:通过AOP+RabbitMQ去实现日志记录

    3.1 模块说明

    模块:

    • ruoyi-common-log
      • RabbitmqLogQueueConfig:把日志需要的队列和exchange的声明放在这个里面。
      • service:在service里面设置发送具体消息的方法,比如发送到哪个exchange里面。
    • ruoyi-common-mq
      • RabbitMQConfig:把通用配置,序列化等,放在这个模块里面,这样可以多个模块公用。
      • ProduceService:利用mq进行send消息的通用方法。供其他模块使用。
    • ruoyi-modules-system
      • OperListener:监听的方法,用来接收消息,并且调用本模块里面的service实现批量入库这个功能。

    3.2 ruoyi-common-mq模块代码

    • RabbitmqConfig.java
    package com.ruoyi.common.mq.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.messaging.converter.MappingJackson2MessageConverter;
    import org.springframework.messaging.converter.MessageConverter;
    import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
    import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
    
    /**
     * @author xx
     * @date 2021/10/5 16:24
     */
    @Configuration
    public class RabbitmqConfig implements RabbitListenerConfigurer {
        @Autowired
        private CachingConnectionFactory connectionFactory;
        //自动装配消息监听器所在的容器工厂配置类实例
    //    @Autowired
    //    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
    
        @Bean(name = "singleListenerContainer")
        public SimpleRabbitListenerContainerFactory listenerContainer() {
            //定义消息监听器所在的容器工厂
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            //设置容器工厂所用的实例
            factory.setConnectionFactory(connectionFactory);
            //设置消息在传输中的格式,在这里采用JSON的格式进行传输
            factory.setMessageConverter(producerJackson2MessageConverter());
    //        //设置并发消费者实例的初始数量。在这里为1个
    //        factory.setConcurrentConsumers(1);
    //        //设置并发消费者实例的最大数量。在这里为1个
    //        factory.setMaxConcurrentConsumers(1);
    //        //设置并发消费者实例中每个实例拉取的消息数量-在这里为1个
            factory.setPrefetchCount(1);
            // 关闭自动应答
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    
            // 设置批量消费
            factory.setBatchListener(true);
            factory.setBatchSize(3);
            factory.setConsumerBatchEnabled(true);
            return factory;
        }
    
        @Bean
        public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
            return rabbitTemplate;
        }
    
        @Override
        public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
            registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
        }
    
        @Bean
        MessageHandlerMethodFactory messageHandlerMethodFactory() {
            DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
            messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
            return messageHandlerMethodFactory;
        }
    
        @Bean
        public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
            return new MappingJackson2MessageConverter();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84

    ruoyi-common-mq就是一个总体的配置设置,这样其他就可以不用再配了,直接使用。

    • ProduceService.java
    package com.ruoyi.common.mq.producer;
    
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    
    @Component
    public class ProduceService {
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        public <T> void send(String exchange, String routingKey, T object) {
    //        Map msgMap = new HashMap<>();
    //        msgMap.put("message", msg);
    //        String messageJson = JSONObject.toJSONString(msgMap);
    //        Message message = MessageBuilder
    //                .withBody(messageJson.getBytes())
    //                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
    //                .setContentEncoding("utf-8")
    //                .setMessageId(UUID.randomUUID() + "")
    //                .build();
    //        log.info("生产者发送:" + message);
            rabbitTemplate.convertAndSend(exchange, routingKey, object);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    3.3 ruoyi-common-log

    • LogAspect
    package com.ruoyi.common.log.aspect;
    
    import java.util.Collection;
    import java.util.Map;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    import org.aspectj.lang.JoinPoint;
    import org.aspectj.lang.annotation.AfterReturning;
    import org.aspectj.lang.annotation.AfterThrowing;
    import org.aspectj.lang.annotation.Aspect;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.http.HttpMethod;
    import org.springframework.stereotype.Component;
    import org.springframework.validation.BindingResult;
    import org.springframework.web.multipart.MultipartFile;
    import com.alibaba.fastjson.JSON;
    import com.ruoyi.common.core.utils.ServletUtils;
    import com.ruoyi.common.core.utils.StringUtils;
    import com.ruoyi.common.core.utils.ip.IpUtils;
    import com.ruoyi.common.log.annotation.Log;
    import com.ruoyi.common.log.enums.BusinessStatus;
    import com.ruoyi.common.log.service.AsyncLogService;
    import com.ruoyi.common.security.utils.SecurityUtils;
    import com.ruoyi.system.api.domain.SysOperLog;
    
    /**
     * 操作日志记录处理
     * 
     * @author ruoyi
     */
    @Aspect
    @Component
    public class LogAspect
    {
        private static final Logger log = LoggerFactory.getLogger(LogAspect.class);
        
        @Autowired
        private AsyncLogService asyncLogService;
    
        /**
         * 处理完请求后执行
         *
         * @param joinPoint 切点
         */
        @AfterReturning(pointcut = "@annotation(controllerLog)", returning = "jsonResult")
        public void doAfterReturning(JoinPoint joinPoint, Log controllerLog, Object jsonResult)
        {
            handleLog(joinPoint, controllerLog, null, jsonResult);
        }
    
        /**
         * 拦截异常操作
         * 
         * @param joinPoint 切点
         * @param e 异常
         */
        @AfterThrowing(value = "@annotation(controllerLog)", throwing = "e")
        public void doAfterThrowing(JoinPoint joinPoint, Log controllerLog, Exception e)
        {
            handleLog(joinPoint, controllerLog, e, null);
        }
    
        protected void handleLog(final JoinPoint joinPoint, Log controllerLog, final Exception e, Object jsonResult)
        {
            try
            {
                // *========数据库日志=========*//
                SysOperLog operLog = new SysOperLog();
                operLog.setStatus(BusinessStatus.SUCCESS.ordinal());
                // 请求的地址
                String ip = IpUtils.getIpAddr(ServletUtils.getRequest());
                operLog.setOperIp(ip);
                operLog.setOperUrl(ServletUtils.getRequest().getRequestURI());
                String username = SecurityUtils.getUsername();
                if (StringUtils.isNotBlank(username))
                {
                    operLog.setOperName(username);
                }
    
                if (e != null)
                {
                    operLog.setStatus(BusinessStatus.FAIL.ordinal());
                    operLog.setErrorMsg(StringUtils.substring(e.getMessage(), 0, 2000));
                }
                // 设置方法名称
                String className = joinPoint.getTarget().getClass().getName();
                String methodName = joinPoint.getSignature().getName();
                operLog.setMethod(className + "." + methodName + "()");
                // 设置请求方式
                operLog.setRequestMethod(ServletUtils.getRequest().getMethod());
                // 处理设置注解上的参数
                getControllerMethodDescription(joinPoint, controllerLog, operLog, jsonResult);
                // 保存数据库
                asyncLogService.saveSysLog(operLog);
            }
            catch (Exception exp)
            {
                // 记录本地异常日志
                log.error("==前置通知异常==");
                log.error("异常信息:{}", exp.getMessage());
                exp.printStackTrace();
            }
        }
    
        /**
         * 获取注解中对方法的描述信息 用于Controller层注解
         * 
         * @param log 日志
         * @param operLog 操作日志
         * @throws Exception
         */
        public void getControllerMethodDescription(JoinPoint joinPoint, Log log, SysOperLog operLog, Object jsonResult) throws Exception
        {
            // 设置action动作
            operLog.setBusinessType(log.businessType().ordinal());
            // 设置标题
            operLog.setTitle(log.title());
            // 设置操作人类别
            operLog.setOperatorType(log.operatorType().ordinal());
            // 是否需要保存request,参数和值
            if (log.isSaveRequestData())
            {
                // 获取参数的信息,传入到数据库中。
                setRequestValue(joinPoint, operLog);
            }
            // 是否需要保存response,参数和值
            if (log.isSaveResponseData() && StringUtils.isNotNull(jsonResult))
            {
                operLog.setJsonResult(StringUtils.substring(JSON.toJSONString(jsonResult), 0, 2000));
            }
        }
    
        /**
         * 获取请求的参数,放到log中
         * 
         * @param operLog 操作日志
         * @throws Exception 异常
         */
        private void setRequestValue(JoinPoint joinPoint, SysOperLog operLog) throws Exception
        {
            String requestMethod = operLog.getRequestMethod();
            if (HttpMethod.PUT.name().equals(requestMethod) || HttpMethod.POST.name().equals(requestMethod))
            {
                String params = argsArrayToString(joinPoint.getArgs());
                operLog.setOperParam(StringUtils.substring(params, 0, 2000));
            }
        }
    
        /**
         * 参数拼装
         */
        private String argsArrayToString(Object[] paramsArray)
        {
            String params = "";
            if (paramsArray != null && paramsArray.length > 0)
            {
                for (Object o : paramsArray)
                {
                    if (StringUtils.isNotNull(o) && !isFilterObject(o))
                    {
                        try
                        {
                            Object jsonObj = JSON.toJSON(o);
                            params += jsonObj.toString() + " ";
                        }
                        catch (Exception e)
                        {
                        }
                    }
                }
            }
            return params.trim();
        }
    
        /**
         * 判断是否需要过滤的对象。
         * 
         * @param o 对象信息。
         * @return 如果是需要过滤的对象,则返回true;否则返回false。
         */
        @SuppressWarnings("rawtypes")
        public boolean isFilterObject(final Object o)
        {
            Class<?> clazz = o.getClass();
            if (clazz.isArray())
            {
                return clazz.getComponentType().isAssignableFrom(MultipartFile.class);
            }
            else if (Collection.class.isAssignableFrom(clazz))
            {
                Collection collection = (Collection) o;
                for (Object value : collection)
                {
                    return value instanceof MultipartFile;
                }
            }
            else if (Map.class.isAssignableFrom(clazz))
            {
                Map map = (Map) o;
                for (Object value : map.entrySet())
                {
                    Map.Entry entry = (Map.Entry) value;
                    return entry.getValue() instanceof MultipartFile;
                }
            }
            return o instanceof MultipartFile || o instanceof HttpServletRequest || o instanceof HttpServletResponse
                    || o instanceof BindingResult;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211

      它这个AOP作用就在于@Log这个注解,只要遇到它就去执行,然后提取一些信息,通过log里面的代码去入库。

    • RabbitmqLogQueueConfig.java
    package com.ruoyi.common.log.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @author xx
     * @date 2021/10/5 16:24
     */
    @Configuration
    public class RabbitmqLogQueueConfig {
    
        //队列 起名:springDirectQueue
        @Bean
        public Queue springDirectQueue() {
            // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,不然你电脑关机的时候队列就会消失。
            // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
            // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
    
            //一般设置一下队列的持久化就好,其余两个就是默认false
            return new Queue("springOperLogQueue", true);
        }
    
        //Direct交换机 起名:springDirectExchange
        @Bean
        public DirectExchange TestDirectExchange() {
            return new DirectExchange("springOperLogExchange", true, false);
        }
    
        //用两个队列,将队列和交换机绑定, 并设置用于匹配键:springDirectRouting1, springDirectRouting2,用来实验routingKey的作用
        @Bean
        public Binding bindingDirectQueue1() {
            return BindingBuilder.bind(springDirectQueue()).to(TestDirectExchange()).with("springOperLog");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    把要用到的队列什么的配置好。

    • AsyncLogService.java
    package com.ruoyi.common.log.service;
    
    import com.ruoyi.common.mq.producer.ProduceService;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.stereotype.Service;
    import com.ruoyi.common.core.constant.SecurityConstants;
    import com.ruoyi.system.api.RemoteLogService;
    import com.ruoyi.system.api.domain.SysOperLog;
    
    import java.util.List;
    
    /**
     * 异步调用日志服务
     *
     * @author ruoyi
     */
    @Service
    public class AsyncLogService {
        @Autowired
        private RemoteLogService remoteLogService;
        @Autowired
        private ProduceService produceService;
    
        /**
         * 保存系统日志记录
         */
        @Async
        public void saveSysLog(SysOperLog sysOperLog) {
    //        remoteLogService.saveLog(sysOperLog, SecurityConstants.INNER);
            saveSysLogList(sysOperLog);
        }
    
        public void saveSysLogList(SysOperLog sysOperLog) {
            produceService.send("springOperLogExchange", "springOperLog", sysOperLog);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

      之前用的remoteService,也就是openfeign,我们这里就不用了,因为我们借助了消息队列,不需要远程服务了。所以我们改一下,换成向mq中发送消息的方式。

    3.4 ruoyi-modules-system消费端

    • OperListener
    package com.ruoyi.system.rabbitmqListener;
    
    import com.alibaba.fastjson.JSON;
    import com.rabbitmq.client.Channel;
    import com.ruoyi.system.api.domain.SysOperLog;
    import com.ruoyi.system.api.domain.SysUser;
    import com.ruoyi.system.service.ISysOperLogService;
    import org.apache.logging.log4j.Logger;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import org.springframework.transaction.annotation.Transactional;
    
    import java.io.IOException;
    import java.util.List;
    import java.util.stream.Collectors;
    
    /**
     * 利用rabbitmq实现异步log处理
     */
    @Component
    public class OperListener {
    
    
        @Autowired
        private ISysOperLogService sysOperLogService;
    
    
        @RabbitListener(queues = "springOperLogQueue", containerFactory = "singleListenerContainer")
        @Transactional(rollbackFor = Exception.class)
        public void processMessage3(List<Message> messages, Channel channel) throws Exception {
            System.out.println("开始消费");
            List<SysOperLog> collect = messages.stream().map(message -> {
                try {
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                return JSON.parseObject(new String(message.getBody()), SysOperLog.class);
            }).collect(Collectors.toList());
            sysOperLogService.inserOperlogList(collect);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    利用之前springboot中的测试代码,我们这里直接用起来,然后入库就行了。

    4 结语

      到这里就结束了,这个方法是可行的,然后代码我都测试过,没有问题,最后可以入库的,算是RabbitMQ的第一次应用吧,这里成功实现了日志入库的功能。

  • 相关阅读:
    Vue中模板语法与el 和 data 的两种写法
    4项简化IT服务台任务的ChatGPT功能
    十六、Lua 文件 I/O的学习
    Windows 下 bat 脚本调用 Git bash 环境 sh 脚本
    高德地图实现gps轨迹坐标定位代码
    Springboot整合Redis
    被Gartner列入十大战略技术趋势的“行业云”,不再是个伪命题?
    【MCAL_CANDriver】-1.5-图解CANFD如何兼容经典Classical CAN 2.0及其解决方案
    查看Linux系统信息的常用命令
    基于Prometheus+Grafana搭建可视化监控服务 (一) Prometheus监控
  • 原文地址:https://blog.csdn.net/qq_41376740/article/details/126531260