• 【实践篇】redis管道pipeline使用详解



    在这里插入图片描述

    0. 前言

    Redis管道(Pipeline)是一种批量执行Redis命令的机制。通常情况下,客户端向Redis发送一个命令时,需要等待Redis服务器执行完该命令并返回结果后才能发送下一个命令。但使用管道可以在客户端一次性发送多个命令,然后等待Redis服务器一次性返回所有命令的结果,从而减少了多次网络往返的开销。

    在管道中,客户端发送的每个命令都会被Redis服务器缓存起来,而不是立即执行。当客户端调用执行命令的方法(如EXEC)时,Redis服务器会按照命令发送的顺序依次执行这些命令,并将执行结果一次性返回给客户端。这样就实现了一次性发送多个命令并一次性接收多个结果的效果。

    通过使用管道,可以大大提高Redis的性能和吞吐量。因为管道允许客户端一次性发送多个命令,减少了网络延迟和连接开销。此外,管道还支持原子性的事务操作,可以将多个命令封装在一个事务中进行执行。

    由于管道是无序的,所以返回结果的顺序可能与命令的发送顺序不一致。在使用管道时,客户端需要根据实际情况进行结果的解析和处理。

    什么场景下使用redis管道特性

    Redis管道特性在以下场景下可以发挥作用:

    1. 批量操作:当需要执行大量Redis操作时,使用管道可以将多个操作合并到一次网络往返中,减少了网络延迟和连接开销,大大提高了性能。例如批量写入、批量读取、批量删除等。

    2. 高吞吐量:管道可以并行地执行多个Redis操作,提高了系统的并发处理能力,从而实现高吞吐量的数据处理。

    3. 数据流水线:当需要依次执行多个Redis命令,并且后续命令的执行依赖于前面命令的结果时,可以使用管道来实现数据的流水线处理。例如计算、过滤、排序等操作可以通过管道一次性完成。

    4. 批量查询:当需要查询多个键对应的值时,使用管道可以将多次查询合并为一次查询,减少了多次网络往返的开销,提高了查询效率。

    5. 事务操作:在Redis中,事务是通过MULTI/EXEC命令实现的,使用管道可以将多个命令一起发送给Redis服务器,实现原子性的事务操作。这样可以减少网络往返的次数,提高事务的执行效率。

    Redis管道特性适用于需要高性能、高并发、批量操作的场景,可以有效地提升Redis的操作效率和系统的整体性能。但管道操作是无序的,返回结果的顺序可能与命令的发送顺序不一致,因此在使用管道时需要根据实际情况进行结果的解析和处理。

    1.原理

    Redis管道可以与TCP连接复用一起使用。在使用管道时,可以选择在同一个TCP连接上发送多个命令,而无需每次都建立和关闭连接。这样可以减少连接的开销,提高数据传输的效率。

    从TCP连接的角度来解释Redis管道提升性能原理,主要涉及到以下几个方面:

    1. 建立TCP连接:在客户端与Redis服务器建立连接时,会通过TCP三次握手来建立可靠的连接。客户端与服务器之间的通信会通过该TCP连接进行。

    2. 消息传输:在Redis管道中,客户端发送的多个命令请求会被缓存到请求队列中,而不是立即发送到Redis服务器。这些请求会在客户端本地进行缓存。

    3. 批量发送:当客户端调用执行命令的方法(如EXEC)时,客户端会将请求队列中的命令一次性发送到Redis服务器。这样就减少了网络往返的次数,提高了性能。客户端通过TCP连接将命令一批一批地发送给服务器。

    4. 执行命令:Redis服务器收到客户端发送的命令后,会按照请求队列中的命令顺序依次执行这些命令。执行完所有命令后,将执行结果一次性返回给客户端。

    5. 关闭连接:当管道中的所有命令执行完毕后,客户端可以选择关闭TCP连接或者继续发送其他命令。如果不关闭连接,客户端可以在后续的操作中继续使用该TCP连接进行通信。

    Redis管道的原理基于TCP连接的建立和消息传输机制。通过在客户端本地缓存多个命令请求,然后一次性发送到Redis服务器,减少网络往返的开销,提高性能。同时,通过TCP连接的保持,可以在不关闭连接的情况下继续使用管道进行后续的操作。

    1.1 redis管道特性的处理机制

    Redis的管道实现原理主要依靠以下两个机制:

    1. 请求队列:客户端发送的命令请求会先进入一个请求队列,而不是立即发送到Redis服务器。请求队列会按照命令的发送顺序缓存请求,保持命令的顺序不变。

    2. 批量执行:当客户端调用执行命令的方法(如EXEC)时,Redis服务器会按照请求队列中的命令顺序依次执行这些命令,并将执行结果一次性返回给客户端。这样就实现了一次性发送多个命令并一次性接收多个结果的效果。

    具体的实现流程如下:

    1. 客户端发送命令:客户端将多个命令发送到Redis服务器。这些命令会被添加到请求队列中。

    2. REDIS_PIPELINE 指令:客户端发送一个特殊的REDIS_PIPELINE指令给Redis服务器,表示开始使用管道。

    3. 请求队列缓存命令:Redis服务器收到REDIS_PIPELINE指令后,将客户端发送的命令添加到请求队列中。

    4. 执行命令并返回结果:当客户端调用执行命令的方法(如EXEC)时,Redis服务器会按照请求队列中的命令顺序依次执行这些命令。执行完所有命令后,将执行结果一次性返回给客户端。

    通过使用请求队列和批量执行机制,Redis的管道实现了一次性发送多个命令并一次性接收多个结果的效果,从而减少了多次网络往返的开销,提高了性能和吞吐量。同时,管道还支持原子性的事务操作,可以将多个命令封装在一个事务中进行执行。

    使用redis管道优化示例

    通常在项目中发现大量使用循环操作redis的示例。使用循环单独更新缓存这种在大数据量情况下存在很大性能问题。

    举个简单示例

    假设有一个需求是根据用户ID列表批量更新用户的缓存信息。原始的实现方式是使用循环单独更新每个用户的缓存信息,如下所示:

    import redis.clients.jedis.Jedis;
    
    public class CacheUpdater {
        private Jedis jedis;
    
        public CacheUpdater() {
            jedis = new Jedis("localhost", 6379);
        }
    
        public void updateCache(List<String> userIds) {
            for (String userId : userIds) {
                User user = getUserFromDB(userId); // 从数据库获取用户信息
                updateCache(user); // 更新缓存信息
            }
        }
    
        private void updateCache(User user) {
            String cacheKey = "user:" + user.getId();
            jedis.set(cacheKey, user.toJson());
        }
    
        private User getUserFromDB(String userId) {
            // 从数据库中获取用户信息的逻辑
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    上面代码中,每次循环都会单独更新一个用户的缓存信息,导致每次都要进行网络通信,性能较差。

    使用Redis管道优化后的代码如下所示:

    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.Pipeline;
    
    public class CacheUpdater {
        private Jedis jedis;
    
        public CacheUpdater() {
            jedis = new Jedis("localhost", 6379);
        }
    
        public void updateCache(List<String> userIds) {
            Pipeline pipeline = jedis.pipelined();
            for (String userId : userIds) {
                User user = getUserFromDB(userId); // 从数据库获取用户信息
                updateCache(user, pipeline); // 更新缓存信息
            }
            pipeline.syncAndReturnAll();
        }
    
        private void updateCache(User user, Pipeline pipeline) {
            String cacheKey = "user:" + user.getId();
            pipeline.set(cacheKey, user.toJson());
        }
    
        private User getUserFromDB(String userId) {
            // 从数据库中获取用户信息的逻辑
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    在优化后的代码中,创建了一个Redis管道Pipeline对象pipeline,然后在循环中使用管道的set命令批量更新缓存信息。最后,通过pipeline.syncAndReturnAll()一次性执行所有的管道命令。

    通过使用Redis管道优化,可以减少网络通信的开销,提高性能。

    3. springboot使用redis管道示例

    在Spring Boot中使用Redis管道,可以借助RedisTemplate和SessionCallback来实现。

    1. 如何使用Redis管道进行批量查询车辆最新位置GPS信息。
      注入RedisTemplate在getLatestGpsInfo方法中使用executePipelined方法来执行Redis管道操作。在管道中,遍历车辆ID列表,使用RedisConnection对象的get方法来获取对应车辆的GPS信息。
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.connection.RedisConnection;
    import org.springframework.data.redis.core.RedisCallback;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Service;
    
    import java.util.ArrayList;
    import java.util.List;
    
    @Service
    public class VehicleService {
        
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
    
        public List<GpsInfo> getLatestGpsInfo(List<String> vehicleIds) {
            List<GpsInfo> gpsInfoList = redisTemplate.executePipelined(new RedisCallback<List<GpsInfo>>() {
                @Override
                public List<GpsInfo> doInRedis(RedisConnection connection) {
                    for (String vehicleId : vehicleIds) {
                        connection.get(redisTemplate.getStringSerializer().serialize("gps:" + vehicleId));
                    }
                    return null;
                }
            });
    
            // 解析返回结果
            List<GpsInfo> result = new ArrayList<>();
            for (Object gpsInfoObj : gpsInfoList) {
                if (gpsInfoObj != null) {
                    String gpsStr = redisTemplate.getStringSerializer().deserialize((byte[]) gpsInfoObj);
                    GpsInfo gpsInfo = parseGpsInfo(gpsStr);
                    result.add(gpsInfo);
                }
            }
    
            return result;
        }
    
        private GpsInfo parseGpsInfo(String gpsStr) {
            // 解析GPS信息的逻辑
            // ...
            return gpsInfo;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    4. 参考资料

    1. Redis官方文档https://redis.io/topics/pipelining

    2. Redis协议https://redis.io/topics/protocol

    3. Redis管道的优势和用途https://blog.csdn.net/u012439416/article/details/80291006

    4. Redis管道的实现原理https://juejin.cn/post/6844903955324334093

    5. 源码地址

    https://github.com/wangshuai67/icepip-springboot-action-examples
    https://github.com/wangshuai67/Redis-Tutorial-2023

    6. Redis从入门到精通系列文章

  • 相关阅读:
    【0236】聊一聊PG内核中的命令标签(Command Tags、CommandTag、tag_behavior)
    C#进阶09——值类型和引用类型2
    蓝桥杯构造法|两道例题(C++)
    反射(类加载、加载流程、加载的五个阶段、获取类结构信息、反射暴破创建实例、操作属性、操作方法)
    从理解路由到实现一套Router(路由)
    开发者生态:共享知识,携手共进,共创技术辉煌
    基于SSM框架的人力资源管理系统毕业设计源码060936
    神经网络中神经元的权重更新
    SpringBoot SpringBoot 原理篇 1 自动配置 1.12 bean 的加载控制【注解式】
    4.物联网射频识别,RFID开发【智能门禁项目】
  • 原文地址:https://blog.csdn.net/wangshuai6707/article/details/132842311