• Redis之zset在异步队列上的应用


    当遇到并发的客户端请求时,为了缓解服务端的处理压力,当请求对响应的处理的实时性要求不高时,可以实现一个异步的请求消息队列。

    一种实现策略是使用redis的zset,将消息的到期处理时间作为score,然后用多个线程去轮训获取zset中的任务并进行处理。

    需要提前考虑一个问题:

    如何避免一个任务被多次处理?

    一种解决方案是当多个线程获取到任务时,调用redis的zrem命令,将该任务从指定的zset中移除(利用了redis处理命令时是顺序执行的)。

    环境

    • JDK17
    • 两个jar包
      • Jedis
      • fastjson2

    代码

    import com.alibaba.fastjson2.JSON;
    import com.alibaba.fastjson2.TypeReference;
    import redis.clients.jedis.Jedis;
    
    import java.lang.reflect.Type;
    import java.util.List;
    import java.util.UUID;
    
    // 基于Redis实现的延迟队列
    public class RedisDelayingQueue<T> {
        static class TaskItem<T> {
            public String id;
            public T msg;
        }
        // fastjson序列化对象时如果存在泛型,需要使用TypeReference
        private Type TaskType = new TypeReference<TaskItem<T>>(){}.getType();
    
        private Jedis jedis;
        private String queueKey;
    
        public RedisDelayingQueue(Jedis jedis, String queueKey) {
            this.jedis = jedis;
            this.queueKey = queueKey;
        }
    
        // 将任务添加到 zset 中
        // 分数是延时的时间
        public void delay(T msg) {
            TaskItem<T> task = new TaskItem<T>();
            task.id = UUID.randomUUID().toString();
            task.msg = msg;
            // 序列化任务
            String s = JSON.toJSONString(task);
            jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s);
        }
    
        public void loop() {
            while(!Thread.interrupted()) {
                // 从zset中取出一个任务
                List<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
                if(values.isEmpty()) {
                    try {
                        Thread.sleep(500);
                    } catch(InterruptedException e) {
                        break;
                    }
                    continue;
                }
                String s = values.iterator().next();
                if(jedis.zrem(queueKey, s) > 0) {
                    TaskItem<T> task = JSON.parseObject(s, TaskType);
                    this.handleMsg(task.msg);
                }
            }
        }
    
        public void handleMsg(T msg) {
            System.out.println(msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    优化

    通过上面loop中代码,多个线程获取到values时,可能会被多个线程同时取到,然后再调用zrem命令去竞争的删除该值,所以会有很多无用的网络请求发送到redis。更容易想到的方案是将取值然后删除的操作变成原子性的,两种实现方案:

    • 通过对代码块进行加锁的方式
    • 利用redis中lua脚本的原子执行的特点

    代码块加锁

    这种方案不太好,如果两个命令之间发生了网络错误或者延迟,将造成其它线程的阻塞

        public void synchronizedLoop() {
            while(!Thread.interrupted()) {
                synchronized(this) {
                    // 从zset中取出一个任务
                    List<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
                    if(values.isEmpty()) {
                        try {
                            Thread.sleep(500);
                        } catch(InterruptedException e) {
                            break;
                        }
                        continue;
                    }
                    String s = values.iterator().next();
                    if(jedis.zrem(queueKey, s) > 0) {
                        TaskItem<T> task = JSON.parseObject(s, TaskType);
                        this.handleMsg(task.msg);
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    Lua脚本

    local key = KEYS[1]
    local task = redis.call('ZPOPMIN', key)
    if task and next(task) != nil then
      redis.call('ZREM', key, task[1])
      return task[1]
    else
      return nil
    end
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    通过查阅文档发现,ZRANGEBYSCORE从6 版本开始已经过时了,所以这里使用ZPOPMIN来获取分数最小的value,可以达到相同的效果。

    通过Jedis的eval函数,调用redis执行lua脚本的命令。

        public void luaLoop() {
            while(!Thread.interrupted()) {
                Object ans = jedis.eval(script, 1, queueKey);
                if(ans != null) {
                    String task = (String) ans;
                    TaskItem<T> taskItem = JSON.parseObject(task, TaskType);
                    this.handleMsg(taskItem.msg);
                }else{
                    try{
                        Thread.sleep(500);
                    }catch(Exception e) {
                        break;
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    为什么可以优化

    • 使用lua脚本的方式,使得一个线程如果zset中有任务都会成功获取任务,而不会多个线程同时拿到同一个任务,再去竞争删除,减少了无效的网络IO

    测试程序

    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    
    public class Main {
        public static void main(String[] args) {
            JedisPool jedisPool = new JedisPool("url-of-redis", 6379, "username", "pass");
            Jedis jedis = jedisPool.getResource();
    
            RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
    
            // 创建一个线程充当生产者,并向redis中存10个异步任务
            Thread producer = new Thread() {
                public void run() {
                    for (int i = 0; i < 10; i++) {
                        queue.delay("codehole" + i);
                    }
                }
            };
    
            // 创建一个线程充当消费者,不断从redis中取任务并执行
            Thread consumer = new Thread() {
                public void run() {
                    queue.luaLoop();
                }
            };
    
            producer.start();
            consumer.start();
            try {
                // 等待生产者线程执行结束
                producer.join();
                Thread.sleep(6000);
                consumer.interrupt();
                consumer.join();
            }catch(InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    一些问题

    这个问题是关于Jedis的问题,因为我通过上面的方式发起redis请求实际上是存在并发问题的,如果将上述代码中的延时去掉,这个问题发生的概率将大大发生,主要是因为Jedis不是线程安全的,换句话说,通过JedisPool获取redis连接的实例,并发访问是是通过同一个socket发送数据的。

    这里使用时,最好是每个线程都用有一个Jedis的实例,避免数据竞争问题.这里只是用了两个线程,所以简单手动使用两个redis实例,如果有多个消费者存在的情况下,还是每个线程单独持有一个Jedis才能解决问题。

        private Jedis readJedis;
        private Jedis writeJedis;
    
    • 1
    • 2

    总结

    本篇文章记录了使用zset实现一个简单异步队列的过程,然后对于第一次实现存在的一个问题,使用lua或者锁的方式优化网络IO。使用锁的方式会降低程序的并发度,所以一般使用lua脚本的方式来实现。

  • 相关阅读:
    【云原生 · Kubernetes】apiserver高可用
    【限时优惠】RHCE9.0培训考证-红帽官方授权中心
    GORM 中SQL、慢SQL打印日志传递 trace ID, Kratos框架输出日志trace id
    【软考-中级】系统集成项目管理工程师 【16 变更管理】
    Lua使用方式介绍
    WinFrom、C# 学习记录五 开发一个鼠标自动点击小软件
    QLabel中显示图片,让图片保持比例进行放大缩小
    鸿鹄工程项目管理系统em Spring Cloud+Spring Boot+前后端分离构建工程项目管理系统
    docker配置nginx
    idea Transparent-native-to-ascii 是否需要勾选?
  • 原文地址:https://blog.csdn.net/Erictr/article/details/133956517