rabbitmq 延时队列和死信队列的分析
一、生产者服务配置:
1、pom
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
2、服务器信息配置:
- spring:
- rabbitmq:
- host: 94.191.83.120
- port: 5672
- username: admin
- password: admin
- publisher-confirms: true
- virtual-host: /
3、类实例化配置:
-
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
- import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.retry.backoff.ExponentialBackOffPolicy;
- import org.springframework.retry.policy.SimpleRetryPolicy;
- import org.springframework.retry.support.RetryTemplate;
-
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
-
-
- @Slf4j
- @Configuration
- public class RabbitConfig {
-
- public final static String EXCHANGE_SECKILL = "order.seckill.delay.exchange";
- public final static String KEY_SECKILL = "order.seckill.delay.routingkey";
-
- public final static String EXCHANGE_SECKILL_DEAD = "order.seckill.dead.exchange";
- public final static String KEY_SECKILL_DEAD = "order.seckill.dead.routingkey";
-
- @Bean(name = "queueDelayMessage")
- public Queue queueDelayMessage() {
- Map<String,Object> arguments = new HashMap<>();
- arguments.put("x-message-ttl",1000 * 60 * 10);
- // arguments.put("x-expires",1000 * 60);
- // arguments.put("x-max-length",10000);
- // arguments.put("x-max-length-bytes",50*1024);
- arguments.put("x-dead-letter-exchange",EXCHANGE_SECKILL_DEAD);
- arguments.put("x-dead-letter-routing-key",KEY_SECKILL_DEAD);
- return new Queue("order.seckill.delay.queue", true, false, false,arguments);
- }
-
- @Bean(name = "exchange")
- public DirectExchange exchange() {
- return new DirectExchange(EXCHANGE_SECKILL, true, false);
- }
-
- @Bean
- public Binding bindingSecKillExchangeMessage(@Qualifier("queueDelayMessage") Queue queueMessage,
- @Qualifier("exchange") DirectExchange exchange) {
- return BindingBuilder
- .bind(queueMessage)
- .to(exchange)
- .with(KEY_SECKILL);
- }
-
- @Bean(name = "deadqueueMessage")
- public Queue deadqueueMessage() {
- return new Queue("order.seckill.dead.queue",true);
- }
-
- @Bean(name = "deadexchange")
- public DirectExchange deadexchange() {
- return new DirectExchange(EXCHANGE_SECKILL_DEAD,true,false);
- }
-
- @Bean
- Binding bindingDeadExchangeMessage(@Qualifier("deadqueueMessage") Queue deadqueueMessage,
- @Qualifier("deadexchange") DirectExchange deadexchange) {
- return BindingBuilder.bind(deadqueueMessage).to(deadexchange).
- with(KEY_SECKILL_DEAD);
- }
-
-
- @Bean
- public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory, List<SimpleMessageListenerContainer> list) {
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- factory.setConnectionFactory(connectionFactory);
- factory.setConcurrentConsumers(2);
- //抓取参数非常关键,一次抓取的消息多了,消费速度一慢,就会造成响应延迟,抓取少了又会导致并发量低,消息堵塞
- factory.setPrefetchCount(10);
-
- /*
- * AcknowledgeMode.NONE:自动确认
- * AcknowledgeMode.AUTO:根据情况确认
- * AcknowledgeMode.MANUAL:手动确认
- */
- factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
- /* factory.setDefaultRequeueRejected(false);
- factory.setAdviceChain(
- RetryInterceptorBuilder
- .stateless()
- .recoverer(new RejectAndDontRequeueRecoverer())
- .retryOperations(retryTemplate())
- .build()
- );*/
- return factory;
- }
-
- @Bean
- public RetryTemplate retryTemplate() {
- RetryTemplate retryTemplate = new RetryTemplate();
- /* retryTemplate.registerListener(new RetryListener() {
- @Override
- public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
- return false;
- }
- @Override
- public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
- }
- @Override
- public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
- }
- });*/
- retryTemplate.setBackOffPolicy(backOffPolicy());
- retryTemplate.setRetryPolicy(retryPolicy());
- return retryTemplate;
- }
-
- @Bean
- public ExponentialBackOffPolicy backOffPolicy() {
- ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
- backOffPolicy.setInitialInterval(1000);
- backOffPolicy.setMaxInterval(10000);
- return backOffPolicy;
- }
-
- @Bean
- public SimpleRetryPolicy retryPolicy() {
- SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
- retryPolicy.setMaxAttempts(3);
- return retryPolicy;
- }
-
- }
4、发送信息的工具类
-
-
- import cn.nan.config.RabbitConfig;
- import cn.nan.mall.vo.KillOrderVo;
- import com.alibaba.fastjson.JSON;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageProperties;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import java.util.UUID;
-
-
- @Slf4j
- @Component
- public class SecKillSender {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- public void send(KillOrderVo vo) {
- String msg1 = JSON.toJSONString(vo);
- log.info("TopicSender send the 1st : " + msg1);
- this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_SECKILL, RabbitConfig.KEY_SECKILL, msg1);
- }
-
- public void send(KillOrderVo vo,CorrelationData correlationData) {
- String msg1 = JSON.toJSONString(vo);
- log.info("TopicSender send the 1st : " + msg1);
- this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_SECKILL, RabbitConfig.KEY_SECKILL, msg1,correlationData);
- }
-
- public String sendAndReceive(KillOrderVo vo) {
- String msg1 = JSON.toJSONString(vo);
- //设置消息唯一id
- CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
- //直接发送message对象
- MessageProperties messageProperties = new MessageProperties();
- //过期时间10秒,也是为了减少消息挤压的可能
- // messageProperties.setExpiration("10000");
- messageProperties.setCorrelationId(correlationId.getId());
- Message message = new Message(msg1.getBytes(), messageProperties);
- log.info("TopicSender send the 1st : " + msg1);
- //设置消息唯一id
- Message message1 = rabbitTemplate.sendAndReceive(RabbitConfig.EXCHANGE_SECKILL, RabbitConfig.KEY_SECKILL, message, correlationId);
- return new String(message1.getBody());
- }
- }
5、业务生产信息的伪代码
- @Autowired
- private SecKillSender secKillSender;
-
- @RabbitListener(queues = "order.seckill.producer"/*,errorHandler = "rabbitConsumerListenerErrorHandler"*/)
- @RabbitHandler // 此注解加上之后可以接受对象型消息
- public void process(Message message, Channel channel, @Headers Map<String, Object> headers) throws Exception {
- try {
- String msg = new String(message.getBody());
- log.info("UserReceiver>>>>>>>接收到消息:" + msg);
- try {
- KillOrderVo vo = JSON.parseObject(msg, KillOrderVo.class);
- Long orderId = orderService.killOrder(vo);
- //把订单信息存储到缓存中
- // setOrderToRedis(vo);
- //发送消息到延迟队列
- secKillSender.send(vo);
- log.info("UserReceiver>>>>>>消息已消费");
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//手工确认,可接下一条
- } catch (Exception e) {
- System.out.println(e.getMessage());
- channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);//失败,则直接忽略此订单
- log.info("UserReceiver>>>>>>拒绝消息,直接忽略");
- throw e;
- }
- } catch (Exception e) {
- log.info(e.getMessage());
- }
- }
二、客户端消费者服务配置
1、pom文件配置同上
2、配置文件信息
- spring:
- rabbitmq:
- host: 94.191.83.120
- port: 5672
- username: admin
- password: admin
- publisher-confirms: true
- virtual-host: /
- publisher-returns: true
3、监听消费伪代码:
-
- @RabbitListener(queues = "order.seckill.dead.queue"/*,errorHandler = "rabbitConsumerListenerErrorHandler"*/)
- @RabbitHandler // 此注解加上之后可以接受对象型消息
- public void processDead(Message message, Channel channel, @Headers Map<String, Object> headers) throws Exception {
- try {
- String msg = new String(message.getBody());
- log.info("order.seckill.dead.queue>>>>>>>consumer:" + msg);
- try {
- KillOrderVo vo = JSON.parseObject(msg, KillOrderVo.class);
- //1、校验订单是否已经支付,查询该订单的支付状态
- Order order = orderService.search(vo.getOrderId());
- //2、如果未支付就把订单取消,修改订单状态
- if(order.getPayStatus() != null && "0".equals(order.getPayStatus().toString())) {
- orderService.selfCancel(vo.getOrderId(),vo.getUserId());
- //3、把库存+1操作
- String killGoodCount = KillConstants.KILL_GOOD_COUNT + vo.getKillGoodsSpecPriceDetailVo().getId();
- //返回的数值,执行了lua脚本
- Long stock = stock(killGoodCount, 1, STOCK_LUA_INCR);
- if(stock > 0) {
- log.info("---------增加库存成功---stock:" + stock);
- }
- }
- log.info("UserReceiver>>>>>>消息已消费");
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//手工确认,可接下一条
- } catch (Exception e) {
- log.info(e.getMessage());
- channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);//失败,则直接忽略此订单
- log.info("UserReceiver>>>>>>拒绝消息,直接忽略");
- throw e;
- }
- } catch (Exception e) {
- log.info(e.getMessage());
- }
- }
到此、rabbitmq 延时队列和死信队列的分析完毕,小伙伴多多联系,定会早日掌握。