• 记录----RabbitMQ踩坑(一)


    业务场景

    用户触发某种行为动作,根据行为类型,分别执行延迟30s处理数据动作,和立即处理数据重做两种业务

    思路

    方案一:Redis key监听器,但是考虑实际key数量,放弃该方式
    方案二:方案RabbitMQ延迟队列实现,程序同时监听两种队列即可,简单方便

    踩坑记录

    因为不熟悉RabbitMQ相关配置,导致很多奇奇怪怪的问题,特此记录

    消费者篇

    一、需要手动确认ACK + 动态设置消费者上限

    理由:

    1. 手动ACK是为了确保消息处理完毕,但是!消息处理性能一定要跟上,否则就需要做幂等控制
    2. 动态设置消费者上限:是因为机器配置不一样,设置过大,可能导致低配的机器负载离谱

    application.yml 如下

    spring:
      # rabbitmq相关配置
      rabbitmq:
        # 链接超时时间
        connection-timeout: 1000
        cache:
          channel:
            # 要保留在高速缓存中的通道数
            size: 200
            # 如果已达到高速缓存大小,则等待获取通道的持续时间。如果为0,则始终创建一个新通道。
            checkout-timeout: 500
          connection:
            mode: channel
        publisher-returns: true
        publisher-confirm-type: simple
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    配置类如下

    import com.alibaba.fastjson.JSON;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.Objects;
    
    
    
    @Configuration
    public class RabbitMqConfig {
        @Bean
        public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
                                                                                   MessageConverter messageConverter) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setMessageConverter(messageConverter);
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            // 根据CPU数量,动态调整消费者最大值
            int i = Runtime.getRuntime().availableProcessors();
            factory.setMaxConcurrentConsumers(i * 5);
            return factory;
        }
    
        @Bean
        public MessageConverter jsonMessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    二、同时监听多个队列

    消费者代码如下

    
    import com.alibaba.fastjson.JSONObject;
    import com.qlm.yfb.util.log.LogUtils;
    import com.rabbitmq.client.Channel;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.messaging.handler.annotation.Header;
    
    import javax.annotation.PostConstruct;
    import java.io.IOException;
    
    /**
     * @author chunyang.leng
     * @date 2022-06-17 9:52 PM
     */
    @Configuration
    public class WechatDeadLetterConsumer {
    
        private static final Logger logger = LoggerFactory.getLogger(WechatDeadLetterConsumer.class);
    
        @PostConstruct
        private void postConstruct(){
            LogUtils.info(logger,"启动 ========> 微信延迟队列消费者");
        }
    
        @RabbitListener(
                autoStartup = "true",
                bindings = {
                        @QueueBinding(
                            value = @Queue(name = "subscribeDelayQueue"),
                            exchange = @Exchange(name = "deadLetterExchange"),
                            key = {"subscribeDelayQueueRoutingKey"}
                        ),
                        @QueueBinding(
                                value = @Queue(name = "clickMenuQueue"),
                                exchange = @Exchange(name = "wechatPushExchange"),
                                key = {"clickMenuQueueRoutingKey"}
                        )
                }
        )
        // 幂等检查注解
        @IdempotenceCheck(redisKeyPrefix = "delay:strategy:")
        public void enterpriseWechatDeadLetterHandler(EnterpriseWechatDelayDTO enterpriseWechatDelayDTO, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag){
            LogUtils.debug(logger,"消费到EnterpriseWechat死信队列数据:{}", JSONObject.toJSONString(enterpriseWechatDelayDTO));
            try {
                // todo 处理业务
          
            } catch (Exception e) {
                logger.error("rabbitmq 消费信息出现异常", e);
            } finally {
                try {
                    channel.basicAck(tag, false);
                } catch (IOException e) {
                    logger.error("rabbitmq ack出现异常", e);
                }
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    生产者篇

    因生产者和消费者框架不同,配置方式不太一样,生产者为Spring MVC

    配置类

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Recoverable;
    import com.rabbitmq.client.RecoveryListener;
    import com.rabbitmq.client.ShutdownSignalException;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.*;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
    import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.retry.policy.SimpleRetryPolicy;
    import org.springframework.retry.support.RetryTemplate;
    
    /**
     * @author chunyang.leng
     * @date 2021-04-13 9:32 上午
     */
    @Configuration
    @PropertySource("classpath:rabbit-push.properties")
    public class RabbitMqConfig {
        private static final Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class);
    
        @Value("${rabbit.push.address:192.168.1.2}")
        private String rabbitPushAddress;
        @Value("${rabbit.push.username:xxx}")
        private String rabbitPushUsername ;
        @Value("${rabbit.push.password:xxxxxx}")
        private String rabbitPushPassword;
        @Value("${rabbit.consumesize:10}")
        private Integer consumesize;
    
    
        @Bean
        public ConnectionFactory wechatPushConnectionFactory(ConnectionListener wechatPushConnectionListener,
                                                             RecoveryListener wechatPushRecoveryListener,
                                                             ChannelListener wechatPushChannelListener) {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses(rabbitPushAddress);
            connectionFactory.setUsername(rabbitPushUsername);
            connectionFactory.setPassword(rabbitPushPassword);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
    		// 坑一,正确值应该设置大一些,比如200
            connectionFactory.setChannelCacheSize(5);
            // 坑二,正确值应该设置大一些,比如2000,单位毫秒
            connectionFactory.setChannelCheckoutTimeout(0);
            connectionFactory.setPublisherReturns(true);
            connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.SIMPLE);
            connectionFactory.addConnectionListener(wechatPushConnectionListener);
            connectionFactory.addChannelListener(wechatPushChannelListener);
            connectionFactory.setRecoveryListener(wechatPushRecoveryListener);
            connectionFactory.getRabbitConnectionFactory().setRequestedChannelMax(500);
            return connectionFactory;
    
        }
    
        @Bean
        public RabbitListenerContainerFactory yfbPushListenerContainerFactory(ConnectionFactory wechatPushConnectionFactory,
                                                                              MessageConverter jsonMessageConverter) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(wechatPushConnectionFactory);
            factory.setMessageConverter(jsonMessageConverter);
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            factory.setConcurrentConsumers(consumesize);
            factory.setMaxConcurrentConsumers(consumesize);
            factory.setPrefetchCount(1000);
            factory.setChannelTransacted(false);
            factory.setDefaultRequeueRejected(false);
            factory.setErrorHandler(new ConditionalRejectingErrorHandler());
            return factory;
        }
    
        @Bean
        public MessageConverter jsonMessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    业务触发入口

    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    import org.springframework.stereotype.Service;
    
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    import java.util.Map;
    
    
    /**
     * 类名: MessageEventService </br>
     * 描述: 核心服务类 </br>
     */
    
    @Slf4j
    @RestController
    public class MessageEvent {
    
        @Autowired
        private SubscribeEventListener subscribeEventListener;
        @Autowired
        private ClickMenuMessageEventListener clickMenuMessageEventListener;
    
        /**
         * 处理微信发来的请求
         * @param request
         * @return xml
         */
        @PostMapping("handle")
        public String processRequest(HttpServletRequest request, HttpServletResponse response) {          
    	       // todo 用户关注事件
    	     subscribeEventListener.onApplicationEvent(new SubscribeEvent(openid));
    	                  
    	      // todo 用户点击菜单事件
    	     clickMenuMessageEventListener.onApplicationEvent(new ClickMenuEvent(openid));
          }              
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    Listener 超级大坑

    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationListener;
    import org.springframework.stereotype.Component;
    
    /**
     *
     *
     * @author chunyang.leng
     * @date 2022-06-20 1:19 PM
     */
    @Component
    public class ClickMenuMessageEventListener implements ApplicationListener<ClickMenuEvent> {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Override
        public void onApplicationEvent(ClickMenuEvent event) {
       
            String openId = event.getOpenId();
    
            EnterpriseWechatDelayDTO dto = new EnterpriseWechatDelayDTO();
            dto.setOpenId(openId);
            dto.setSource(EnterpriseWechatSource.CLICK_MENU_EVENT);
            dto.setDelayStrategy(DelayStrategyEnum.ENTERPRISE_WECHAT_CUSTOMER_SERVICE_PUSH);
            // 发送到mq,直接消费,不做延迟处理
            rabbitTemplate.convertAndSend("wechatPushExchange","clickMenuQueueRoutingKey",dto);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    生产者踩坑详解

    以上代码上线之后,出现服务器内存增加,连接数暴增等现象

    连接数异常

    在这里插入图片描述

    内存增长

    在这里插入图片描述

    问题分析过程

    1. 检查上线代码:查看是否存在链接泄漏等情况(无)
    2. 转储内存镜像(jmap命令)
    3. 转储线程运行情况(jstack命令)

    分析内存镜像,发现有大量http链接被卡住,导致内存增加
    在这里插入图片描述

    分析线程运行情况,发现大量amqp线程处理等待状态
    在这里插入图片描述
    查看amqp线程发现全部都在创建链接
    在这里插入图片描述

    查看源代码
    在这里插入图片描述
    在这里插入图片描述
    最后发现,ThreadLocal
    在这里插入图片描述
    tomcat因为是BIO模式,因此每次请求都是一个线程在处理,而在投递信息的时候,直接调用的send,因此这里会一直是null,直到。。。tomcat所有线程执行完毕,并且每个线程都通过ThreadLocal绑定channel,才不会在继续创建channel

    而且,tomcat设置的线程数,,嗯.配置如下,后果自己脑补吧
    在这里插入图片描述
    解决方案,,,在线程池中使用amqpTemplate即可
    修改后的示例代码

    
    rabbitMqThreadPool.execute(()->{
     	// 发布用户关注事件
         subscribeEventListener.onApplicationEvent(new SubscribeEvent(openid));
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5

    线程池配置

     @Bean
        public ThreadPoolTaskExecutor rabbitMqThreadPool(){
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            //获取到服务器的cpu内核
            int i = Runtime.getRuntime().availableProcessors();
            //核心池大小
            executor.setCorePoolSize(i);
            //最大线程数
            executor.setMaxPoolSize(i * 2);
            //队列长度
            executor.setQueueCapacity(60000);
            //线程空闲时间
            executor.setKeepAliveSeconds(1000);
            //线程前缀名称
            executor.setThreadNamePrefix("rabbitMq-thread-pool-");
            //配置拒绝策略
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
            return executor;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    这样 channel的数量,就是线程池的maxCore数量了,搞定收工

  • 相关阅读:
    DSA之查找(3):哈希表的查找
    Docker搭建ctfd平台
    redis异常:OOM command not allowed when used memory > ‘maxmemory‘
    fastadmin后台中文章分类ID改为名称显示
    均匀传输线的串扰和饱和长度
    Gradient Domain High Dynamic Range Compression
    2021年全球IB考试99人得满分,55考生来自新加坡
    @TableField(fill = FieldFill.INSERT)这个注解的作用
    7、Docker网络
    正弦信号的平均功率和峰值电压计算举例
  • 原文地址:https://blog.csdn.net/weixin_42321034/article/details/125436122