本文给出的只是核心代码,完整源码请fork源码仓库查看:
https://gitee.com/DaHuYuXiXi/deley-task
如果对mq不太了解的,建议先看一下我在源码仓库中提供的Rabbitmq小书:
基于mq来实现延迟任务方案,相信各位很容易就可以想到mq中的延迟队列,延时队列就是用来存放需要在指定时间被处理的元素的队列。
基于mq延迟队列实现的大体思路如下:
package com.delayTask.rabbitmq;
/**
* @author zdh
*/
public class RabbitmqConstants {
//--------------------EXCHANGE--------------------------
public static final String ORDER_EXCHANGE = "orderExchange";
public static final String ORDER_DEAD_EXCHANGE = "orderDeadExchange";
//--------------------QUEUE--------------------------
public static final String ORDER_QUEUE = "orderQueue";
public static final String ORDER_DEAD_QUEUE = "orderDeadQueue";
//------------------ROUTE_KEY-----------------------------
public static final String ORDER_ROUTE_KEY="orderKey";
public static final String ORDER_DEAD_ROUTE_KEY="orderDeadKey";
}
package com.delayTask.rabbitmq;
import com.delayTask.DelayTaskEvent;
import com.delayTask.DelayTaskQueue;
import com.delayTask.delayQueue.OrderDelayEvent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import static com.delayTask.rabbitmq.RabbitmqConstants.*;
/**
* @author zdh
*/
@Data
@Builder
@Slf4j
@Component
@RequiredArgsConstructor
public class MqDelayQueue implements DelayTaskQueue<OrderDelayEvent,OrderDelayEvent> {
private final RabbitTemplate rabbitTemplate;
private final ObjectMapper objectMapper=new ObjectMapper();
/**
* 存放被取消的延迟任务集合
*/
private final Set<Long> cancelDelayTask=new ConcurrentSkipListSet<>();
/**
*
* 生成一个延迟任务加入延迟队列中去
*
*
* @param delayTaskEvent
* @return 可以定位此次延迟任务的标记
*/
@Override
public OrderDelayEvent produce(DelayTaskEvent delayTaskEvent) {
try {
rabbitTemplate.convertAndSend(ORDER_EXCHANGE,ORDER_ROUTE_KEY,objectMapper.writeValueAsString(delayTaskEvent),msg-> {
msg.getMessageProperties().setExpiration(String.valueOf(delayTaskEvent.getDelay(TimeUnit.MILLISECONDS)));
return msg;
});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return (OrderDelayEvent) delayTaskEvent;
}
/**
* 处理到期的延迟任务
*
* @param taskId
*/
@Override
public void consume(OrderDelayEvent taskId) {}
@RabbitListener(queues = ORDER_DEAD_QUEUE)
public void consume(Message message, Channel channel) throws Exception {
OrderDelayEvent orderDelayEvent = objectMapper.readValue(new String(message.getBody()), OrderDelayEvent.class);
log.info("消息队列中接收到一条消息: {}",orderDelayEvent);
//被取消的延迟任务,不再进行处理
if(cancelDelayTask.contains(orderDelayEvent.getId())){
cancelDelayTask.remove(orderDelayEvent.getId());
log.info("当前任务已被客户提交: {}",orderDelayEvent);
return;
}
orderDelayEvent.handleDelayEvent();
}
/**
*
* 取消taskId对应的延迟任务
*
*
* @param taskId 延迟任务标记
*/
@Override
public void cancel(OrderDelayEvent taskId) {
cancelDelayTask.add(taskId.getId());
taskId.getOrder().submitOrder();
}
}
package com.dhy.mq;
import com.delayTask.DelayTaskMain;
import com.delayTask.delayQueue.OrderDelayEvent;
import com.delayTask.delayQueue.OrderDelayFactory;
import com.delayTask.rabbitmq.MqDelayQueue;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
/**
* @author 大忽悠
* @create 2022/9/19 9:51
*/
@SpringBootTest(classes = DelayTaskMain.class)
public class MqDelayQueueTest {
@Resource
private MqDelayQueue mqDelayQueue;
@Test
public void testMqDelayQueue() throws InterruptedException {
OrderDelayEvent orderDelay = OrderDelayFactory.newOrderDelay("大忽悠", "小风扇", 13.4, 10L);
OrderDelayEvent orderDelay1 = OrderDelayFactory.newOrderDelay("小朋友", "冰箱", 3000.0, 15L);
mqDelayQueue.produce(orderDelay);
mqDelayQueue.produce(orderDelay1);
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
mqDelayQueue.cancel(orderDelay1);
Thread.sleep(TimeUnit.SECONDS.toMillis(30L));
}
}
目前看起来似乎没什么问题,但是如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
如果不能实现在消息粒度上的 TTL,并使其在设置的 TTL 时间及时死亡,就无法设计成一个通用的延时队列。那如何解决呢,接下来我们就去解决该问题。
为了解决这个问题,我们可以使用mq官方提供的插件来实现,该插件实现的思路是利用交换机来控制延迟消息何时推送给对应的队列。
延迟交换机:
通过给每个消息指定延迟发送时间,延迟交换机拿到这些消息后,不会立刻将其路由到某个队列,而是先保存起来,然后等待消息的延迟时间结束后,再将消息发送到指定的队列中去。
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.10.0/rabbitmq_delayed_message_exchange-3.10.0.ez
#如果下载速度比较慢,大家可以尝试使用下面的镜像连接进行下载
wget http://110.40.155.17/download/rabbitmq_delayed_message_exchange-3.10.0.ez
#解压
tar -zxvf 3.10.0.tar.gz
#宿主机文件 #容器ID(可缩写)或者容器名:容器内插件目录
docker cp rabbitmq_delayed_message_exchange-3.10.0.ez cf:/plugins
docker exec -it cf rabbitmq-plugins enable rabbitmq_delayed_message_exchange
docker restart cf
使用延迟交换机之后,我们整体的设计如下:
package com.delayTask.rabbitmq;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
import static com.delayTask.rabbitmq.RabbitmqConstants.*;
/**
* @author zdh
*/
@Configuration
public class RabbitConfigAdvanced {
//--------------------EXCHANGE--------------------------
@Bean(ORDER_DELAYED_EXCHANGE)
public CustomExchange orderExchange()
{
Map<String, Object> arguments = new HashMap<>();
//自定义交换机的类型
arguments.put("x-delayed-type","direct");
/*
* 1.交换机的名称
* 2.交换机的类型
* 3.是否需要持久化
* 4.是否需要自动删除
* 5.其他的参数
* */
return new CustomExchange(ORDER_DELAYED_EXCHANGE,
//延迟交换机类型
"x-delayed-message",
false,false,arguments);
}
//--------------------QUEUE--------------------------
@Bean(ORDER_DELAYED_QUEUE)
public Queue orderQueue(){
return QueueBuilder.nonDurable(ORDER_DELAYED_QUEUE).build();
}
//--------------------bind--------------------------
@Bean
public Binding orderDelayedBinding(@Qualifier(ORDER_DELAYED_QUEUE) Queue delayedQueue,
@Qualifier(ORDER_DELAYED_EXCHANGE) CustomExchange orderDelayedExchange){
return BindingBuilder.bind(delayedQueue).to(orderDelayedExchange).with(ORDER_DELAYED_ROUTE_KEY).noargs();
}
}
package com.delayTask.rabbitmq;
import com.delayTask.DelayTaskEvent;
import com.delayTask.DelayTaskQueue;
import com.delayTask.delayQueue.OrderDelayEvent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import lombok.Builder;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import static com.delayTask.rabbitmq.RabbitmqConstants.*;
/**
* @author zdh
*/
@Data
@Builder
@Slf4j
@Component
@RequiredArgsConstructor
public class MqDelayQueueAdvanced implements DelayTaskQueue<OrderDelayEvent,OrderDelayEvent> {
private final RabbitTemplate rabbitTemplate;
private final ObjectMapper objectMapper=new ObjectMapper();
/**
* 存放被取消的延迟任务集合
*/
private final Set<Long> cancelDelayTask=new ConcurrentSkipListSet<>();
/**
*
* 生成一个延迟任务加入延迟队列中去
*
*
* @param delayTaskEvent
* @return 可以定位此次延迟任务的标记
*/
@Override
public OrderDelayEvent produce(DelayTaskEvent delayTaskEvent) {
try {
rabbitTemplate.convertAndSend(ORDER_DELAYED_EXCHANGE,ORDER_DELAYED_ROUTE_KEY,objectMapper.writeValueAsString(delayTaskEvent),msg-> {
// 发送消息的时候 延迟时长 单位ms
msg.getMessageProperties().setDelay((int) delayTaskEvent.getDelay(TimeUnit.MILLISECONDS));
return msg;
});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return (OrderDelayEvent) delayTaskEvent;
}
/**
* 处理到期的延迟任务
*
* @param taskId
*/
@Override
public void consume(OrderDelayEvent taskId) {}
@RabbitListener(queues = ORDER_DELAYED_QUEUE)
public void consume(Message message) throws Exception {
OrderDelayEvent orderDelayEvent = objectMapper.readValue(new String(message.getBody()), OrderDelayEvent.class);
log.info("消息队列中接收到一条消息: {}",orderDelayEvent);
//被取消的延迟任务,不再进行处理
if(cancelDelayTask.contains(orderDelayEvent.getId())){
cancelDelayTask.remove(orderDelayEvent.getId());
log.info("当前任务已被客户提交: {}",orderDelayEvent);
return;
}
orderDelayEvent.handleDelayEvent();
}
/**
*
* 取消taskId对应的延迟任务
*
*
* @param taskId 延迟任务标记
*/
@Override
public void cancel(OrderDelayEvent taskId) {
cancelDelayTask.add(taskId.getId());
taskId.getOrder().submitOrder();
}
}
package com.dhy.mq;
import com.delayTask.DelayTaskMain;
import com.delayTask.delayQueue.OrderDelayEvent;
import com.delayTask.delayQueue.OrderDelayFactory;
import com.delayTask.rabbitmq.MqDelayQueueAdvanced;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
/**
* @author 大忽悠
* @create 2022/9/19 10:34
*/
@SpringBootTest(classes = DelayTaskMain.class)
public class MqDelayQueueAdvancedTest {
@Resource
private MqDelayQueueAdvanced mqDelayQueueAdvanced;
@Test
public void test() throws InterruptedException {
OrderDelayEvent orderDelay = OrderDelayFactory.newOrderDelay("大忽悠", "小风扇", 13.4, 15L);
OrderDelayEvent orderDelay1 = OrderDelayFactory.newOrderDelay("小朋友", "冰箱", 3000.0, 10L);
mqDelayQueueAdvanced.produce(orderDelay);
mqDelayQueueAdvanced.produce(orderDelay1);
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
mqDelayQueueAdvanced.cancel(orderDelay1);
Thread.sleep(TimeUnit.SECONDS.toMillis(30L));
}
}
本文只是简单的利用mq实现了一些延迟任务,代码细节上还存在诸多漏洞,希望能够给大家带来一些启发。