Redis 设计与实现:多机数据库的实现
1 主从复制
初次主从同步,后续命令传播。
# 只能完整重同步
sync
# 可以部分重同步
psync
# 初次复制,执行完整重同步
psync ? -1
# 之前已经复制过,则根据主服务器runid和从服务器偏移量offset信息判断,选择部分重同步/完整重同步
psync <runid> <offset>
1.1 部分重同步的实现
- 主从服务器的复制偏移量,主从服务器分别会维护一个复制偏移量。如果主从偏移量相等,则主从服务器处于一致状态;反之,主从服务器处于不一致状态,此时从服务器将向主服务器发送psync同步命令
- 主服务器的复制积压缓冲区,由主服务器维护的一个固定长度(可配置,根据需要调整缓冲区大小)的先进先出队列,里面保存着部分最近传播的命令。主服务器会检查从服务器发送psync命令中的偏移量,如果该偏移量之后的数据存在于复制积压缓冲区内,则执行部分重同步;反之不在复制积压缓冲区内,则执行完整重同步
- 服务器运行ID,每个Redis服务器,无论主从,都有自己的运行ID(run ID)。主从服务器初次复制时,主服务器会将运行ID发送给每个从服务器保存好,当某个主从连接断开重连后,从服务器会将之前保存的主服务器运行ID发送给现在连接的主服务器,主服务器判断自己的运行ID与之是否相同,从而决定执行部分重同步还是完整重同步。
1.2 复制的实现
- 从服务器根据slaveof命令信息,设置主服务器的地址和端口(本例为127.0.0.1:6379)
- 从服务器建立套接字连接主服务器
- 从服务器向主服务器发送ping命令
- 主从服务器身份验证
- 发送从服务器端口信息,从服务器向主服务器发送自己的监听端口号(本例为12345)
- 同步,从服务器发送psync命令给主服务器,执行同步操作,主从服务器状态达到一致
- 命令传播,主服务器向每个从服务器发送自己接收到的写命令,每个从服务器执行来自于主服务器的写命令,保证主从服务器状态始终一致
# 主服务器:127.0.0.1:6379
# 从服务器:127.0.0.1:12345
# 客户端向从服务器发送slaveof命令
slaveof <master_ip> <master_port>
slaveof 127.0.0.1 6379
1.3 心跳检测
在命令传播阶段,从服务器以一定频率向主服务器发送心跳检测命令replconf ack slave_offset,其主要有三大作用:
- 检查主从服务器的连接状态
- 辅助实现min_slaves配置选项
- 检测命令丢失,通过对比主从服务器的复制偏移量,不相等则存在命令丢失,此时主服务器会向从服务器补发丢失的命令数据
/* src/replication.c */
/* Send a REPLCONF ACK command to the master to inform it about the current
* processed offset. If we are not connected with a master, the command has
* no effects. */
void replicationSendAck(void) {
client *c = server.master;
if (c != NULL) {
c->flags |= CLIENT_MASTER_FORCE_REPLY;
addReplyArrayLen(c,3);
addReplyBulkCString(c,"REPLCONF");
addReplyBulkCString(c,"ACK");
addReplyBulkLongLong(c,c->reploff);
c->flags &= ~CLIENT_MASTER_FORCE_REPLY;
}
}
2 Sentinel(哨兵)
**Sentinel 本质上只是一个运行在特殊模式(哨兵模式)下的Redis服务器。**多个Sentinel实例组成Sentinel监控系统,能够监视任意多个主服务器和主服务器下的从服务器,并在主服务器下线时,从主服务器下的从服务器中选择一个升级为新的主服务器,替代已下线的旧主服务器继续处理命令。
2.1 Sentinel 启动与初始化
# 启动初始化Sentinel
redis-sentinel /path/to/your/sentinel.conf
redis-server /path/to/your/sentinel.conf --sentinel
主要步骤:
- 初始化一个普通的redis服务器
- 将普通redis服务器代码替换成sentinel专用代码,如命令表、部分函数的实现等
- 初始化sentinel状态,初始化一个
sentinel.c/sentinelState结构,来保存服务器中与sentinel功能相关的状态 - 根据配置文件,初始化sentinel监视的主服务器列表,即
sentinel.c/sentinelState结构中的masters字典,这里面保存了sentinel监视的所有主服务器的信息 - 创建连向主服务器的异步网络连接,分为命令连接和订阅连接两种,命令连接用于收发命令,订阅连接用于订阅主服务器的
__sentinel__:hello频道
/* src/sentinel.c */
/* Main state. */
struct sentinelState {
char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */
uint64_t current_epoch; /* Current epoch. */
dict *masters; /* Dictionary of master sentinelRedisInstances.
Key is the instance name, value is the
sentinelRedisInstance structure pointer. */
int tilt; /* Are we in TILT mode? */
int running_scripts; /* Number of scripts in execution right now. */
mstime_t tilt_start_time; /* When TITL started. */
mstime_t previous_time; /* Last time we ran the time handler. */
list *scripts_queue; /* Queue of user scripts to execute. */
char *announce_ip; /* IP addr that is gossiped to other sentinels if
not NULL. */
int announce_port; /* Port that is gossiped to other sentinels if
non zero. */
unsigned long simfailure_flags; /* Failures simulation. */
int deny_scripts_reconfig; /* Allow SENTINEL SET ... to change script
paths at runtime? */
char *sentinel_auth_pass; /* Password to use for AUTH against other sentinel */
char *sentinel_auth_user; /* Username for ACLs AUTH against other sentinel. */
int resolve_hostnames; /* Support use of hostnames, assuming DNS is well configured. */
int announce_hostnames; /* Announce hostnames instead of IPs when we have them. */
} sentinel;
2.2 Sentinel 与主从服务器及其他sentinel的通信
- sentinel 定时发送info命令,获取主服务器信息及其相关从服务器信息
- sentinel 定时发送info命令,获取从服务器信息,也会与新的从服务器建立命令连接和订阅连接
- sentinel定时向主从服务器发送频道消息,以此向其他监控同一主从服务器的sentinel宣布自己的存在
- 每个sentinel从频道接受其他sentinel发送的消息,更新sentinels字典,创建连向其他sentinel的命令连接
- 检测主观下线状态,sentinel以一定频率向其他实例(主服务器、从服务器、其他sentinel)发送ping命令,以此判断其他实例是否在线
- 检查客观下线状态,sentinel会统计其他sentinel认为主观下线的数量,当达到一定标准即可认为主服务器已客观下线
- 当主服务客观下线后。sentinel系统会选举领头sentinel,在同一纪元获得过半票数即为领头sentinel(Raft选举算法),对下线的主服务器进行故障转移。
2.3 故障转移
- 选出新的主服务器,从已下线的主服务器的从服务器中,选择一个作为新的主服务器(会经过各种条件过滤与排序,最后执行 slaveof no one)
- 修改从服务器的复制目标,领头sentinel发送salveof命令,告诉所有从服务器去复制新的主服务器
- 将已下线的旧主服务器变成从服务器
3 Cluster(集群)
Redis 集群实现了对redis的水平扩容,即启动N个redis节点,将整个数据库分布式的存储在这N个节点中,每个节点存储总数据的1/N。Redis 集群通过分区(partition)来提供一定程度的可用性: 即使集群中有一部分节点失效或者无法进行通讯, 集群也可以继续处理命令请求。
3.1 节点
一个节点就是一个运行在集群模式下的Redis服务器,在启动服务器时,会根据cluster-enabled选项来确定是集群模式(cluster)还是单机模式(stand alone)运行Redis服务器。一个Redis集群由多个节点组成。连接各个节点可使用命令:
cluster meet <ip> <port>
# 假设有三个独立节点:127.0.0.1:6380、127.0.0.1:6381、127.0.0.1:6382
# 则连接三个节点组成一个集群,可以在6380节点服务器执行:
cluster meet 127.0.0.1 6381
cluster meet 127.0.0.1 6382
# 可查看到一共有三个节点
cluster nodes
3.2 集群数据结构
1. clusterNode
clusterNode 结构保存了一个节点的当前状态,每个节点都使用一个clusterNode记录自己的当前状态,并为集群中的其他节点(包括主从节点)分别创建一个clusterNode结构,以此来记录其他节点的状态。
/* src/cluster.h */
typedef struct clusterNode {
mstime_t ctime; /* Node object creation time. */
char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
int flags; /* CLUSTER_NODE_... */
uint64_t configEpoch; /* Last configEpoch observed for this node */
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
uint16_t *slot_info_pairs; /* Slots info represented as (start/end) pair (consecutive index). */
int slot_info_pairs_count; /* Used number of slots in slot_info_pairs */
int numslots; /* Number of slots handled by this node */
int numslaves; /* Number of slave nodes, if this is a master */
struct clusterNode **slaves; /* pointers to slave nodes */
struct clusterNode *slaveof; /* pointer to the master node. Note that it
may be NULL even if the node is a slave
if we don't have the master node in our
tables. */
unsigned long long last_in_ping_gossip; /* The number of the last carried in the ping gossip section */
mstime_t ping_sent; /* Unix time we sent latest ping */
mstime_t pong_received; /* Unix time we received the pong */
mstime_t data_received; /* Unix time we received any data */
mstime_t fail_time; /* Unix time when FAIL flag was set */
mstime_t voted_time; /* Last time we voted for a slave of this master */
mstime_t repl_offset_time; /* Unix time we received offset for this node */
mstime_t orphaned_time; /* Starting time of orphaned master condition */
long long repl_offset; /* Last known repl offset for this node. */
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
sds hostname; /* The known hostname for this node */
int port; /* Latest known clients port (TLS or plain). */
int pport; /* Latest known clients plaintext port. Only used
if the main clients port is for TLS. */
int cport; /* Latest known cluster port of this node. */
clusterLink *link; /* TCP/IP link established toward this node */
clusterLink *inbound_link; /* TCP/IP link accepted from this node */
list *fail_reports; /* List of nodes signaling this as failing */
} clusterNode;
2. clusterLink
clusterNode 结构中的link属性是一个clusterLink结构,该结构保存了链接节点所需的信息。
/* src/cluster.h */
/* clusterLink encapsulates everything needed to talk with a remote node. */
typedef struct clusterLink {
mstime_t ctime; /* Link creation time */
connection *conn; /* Connection to remote node */
sds sndbuf; /* Packet send buffer */
char *rcvbuf; /* Packet reception buffer */
size_t rcvbuf_len; /* Used size of rcvbuf */
size_t rcvbuf_alloc; /* Allocated size of rcvbuf */
struct clusterNode *node; /* Node related to this link. Initialized to NULL when unknown */
int inbound; /* 1 if this link is an inbound link accepted from the related node */
} clusterLink;
3. clusterState
每个节点都保存了一个clusterState结构,记录着在当前节点视角下,整个集群目前的状态信息。
/* src/cluster.h */
typedef struct clusterState {
clusterNode *myself; /* This node */
uint64_t currentEpoch;
int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */
int size; /* Num of master nodes with at least one slot */
dict *nodes; /* Hash table of name -> clusterNode structures */
dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
clusterNode *importing_slots_from[CLUSTER_SLOTS];
clusterNode *slots[CLUSTER_SLOTS];
rax *slots_to_channels;
/* The following fields are used to take the slave state on elections. */
mstime_t failover_auth_time; /* Time of previous or next election. */
int failover_auth_count; /* Number of votes received so far. */
int failover_auth_sent; /* True if we already asked for votes. */
int failover_auth_rank; /* This slave rank for current auth request. */
uint64_t failover_auth_epoch; /* Epoch of the current election. */
int cant_failover_reason; /* Why a slave is currently not able to
failover. See the CANT_FAILOVER_* macros. */
/* Manual failover state in common. */
mstime_t mf_end; /* Manual failover time limit (ms unixtime).
It is zero if there is no MF in progress. */
/* Manual failover state of master. */
clusterNode *mf_slave; /* Slave performing the manual failover. */
/* Manual failover state of slave. */
long long mf_master_offset; /* Master offset the slave needs to start MF
or -1 if still not received. */
int mf_can_start; /* If non-zero signal that the manual failover
can start requesting masters vote. */
/* The following fields are used by masters to take state on elections. */
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
/* Stats */
/* Messages received and sent by type. */
long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
long long stats_pfail_nodes; /* Number of nodes in PFAIL status,
excluding nodes without address. */
unsigned long long stat_cluster_links_buffer_limit_exceeded; /* Total number of cluster links freed due to exceeding buffer limit */
} clusterState;
3.3 槽指派
Redis 集群通过分片的方式来保存数据库中的键值对,集群的整个数据库被分为16384个槽(slot),每个键值对都属于其中一个槽,集群中的每个节点服务器可处理0~16384个槽。当集群中16384个槽都有节点处理时,集群处于上线状态(ok),当有任何一个槽没有节点处理时,集群则处于下线状态(fail)。
/* src/cluster.h */
#define CLUSTER_SLOTS 16384
可以给每个节点分配一些槽:
cluster addslots <slot_number>
# 假设有三个节点服务器,在各个节点执行分配命令:
cluster addslots 0 1 2 3 4 ... 5000
cluster addslots 5001 5002 5003 5004 ... 10000
cluster addslots 10001 10002 10003 10004 ... 16383
# 可查看槽指派后的集群信息
cluster info
clusterState.slots 数组记录了集群所有16384个槽的指派信息,而 clusterNode.slots 数组只记录了 clusterNode 结构所代表节点的槽指派信息。
3.4 在集群中执行命令
对数据库的163841个槽都指派后(注意:集群模式下节点服务器只能使用0号数据库),集群即处于上线状态,各个节点可以开始处理命令请求。当节点接收到一个命令,会先计算键所属槽(slot),然后判断该槽是否由本节点负责,如果是就直接执行命令,反之不是则通过moved错误重定向到负责该槽的目标节点进行处理。
3.5 重新分片
即将已经分配给某个节点的一些槽重新分配给一个新的节点,且这些槽相关的键值对数据也会被移动到新节点。重新分片由Redis集群管理程序redis-trib负责执行。
3.6 moved 和 ask 错误
move错误表示槽(slot)的负责权已经由一个节点转移到另一个节点,是在集群中执行命令时出现,表明该命令请求的键不在当前节点负责的槽中,需要通过moved命令转移到负责该槽的目标节点进行处理。
而ask错误是在重新分片时,两个节点在迁移槽的过程中出现的一种临时措施,因为此时该槽的一部分键值对还在源节点,而另一部分键值对已经移动到目标节点,在源节点没能找到该键值对时,通过ask错误转到目标节点继续寻找该键值对。
3.7 主从复制与故障转移
Redis 集群中的节点也分主节点(master)和从节点(slave)。主节点负责槽和处理命令请求,从节点用于复制主节点,并在主节点下线时替代其位置继续处理命令请求。
# 设置当前节点为master_node_id的从节点
cluster replicate <master_node_id>
集群中的每个节点会定时向其他节点发送ping消息,以检测对方是否在线,未能正常回复的节点会被标识为疑似下线状态(probable fail,pfail)。其余故障检测和转移过程,跟sentinel系统类似(同采用Raft选举算法)。
3.8 消息与通信
集群中的节点通过发送和接收消息来进行通信,常见消息有:meet、ping、pong、publish、fail。消息分由消息头和消息正文组成,每个消息头都是一个cluster.h/clusterMsg结构表示,记录了消息发送者自身的一些信息;每个消息正文都是一个cluster.h/clusterMsg结构表示。
/* src/cluster.h */
typedef struct {
char sig[4]; /* Signature "RCmb" (Redis Cluster message bus). */
uint32_t totlen; /* Total length of this message */
uint16_t ver; /* Protocol version, currently set to 1. */
uint16_t port; /* TCP base port number. */
uint16_t type; /* Message type */
uint16_t count; /* Only used for some kind of messages. */
uint64_t currentEpoch; /* The epoch accordingly to the sending node. */
uint64_t configEpoch; /* The config epoch if it's a master, or the last
epoch advertised by its master if it is a
slave. */
uint64_t offset; /* Master replication offset if node is a master or
processed replication offset if node is a slave. */
char sender[CLUSTER_NAMELEN]; /* Name of the sender node */
unsigned char myslots[CLUSTER_SLOTS/8];
char slaveof[CLUSTER_NAMELEN];
char myip[NET_IP_STR_LEN]; /* Sender IP, if not all zeroed. */
uint16_t extensions; /* Number of extensions sent along with this packet. */
char notused1[30]; /* 30 bytes reserved for future usage. */
uint16_t pport; /* Sender TCP plaintext port, if base port is TLS */
uint16_t cport; /* Sender TCP cluster bus port */
uint16_t flags; /* Sender node flags */
unsigned char state; /* Cluster state from the POV of the sender */
unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */
union clusterMsgData data;
} clusterMsg;
union clusterMsgData {
/* PING, MEET and PONG */
struct {
/* Array of N clusterMsgDataGossip structures */
clusterMsgDataGossip gossip[1];
/* Extension data that can optionally be sent for ping/meet/pong
* messages. We can't explicitly define them here though, since
* the gossip array isn't the real length of the gossip data. */
} ping;
/* FAIL */
struct {
clusterMsgDataFail about;
} fail;
/* PUBLISH */
struct {
clusterMsgDataPublish msg;
} publish;
/* UPDATE */
struct {
clusterMsgDataUpdate nodecfg;
} update;
/* MODULE */
struct {
clusterMsgModule msg;
} module;
};