Ch3nyang's blog collections_bookmark

post

person

about

category

category

local_offer

tag

rss_feed

rss

深入 Redis 源码 | (6)
Redis 事件循环

calendar_month 2025-01
archive 中间件
tag redis

There are 6 posts in series 深入 Redis 源码.

本文将会从源码层面解读 Redis 的各项机制。我们会从最基础的内容开始,尽量只看 Redis 最核心的部分,而剥离掉一些不太重要的内容,所有的源码都给出了 GitHub 的链接。

写作本文时,Redis CE 还在 7.4 版本;Redis 8.0 仍然在 Pre-Release。因此,本文主要基于 Redis 7.4,其余版本可能有所不同。本文使用的环境为 Debian 12。

事件循环

函数入口

对于一个 Redis 实例来说,其启动后会首先运行 server.c 中的 main 函数

点击查看源码
int main(int argc, char **argv) {
    struct timeval tv;
    int j;
    char config_from_stdin = 0;

    // 测试相关逻辑
    /* ... */

    // 初始化随机数种子
    /* ... */

    // 获取可执行文件名称,以检查是否需要从快照中恢复数据
    char *exec_name = strrchr(argv[0], '/');
    if (exec_name == NULL) exec_name = argv[0];

    // 检查是否存在哨兵模式
    server.sentinel_mode = checkForSentinelMode(argc,argv, exec_name);

    // 初始化服务器配置
    initServerConfig();

    // 初始化 ACL(即用户认证与管理)
    ACLInit();

    // 初始化自定义模块
    moduleInitModulesSystem();

    // 初始化连接类型系统(即支持的网络协议等)
    connTypeInitialize();

    server.executable = getAbsolutePath(argv[0]);
    server.exec_argv = zmalloc(sizeof(char*)*(argc+1));
    server.exec_argv[argc] = NULL;
    for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);

    // 初始化哨兵模式的配置
    if (server.sentinel_mode) {
        initSentinelConfig();
        initSentinel();
    }

    // 检查是否需要从快照中恢复数据
    if (strstr(exec_name,"redis-check-rdb") != NULL)
        redis_check_rdb_main(argc,argv,NULL);
    else if (strstr(exec_name,"redis-check-aof") != NULL)
        redis_check_aof_main(argc,argv);

    // 处理命令行参数
    /* ... */

    // 检查系统需求(内存限制等)
    /* ... */

    // 检测是否在受监督模式下(如 systemd)运行
    // 如果是,则以守护进程的方式运行
    /* ... */

    // 记录启动信息,包括版本、位数、提交信息等
    /* ... */

    // 初始化 Redis 服务器的核心组件
    initServer();

    // 记录当前进程的 PID
    if (background || server.pidfile) createPidFile();
    if (server.set_proc_title) redisSetProcTitle(NULL);
    // 打印 ASCII 艺术图案
    redisAsciiArt();
    // 检测 TCP Backlog 设置
    checkTcpBacklogSettings();

    // 集群模式初始化
    if (server.cluster_enabled) {
        clusterInit();
    }

    // 加载自定义模块
    if (!server.sentinel_mode) {
        moduleInitModulesSystemLast();
        moduleLoadFromQueue();
    }

    // 加载 ACL 信息
    ACLLoadUsersAtStartup();

    // 初始化网络监听器
    initListeners();

    // 完成初始化的所有工作
    if (server.cluster_enabled) {
        clusterInitLast();
    }
    InitServerLast();

    if (!server.sentinel_mode) {
        // 加载并初始化 AOF 和 RDB 文件,并验证集群是否一致
        serverLog(LL_NOTICE,"Server initialized");
        aofLoadManifestFromDisk();
        loadDataFromDisk();
        aofOpenIfNeededOnServerStart();
        aofDelHistoryFiles();
        if (server.cluster_enabled) {
            serverAssert(verifyClusterConfigWithData() == C_OK);
        }

        // 监听客户端连接
        for (j = 0; j < CONN_TYPE_MAX; j++) {
            connListener *listener = &server.listeners[j];
            if (listener->ct == NULL)
                continue;

            serverLog(LL_NOTICE,"Ready to accept connections %s", listener->ct->get_type(NULL));
        }

        // 监督模式下告知 systemd 服务器已准备好
        /* ... */
    } else {
        // Sentinel 模式下的初始化逻辑
        /* ... */
    }

    // 内存限制检查
    /* ... */

    // 设置 CPU 亲和性
    redisSetCpuAffinity(server.server_cpulist);
    // 调整进程 OOM 优先级
    setOOMScoreAdj(-1);

    // 启动事件循环
    aeMain(server.el);

    // 清理事件循环资源
    aeDeleteEventLoop(server.el);

    return 0;
}

容易看出,在启动一个 Redis 实例后,程序依次进行了如下工作:

  • 连接初始化
  • 配置文件加载与初始化
  • 快照数据恢复
  • 集群和哨兵初始化
  • 事件循环主逻辑

类似 Node.js,Redis 是事件驱动的。

从事件处理角度来说,Redis 是单线程的,它使用了一个事件循环来处理事件。事件循环是一个无限循环,不断地从事件队列中取出事件,然后执行事件对应的回调函数。

Redis 事件主要包含两类:

  • 文件事件:Redis 通过 socket 与客户端通信,通过文件事件来处理客户端的请求;
  • 时间事件:Redis 通过时间事件来执行定时任务。

事件循环通过 aeMain 作为入口启动。

点击查看源码
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

它调用了 aeProcessEvents 函数。

点击查看源码
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    /* flags 包含了处理哪些、如何处理事件 */
    // processed 为已经处理的事件数
    // numevents 为需要处理的事件数
    int processed = 0, numevents;

    // 事件事件和文件事件都不需要处理,则直接返回
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) &&
        !(flags & AE_DONT_WAIT))) {
        int j;
        struct timeval tv, *tvp = NULL;
        int64_t usUntilTimer;

        if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP))
            eventLoop->beforesleep(eventLoop);

        // 根据 AE_DONT_WAIT 或时间事件的最近触发时间,设置调用多路复用 API 的超时时间
        // 如果没有文件事件且没有等待时间,则多路复用会无限期阻塞
        if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) {
            tv.tv_sec = tv.tv_usec = 0;
            tvp = &tv;
        } else if (flags & AE_TIME_EVENTS) {
            usUntilTimer = usUntilEarliestTimer(eventLoop);
            if (usUntilTimer >= 0) {
                tv.tv_sec = usUntilTimer / 1000000;
                tv.tv_usec = usUntilTimer % 1000000;
                tvp = &tv;
            }
        }
        
        // 多路复用处理 socket
        numevents = aeApiPoll(eventLoop, tvp);

        // 如果不需要处理文件事件,则不处理文件事件
        if (!(flags & AE_FILE_EVENTS)) {
            numevents = 0;
        }

        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        // 处理文件事件
        for (j = 0; j < numevents; j++) {
            int fd = eventLoop->fired[j].fd;
            aeFileEvent *fe = &eventLoop->events[fd];
            int mask = eventLoop->fired[j].mask;
            int fired = 0;

            // 先处理读事件,再处理写事件;如果设置了 AE_BARRIER 则反转顺序
            int invert = fe->mask & AE_BARRIER;

            // 读事件
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
                fe = &eventLoop->events[fd];
            }
            // 写事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            // 反转顺序的情况
            if (invert) {
                fe = &eventLoop->events[fd];
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
                {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }

    // 处理时间事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    // 返回已经处理的事件数
    return processed;
}

这里看到了几个关键点:

  • Redis 使用多路复用处理 socket
  • 事件循环先处理文件事件,再处理时间事件
  • 事件循环有阻塞和非阻塞两种模式
  • 用户可以通过 beforesleepaftersleep 回调函数来处理一些自定义的逻辑,如日志或统计等

这里的文件事件和时间事件是 Redis 事件循环的核心:

  • 文件事件是指与文件描述符相关的事件,如读取 socket 数据、向 socket 写入数据等。Redis 使用 I/O 多路复用技术来处理文件事件;
  • 时间事件是指需要在特定时间点或时间间隔内执行的事件,如清理过期键、持久化、统计和监控等。

接下来,我们会具体来看它们的实现。

I/O 多路复用

对于 Redis 的使用场景来讲,网络带宽和内存带宽才是真正的瓶颈。如果使用 readwrite 系统调用,那么可能会导致阻塞,从而浪费 CPU 时间。因此,Redis 使用了非阻塞的 I/O 多路复用技术,如 epollselect 等,来监听多个文件描述符的可读、可写和异常事件。

epoll 是 Linux 下的一种 I/O 多路复用机制。它的使用方法如下:

  • 使用 epoll_create 创建一个 epoll 实例:

    int epoll_create(int size);
    

    epoll_create 的返回值是一个文件描述符,用于后续的操作。

    epoll_create1epoll_create 的一个变种,可以传入一个 flags 参数。假如 flagsEPOLL_CLOEXEC,则在 exec 时关闭文件描述符。

  • 使用 epoll_ctl 添加/删除/修改监听的文件描述符:

    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    
  • 使用 epoll_wait 等待事件发生:

    int epoll_wait(int epfd, struct epoll_event*events, int maxevents, int timeout);
    

Redis 支持多种 I/O 多路复用程序,其实现在 ae_epoll.cae_evport.cae_kqueue.cae_select.c 中。Redis 为它们包装了统一的接口,并会自动选择性能最好的使用。

点击查看源码
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

我们以 ae_epoll 为例。

点击查看源码
typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

static int aeApiCreate(aeEventLoop *eventLoop) {
    /* ... */
    // 分配 epoll_event
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    /* ... */
    // 创建 epoll 文件描述符
    state->epfd = epoll_create(1024);
    /* ... */
}

static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
    /* ... */
    // 重新分配 epoll_event
    state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);
    /* ... */
}

static void aeApiFree(aeEventLoop *eventLoop) {
    /* ... */
    // 关闭 epoll 文件描述符
    close(state->epfd);
    /* ... */
}

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    /* ... */
    // 根据文件描述符是否已经被监控,确定是 EPOLL_CTL_ADD 还是 EPOLL_CTL_MOD
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    /* ... */
    // 添加监听的文件描述符
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    /* ... */
}

static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
    /* ... */
    // 删除监听的文件描述符
    epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
    /* ... */
}

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    /* ... */
    // 等待事件发生
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
    if (retval > 0) {
        int j;
        numevents = retval;
        // 遍历返回的事件
        for (j = 0; j < numevents; j++) {
            /* ... */
        }
    }
    /* ... */
}

/* ... */

可以看到,它将开启、关闭、增加、删除、处理这五个事件进行了封装处理。

时间事件

处理事件事件的主要逻辑在 processTimeEvents 函数中。

点击查看源码
static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0; // 处理的事件数量
    aeTimeEvent *te; // 时间事件链表的头部
    long long maxId; // 当前循环中可处理的最大时间事件 ID

    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    monotime now = getMonotonicUs();

    // 遍历所有的时间事件节点
    while(te) {
        long long id;

        // 如果事件被标记为删除,则根据引用计数决定是否释放内存
        if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            
            if (te->refcount) {
                te = next;
                continue;
            }
            if (te->prev)
                te->prev->next = te->next;
            else
                eventLoop->timeEventHead = te->next;
            if (te->next)
                te->next->prev = te->prev;
            if (te->finalizerProc) {
                te->finalizerProc(eventLoop, te->clientData);
                now = getMonotonicUs();
            }
            zfree(te);
            te = next;
            continue;
        }

        // 检查事件 ID 是否超出最大值
        // 防止在当前迭代中处理新创建的时间事件
        if (te->id > maxId) {
            te = te->next;
            continue;
        }

        // 检查当前事件是否到期
        if (te->when <= now) {
            int retval;

            id = te->id;
            te->refcount++;

            // 调用时间处理函数
            retval = te->timeProc(eventLoop, id, te->clientData);

            te->refcount--;
            processed++;
            now = getMonotonicUs();
            if (retval != AE_NOMORE) {
                // 事件需要重新调度,更新触发时间
                te->when = now + (monotime)retval * 1000;
            } else {
                // 标记事件为删除
                te->id = AE_DELETED_EVENT_ID;
            }
        }

        // 移动到下一个事件
        te = te->next;
    }

    // 返回处理的事件数量
    return processed;
}

可以看到,Redis 的时间事件是一个双向链表,timeEventHead 是链表的头部。每个时间事件都有一个 ID 和一个触发时间 when。当 when 小于等于当前时间时,就会调用对应的回调函数。

同时,函数会检查引用计数和最大 ID,确保能够正确地处理事件。

Series 深入 Redis 源码

Comments

Share This Post