/* Cluster node flags and macros. */
#define CLUSTER_NODE_MASTER 1 /* The node is a master */
#define CLUSTER_NODE_SLAVE 2 /* The node is a slave */
#define CLUSTER_NODE_PFAIL 4 /* Failure? Need acknowledge */
#define CLUSTER_NODE_FAIL 8 /* The node is believed to be malfunctioning */
#define CLUSTER_NODE_MYSELF 16 /* This node is myself */
#define CLUSTER_NODE_HANDSHAKE 32 /* We have still to exchange the first ping */
#define CLUSTER_NODE_NOADDR 64 /* We don't know the address of this node */
#define CLUSTER_NODE_MEET 128 /* Send a MEET message to this node */
#define CLUSTER_NODE_MIGRATE_TO 256 /* Master eligible for replica migration. */
#define CLUSTER_NODE_NOFAILOVER 512 /* Slave will not try to failover. */
#define CLUSTER_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
/* This is executed 10 times every second */
void clusterCron(void) {
...
/* Iterate nodes to check if we need to flag something as failing.
* This loop is also responsible to:
* 1) Check if there are orphaned masters (masters without non failing
* slaves).
* 2) Count the max number of non failing slaves for a single master.
* 3) Count the number of slaves for our master, if we are a slave. */
orphaned_masters = 0;
max_slaves = 0;
this_slaves = 0;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
...
/* Check if this node looks unreachable.
* Note that if we already received the PONG, then node->ping_sent
* is zero, so can't reach this code at all, so we don't risk of
* checking for a PONG delay if we didn't sent the PING.
*
* We also consider every incoming data as proof of liveness, since
* our cluster bus link is also used for data: under heavy data
* load pong delays are possible. */
mstime_t node_delay = (ping_delay < data_delay) ? ping_delay :
data_delay;
if (node_delay > server.cluster_node_timeout) {
/* Timeout reached. Set the node as possibly failing if it is
* not already in this state. */
if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
node->name);
node->flags |= CLUSTER_NODE_PFAIL;
update_state = 1;
}
}
}
dictReleaseIterator(di);
...
}
/* This structure represent elements of node->fail_reports. */
typedef struct clusterNodeFailReport {
struct clusterNode *node; /* Node reporting the failure condition. */
mstime_t time; /* Time of the last report from this node. */
} clusterNodeFailReport;
typedef struct clusterNode {
mstime_t ctime; /* Node object creation time. */
char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
int flags; /* CLUSTER_NODE_... */
uint64_t configEpoch; /* Last configEpoch observed for this node */
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
uint16_t *slot_info_pairs; /* Slots info represented as (start/end) pair (consecutive index). */
int slot_info_pairs_count; /* Used number of slots in slot_info_pairs */
int numslots; /* Number of slots handled by this node */
int numslaves; /* Number of slave nodes, if this is a master */
struct clusterNode **slaves; /* pointers to slave nodes */
struct clusterNode *slaveof; /* pointer to the master node. Note that it
may be NULL even if the node is a slave
if we don't have the master node in our
tables. */
mstime_t ping_sent; /* Unix time we sent latest ping */
mstime_t pong_received; /* Unix time we received the pong */
mstime_t data_received; /* Unix time we received any data */
mstime_t fail_time; /* Unix time when FAIL flag was set */
mstime_t voted_time; /* Last time we voted for a slave of this master */
mstime_t repl_offset_time; /* Unix time we received offset for this node */
mstime_t orphaned_time; /* Starting time of orphaned master condition */
long long repl_offset; /* Last known repl offset for this node. */
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
sds hostname; /* The known hostname for this node */
int port; /* Latest known clients port (TLS or plain). */
int pport; /* Latest known clients plaintext port. Only used
if the main clients port is for TLS. */
int cport; /* Latest known cluster port of this node. */
clusterLink *link; /* TCP/IP link established toward this node */
clusterLink *inbound_link; /* TCP/IP link accepted from this node */
list *fail_reports; /* List of nodes signaling this as failing */
} clusterNode;
#define CLUSTER_FAIL_REPORT_VALIDITY_MULT 2 /* Fail report validity. */
/* Remove failure reports that are too old, where too old means reasonably
* older than the global node timeout. Note that anyway for a node to be
* flagged as FAIL we need to have a local PFAIL state that is at least
* older than the global node timeout, so we don't just trust the number
* of failure reports from other nodes. */
void clusterNodeCleanupFailureReports(clusterNode *node) {
list *l = node->fail_reports;
listNode *ln;
listIter li;
clusterNodeFailReport *fr;
mstime_t maxtime = server.cluster_node_timeout *
CLUSTER_FAIL_REPORT_VALIDITY_MULT;
mstime_t now = mstime();
listRewind(l,&li);
while ((ln = listNext(&li)) != NULL) {
fr = ln->value;
if (now - fr->time > maxtime) listDelNode(l,ln);
}
}
void markNodeAsFailingIfNeeded(clusterNode *node) {
int failures;
int needed_quorum = (server.cluster->size / 2) + 1;
if (!nodeTimedOut(node)) return; /* We can reach it. */
if (nodeFailed(node)) return; /* Already FAILing. */
failures = clusterNodeFailureReportsCount(node);
/* Also count myself as a voter if I'm a master. */
if (nodeIsMaster(myself)) failures++;
if (failures < needed_quorum) return; /* No weak agreement from masters. */
serverLog(LL_NOTICE,
"Marking node %.40s as failing (quorum reached).", node->name);
/* Mark the node as failing. */
node->flags &= ~CLUSTER_NODE_PFAIL;
node->flags |= CLUSTER_NODE_FAIL;
node->fail_time = mstime();
/* Broadcast the failing node name to everybody, forcing all the other
* reachable nodes to flag the node as FAIL.
* We do that even if this node is a replica and not a master: anyway
* the failing state is triggered collecting failure reports from masters,
* so here the replica is only helping propagating this status. */
clusterSendFail(node->name);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
}
当故障节点变成客观下线之后,如果下线节点是持有槽的主节点,则需要选择一个从节点替换ta。
主从断连的时间超过:server.cluster_node_timeout * server.cluster_slave_validity_factor,则没有资格
从节点的repl_offset越大,证明延迟越低,拥有的数据越多,选举优先级更高。怎么体现这个选举的优先级,通过延迟选举时间。比如ABC三个从节点,A的repl_offset > B的repl_offset > C的repl_offset,那么A延迟1s发起选举,B延迟2s,而C延迟3s
void clusterHandleSlaveFailover(void) {
mstime_t data_age;
mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
...
auth_timeout = server.cluster_node_timeout*2;
...
/* If the previous failover attempt timeout and the retry time has
* elapsed, we can setup a new one. */
if (auth_age > auth_retry_time) {
server.cluster->failover_auth_time = mstime() +
500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
random() % 500; /* Random delay between 0 and 500 milliseconds. */
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_sent = 0;
server.cluster->failover_auth_rank = clusterGetSlaveRank();
/* We add another delay that is proportional to the slave rank.
* Specifically 1 second * rank. This way slaves that have a probably
* less updated replication offset, are penalized. */
server.cluster->failover_auth_time +=
server.cluster->failover_auth_rank * 1000;
/* However if this is a manual failover, no delay is needed. */
if (server.cluster->mf_end) {
server.cluster->failover_auth_time = mstime();
server.cluster->failover_auth_rank = 0;
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
}
serverLog(LL_WARNING,
"Start of election delayed for %lld milliseconds "
"(rank #%d, offset %lld).",
server.cluster->failover_auth_time - mstime(),
server.cluster->failover_auth_rank,
replicationGetSlaveOffset());
/* Now that we have a scheduled election, broadcast our offset
* to all the other slaves so that they'll updated their offsets
* if our offset is better. */
clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
return;
}
...
/* Return ASAP if we can't still start the election. */
if (mstime() < server.cluster->failover_auth_time) {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
return;
}
/* Return ASAP if the election is too old to be valid. */
if (auth_age > auth_timeout) {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
return;
}
...
}
const char *clusterGetMessageTypeString(int type) {
switch(type) {
case CLUSTERMSG_TYPE_PING: return "ping";
case CLUSTERMSG_TYPE_PONG: return "pong";
case CLUSTERMSG_TYPE_MEET: return "meet";
case CLUSTERMSG_TYPE_FAIL: return "fail";
case CLUSTERMSG_TYPE_PUBLISH: return "publish";
case CLUSTERMSG_TYPE_PUBLISHSHARD: return "publishshard";
case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req";
case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack";
case CLUSTERMSG_TYPE_UPDATE: return "update";
case CLUSTERMSG_TYPE_MFSTART: return "mfstart";
case CLUSTERMSG_TYPE_MODULE: return "module";
}
return "unknown";
}
配置纪元概念
配置纪元更新方式
void clusterHandleConfigEpochCollision(clusterNode *sender) {
/* Prerequisites: nodes have the same configEpoch and are both masters. */
if (sender->configEpoch != myself->configEpoch ||
!nodeIsMaster(sender) || !nodeIsMaster(myself)) return;
/* Don't act if the colliding node has a smaller Node ID. */
/*
* 如果返回值 < 0,则表示 str1 小于 str2。
* 如果返回值 > 0,则表示 str2 小于 str1。
* 如果返回值 = 0,则表示 str1 等于 str2。
*/
if (memcmp(sender->name,myself->name,CLUSTER_NAMELEN) <= 0) return;
/* Get the next ID available at the best of this node knowledge. */
server.cluster->currentEpoch++;
myself->configEpoch = server.cluster->currentEpoch;
clusterSaveConfigOrDie(1);
serverLog(LL_VERBOSE,
"WARNING: configEpoch collision with node %.40s."
" configEpoch set to %llu",
sender->name,
(unsigned long long) myself->configEpoch);
}
/* Vote for the node asking for our vote if there are the conditions. */
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
...
/* We did not voted for a slave about this master for two
* times the node timeout. This is not strictly needed for correctness
* of the algorithm but makes the base case more linear. */
if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
{
serverLog(LL_WARNING,
"Failover auth denied to %.40s: "
"can't vote about this master before %lld milliseconds",
node->name,
(long long) ((server.cluster_node_timeout*2)-
(mstime() - node->slaveof->voted_time)));
return;
}
...
}
由以下三部分组成:
最后求和:故障转移的时间 <= cluster-node-timeout + cluster-node-timeout/2 + 1000
一般cluster-node-timeout默认值为15s,也不可以太短,太短可能还没收到半数以上的失败节点投票,失败节点列表就过期了。。
某分片A的主节点日志,重启主节点(变成了游离节点,等待别人连接)
│ Could not connect to Redis at 127.0.0.1:6379: Connection refused
│ 6:C 26 Jun 2022 12:10:20.431 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
│ 6:C 26 Jun 2022 12:10:20.431 # Redis version=5.0.0, bits=64, commit=00000000, modified=0, pid=6, just started
│ 6:C 26 Jun 2022 12:10:20.431 # Configuration loaded
│ 6:M 26 Jun 2022 12:10:20.432 * No cluster configuration found, I'm 84b64a150d13d61217f461955bac1a50b6dce6a4
│ 6:M 26 Jun 2022 12:10:20.435 * Running mode=cluster, port=6379.
│ 6:M 26 Jun 2022 12:10:20.435 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
│ 6:M 26 Jun 2022 12:10:20.435 # Server initialized
│ 6:M 26 Jun 2022 12:10:20.435 # WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never
│ 6:M 26 Jun 2022 12:10:20.435 * Ready to accept connections
某分片A的从节点日志
│ 6:S 26 Jun 2022 12:10:07.338 # Connection with master lost. # 与主节点连接断开
│ 6:S 26 Jun 2022 12:10:07.338 * Caching the disconnected master state.
│ 6:S 26 Jun 2022 12:10:08.030 * Connecting to MASTER 10.0.129.86:6379
│ 6:S 26 Jun 2022 12:10:08.030 * MASTER <-> REPLICA sync started
│ 6:S 26 Jun 2022 12:10:22.836 * FAIL message received from d84e8d7fc9596bea211746dc02fe2328a1b87c97 about 3dc7feb695bbddf6791be551a04dfd9d0c70937e
│ 6:S 26 Jun 2022 12:10:22.836 # Cluster state changed: fail
│ 6:S 26 Jun 2022 12:10:22.857 # Start of election delayed for 912 milliseconds (rank #0, offset 14728). #开始选举
│ 6:S 26 Jun 2022 12:10:23.861 # Starting a failover election for epoch 9. # 更新纪元开始选举
│ 6:S 26 Jun 2022 12:10:23.877 # Failover election won: I'm the new master. # 变成主节点
│ 6:S 26 Jun 2022 12:10:23.877 # configEpoch set to 9 after successful failover
│ 6:M 26 Jun 2022 12:10:23.877 # Setting secondary replication ID to a4370f1980adcc4e6805998e28bfc5f6f5b7b6b8, valid up to offset: 14729. New replication ID is e642e383ddac1b4464a90e416491f8298aa15030
│ 6:M 26 Jun 2022 12:10:23.877 * Discarding previously cached master state.
│ 6:M 26 Jun 2022 12:10:23.877 # Cluster state changed: ok
其他分片B的主节点
│ 6:M 26 Jun 2022 12:10:22.836 * Marking node 3dc7feb695bbddf6791be551a04dfd9d0c70937e as failing (quorum reached). # 认为分片A的主挂了,A的主客观下线
│ 6:M 26 Jun 2022 12:10:22.836 # Cluster state changed: fail
│ 6:M 26 Jun 2022 12:10:23.864 # Failover auth granted to f3425ec033cdafcc94ae8081f5d20724d76c2ef2 for epoch 9 #支持分片A的从成为主
│ 6:M 26 Jun 2022 12:10:23.879 # Cluster state changed: ok
其他分片C的主节点
│ 6:M 26 Jun 2022 12:10:22.838 * Marking node 3dc7feb695bbddf6791be551a04dfd9d0c70937e as failing (quorum reached). # 认为分片A的主挂了,A的主客观下线
│ 6:M 26 Jun 2022 12:10:22.838 # Cluster state changed: fail
│ 6:M 26 Jun 2022 12:10:23.866 # Failover auth granted to f3425ec033cdafcc94ae8081f5d20724d76c2ef2 for epoch 9 #支持分片A的从成为主
│ 6:M 26 Jun 2022 12:10:23.881 # Cluster state changed: ok