• Redis的延时队列


    延时队列

    redis的list数据结构用来做一部消息队列,使用rpush/lpush操作入队列。 使用lpop/rpop来出队列。

    > rpush notify-queue apple banana pear 
    (integer) 3 
    > llen notify-queue 
    (integer) 3 
    > lpop notify-queue 
    "apple"
    > llen notify-queue 
    (integer) 2 
    > lpop notify-queue 
    "banana"
    > llen notify-queue 
    (integer) 1 
    > lpop notify-queue 
    "pear"
    > llen notify-queue 
    (integer) 0 
    > lpop notify-queue 
    (nil)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    如果队列空了,客户端会陷入pop的死循环,不停的pop,没有数据,接着pop,又没有数据。空轮询会拉高客户端的CPU,redisde QPS也会被拉高。

    阻塞读在队列没有数据时,会立即进入休眠状态,一旦数据到来,则立刻醒来,消息的延迟几乎为0,用blpop/brpop替代前面的lpop/rpop。可解决上面问题。

    如果线程一直阻塞,redis客户端链接成了闲置链接,久了服务器会自动断开链接,以减少资源的占用。此时blpop/brpop会抛出异常

    实现

    演示队列可以通过redis的zset(有序列表)来实现。我们将消息序列化成一个字符串作为zset的value,这个消息的到期处理时间作为score,然后用多个线程轮询zset活到期任务进行处理,多个线程是为了保障可用性,万一挂了一个线程还有其他线程可以继续处理。因为有多个线程,所以需要考虑并发争抢任务,确保任务不能被多次执行。

    import java.lang.reflect.Type; 
    import java.util.Set; 
    import java.util.UUID;
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.TypeReference; 
    import redis.clients.jedis.Jedis; 
    public class RedisDelayingQueue<T> { 
    static class TaskItem<T> { 
    public String id; 
    public T msg; 
    } 
    // fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference 
    private Type TaskType = new TypeReference<TaskItem<T>>() { }.getType(); 
    private Jedis jedis; 
    private String queueKey; 
    public RedisDelayingQueue(Jedis jedis, String queueKey) { 
    this.jedis = jedis; 
    this.queueKey = queueKey; 
    } 
    public void delay(T msg) { 
    TaskItem task = new TaskItem(); 
    task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid 
    task.msg = msg; 
    String s = JSON.toJSONString(task); // fastjson 序列化
    jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试
    } 
    public void loop() { 
    while (!Thread.interrupted()) {
    // 只取一条
    Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1); 
    if (values.isEmpty()) { 
    try { 
    Thread.sleep(500); // 歇会继续
    } 
    catch (InterruptedException e) { 
    break;
    } 
    continue; 
    } 
    String s = values.iterator().next(); 
    if (jedis.zrem(queueKey, s) > 0) { // 抢到了
    TaskItem task = JSON.parseObject(s, TaskType); // fastjson 反序列化
    this.handleMsg(task.msg); 
    } 
    } 
    } 
    public void handleMsg(T msg) { 
    System.out.println(msg); 
    } 
    public static void main(String[] args) { 
    Jedis jedis = new Jedis(); 
    RedisDelayingQueue queue = new RedisDelayingQueue<>(jedis, "q-demo"); 
    Thread producer = new Thread() { 
    public void run() { 
    for (int i = 0; i < 10; i++) { 
    queue.delay("codehole" + i); 
    } 
    } 
    }; 
    Thread consumer = new Thread() { 
    public void run() { 
    queue.loop(); 
    } 
    }; 
    producer.start(); 
    consumer.start(); 
    try { 
    producer.join(); 
    Thread.sleep(6000); 
    consumer.interrupt(); 
    consumer.join(); 
    } 
    catch (InterruptedException e) { 
    } 
    } 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    Redis 作为消息队列为什么不能保证 100% 的可靠性?

    因为没有ask机制,当消费端崩溃后消息丢失。pop出消息后,list 中就没这个消息了,如果处理消息的程序拿到消息还未处理就挂掉了,那消息就丢失了,所以是不可靠队列

    Redis延时队列的优势

    1.Redis zset支持高性能的 score 排序。

    2.Redis是在内存上进行操作的,速度非常快。

    3.Redis可以搭建集群,当消息很多时候,我们可以用集群来提高消息处理的速度,提高可用性。

    4.Redis具有持久化机制,当出现故障的时候,可以通过AOF和RDB方式来对数据进行恢复,保证了数据的可靠性

    Redis延时队列劣势
    1. 使用 Redis 实现的延时消息队列也存在数据持久化, 消息可靠性的问题
    2. 没有重试机制 - 处理消息出现异常没有重试机制, 这些需要自己去实现, 包括重试次数的实现等
    3. 没有 ACK 机制 - 例如在获取消息并已经删除了消息情况下, 正在处理消息的时候客户端崩溃了, 这条正在处理的这些消息就会丢失, MQ 是需要明确的返回一个值给 MQ 才会认为这个消息是被正确的消费了,如果对消息可靠性要求较高, 推荐使用 MQ 来实现.
  • 相关阅读:
    ubuntu 清理缓存
    C++学习:函数重载
    设计模式【5】——观察者模式
    uniapp自定义组件
    LCM Sum (hard version)(树状数组,筛因子)
    Java 入门基础知识
    英语——分享篇——每日100词——701-800
    vscode在docker镜像环境编程
    递归:判断一个数是否是2的幂
    【笔记】Android 漫游定制SPN定制有关字段
  • 原文地址:https://blog.csdn.net/qq_51347907/article/details/126563828