Back to Blogs
Redis
设计与实现
多机数据库的实现

Redis 设计与实现:多机数据库的实现

Soloman
2022-09-24

Redis 设计与实现:多机数据库的实现

1 主从复制

初次主从同步,后续命令传播。

# 只能完整重同步
sync

# 可以部分重同步
psync

# 初次复制,执行完整重同步
psync ? -1

# 之前已经复制过,则根据主服务器runid和从服务器偏移量offset信息判断,选择部分重同步/完整重同步
psync <runid> <offset>

1.1 部分重同步的实现

  1. 主从服务器的复制偏移量,主从服务器分别会维护一个复制偏移量。如果主从偏移量相等,则主从服务器处于一致状态;反之,主从服务器处于不一致状态,此时从服务器将向主服务器发送psync同步命令
  2. 主服务器的复制积压缓冲区,由主服务器维护的一个固定长度(可配置,根据需要调整缓冲区大小)的先进先出队列,里面保存着部分最近传播的命令。主服务器会检查从服务器发送psync命令中的偏移量,如果该偏移量之后的数据存在于复制积压缓冲区内,则执行部分重同步;反之不在复制积压缓冲区内,则执行完整重同步
  3. 服务器运行ID,每个Redis服务器,无论主从,都有自己的运行ID(run ID)。主从服务器初次复制时,主服务器会将运行ID发送给每个从服务器保存好,当某个主从连接断开重连后,从服务器会将之前保存的主服务器运行ID发送给现在连接的主服务器,主服务器判断自己的运行ID与之是否相同,从而决定执行部分重同步还是完整重同步。

1.2 复制的实现

  1. 从服务器根据slaveof命令信息,设置主服务器的地址和端口(本例为127.0.0.1:6379)
  2. 从服务器建立套接字连接主服务器
  3. 从服务器向主服务器发送ping命令
  4. 主从服务器身份验证
  5. 发送从服务器端口信息,从服务器向主服务器发送自己的监听端口号(本例为12345)
  6. 同步,从服务器发送psync命令给主服务器,执行同步操作,主从服务器状态达到一致
  7. 命令传播,主服务器向每个从服务器发送自己接收到的写命令,每个从服务器执行来自于主服务器的写命令,保证主从服务器状态始终一致
# 主服务器:127.0.0.1:6379
# 从服务器:127.0.0.1:12345

# 客户端向从服务器发送slaveof命令
slaveof <master_ip> <master_port>

slaveof 127.0.0.1 6379

1.3 心跳检测

在命令传播阶段,从服务器以一定频率向主服务器发送心跳检测命令replconf ack slave_offset,其主要有三大作用:

  1. 检查主从服务器的连接状态
  2. 辅助实现min_slaves配置选项
  3. 检测命令丢失,通过对比主从服务器的复制偏移量,不相等则存在命令丢失,此时主服务器会向从服务器补发丢失的命令数据
/* src/replication.c */

/* Send a REPLCONF ACK command to the master to inform it about the current
 * processed offset. If we are not connected with a master, the command has
 * no effects. */
void replicationSendAck(void) {
    client *c = server.master;

    if (c != NULL) {
        c->flags |= CLIENT_MASTER_FORCE_REPLY;
        addReplyArrayLen(c,3);
        addReplyBulkCString(c,"REPLCONF");
        addReplyBulkCString(c,"ACK");
        addReplyBulkLongLong(c,c->reploff);
        c->flags &= ~CLIENT_MASTER_FORCE_REPLY;
    }
}

2 Sentinel(哨兵)

**Sentinel 本质上只是一个运行在特殊模式(哨兵模式)下的Redis服务器。**多个Sentinel实例组成Sentinel监控系统,能够监视任意多个主服务器和主服务器下的从服务器,并在主服务器下线时,从主服务器下的从服务器中选择一个升级为新的主服务器,替代已下线的旧主服务器继续处理命令。

2.1 Sentinel 启动与初始化

# 启动初始化Sentinel
redis-sentinel /path/to/your/sentinel.conf

redis-server /path/to/your/sentinel.conf --sentinel

主要步骤:

  1. 初始化一个普通的redis服务器
  2. 将普通redis服务器代码替换成sentinel专用代码,如命令表、部分函数的实现等
  3. 初始化sentinel状态,初始化一个sentinel.c/sentinelState结构,来保存服务器中与sentinel功能相关的状态
  4. 根据配置文件,初始化sentinel监视的主服务器列表,即sentinel.c/sentinelState结构中的masters字典,这里面保存了sentinel监视的所有主服务器的信息
  5. 创建连向主服务器的异步网络连接,分为命令连接和订阅连接两种,命令连接用于收发命令,订阅连接用于订阅主服务器的__sentinel__:hello频道
/* src/sentinel.c */

/* Main state. */
struct sentinelState {
    char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */
    uint64_t current_epoch;         /* Current epoch. */
    dict *masters;      /* Dictionary of master sentinelRedisInstances.
                           Key is the instance name, value is the
                           sentinelRedisInstance structure pointer. */
    int tilt;           /* Are we in TILT mode? */
    int running_scripts;    /* Number of scripts in execution right now. */
    mstime_t tilt_start_time;       /* When TITL started. */
    mstime_t previous_time;         /* Last time we ran the time handler. */
    list *scripts_queue;            /* Queue of user scripts to execute. */
    char *announce_ip;  /* IP addr that is gossiped to other sentinels if
                           not NULL. */
    int announce_port;  /* Port that is gossiped to other sentinels if
                           non zero. */
    unsigned long simfailure_flags; /* Failures simulation. */
    int deny_scripts_reconfig; /* Allow SENTINEL SET ... to change script
                                  paths at runtime? */
    char *sentinel_auth_pass;    /* Password to use for AUTH against other sentinel */
    char *sentinel_auth_user;    /* Username for ACLs AUTH against other sentinel. */
    int resolve_hostnames;       /* Support use of hostnames, assuming DNS is well configured. */
    int announce_hostnames;      /* Announce hostnames instead of IPs when we have them. */
} sentinel;

2.2 Sentinel 与主从服务器及其他sentinel的通信

  • sentinel 定时发送info命令,获取主服务器信息及其相关从服务器信息
  • sentinel 定时发送info命令,获取从服务器信息,也会与新的从服务器建立命令连接和订阅连接
  • sentinel定时向主从服务器发送频道消息,以此向其他监控同一主从服务器的sentinel宣布自己的存在
  • 每个sentinel从频道接受其他sentinel发送的消息,更新sentinels字典,创建连向其他sentinel的命令连接
  • 检测主观下线状态,sentinel以一定频率向其他实例(主服务器、从服务器、其他sentinel)发送ping命令,以此判断其他实例是否在线
  • 检查客观下线状态,sentinel会统计其他sentinel认为主观下线的数量,当达到一定标准即可认为主服务器已客观下线
  • 当主服务客观下线后。sentinel系统会选举领头sentinel,在同一纪元获得过半票数即为领头sentinel(Raft选举算法),对下线的主服务器进行故障转移。

2.3 故障转移

  1. 选出新的主服务器,从已下线的主服务器的从服务器中,选择一个作为新的主服务器(会经过各种条件过滤与排序,最后执行 slaveof no one)
  2. 修改从服务器的复制目标,领头sentinel发送salveof命令,告诉所有从服务器去复制新的主服务器
  3. 将已下线的旧主服务器变成从服务器

3 Cluster(集群)

Redis 集群实现了对redis的水平扩容,即启动N个redis节点,将整个数据库分布式的存储在这N个节点中,每个节点存储总数据的1/N。Redis 集群通过分区(partition)来提供一定程度的可用性: 即使集群中有一部分节点失效或者无法进行通讯, 集群也可以继续处理命令请求。

3.1 节点

一个节点就是一个运行在集群模式下的Redis服务器,在启动服务器时,会根据cluster-enabled选项来确定是集群模式(cluster)还是单机模式(stand alone)运行Redis服务器。一个Redis集群由多个节点组成。连接各个节点可使用命令:

cluster meet <ip> <port>

# 假设有三个独立节点:127.0.0.1:6380、127.0.0.1:6381、127.0.0.1:6382
# 则连接三个节点组成一个集群,可以在6380节点服务器执行:
cluster meet 127.0.0.1 6381
cluster meet 127.0.0.1 6382

# 可查看到一共有三个节点
cluster nodes

3.2 集群数据结构

1. clusterNode

clusterNode 结构保存了一个节点的当前状态,每个节点都使用一个clusterNode记录自己的当前状态,并为集群中的其他节点(包括主从节点)分别创建一个clusterNode结构,以此来记录其他节点的状态。

/* src/cluster.h */

typedef struct clusterNode {
    mstime_t ctime; /* Node object creation time. */
    char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
    int flags;      /* CLUSTER_NODE_... */
    uint64_t configEpoch; /* Last configEpoch observed for this node */
    unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
    uint16_t *slot_info_pairs; /* Slots info represented as (start/end) pair (consecutive index). */
    int slot_info_pairs_count; /* Used number of slots in slot_info_pairs */
    int numslots;   /* Number of slots handled by this node */
    int numslaves;  /* Number of slave nodes, if this is a master */
    struct clusterNode **slaves; /* pointers to slave nodes */
    struct clusterNode *slaveof; /* pointer to the master node. Note that it
                                    may be NULL even if the node is a slave
                                    if we don't have the master node in our
                                    tables. */
    unsigned long long last_in_ping_gossip; /* The number of the last carried in the ping gossip section */
    mstime_t ping_sent;      /* Unix time we sent latest ping */
    mstime_t pong_received;  /* Unix time we received the pong */
    mstime_t data_received;  /* Unix time we received any data */
    mstime_t fail_time;      /* Unix time when FAIL flag was set */
    mstime_t voted_time;     /* Last time we voted for a slave of this master */
    mstime_t repl_offset_time;  /* Unix time we received offset for this node */
    mstime_t orphaned_time;     /* Starting time of orphaned master condition */
    long long repl_offset;      /* Last known repl offset for this node. */
    char ip[NET_IP_STR_LEN];    /* Latest known IP address of this node */
    sds hostname;               /* The known hostname for this node */
    int port;                   /* Latest known clients port (TLS or plain). */
    int pport;                  /* Latest known clients plaintext port. Only used
                                   if the main clients port is for TLS. */
    int cport;                  /* Latest known cluster port of this node. */
    clusterLink *link;          /* TCP/IP link established toward this node */
    clusterLink *inbound_link;  /* TCP/IP link accepted from this node */
    list *fail_reports;         /* List of nodes signaling this as failing */
} clusterNode;

2. clusterLink

clusterNode 结构中的link属性是一个clusterLink结构,该结构保存了链接节点所需的信息。

/* src/cluster.h */

/* clusterLink encapsulates everything needed to talk with a remote node. */
typedef struct clusterLink {
    mstime_t ctime;             /* Link creation time */
    connection *conn;           /* Connection to remote node */
    sds sndbuf;                 /* Packet send buffer */
    char *rcvbuf;               /* Packet reception buffer */
    size_t rcvbuf_len;          /* Used size of rcvbuf */
    size_t rcvbuf_alloc;        /* Allocated size of rcvbuf */
    struct clusterNode *node;   /* Node related to this link. Initialized to NULL when unknown */
    int inbound;                /* 1 if this link is an inbound link accepted from the related node */
} clusterLink;

3. clusterState

每个节点都保存了一个clusterState结构,记录着在当前节点视角下,整个集群目前的状态信息。

/* src/cluster.h */

typedef struct clusterState {
    clusterNode *myself;  /* This node */
    uint64_t currentEpoch;
    int state;            /* CLUSTER_OK, CLUSTER_FAIL, ... */
    int size;             /* Num of master nodes with at least one slot */
    dict *nodes;          /* Hash table of name -> clusterNode structures */
    dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
    clusterNode *migrating_slots_to[CLUSTER_SLOTS];
    clusterNode *importing_slots_from[CLUSTER_SLOTS];
    clusterNode *slots[CLUSTER_SLOTS];
    rax *slots_to_channels;
    /* The following fields are used to take the slave state on elections. */
    mstime_t failover_auth_time; /* Time of previous or next election. */
    int failover_auth_count;    /* Number of votes received so far. */
    int failover_auth_sent;     /* True if we already asked for votes. */
    int failover_auth_rank;     /* This slave rank for current auth request. */
    uint64_t failover_auth_epoch; /* Epoch of the current election. */
    int cant_failover_reason;   /* Why a slave is currently not able to
                                   failover. See the CANT_FAILOVER_* macros. */
    /* Manual failover state in common. */
    mstime_t mf_end;            /* Manual failover time limit (ms unixtime).
                                   It is zero if there is no MF in progress. */
    /* Manual failover state of master. */
    clusterNode *mf_slave;      /* Slave performing the manual failover. */
    /* Manual failover state of slave. */
    long long mf_master_offset; /* Master offset the slave needs to start MF
                                   or -1 if still not received. */
    int mf_can_start;           /* If non-zero signal that the manual failover
                                   can start requesting masters vote. */
    /* The following fields are used by masters to take state on elections. */
    uint64_t lastVoteEpoch;     /* Epoch of the last vote granted. */
    int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
    /* Stats */
    /* Messages received and sent by type. */
    long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
    long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
    long long stats_pfail_nodes;    /* Number of nodes in PFAIL status,
                                       excluding nodes without address. */
    unsigned long long stat_cluster_links_buffer_limit_exceeded;  /* Total number of cluster links freed due to exceeding buffer limit */
} clusterState;

3.3 槽指派

Redis 集群通过分片的方式来保存数据库中的键值对,集群的整个数据库被分为16384个槽(slot),每个键值对都属于其中一个槽,集群中的每个节点服务器可处理0~16384个槽。当集群中16384个槽都有节点处理时,集群处于上线状态(ok),当有任何一个槽没有节点处理时,集群则处于下线状态(fail)。

/* src/cluster.h */

#define CLUSTER_SLOTS 16384

可以给每个节点分配一些槽:

cluster addslots <slot_number>

# 假设有三个节点服务器,在各个节点执行分配命令:
cluster addslots 0 1 2 3 4 ... 5000
cluster addslots 5001 5002 5003 5004 ... 10000
cluster addslots 10001 10002 10003 10004 ... 16383

# 可查看槽指派后的集群信息
cluster info

clusterState.slots 数组记录了集群所有16384个槽的指派信息,而 clusterNode.slots 数组只记录了 clusterNode 结构所代表节点的槽指派信息。

3.4 在集群中执行命令

对数据库的163841个槽都指派后(注意:集群模式下节点服务器只能使用0号数据库),集群即处于上线状态,各个节点可以开始处理命令请求。当节点接收到一个命令,会先计算键所属槽(slot),然后判断该槽是否由本节点负责,如果是就直接执行命令,反之不是则通过moved错误重定向到负责该槽的目标节点进行处理。

3.5 重新分片

即将已经分配给某个节点的一些槽重新分配给一个新的节点,且这些槽相关的键值对数据也会被移动到新节点。重新分片由Redis集群管理程序redis-trib负责执行。

3.6 moved 和 ask 错误

move错误表示槽(slot)的负责权已经由一个节点转移到另一个节点,是在集群中执行命令时出现,表明该命令请求的键不在当前节点负责的槽中,需要通过moved命令转移到负责该槽的目标节点进行处理。

而ask错误是在重新分片时,两个节点在迁移槽的过程中出现的一种临时措施,因为此时该槽的一部分键值对还在源节点,而另一部分键值对已经移动到目标节点,在源节点没能找到该键值对时,通过ask错误转到目标节点继续寻找该键值对。

3.7 主从复制与故障转移

Redis 集群中的节点也分主节点(master)和从节点(slave)。主节点负责槽和处理命令请求,从节点用于复制主节点,并在主节点下线时替代其位置继续处理命令请求。

# 设置当前节点为master_node_id的从节点

cluster replicate <master_node_id>

集群中的每个节点会定时向其他节点发送ping消息,以检测对方是否在线,未能正常回复的节点会被标识为疑似下线状态(probable fail,pfail)。其余故障检测和转移过程,跟sentinel系统类似(同采用Raft选举算法)。

3.8 消息与通信

集群中的节点通过发送和接收消息来进行通信,常见消息有:meet、ping、pong、publish、fail。消息分由消息头和消息正文组成,每个消息头都是一个cluster.h/clusterMsg结构表示,记录了消息发送者自身的一些信息;每个消息正文都是一个cluster.h/clusterMsg结构表示。

/* src/cluster.h */

typedef struct {
    char sig[4];        /* Signature "RCmb" (Redis Cluster message bus). */
    uint32_t totlen;    /* Total length of this message */
    uint16_t ver;       /* Protocol version, currently set to 1. */
    uint16_t port;      /* TCP base port number. */
    uint16_t type;      /* Message type */
    uint16_t count;     /* Only used for some kind of messages. */
    uint64_t currentEpoch;  /* The epoch accordingly to the sending node. */
    uint64_t configEpoch;   /* The config epoch if it's a master, or the last
                               epoch advertised by its master if it is a
                               slave. */
    uint64_t offset;    /* Master replication offset if node is a master or
                           processed replication offset if node is a slave. */
    char sender[CLUSTER_NAMELEN]; /* Name of the sender node */
    unsigned char myslots[CLUSTER_SLOTS/8];
    char slaveof[CLUSTER_NAMELEN];
    char myip[NET_IP_STR_LEN];    /* Sender IP, if not all zeroed. */
    uint16_t extensions; /* Number of extensions sent along with this packet. */
    char notused1[30];   /* 30 bytes reserved for future usage. */
    uint16_t pport;      /* Sender TCP plaintext port, if base port is TLS */
    uint16_t cport;      /* Sender TCP cluster bus port */
    uint16_t flags;      /* Sender node flags */
    unsigned char state; /* Cluster state from the POV of the sender */
    unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */
    union clusterMsgData data;
} clusterMsg;

union clusterMsgData {
    /* PING, MEET and PONG */
    struct {
        /* Array of N clusterMsgDataGossip structures */
        clusterMsgDataGossip gossip[1];
        /* Extension data that can optionally be sent for ping/meet/pong
         * messages. We can't explicitly define them here though, since
         * the gossip array isn't the real length of the gossip data. */
    } ping;

    /* FAIL */
    struct {
        clusterMsgDataFail about;
    } fail;

    /* PUBLISH */
    struct {
        clusterMsgDataPublish msg;
    } publish;

    /* UPDATE */
    struct {
        clusterMsgDataUpdate nodecfg;
    } update;

    /* MODULE */
    struct {
        clusterMsgModule msg;
    } module;
};

4 Redis 相关技术文章

1.Redis 设计与实现:数据结构与对象

2.Redis 设计与实现:单机数据库的实现

3.Redis 数据库基础知识:5大数据类型对象

4.如何在Python中使用Redis