本文将会从源码层面解读 Redis 的各项机制。我们会从最基础的内容开始,尽量只看 Redis 最核心的部分,而剥离掉一些不太重要的内容,所有的源码都给出了 GitHub 的链接。
写作本文时,Redis CE 还在 7.4 版本;Redis 8.0 仍然在 Pre-Release。因此,本文主要基于 Redis 7.4,其余版本可能有所不同。本文使用的环境为 Debian 12。
事件循环
函数入口
对于一个 Redis 实例来说,其启动后会首先运行 server.c
中的 main
函数。
点击查看源码
int main(int argc, char **argv) {
struct timeval tv;
int j;
char config_from_stdin = 0;
// 测试相关逻辑
/* ... */
// 初始化随机数种子
/* ... */
// 获取可执行文件名称,以检查是否需要从快照中恢复数据
char *exec_name = strrchr(argv[0], '/');
if (exec_name == NULL) exec_name = argv[0];
// 检查是否存在哨兵模式
server.sentinel_mode = checkForSentinelMode(argc,argv, exec_name);
// 初始化服务器配置
initServerConfig();
// 初始化 ACL(即用户认证与管理)
ACLInit();
// 初始化自定义模块
moduleInitModulesSystem();
// 初始化连接类型系统(即支持的网络协议等)
connTypeInitialize();
server.executable = getAbsolutePath(argv[0]);
server.exec_argv = zmalloc(sizeof(char*)*(argc+1));
server.exec_argv[argc] = NULL;
for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);
// 初始化哨兵模式的配置
if (server.sentinel_mode) {
initSentinelConfig();
initSentinel();
}
// 检查是否需要从快照中恢复数据
if (strstr(exec_name,"redis-check-rdb") != NULL)
redis_check_rdb_main(argc,argv,NULL);
else if (strstr(exec_name,"redis-check-aof") != NULL)
redis_check_aof_main(argc,argv);
// 处理命令行参数
/* ... */
// 检查系统需求(内存限制等)
/* ... */
// 检测是否在受监督模式下(如 systemd)运行
// 如果是,则以守护进程的方式运行
/* ... */
// 记录启动信息,包括版本、位数、提交信息等
/* ... */
// 初始化 Redis 服务器的核心组件
initServer();
// 记录当前进程的 PID
if (background || server.pidfile) createPidFile();
if (server.set_proc_title) redisSetProcTitle(NULL);
// 打印 ASCII 艺术图案
redisAsciiArt();
// 检测 TCP Backlog 设置
checkTcpBacklogSettings();
// 集群模式初始化
if (server.cluster_enabled) {
clusterInit();
}
// 加载自定义模块
if (!server.sentinel_mode) {
moduleInitModulesSystemLast();
moduleLoadFromQueue();
}
// 加载 ACL 信息
ACLLoadUsersAtStartup();
// 初始化网络监听器
initListeners();
// 完成初始化的所有工作
if (server.cluster_enabled) {
clusterInitLast();
}
InitServerLast();
if (!server.sentinel_mode) {
// 加载并初始化 AOF 和 RDB 文件,并验证集群是否一致
serverLog(LL_NOTICE,"Server initialized");
aofLoadManifestFromDisk();
loadDataFromDisk();
aofOpenIfNeededOnServerStart();
aofDelHistoryFiles();
if (server.cluster_enabled) {
serverAssert(verifyClusterConfigWithData() == C_OK);
}
// 监听客户端连接
for (j = 0; j < CONN_TYPE_MAX; j++) {
connListener *listener = &server.listeners[j];
if (listener->ct == NULL)
continue;
serverLog(LL_NOTICE,"Ready to accept connections %s", listener->ct->get_type(NULL));
}
// 监督模式下告知 systemd 服务器已准备好
/* ... */
} else {
// Sentinel 模式下的初始化逻辑
/* ... */
}
// 内存限制检查
/* ... */
// 设置 CPU 亲和性
redisSetCpuAffinity(server.server_cpulist);
// 调整进程 OOM 优先级
setOOMScoreAdj(-1);
// 启动事件循环
aeMain(server.el);
// 清理事件循环资源
aeDeleteEventLoop(server.el);
return 0;
}
容易看出,在启动一个 Redis 实例后,程序依次进行了如下工作:
- 连接初始化
- 配置文件加载与初始化
- 快照数据恢复
- 集群和哨兵初始化
- 事件循环主逻辑
类似 Node.js,Redis 是事件驱动的。
从事件处理角度来说,Redis 是单线程的,它使用了一个事件循环来处理事件。事件循环是一个无限循环,不断地从事件队列中取出事件,然后执行事件对应的回调函数。
Redis 事件主要包含两类:
- 文件事件:Redis 通过 socket 与客户端通信,通过文件事件来处理客户端的请求;
- 时间事件:Redis 通过时间事件来执行定时任务。
事件循环通过 aeMain
作为入口启动。
点击查看源码
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
它调用了 aeProcessEvents
函数。
点击查看源码
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
/* flags 包含了处理哪些、如何处理事件 */
// processed 为已经处理的事件数
// numevents 为需要处理的事件数
int processed = 0, numevents;
// 事件事件和文件事件都不需要处理,则直接返回
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) &&
!(flags & AE_DONT_WAIT))) {
int j;
struct timeval tv, *tvp = NULL;
int64_t usUntilTimer;
if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP))
eventLoop->beforesleep(eventLoop);
// 根据 AE_DONT_WAIT 或时间事件的最近触发时间,设置调用多路复用 API 的超时时间
// 如果没有文件事件且没有等待时间,则多路复用会无限期阻塞
if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else if (flags & AE_TIME_EVENTS) {
usUntilTimer = usUntilEarliestTimer(eventLoop);
if (usUntilTimer >= 0) {
tv.tv_sec = usUntilTimer / 1000000;
tv.tv_usec = usUntilTimer % 1000000;
tvp = &tv;
}
}
// 多路复用处理 socket
numevents = aeApiPoll(eventLoop, tvp);
// 如果不需要处理文件事件,则不处理文件事件
if (!(flags & AE_FILE_EVENTS)) {
numevents = 0;
}
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
// 处理文件事件
for (j = 0; j < numevents; j++) {
int fd = eventLoop->fired[j].fd;
aeFileEvent *fe = &eventLoop->events[fd];
int mask = eventLoop->fired[j].mask;
int fired = 0;
// 先处理读事件,再处理写事件;如果设置了 AE_BARRIER 则反转顺序
int invert = fe->mask & AE_BARRIER;
// 读事件
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd];
}
// 写事件
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
// 反转顺序的情况
if (invert) {
fe = &eventLoop->events[fd];
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
// 处理时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
// 返回已经处理的事件数
return processed;
}
这里看到了几个关键点:
- Redis 使用多路复用处理 socket
- 事件循环先处理文件事件,再处理时间事件
- 事件循环有阻塞和非阻塞两种模式
- 用户可以通过
beforesleep
和aftersleep
回调函数来处理一些自定义的逻辑,如日志或统计等
这里的文件事件和时间事件是 Redis 事件循环的核心:
- 文件事件是指与文件描述符相关的事件,如读取 socket 数据、向 socket 写入数据等。Redis 使用 I/O 多路复用技术来处理文件事件;
- 时间事件是指需要在特定时间点或时间间隔内执行的事件,如清理过期键、持久化、统计和监控等。
接下来,我们会具体来看它们的实现。
I/O 多路复用
对于 Redis 的使用场景来讲,网络带宽和内存带宽才是真正的瓶颈。如果使用 read
和 write
系统调用,那么可能会导致阻塞,从而浪费 CPU 时间。因此,Redis 使用了非阻塞的 I/O 多路复用技术,如 epoll
、select
等,来监听多个文件描述符的可读、可写和异常事件。
epoll
是 Linux 下的一种 I/O 多路复用机制。它的使用方法如下:
使用
epoll_create
创建一个 epoll 实例:int epoll_create(int size);
epoll_create
的返回值是一个文件描述符,用于后续的操作。
epoll_create1
是epoll_create
的一个变种,可以传入一个flags
参数。假如flags
为EPOLL_CLOEXEC
,则在exec
时关闭文件描述符。使用
epoll_ctl
添加/删除/修改监听的文件描述符:int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
使用
epoll_wait
等待事件发生:int epoll_wait(int epfd, struct epoll_event*events, int maxevents, int timeout);
Redis 支持多种 I/O 多路复用程序,其实现在 ae_epoll.c
、ae_evport.c
、ae_kqueue.c
、ae_select.c
中。Redis 为它们包装了统一的接口,并会自动选择性能最好的使用。
点击查看源码
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
我们以 ae_epoll
为例。
点击查看源码
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
static int aeApiCreate(aeEventLoop *eventLoop) {
/* ... */
// 分配 epoll_event
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
/* ... */
// 创建 epoll 文件描述符
state->epfd = epoll_create(1024);
/* ... */
}
static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
/* ... */
// 重新分配 epoll_event
state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);
/* ... */
}
static void aeApiFree(aeEventLoop *eventLoop) {
/* ... */
// 关闭 epoll 文件描述符
close(state->epfd);
/* ... */
}
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
/* ... */
// 根据文件描述符是否已经被监控,确定是 EPOLL_CTL_ADD 还是 EPOLL_CTL_MOD
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
/* ... */
// 添加监听的文件描述符
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
/* ... */
}
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
/* ... */
// 删除监听的文件描述符
epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
/* ... */
}
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
/* ... */
// 等待事件发生
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
// 遍历返回的事件
for (j = 0; j < numevents; j++) {
/* ... */
}
}
/* ... */
}
/* ... */
可以看到,它将开启、关闭、增加、删除、处理这五个事件进行了封装处理。
时间事件
处理事件事件的主要逻辑在 processTimeEvents
函数中。
点击查看源码
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0; // 处理的事件数量
aeTimeEvent *te; // 时间事件链表的头部
long long maxId; // 当前循环中可处理的最大时间事件 ID
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
monotime now = getMonotonicUs();
// 遍历所有的时间事件节点
while(te) {
long long id;
// 如果事件被标记为删除,则根据引用计数决定是否释放内存
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
if (te->refcount) {
te = next;
continue;
}
if (te->prev)
te->prev->next = te->next;
else
eventLoop->timeEventHead = te->next;
if (te->next)
te->next->prev = te->prev;
if (te->finalizerProc) {
te->finalizerProc(eventLoop, te->clientData);
now = getMonotonicUs();
}
zfree(te);
te = next;
continue;
}
// 检查事件 ID 是否超出最大值
// 防止在当前迭代中处理新创建的时间事件
if (te->id > maxId) {
te = te->next;
continue;
}
// 检查当前事件是否到期
if (te->when <= now) {
int retval;
id = te->id;
te->refcount++;
// 调用时间处理函数
retval = te->timeProc(eventLoop, id, te->clientData);
te->refcount--;
processed++;
now = getMonotonicUs();
if (retval != AE_NOMORE) {
// 事件需要重新调度,更新触发时间
te->when = now + (monotime)retval * 1000;
} else {
// 标记事件为删除
te->id = AE_DELETED_EVENT_ID;
}
}
// 移动到下一个事件
te = te->next;
}
// 返回处理的事件数量
return processed;
}
可以看到,Redis 的时间事件是一个双向链表,timeEventHead
是链表的头部。每个时间事件都有一个 ID 和一个触发时间 when
。当 when
小于等于当前时间时,就会调用对应的回调函数。
同时,函数会检查引用计数和最大 ID,确保能够正确地处理事件。
Comments