• redis集群系列二


    1. 集群伸缩

    集群伸缩会导致分片数的增加,从而影响数据分布。以前16384个槽对应N个分片,现在对应M个分片。

    1.1 扩容集群

    扩容节点加入集群

    • 启动新的节点redis-server redis.conf
    • 加入集群,使用cluster meet(这里如果新节点已经加入了别的集群,那操作需慎重)
    • 集群节点通过一段时间的ping/pong通信之后,所有节点会发现新节点并将状态保存在本地nodes.conf
      • 其他节点是否发现了新节点,可以用cluster nodes命令check,程序中为了获取这个命令的结果,会调用clusterGenNodesDescription函数,使用内存中的最新信息生成结果返回
      • 其他节点本地的nodes.conf是通过clusterSaveConfig函数更新
    • 新节点刚开始都是主节点状态,后续对它的处理有两种
      • 迁移槽和数据实现扩容
      • 变成其他master节点的从

    迁移槽和数据

    1. 明确新节点负责的槽位(迁移需保证每个分片负责的槽数量大致相同)
    2. 逐个迁移槽位,每个槽位的迁移流程如下
      1. 向目标节点发送cluster setslot {slot} importing {sourceNodeId}
      2. 向源节点发送cluster setslot {slot} migrating {targetNodeId}
      3. 源节点循环执行cluster getkeysinslot {slot} {count} 获取slot上count个key
      4. 在源节点上执行migrate {targetIp} {targetPort} “” 0 {timeout} keys {keys…}。源节点批量迁移的migrate命令在Redis3.0.6以上版本提供。批量迁移可以减少节点之间的网络I/O次数
      5. 重复3和4,直到所有的slot就位
      6. 向集群内所有节点发送cluster setslot {slot} node {targetNodeId},通知大家槽分配给目标节点了(为了确保槽分配被及时传播)
    3. redis-trib有槽重新分片的功能

    扩容节点也需要增加从节点,保证高可靠

    1.2 收缩集群

    主要流程:

    1. 处理即将收缩节点所负责的槽(参考上述的迁移槽和数据的过程)
    2. 下线节点不负责槽位或者为从节点时,才可通知集群内的其他节点下线该节点
      1. cluster forget {downNodeId},当其他节点收到该命令,会将downNodeId节点加入禁用列表中。在禁用列表中的节点不会发送gossip消息,但禁用列表的有效期是60s,超过60s之后会再次进行消息交换。【在60s的时间,需要让集群内的所有节点忘记下线节点】
      2. 如果只下线主节点,要安排好从节点的归属
      3. 如果下线主从节点,建议先下线从节点,再下线主节点
    # 60s 有效期
    /* -----------------------------------------------------------------------------
     * CLUSTER nodes blacklist
     *
     * The nodes blacklist is just a way to ensure that a given node with a given
     * Node ID is not readded before some time elapsed (this time is specified
     * in seconds in CLUSTER_BLACKLIST_TTL).
     *
     * This is useful when we want to remove a node from the cluster completely:
     * when CLUSTER FORGET is called, it also puts the node into the blacklist so
     * that even if we receive gossip messages from other nodes that still remember
     * about the node we want to remove, we don't re-add it before some time.
     *
     * Currently the CLUSTER_BLACKLIST_TTL is set to 1 minute, this means
     * that redis-trib has 60 seconds to send CLUSTER FORGET messages to nodes
     * in the cluster without dealing with the problem of other nodes re-adding
     * back the node to nodes we already sent the FORGET command to.
     *
     * The data structure used is a hash table with an sds string representing
     * the node ID as key, and the time when it is ok to re-add the node as
     * value.
     * -------------------------------------------------------------------------- */
    
    #define CLUSTER_BLACKLIST_TTL 60      /* 1 minute. */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    2. 请求路由

    Redis集群对客户端通信协议做了比较大的修改,官方的客户端连接redis集群会直连

    2.1 请求重定向

    1. 集群模式下,当redis节点接收到key相关的命令,会先计算对应的slot,再根据slot找到节点。如果发现节点就是自己,直接操作;如果发现节点是别人,则给客户端返回{moved 正确节点}重定向错误。客户端收到之后,将连接正确节点进行操作。
    2. 使用命令cluster keyslot {key} 返回key对应的槽
    3. redis-cli 命令加上-c会自动支持重定向,redis-cli其实本质也是一个连接集群的客户端,在内部实现了如果key不属于所连接的节点,则向正确的节点再次发起请求

    槽的计算

    根据键的有效部分使用CRC16计算出散列值,然后%16384

    • 源代码见下,很清晰
    • 键的有效部分:如果没有{}则是全部key;如果有{}则是{}中间的key【该内容称之为hash_tag】
      • 通过hash_tag可以实现不同的key属于相同的slot,常用于RedisIO优化
        • 当我们想在集群模式下使用mget执行批量调用,键列表必须属于相同的slot否则会报错,这时可以使用hash_tag
        • pipeline只能向一个节点批量发送命令,也需要pipeline中涉及的key属于相同的slot
    unsigned int keyHashSlot(char *key, int keylen) {
        int s, e; /* start-end indexes of { and } */
    
        for (s = 0; s < keylen; s++)
            if (key[s] == '{') break;
    
        /* No '{' ? Hash the whole key. This is the base case. */
        if (s == keylen) return crc16(key,keylen) & 0x3FFF;
    
        /* '{' found? Check if we have the corresponding '}'. */
        for (e = s+1; e < keylen; e++)
            if (key[e] == '}') break;
    
        /* No '}' or nothing between {} ? Hash the whole key. */
        if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
    
        /* If we are here there is both a { and a } on its right. Hash
         * what is in the middle between { and }. */
        return crc16(key+s+1,e-s-1) & 0x3FFF;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    2.2 Redis集群客户端

    一般分为两种dummy client和smart client

    • dummy client:每次都随机连接一个节点,如果返回moved则重新发送请求
    • smart client:在内部维护slot和node的映射关系

    Java的Jedis框架

    相关的资料:

    1. 文档:https://javadoc.io/doc/redis.clients/jedis/latest/index.html

    2. 代码:https://github.com/redis/jedis (以下代码举例为当前master分支,最新的发版版本为4.2.3)

    3. jediscluster会选择一个节点,发送cluster slots获取槽与节点的映射关系

    4. jediscluster会解析cluster slots的结果缓存在本地,并为每一个节点创建唯一的jedisPool链接池

      1. 解析cluster slots的结果在JedisClusterInfoCache

        1. https://javadoc.io/doc/redis.clients/jedis/latest/redis/clients/jedis/JedisClusterInfoCache.html
      2. JedisCluster执行键命令的过程

      public final <T> T executeCommand(CommandObject<T> commandObject) {
        Instant deadline = Instant.now().plus(maxTotalRetriesDuration);
    
        JedisRedirectionException redirect = null;
        int consecutiveConnectionFailures = 0;
        Exception lastException = null;
        for (int attemptsLeft = this.maxAttempts; attemptsLeft > 0; attemptsLeft--) {
          Connection connection = null;
          try {
            if (redirect != null) {
               // 获取目标节点的连接
              connection = provider.getConnection(redirect.getTargetNode());
              if (redirect instanceof JedisAskDataException) {
                // TODO: Pipeline asking with the original command to make it faster....
                connection.executeCommand(Protocol.Command.ASKING);
              }
            } else {
              connection = provider.getConnection(commandObject.getArguments());
            }
    
            return connection.executeCommand(commandObject);
    
          } catch (JedisClusterOperationException jnrcne) {
            throw jnrcne;
          } catch (JedisConnectionException jce) {
            // 如果上次连接失败,则handleConnectionProblem函数中会遍历别的节点discoverClusterSlots然后刷新ta
            // 在刷新slot和node映射的过程中,需要加锁
            lastException = jce;
            ++consecutiveConnectionFailures;
            log.debug("Failed connecting to Redis: {}", connection, jce);
            // "- 1" because we just did one, but the attemptsLeft counter hasn't been decremented yet
            boolean reset = handleConnectionProblem(attemptsLeft - 1, consecutiveConnectionFailures, deadline);
            if (reset) {
              consecutiveConnectionFailures = 0;
              redirect = null;
            }
          } catch (JedisRedirectionException jre) {
            // avoid updating lastException if it is a connection exception
            if (lastException == null || lastException instanceof JedisRedirectionException) {
              lastException = jre;
            }
            log.debug("Redirected by server to {}", jre.getTargetNode());
            consecutiveConnectionFailures = 0;
            redirect = jre;
            // if MOVED redirection occurred, moved发生,则刷新slot和node映射的缓存
            if (jre instanceof JedisMovedDataException) {
              // it rebuilds cluster's slot cache recommended by Redis cluster specification
              provider.renewSlotCache(connection);
            }
          } finally {
            IOUtils.closeQuietly(connection);
          }
          if (Instant.now().isAfter(deadline)) {
            throw new JedisClusterOperationException("Cluster retry deadline exceeded.");
          }
        }
    
        JedisClusterOperationException maxAttemptsException
            = new JedisClusterOperationException("No more cluster attempts left.");
        maxAttemptsException.addSuppressed(lastException);
        throw maxAttemptsException;
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
      3. jedisCluster的这种方式,当集群规模大,客户端需要维护很多连接并消耗很多内存
    
      4. jedisCluster执行renewSlotCache的过程中,最新的版本相比以前的优化点
    
         1. 使用rediscoverLock保证同一时刻只有一个线程更新slots缓存
    
    • 1
    • 2
    • 3
    • 4
    • 5
    public void renewClusterSlots(Connection jedis) {
        // If rediscovering is already in process - no need to start one more same rediscovering, just return
        if (rediscoverLock.tryLock()) {
          try {
            // First, if jedis is available, use jedis renew.
            if (jedis != null) {
              try {
                discoverClusterSlots(jedis);
                return;
              } catch (JedisException e) {
                // try nodes from all pools
              }
            }
    
            // Then, we use startNodes to try, as long as startNodes is available,
            // whether it is vip, domain, or physical ip, it will succeed.
            if (startNodes != null) {
              for (HostAndPort hostAndPort : startNodes) {
                try (Connection j = new Connection(hostAndPort, clientConfig)) {
                  discoverClusterSlots(j);
                  return;
                } catch (JedisConnectionException e) {
                  // try next nodes
                }
              }
            }
    
            // Finally, we go back to the ShuffledNodesPool and try the remaining physical nodes.
            for (ConnectionPool jp : getShuffledNodesPool()) {
              try (Connection j = jp.getResource()) {
                // If already tried in startNodes, skip this node.
                if (startNodes != null && startNodes.contains(j.getHostAndPort())) {
                  continue;
                }
                discoverClusterSlots(j);
                return;
              } catch (JedisConnectionException e) {
                // try next nodes
              }
            }
    
          } finally {
            rediscoverLock.unlock();
          }
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    JedisCluster客户端定义

    1. JedisCluster的初始化方法有很多,举例其中之一

      1. soTimeout:读写超时
      2. poolConfig:JedisCluter会为RedisCluster的每个节点创建连接池
        1. 注意:JedisCluster一般不要执行close()操作,该操作会对所有的JedisPool执行destroy操作
         public JedisCluster(HostAndPort node, int connectionTimeout, int soTimeout, int maxAttempts,
             final GenericObjectPoolConfig<Connection> poolConfig) {
           this(Collections.singleton(node), connectionTimeout, soTimeout, maxAttempts, poolConfig);
         }
    
    • 1
    • 2
    • 3
    • 4
    1. 一些特殊的命令需要到多个节点上去执行,比如keys、flushall、删除指定模式的键等
      1. 代码实现中,需要获取所有节点的JedisPool,然后逐个在节点上实现
    2. 事务和Lua脚本,可以使用hash_tag标记key到同一个slot,然后获取指定slot的连接池执行

    JedisCluster遇到ASK重定向

    客户端ASK重定向流程
    1. ASK重定向发生在redis集群数据迁移的过程中。当一个slot数据从A节点迁移到B节点,可能有一部分key存在于A节点,另外一部分key存在于B节点
    2. 客户端会根据本地slots缓存发送命令到源节点。如果key还在源节点A则直接返回,如果key在目标节点B,源节点A会返回ASK重定向异常
    3. 客户端再根据ASK重定向异常提取目标节点的信息,发送asking命令到目标节点打开客户端连接标识,再执行键命令

    JedisClient收到moved重定向会更新slots缓存,但收到ask重定向并不会更新slots缓存,因为ask重定向是一个临时迁移中状态

    redis节点内部如何处理
    1. 当节点收到键命令,会根据clusterState内的迁移属性对命令进行处理。
    2. 需注意asking是一次性命令,每次执行完客户端标识都会修改回原状态,可以参考以下代码
    /* resetClient prepare the client to process the next command */
    void resetClient(client *c) {
        redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL;
    
        freeClientArgv(c);
        c->reqtype = 0;
        c->multibulklen = 0;
        c->bulklen = -1;
    
        /* We clear the ASKING flag as well if we are not inside a MULTI, and
         * if what we just executed is not the ASKING command itself. */
        if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand)
            c->flags &= ~CLIENT_ASKING;
    
        /* Remove the CLIENT_REPLY_SKIP flag if any so that the reply
         * to the next command will be sent, but set the flag if the command
         * we just processed was "CLIENT REPLY SKIP". */
        c->flags &= ~CLIENT_REPLY_SKIP;
        if (c->flags & CLIENT_REPLY_SKIP_NEXT) {
            c->flags |= CLIENT_REPLY_SKIP;
            c->flags &= ~CLIENT_REPLY_SKIP_NEXT;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    1. 在redis集群中,当槽处于迁移状态的时候,对于批量操作的case怎么办?(比如pipeline或者mget)
      1. jediscluster执行mget(如果key不存在同一个节点,则会报错)
      2. jediscluster执行pipeline:会逐个执行,并将结果返回到结果集。如果结果集里面有JedisAskDataException则客户端可以重写逻辑使得重新发送请求。为什么可以这么做呢
        1. 因为pipeline的结果是严格按照顺序返回的,即使有异常也是如此
        2. 通过ASK重定向的原理,我们可以手动发起ASK相关流程保证pipeline的正确执行
  • 相关阅读:
    PDF怎么合并?这些合并方法你该知道
    Linux编译器 -- gcc/g++ 调试器 -- gdb
    一次有趣的 DNS 导致 Node 服务故障问题分析实录
    十、Spring Boot 安全管理(4)
    HTML(基本标签)
    贪心 Leetcode 135 分发糖果
    新型攻击方法分析
    你真的会报bug吗?常见10条错误
    内向的软件开发工程师如何在职场站稳阵脚?
    JAVA工程师面试专题-《Redis》篇
  • 原文地址:https://blog.csdn.net/Alexia23/article/details/125475008