• rabbitmq 延时队列和死信队列的分析


    rabbitmq 延时队列和死信队列的分析

    一、生产者服务配置:

    1、pom

    1. <dependency>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter-amqp</artifactId>
    4. </dependency>

    2、服务器信息配置:

    1. spring:
    2. rabbitmq:
    3. host: 94.191.83.120
    4. port: 5672
    5. username: admin
    6. password: admin
    7. publisher-confirms: true
    8. virtual-host: /

    3、类实例化配置:

    1. import lombok.extern.slf4j.Slf4j;
    2. import org.springframework.amqp.core.*;
    3. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    4. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    5. import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
    6. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    7. import org.springframework.beans.factory.annotation.Qualifier;
    8. import org.springframework.context.annotation.Bean;
    9. import org.springframework.context.annotation.Configuration;
    10. import org.springframework.retry.backoff.ExponentialBackOffPolicy;
    11. import org.springframework.retry.policy.SimpleRetryPolicy;
    12. import org.springframework.retry.support.RetryTemplate;
    13. import java.util.HashMap;
    14. import java.util.List;
    15. import java.util.Map;
    16. @Slf4j
    17. @Configuration
    18. public class RabbitConfig {
    19. public final static String EXCHANGE_SECKILL = "order.seckill.delay.exchange";
    20. public final static String KEY_SECKILL = "order.seckill.delay.routingkey";
    21. public final static String EXCHANGE_SECKILL_DEAD = "order.seckill.dead.exchange";
    22. public final static String KEY_SECKILL_DEAD = "order.seckill.dead.routingkey";
    23. @Bean(name = "queueDelayMessage")
    24. public Queue queueDelayMessage() {
    25. Map<String,Object> arguments = new HashMap<>();
    26. arguments.put("x-message-ttl",1000 * 60 * 10);
    27. // arguments.put("x-expires",1000 * 60);
    28. // arguments.put("x-max-length",10000);
    29. // arguments.put("x-max-length-bytes",50*1024);
    30. arguments.put("x-dead-letter-exchange",EXCHANGE_SECKILL_DEAD);
    31. arguments.put("x-dead-letter-routing-key",KEY_SECKILL_DEAD);
    32. return new Queue("order.seckill.delay.queue", true, false, false,arguments);
    33. }
    34. @Bean(name = "exchange")
    35. public DirectExchange exchange() {
    36. return new DirectExchange(EXCHANGE_SECKILL, true, false);
    37. }
    38. @Bean
    39. public Binding bindingSecKillExchangeMessage(@Qualifier("queueDelayMessage") Queue queueMessage,
    40. @Qualifier("exchange") DirectExchange exchange) {
    41. return BindingBuilder
    42. .bind(queueMessage)
    43. .to(exchange)
    44. .with(KEY_SECKILL);
    45. }
    46. @Bean(name = "deadqueueMessage")
    47. public Queue deadqueueMessage() {
    48. return new Queue("order.seckill.dead.queue",true);
    49. }
    50. @Bean(name = "deadexchange")
    51. public DirectExchange deadexchange() {
    52. return new DirectExchange(EXCHANGE_SECKILL_DEAD,true,false);
    53. }
    54. @Bean
    55. Binding bindingDeadExchangeMessage(@Qualifier("deadqueueMessage") Queue deadqueueMessage,
    56. @Qualifier("deadexchange") DirectExchange deadexchange) {
    57. return BindingBuilder.bind(deadqueueMessage).to(deadexchange).
    58. with(KEY_SECKILL_DEAD);
    59. }
    60. @Bean
    61. public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory, List<SimpleMessageListenerContainer> list) {
    62. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    63. factory.setConnectionFactory(connectionFactory);
    64. factory.setConcurrentConsumers(2);
    65. //抓取参数非常关键,一次抓取的消息多了,消费速度一慢,就会造成响应延迟,抓取少了又会导致并发量低,消息堵塞
    66. factory.setPrefetchCount(10);
    67. /*
    68. * AcknowledgeMode.NONE:自动确认
    69. * AcknowledgeMode.AUTO:根据情况确认
    70. * AcknowledgeMode.MANUAL:手动确认
    71. */
    72. factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    73. /* factory.setDefaultRequeueRejected(false);
    74. factory.setAdviceChain(
    75. RetryInterceptorBuilder
    76. .stateless()
    77. .recoverer(new RejectAndDontRequeueRecoverer())
    78. .retryOperations(retryTemplate())
    79. .build()
    80. );*/
    81. return factory;
    82. }
    83. @Bean
    84. public RetryTemplate retryTemplate() {
    85. RetryTemplate retryTemplate = new RetryTemplate();
    86. /* retryTemplate.registerListener(new RetryListener() {
    87. @Override
    88. public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
    89. return false;
    90. }
    91. @Override
    92. public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
    93. }
    94. @Override
    95. public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
    96. }
    97. });*/
    98. retryTemplate.setBackOffPolicy(backOffPolicy());
    99. retryTemplate.setRetryPolicy(retryPolicy());
    100. return retryTemplate;
    101. }
    102. @Bean
    103. public ExponentialBackOffPolicy backOffPolicy() {
    104. ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    105. backOffPolicy.setInitialInterval(1000);
    106. backOffPolicy.setMaxInterval(10000);
    107. return backOffPolicy;
    108. }
    109. @Bean
    110. public SimpleRetryPolicy retryPolicy() {
    111. SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    112. retryPolicy.setMaxAttempts(3);
    113. return retryPolicy;
    114. }
    115. }

    4、发送信息的工具类

    1. import cn.nan.config.RabbitConfig;
    2. import cn.nan.mall.vo.KillOrderVo;
    3. import com.alibaba.fastjson.JSON;
    4. import lombok.extern.slf4j.Slf4j;
    5. import org.springframework.amqp.core.Message;
    6. import org.springframework.amqp.core.MessageProperties;
    7. import org.springframework.amqp.rabbit.connection.CorrelationData;
    8. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    9. import org.springframework.beans.factory.annotation.Autowired;
    10. import org.springframework.stereotype.Component;
    11. import java.util.UUID;
    12. @Slf4j
    13. @Component
    14. public class SecKillSender {
    15. @Autowired
    16. private RabbitTemplate rabbitTemplate;
    17. public void send(KillOrderVo vo) {
    18. String msg1 = JSON.toJSONString(vo);
    19. log.info("TopicSender send the 1st : " + msg1);
    20. this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_SECKILL, RabbitConfig.KEY_SECKILL, msg1);
    21. }
    22. public void send(KillOrderVo vo,CorrelationData correlationData) {
    23. String msg1 = JSON.toJSONString(vo);
    24. log.info("TopicSender send the 1st : " + msg1);
    25. this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_SECKILL, RabbitConfig.KEY_SECKILL, msg1,correlationData);
    26. }
    27. public String sendAndReceive(KillOrderVo vo) {
    28. String msg1 = JSON.toJSONString(vo);
    29. //设置消息唯一id
    30. CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
    31. //直接发送message对象
    32. MessageProperties messageProperties = new MessageProperties();
    33. //过期时间10秒,也是为了减少消息挤压的可能
    34. // messageProperties.setExpiration("10000");
    35. messageProperties.setCorrelationId(correlationId.getId());
    36. Message message = new Message(msg1.getBytes(), messageProperties);
    37. log.info("TopicSender send the 1st : " + msg1);
    38. //设置消息唯一id
    39. Message message1 = rabbitTemplate.sendAndReceive(RabbitConfig.EXCHANGE_SECKILL, RabbitConfig.KEY_SECKILL, message, correlationId);
    40. return new String(message1.getBody());
    41. }
    42. }

    5、业务生产信息的伪代码

    1. @Autowired
    2. private SecKillSender secKillSender;
    3. @RabbitListener(queues = "order.seckill.producer"/*,errorHandler = "rabbitConsumerListenerErrorHandler"*/)
    4. @RabbitHandler // 此注解加上之后可以接受对象型消息
    5. public void process(Message message, Channel channel, @Headers Map<String, Object> headers) throws Exception {
    6. try {
    7. String msg = new String(message.getBody());
    8. log.info("UserReceiver>>>>>>>接收到消息:" + msg);
    9. try {
    10. KillOrderVo vo = JSON.parseObject(msg, KillOrderVo.class);
    11. Long orderId = orderService.killOrder(vo);
    12. //把订单信息存储到缓存中
    13. // setOrderToRedis(vo);
    14. //发送消息到延迟队列
    15. secKillSender.send(vo);
    16. log.info("UserReceiver>>>>>>消息已消费");
    17. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//手工确认,可接下一条
    18. } catch (Exception e) {
    19. System.out.println(e.getMessage());
    20. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);//失败,则直接忽略此订单
    21. log.info("UserReceiver>>>>>>拒绝消息,直接忽略");
    22. throw e;
    23. }
    24. } catch (Exception e) {
    25. log.info(e.getMessage());
    26. }
    27. }

    二、客户端消费者服务配置

    1、pom文件配置同上

    2、配置文件信息

    1. spring:
    2. rabbitmq:
    3. host: 94.191.83.120
    4. port: 5672
    5. username: admin
    6. password: admin
    7. publisher-confirms: true
    8. virtual-host: /
    9. publisher-returns: true

    3、监听消费伪代码:

    1. @RabbitListener(queues = "order.seckill.dead.queue"/*,errorHandler = "rabbitConsumerListenerErrorHandler"*/)
    2. @RabbitHandler // 此注解加上之后可以接受对象型消息
    3. public void processDead(Message message, Channel channel, @Headers Map<String, Object> headers) throws Exception {
    4. try {
    5. String msg = new String(message.getBody());
    6. log.info("order.seckill.dead.queue>>>>>>>consumer:" + msg);
    7. try {
    8. KillOrderVo vo = JSON.parseObject(msg, KillOrderVo.class);
    9. //1、校验订单是否已经支付,查询该订单的支付状态
    10. Order order = orderService.search(vo.getOrderId());
    11. //2、如果未支付就把订单取消,修改订单状态
    12. if(order.getPayStatus() != null && "0".equals(order.getPayStatus().toString())) {
    13. orderService.selfCancel(vo.getOrderId(),vo.getUserId());
    14. //3、把库存+1操作
    15. String killGoodCount = KillConstants.KILL_GOOD_COUNT + vo.getKillGoodsSpecPriceDetailVo().getId();
    16. //返回的数值,执行了lua脚本
    17. Long stock = stock(killGoodCount, 1, STOCK_LUA_INCR);
    18. if(stock > 0) {
    19. log.info("---------增加库存成功---stock:" + stock);
    20. }
    21. }
    22. log.info("UserReceiver>>>>>>消息已消费");
    23. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//手工确认,可接下一条
    24. } catch (Exception e) {
    25. log.info(e.getMessage());
    26. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);//失败,则直接忽略此订单
    27. log.info("UserReceiver>>>>>>拒绝消息,直接忽略");
    28. throw e;
    29. }
    30. } catch (Exception e) {
    31. log.info(e.getMessage());
    32. }
    33. }

     到此、rabbitmq 延时队列和死信队列的分析完毕,小伙伴多多联系,定会早日掌握。

  • 相关阅读:
    下载视频号安装,下载视频号安装到手机上?
    Teamtalk登录流程详解,客户端和服务器交互流程分析
    LeetCode 1114 按序打印
    记一次惊险的CDH6.3.2集群断电后重启的过程
    TCP和UDP协议(深信服X计划)
    【router-view】切换组件 深刻理解用法 vue的设计思想
    C++ 指针调用
    【记一次vsan数据救援的经历】
    性能优化——动画优化笔记
    redisson springboot配置
  • 原文地址:https://blog.csdn.net/nandao158/article/details/125447909