本文章通过MQ队列来实现秒杀场景
整体的设计如下图,整个流程中对于发送发MQ失败和发送到死信队列的数据未做后续处理

1、首先先创建MQ的配置文件
- @Configuration
- public class RabbitConfig {
-
-
- public static final String DEAD_LETTER_EXCHANGE = "deadLetterExchange";
- public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.#";
- public static final String DEAD_LETTER_QUEUEA_NAME = "deadQueue";
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Autowired
- private ConnectionFactory connectionFactory;
-
- @Bean
- public TopicExchange topicExchange(){
- return new TopicExchange("seckill_topic",true,false);
- }
-
- // 声明死信Exchange
- @Bean("deadLetterExchange")
- public DirectExchange deadLetterExchange(){
- return new DirectExchange(DEAD_LETTER_EXCHANGE);
- }
-
- @Bean("seckillQueue")
- public Queue seckillQueue(){
- Map
args = new HashMap<>(); - args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
- // x-dead-letter-routing-key 这里声明当前队列的死信路由key
- args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
- return QueueBuilder.durable("seckillQueue").withArguments(args).build();
-
- }
-
- @Bean("deadQueue")
- public Queue binding(){
- return new Queue(DEAD_LETTER_QUEUEA_NAME);
-
- }
-
- @Bean
- public Binding bindingExchange(){
- return BindingBuilder.bind(seckillQueue()).to(topicExchange()).with("seckill.#");
-
- }
-
- // 声明死信队列绑定关系
- @Bean
- public Binding deadLetterBinding(@Qualifier("deadQueue") Queue queue,
- @Qualifier("deadLetterExchange") DirectExchange exchange){
- return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
- }
-
- //配置会覆盖yml的重试次数
- //RabbitMQ监听容器
- /*@Bean
- public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- factory.setConnectionFactory(connectionFactory);
- //设置并发
- factory.setConcurrentConsumers(1);
- SimpleMessageListenerContainer s=new SimpleMessageListenerContainer();
- //最大并发
- factory.setMaxConcurrentConsumers(1);
- //消息接收——手动确认
- factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
- //设置超时
- factory.setReceiveTimeout(2000L);
- //设置重试间隔
- factory.setFailedDeclarationRetryInterval(3000L);
- //监听自定义格式转换
- //factory.setMessageConverter(jsonMessageConverter);
- return factory;
- }*/
- }
2、配置yml文件
- spring:
- redis:
- database: 0
- host: xxx
- port: 6379
- password: xxx
- timeout: 60
- jedis:
- pool:
- max-active: 8
- max-wait: -1
- max-idle: 8
- min-idle: 0
- rabbitmq:
- username: admin
- password: admin
- virtual-host: /
- host: xxxx
- port: 12345
- publisher-confirms: true
- publisher-returns: true
- template:
- mandatory: true
- listener:
- simple:
- concurrency: 1
- max-concurrency: 3
- # 消费者预取1条数据到内存,默认为250条
- prefetch: 1
- # 确定机制
- acknowledge-mode: manual
- retry:
- enabled: true #是否支持重试
- max-attempts: 2
- # 重试间隔(ms)
- initial-interval: 5000
这里有一点需要注意的是在做死信队列的时候如果Config文件中配置了监听容器,在yml文件中的一些属性要在容器里面进行配置,当时测试重试的时候发现没有在Config文件中配置,只在yml文件中配置了重试次数,结果会无限期的重试,MQ的默认方式就是无限期的重试,所以这点很容易踩坑
3、实现交换机的ACK,实现 RabbitTemplate.ConfirmCallback接口
- @Component
- public class ConfirmCallBackHandler implements RabbitTemplate.ConfirmCallback {
-
- @Autowired
- private RabbitMessageMapper rabbitMessageMapper;
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
- //注入
- //PostConstruct注解会在Component、Autowired注解完成后再执行
- @PostConstruct
- public void init(){
- rabbitTemplate.setConfirmCallback(this);
- }
-
-
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- if(!ack){
- RabbitMessage rabbitMessage = new RabbitMessage();
- rabbitMessage.setUniqueKey(correlationData.getId().toString());
- rabbitMessage.setSuccessFlag("N");
- rabbitMessageMapper.updateSuccessFlag(rabbitMessage);
- System.out.println("失败原因:"+cause);
- }
- }
- }
4、实现队列的ACK,实现 RabbitTemplate.ReturnCallback
- @Component
- public class ReturnCallBackHandler implements RabbitTemplate.ReturnCallback {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- //注入
- //PostConstruct注解会在Component、Autowired注解完成后再执行
- @PostConstruct
- public void init(){
- rabbitTemplate.setReturnCallback(this);
- }
-
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
- System.out.println("消息主体 message:"+message);
- System.out.println("应答码 replyCode: :"+replyCode);
- System.out.println("原因描述 replyText:"+replyText);
- System.out.println("交换机 exchange:"+exchange);
- System.out.println("消息使用的路由键 routingKey:"+routingKey);
- }
- }
5、消费者方面,实现 ChannelAwareMessageListener 接口
- @Component
- public class AckListener implements ChannelAwareMessageListener {
-
- @Autowired
- private RabbitMqService rabbitMqService;
-
- @RabbitListener(queues = "seckillQueue")
- @Override
- public void onMessage(Message messagex, Channel channel) throws Exception {
- try {
- String result = new String(messagex.getBody(),"utf-8");
- rabbitMqService.receive(result);
-
- channel.basicAck(messagex.getMessageProperties().getDeliveryTag(), false);
- }catch (Exception exception){
- channel.basicNack(messagex.getMessageProperties().getDeliveryTag(), false, false);
- }
- }
- }