redis的zset实现延迟队列
延迟队列是什么?
延时队列相比于普通队列最大的区别就体现在其延时的属性上,普通队列的元素是先进先出,按入队顺序进行处理,而延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理。从某种意义上来讲,延迟队列的结构并不像一个队列,而更像是一种以时间为权重的有序堆结构。
实现思路:
要实现延时队列Q,可以使用redis zSet的一些命令,比如生产者生成消息,就加入队列里,先简单用定时任务,通过当前的时间戳获取所有的消息,到期的消息自动消费.
发送消息,添加到队列里
key为队列的名称,score为当前的时间戳加上延迟时间,value为消息体
zadd key score value
根据当前时间戳获取所有的消息数据
key为队列的名称,min为0,max为当前的时间戳
zrangebyscore key min max
1,生产者实现
可以看到生产者很简单,其实就是利用zset的特性,给一个zset添加元素而已,而时间就是它的score。
public void produce(Integer taskId, long exeTime) {
System.out.println("加入任务, taskId: " + taskId + ", exeTime: " + exeTime + ", 当前时间:" + LocalDateTime.now());
RedisOps.getJedis().zadd(RedisOps.key, exeTime, String.valueOf(taskId));
}
2,消费者实现
消费者的代码也不难,就是把已经过期的zset中的元素给删除掉,然后处理数据。
public void consumer() {
Executors.newSingleThreadExecutor().submit(new Runnable() {
@Override
public void run() {
while (true) {
Set<String> taskIdSet = RedisOps.getJedis().zrangeByScore(RedisOps.key, 0, System.currentTimeMillis(), 0, 1);
if (taskIdSet == null || taskIdSet.isEmpty()) {
System.out.println("没有任务");
} else {
taskIdSet.forEach(id -> {
long result = RedisOps.getJedis().zrem(RedisOps.key, id);
if (result == 1L) {
System.out.println("从延时队列中获取到任务,taskId:" + id + " , 当前时间:" + LocalDateTime.now());
}
});
}
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}