• RabbitMQ实现秒杀场景示例


    本文章通过MQ队列来实现秒杀场景

    整体的设计如下图,整个流程中对于发送发MQ失败和发送到死信队列的数据未做后续处理

    1、首先先创建MQ的配置文件

    1. @Configuration
    2. public class RabbitConfig {
    3. public static final String DEAD_LETTER_EXCHANGE = "deadLetterExchange";
    4. public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.#";
    5. public static final String DEAD_LETTER_QUEUEA_NAME = "deadQueue";
    6. @Autowired
    7. private RabbitTemplate rabbitTemplate;
    8. @Autowired
    9. private ConnectionFactory connectionFactory;
    10. @Bean
    11. public TopicExchange topicExchange(){
    12. return new TopicExchange("seckill_topic",true,false);
    13. }
    14. // 声明死信Exchange
    15. @Bean("deadLetterExchange")
    16. public DirectExchange deadLetterExchange(){
    17. return new DirectExchange(DEAD_LETTER_EXCHANGE);
    18. }
    19. @Bean("seckillQueue")
    20. public Queue seckillQueue(){
    21. Map args = new HashMap<>();
    22. args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
    23. // x-dead-letter-routing-key 这里声明当前队列的死信路由key
    24. args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
    25. return QueueBuilder.durable("seckillQueue").withArguments(args).build();
    26. }
    27. @Bean("deadQueue")
    28. public Queue binding(){
    29. return new Queue(DEAD_LETTER_QUEUEA_NAME);
    30. }
    31. @Bean
    32. public Binding bindingExchange(){
    33. return BindingBuilder.bind(seckillQueue()).to(topicExchange()).with("seckill.#");
    34. }
    35. // 声明死信队列绑定关系
    36. @Bean
    37. public Binding deadLetterBinding(@Qualifier("deadQueue") Queue queue,
    38. @Qualifier("deadLetterExchange") DirectExchange exchange){
    39. return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    40. }
    41. //配置会覆盖yml的重试次数
    42. //RabbitMQ监听容器
    43. /*@Bean
    44. public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
    45. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    46. factory.setConnectionFactory(connectionFactory);
    47. //设置并发
    48. factory.setConcurrentConsumers(1);
    49. SimpleMessageListenerContainer s=new SimpleMessageListenerContainer();
    50. //最大并发
    51. factory.setMaxConcurrentConsumers(1);
    52. //消息接收——手动确认
    53. factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
    54. //设置超时
    55. factory.setReceiveTimeout(2000L);
    56. //设置重试间隔
    57. factory.setFailedDeclarationRetryInterval(3000L);
    58. //监听自定义格式转换
    59. //factory.setMessageConverter(jsonMessageConverter);
    60. return factory;
    61. }*/
    62. }

    2、配置yml文件

    1. spring:
    2. redis:
    3. database: 0
    4. host: xxx
    5. port: 6379
    6. password: xxx
    7. timeout: 60
    8. jedis:
    9. pool:
    10. max-active: 8
    11. max-wait: -1
    12. max-idle: 8
    13. min-idle: 0
    14. rabbitmq:
    15. username: admin
    16. password: admin
    17. virtual-host: /
    18. host: xxxx
    19. port: 12345
    20. publisher-confirms: true
    21. publisher-returns: true
    22. template:
    23. mandatory: true
    24. listener:
    25. simple:
    26. concurrency: 1
    27. max-concurrency: 3
    28. # 消费者预取1条数据到内存,默认为250
    29. prefetch: 1
    30. # 确定机制
    31. acknowledge-mode: manual
    32. retry:
    33. enabled: true #是否支持重试
    34. max-attempts: 2
    35. # 重试间隔(ms)
    36. initial-interval: 5000

    这里有一点需要注意的是在做死信队列的时候如果Config文件中配置了监听容器,在yml文件中的一些属性要在容器里面进行配置,当时测试重试的时候发现没有在Config文件中配置,只在yml文件中配置了重试次数,结果会无限期的重试,MQ的默认方式就是无限期的重试,所以这点很容易踩坑

    3、实现交换机的ACK,实现 RabbitTemplate.ConfirmCallback接口

    1. @Component
    2. public class ConfirmCallBackHandler implements RabbitTemplate.ConfirmCallback {
    3. @Autowired
    4. private RabbitMessageMapper rabbitMessageMapper;
    5. @Autowired
    6. private RabbitTemplate rabbitTemplate;
    7. //注入
    8. //PostConstruct注解会在Component、Autowired注解完成后再执行
    9. @PostConstruct
    10. public void init(){
    11. rabbitTemplate.setConfirmCallback(this);
    12. }
    13. @Override
    14. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    15. if(!ack){
    16. RabbitMessage rabbitMessage = new RabbitMessage();
    17. rabbitMessage.setUniqueKey(correlationData.getId().toString());
    18. rabbitMessage.setSuccessFlag("N");
    19. rabbitMessageMapper.updateSuccessFlag(rabbitMessage);
    20. System.out.println("失败原因:"+cause);
    21. }
    22. }
    23. }

    4、实现队列的ACK,实现 RabbitTemplate.ReturnCallback

    1. @Component
    2. public class ReturnCallBackHandler implements RabbitTemplate.ReturnCallback {
    3. @Autowired
    4. private RabbitTemplate rabbitTemplate;
    5. //注入
    6. //PostConstruct注解会在Component、Autowired注解完成后再执行
    7. @PostConstruct
    8. public void init(){
    9. rabbitTemplate.setReturnCallback(this);
    10. }
    11. @Override
    12. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    13. System.out.println("消息主体 message:"+message);
    14. System.out.println("应答码 replyCode: :"+replyCode);
    15. System.out.println("原因描述 replyText:"+replyText);
    16. System.out.println("交换机 exchange:"+exchange);
    17. System.out.println("消息使用的路由键 routingKey:"+routingKey);
    18. }
    19. }

    5、消费者方面,实现 ChannelAwareMessageListener 接口

    1. @Component
    2. public class AckListener implements ChannelAwareMessageListener {
    3. @Autowired
    4. private RabbitMqService rabbitMqService;
    5. @RabbitListener(queues = "seckillQueue")
    6. @Override
    7. public void onMessage(Message messagex, Channel channel) throws Exception {
    8. try {
    9. String result = new String(messagex.getBody(),"utf-8");
    10. rabbitMqService.receive(result);
    11. channel.basicAck(messagex.getMessageProperties().getDeliveryTag(), false);
    12. }catch (Exception exception){
    13. channel.basicNack(messagex.getMessageProperties().getDeliveryTag(), false, false);
    14. }
    15. }
    16. }

  • 相关阅读:
    算法入门之队列
    在业务开发中遇到的树形结构(部门、区域、职位),递归处理。
    【Linux】信号简介与触发信号的几种方式
    vite搭建vue3项目
    【R语言实战】——金融时序ARIMA建模
    【深度学习】 自编码器(AutoEncoder)
    下个牛市来临时,哪些跨链应用有望成为新独角兽并值得提前布局?
    蓝桥杯:翻转旋转变换(矩阵旋转)
    python中的闭包函数&装饰器
    常见消息队列分析对比
  • 原文地址:https://blog.csdn.net/Yuanhaoxin/article/details/133208853