集群伸缩会导致分片数的增加,从而影响数据分布。以前16384个槽对应N个分片,现在对应M个分片。
主要流程:
# 60s 有效期
/* -----------------------------------------------------------------------------
* CLUSTER nodes blacklist
*
* The nodes blacklist is just a way to ensure that a given node with a given
* Node ID is not readded before some time elapsed (this time is specified
* in seconds in CLUSTER_BLACKLIST_TTL).
*
* This is useful when we want to remove a node from the cluster completely:
* when CLUSTER FORGET is called, it also puts the node into the blacklist so
* that even if we receive gossip messages from other nodes that still remember
* about the node we want to remove, we don't re-add it before some time.
*
* Currently the CLUSTER_BLACKLIST_TTL is set to 1 minute, this means
* that redis-trib has 60 seconds to send CLUSTER FORGET messages to nodes
* in the cluster without dealing with the problem of other nodes re-adding
* back the node to nodes we already sent the FORGET command to.
*
* The data structure used is a hash table with an sds string representing
* the node ID as key, and the time when it is ok to re-add the node as
* value.
* -------------------------------------------------------------------------- */
#define CLUSTER_BLACKLIST_TTL 60 /* 1 minute. */
Redis集群对客户端通信协议做了比较大的修改,官方的客户端连接redis集群会直连
根据键的有效部分使用CRC16计算出散列值,然后%16384
unsigned int keyHashSlot(char *key, int keylen) {
int s, e; /* start-end indexes of { and } */
for (s = 0; s < keylen; s++)
if (key[s] == '{') break;
/* No '{' ? Hash the whole key. This is the base case. */
if (s == keylen) return crc16(key,keylen) & 0x3FFF;
/* '{' found? Check if we have the corresponding '}'. */
for (e = s+1; e < keylen; e++)
if (key[e] == '}') break;
/* No '}' or nothing between {} ? Hash the whole key. */
if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
/* If we are here there is both a { and a } on its right. Hash
* what is in the middle between { and }. */
return crc16(key+s+1,e-s-1) & 0x3FFF;
}
一般分为两种dummy client和smart client
相关的资料:
文档:https://javadoc.io/doc/redis.clients/jedis/latest/index.html
代码:https://github.com/redis/jedis (以下代码举例为当前master分支,最新的发版版本为4.2.3)
jediscluster会选择一个节点,发送cluster slots获取槽与节点的映射关系
jediscluster会解析cluster slots的结果缓存在本地,并为每一个节点创建唯一的jedisPool链接池
解析cluster slots的结果在JedisClusterInfoCache
JedisCluster执行键命令的过程
public final <T> T executeCommand(CommandObject<T> commandObject) {
Instant deadline = Instant.now().plus(maxTotalRetriesDuration);
JedisRedirectionException redirect = null;
int consecutiveConnectionFailures = 0;
Exception lastException = null;
for (int attemptsLeft = this.maxAttempts; attemptsLeft > 0; attemptsLeft--) {
Connection connection = null;
try {
if (redirect != null) {
// 获取目标节点的连接
connection = provider.getConnection(redirect.getTargetNode());
if (redirect instanceof JedisAskDataException) {
// TODO: Pipeline asking with the original command to make it faster....
connection.executeCommand(Protocol.Command.ASKING);
}
} else {
connection = provider.getConnection(commandObject.getArguments());
}
return connection.executeCommand(commandObject);
} catch (JedisClusterOperationException jnrcne) {
throw jnrcne;
} catch (JedisConnectionException jce) {
// 如果上次连接失败,则handleConnectionProblem函数中会遍历别的节点discoverClusterSlots然后刷新ta
// 在刷新slot和node映射的过程中,需要加锁
lastException = jce;
++consecutiveConnectionFailures;
log.debug("Failed connecting to Redis: {}", connection, jce);
// "- 1" because we just did one, but the attemptsLeft counter hasn't been decremented yet
boolean reset = handleConnectionProblem(attemptsLeft - 1, consecutiveConnectionFailures, deadline);
if (reset) {
consecutiveConnectionFailures = 0;
redirect = null;
}
} catch (JedisRedirectionException jre) {
// avoid updating lastException if it is a connection exception
if (lastException == null || lastException instanceof JedisRedirectionException) {
lastException = jre;
}
log.debug("Redirected by server to {}", jre.getTargetNode());
consecutiveConnectionFailures = 0;
redirect = jre;
// if MOVED redirection occurred, moved发生,则刷新slot和node映射的缓存
if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster's slot cache recommended by Redis cluster specification
provider.renewSlotCache(connection);
}
} finally {
IOUtils.closeQuietly(connection);
}
if (Instant.now().isAfter(deadline)) {
throw new JedisClusterOperationException("Cluster retry deadline exceeded.");
}
}
JedisClusterOperationException maxAttemptsException
= new JedisClusterOperationException("No more cluster attempts left.");
maxAttemptsException.addSuppressed(lastException);
throw maxAttemptsException;
}
3. jedisCluster的这种方式,当集群规模大,客户端需要维护很多连接并消耗很多内存
4. jedisCluster执行renewSlotCache的过程中,最新的版本相比以前的优化点
1. 使用rediscoverLock保证同一时刻只有一个线程更新slots缓存
public void renewClusterSlots(Connection jedis) {
// If rediscovering is already in process - no need to start one more same rediscovering, just return
if (rediscoverLock.tryLock()) {
try {
// First, if jedis is available, use jedis renew.
if (jedis != null) {
try {
discoverClusterSlots(jedis);
return;
} catch (JedisException e) {
// try nodes from all pools
}
}
// Then, we use startNodes to try, as long as startNodes is available,
// whether it is vip, domain, or physical ip, it will succeed.
if (startNodes != null) {
for (HostAndPort hostAndPort : startNodes) {
try (Connection j = new Connection(hostAndPort, clientConfig)) {
discoverClusterSlots(j);
return;
} catch (JedisConnectionException e) {
// try next nodes
}
}
}
// Finally, we go back to the ShuffledNodesPool and try the remaining physical nodes.
for (ConnectionPool jp : getShuffledNodesPool()) {
try (Connection j = jp.getResource()) {
// If already tried in startNodes, skip this node.
if (startNodes != null && startNodes.contains(j.getHostAndPort())) {
continue;
}
discoverClusterSlots(j);
return;
} catch (JedisConnectionException e) {
// try next nodes
}
}
} finally {
rediscoverLock.unlock();
}
}
}
JedisCluster的初始化方法有很多,举例其中之一
public JedisCluster(HostAndPort node, int connectionTimeout, int soTimeout, int maxAttempts,
final GenericObjectPoolConfig<Connection> poolConfig) {
this(Collections.singleton(node), connectionTimeout, soTimeout, maxAttempts, poolConfig);
}
JedisClient收到moved重定向会更新slots缓存,但收到ask重定向并不会更新slots缓存,因为ask重定向是一个临时迁移中状态
/* resetClient prepare the client to process the next command */
void resetClient(client *c) {
redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL;
freeClientArgv(c);
c->reqtype = 0;
c->multibulklen = 0;
c->bulklen = -1;
/* We clear the ASKING flag as well if we are not inside a MULTI, and
* if what we just executed is not the ASKING command itself. */
if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand)
c->flags &= ~CLIENT_ASKING;
/* Remove the CLIENT_REPLY_SKIP flag if any so that the reply
* to the next command will be sent, but set the flag if the command
* we just processed was "CLIENT REPLY SKIP". */
c->flags &= ~CLIENT_REPLY_SKIP;
if (c->flags & CLIENT_REPLY_SKIP_NEXT) {
c->flags |= CLIENT_REPLY_SKIP;
c->flags &= ~CLIENT_REPLY_SKIP_NEXT;
}
}