Ch3nyang's blog web_stories

post

assignment_ind

about

data_object

github

local_offer

tag

rss_feed

rss

如何让一台服务器接受尽可能多的连接?

早在 1999 年,Dan Kegel 就提出了 C10K 问题1,旨在解决如何让一台服务器同时处理 \(10,000\) 个连接的问题;到了 2010 年前后,这个问题变为了 C10M——即如何用 1U 服务器处理 \(10,000,000\) 个连接;而如今,随着服务器性能和网络技术的发展,这个数字还在不断攀升。

在最基本的 socket 编程模型中,服务器通常采用阻塞 I/O 模型,即每个连接都对应一个线程或进程。当面临成千上万个连接时,我们就不得不开启成千上万个线程或进程,这会消耗大量的系统资源,并导致频繁的上下文切换,严重影响性能。

为了解决这个问题,操作系统提供了多种 I/O 模型,使得单个线程或进程能够同时处理多个连接,从而大幅提升服务器的并发处理能力。今天我们就来聊聊,是什么技术,使得这一切成为可能。

select

基本使用

select 是 Unix/Linux 系统中最传统的 ​​I/O 多路复用机制之一,它允许程序同时监视多个 fd(如 socket、管道等),并在其中任何一个 fd 就绪(读 / 写 / 异常)时通知应用程序进行处理。

select 的函数原型在 sys/select.h 中:

int select (int nfds,
            fd_set *readfds,
            fd_set *writefds,
            fd_set *exceptfds,
            struct timeval *timeout);

其中,

  • nfds 是需要监视的 fd 的数量;
  • readfds 是一个 fd 集合,用于监视可读事件;
  • writefds 是一个 fd 集合,用于监视可写事件;
  • exceptfds 是一个 fd 集合,用于监视异常事件;
  • timeout 是一个指向 struct timeval 的指针,用于设置超时时间。设置为 NULL 表示无限等待,设置为 {0, 0} 表示立即返回。

select 函数会阻塞当前线程,直到其中有至少一个 fd 就绪,或者长时间没有 fd 就绪而导致超时。它的返回值:

  • 正常情况下,返回就绪的 fd 数量;
  • 如果超时,返回 0
  • 如果出错,返回 -1

fd_set 是一个特殊的结构体,用于表示 fd 集合。它被定义为:

#define __FD_SETSIZE 1024

typedef long int __fd_mask;
#define __NFDBITS (sizeof(__fd_mask) * 8)

typedef struct {
    __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
} fd_set;

fd_set 是一个 bitmap,每个 fd 对应一个比特位,比特位为 1 表示 fd 已就绪。它总共有 __FD_SETSIZE 即 1024 位,因此 select 最多只能监视 1024 个 fd 。

fd_set 主要依靠四个宏来操作:

  • FD_ZERO(fdsetp):将 fdsetp 中的所有比特位置 0
  • FD_SET(fd, fdsetp):将 fdsetp 中对应 fd 的比特位置 1
  • FD_CLR(fd, fdsetp):将 fdsetp 中对应 fd 的比特位置 0
  • FD_ISSET(fd, fdsetp):检查 fdsetp 中对应 fd 的比特位是否为 1

这些宏的实现在 select.h 中。

点击查看源码
#define __FDS_BITS(set) ((set)->fds_bits)

#define __FD_ZERO(s)                                                 \
    do {                                                             \
        unsigned int __i;                                            \
        fd_set *__arr = (s);                                         \
        for (__i = 0; __i < sizeof(fd_set)/sizeof(__fd_mask); ++__i) \
            __FDS_BITS[__arr](__i) = 0;                              \
    } while (0)
#define __FD_SET(d, s)                                               \
    ((void)(__FDS_BITS[s](__FD_ELT(d)) |= __FD_MASK(d)))
#define __FD_CLR(d, s)                                               \
    ((void)(__FDS_BITS[s](__FD_ELT(d)) &= ~__FD_MASK(d)))
#define __FD_ISSET(d, s)                                             \
    ((__FDS_BITS[s](__FD_ELT(d))&__FD_MASK(d)) != 0)

#define FD_SET(fd, fdsetp)   __FD_SET (fd, fdsetp)
#define FD_CLR(fd, fdsetp)   __FD_CLR (fd, fdsetp)
#define FD_ISSET(fd, fdsetp) __FD_ISSET (fd, fdsetp)
#define FD_ZERO(fdsetp)      __FD_ZERO (fdsetp)

在使用 select 时,需要依次:

  1. 使用 FD_ZERO 清空 fd_set
  2. 使用 FD_SET 将需要监视的 fd 添加到 fd_set
  3. 调用 select 函数进行监视
  4. 对返回的 fd_set 进行处理,使用 FD_ISSET 检查哪些 fd 就绪

下面是一个典型的 select 使用示例2

点击查看代码
#include <stdio.h>
#include <stdlib.h>
#include <sys/select.h>
#include <sys/time.h>

int
main(void)
{
    int             retval;
    fd_set          rfds;
    struct timeval  tv;

    // 监控 stdin (fd 0) 是否有输入
    FD_ZERO(&rfds);
    FD_SET(0, &rfds);

    // 超时时间设置为 5 秒
    tv.tv_sec = 5;
    tv.tv_usec = 0;

    retval = select(1, &rfds, NULL, NULL, &tv);
    // 此时,tv 可能已经被修改

    if (retval == -1)
        perror("select()");
    else if (retval)
        printf("Data is available now.\n");
        // FD_ISSET(0, &rfds) 为真
    else
        printf("No data within five seconds.\n");

    exit(EXIT_SUCCESS);
}

源码解读

select 内部执行时经过了以下步骤:

  1. 将传入的 fd_set 拷贝至内核态
  2. 再新建三个 fd_set 作为输出(in / out / except)
  3. 遍历每一位,如果设置了监视且有事件发生,则将输出的对应位设置为 1
  4. 如果没有事件发生,则阻塞等待
  5. 如果有事件发生或超时,则返回输出
flowchart LR
  F[将 fd_set<br>拷贝至<br>内核态] --> G[新建三个<br>fd_set<br>作为输出] --> H[遍历<br>每一位<br>检查状态] --> I{fd 设置了监视<br>且有事件<br>发生?}
  I -->|是| J[将输出的<br>对应位<br>设置为 1]
  I -->|否| K{所有位<br>都检查<br>完了?}
  
  J --> K
  K -->|否| H
  K -->|是| N{超时或<br>有事件<br>发生?}
  N -->|否| H
  N -->|是| O[返回<br>输出]

为了合理调度资源,select 还引入了许多优化机制:

  • 栈空间优先

    select 需要处理的 fd 数量较少(少于 256)时,内核会优先使用栈空间来存储 fd_set,这样可以减少内存分配的开销。而当数量较多时,才会使用堆空间分配。

  • 忙等待

    忙等待主要被用于网络驱动中。对于高频小包网络应用,可以减少上下文切换和中断处理的开销,从而提高性能。

    忙等待的触发需要同时满足以下条件:

    • 网络子系统启用忙等待
    • 当前没有更高优先级的任务需要调度
    • 至少有一个socket设置了 SO_BUSY_POLL 选项
    • 没有检测到任何就绪事件

    在每次主循环检查时,会计算当前时间与上次忙等待开始的时间差,如果超过了忙等待的超时时间(通常设置为 2 个 CPU 周期),则会执行一次正常的检查流程。

  • 延迟唤醒

    在初始化时,wait->_qproc 会指向 __pollwait() 函数。这个函数相当于一个回调函数,当有事件发生时,内核会调用这个函数来唤醒等待的进程。

    当文件描述符就绪时,会将 _qproc 置为 NULL,这样后续检查时,可以避免重复唤醒。

以目前最新的 Linux 6.16 为例,select 的实现在 fs/select.c3

点击查看源码解析

系统调用的入口:

SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
                fd_set __user *, exp, struct __kernel_old_timeval __user *, tvp)
{
    return kern_select(n, inp, outp, exp, tvp);
}

核心处理函数:

static int kern_select(int n, fd_set __user *inp, fd_set __user *outp,
                       fd_set __user *exp, struct __kernel_old_timeval __user *tvp)
{
    struct timespec64 end_time, *to = NULL;
    struct __kernel_old_timeval tv;
    int ret;

    // 如果设置了超时时间,则拷贝进内核态并转换为 timespec64
    if (tvp) {
        if (copy_from_user(&tv, tvp, sizeof(tv)))
            return -EFAULT;

        to = &end_time;
        if (poll_select_set_timeout(to,
                                    tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),
                                     (tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))
            return -EINVAL;
    }

    // 处理核心逻辑(见下文)
    ret = core_sys_select(n, inp, outp, exp, to);
    // 处理结果
    return poll_select_finish(&end_time, tvp, PT_TIMEVAL, ret);
}

核心处理逻辑:

int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
                    fd_set __user *exp, struct timespec64 *end_time)
{
    fd_set_bits fds;
    void *bits;
    int ret, max_fds;
    size_t size, alloc_size;
    struct fdtable *fdt;
    long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];

    ret = -EINVAL;
    if (unlikely(n < 0))
        goto out_nofds;

    rcu_read_lock();
    fdt = files_fdtable(current->files);
    max_fds = fdt->max_fds;
    rcu_read_unlock();
    if (n > max_fds)
        n = max_fds;

    // 总共需要 6 个 bitmap,分别存储 in/out/ex 的输入和输出
    size = FDS_BYTES(n);
    // 如果数量较少,则使用栈分配
    bits = stack_fds;
    if (size > sizeof(stack_fds) / 6) {
        ret = -ENOMEM;
        if (size > (SIZE_MAX / 6))
            goto out_nofds;
        alloc_size = 6 * size;
        // 如果数量较多,则使用堆分配
        bits = kvmalloc(alloc_size, GFP_KERNEL);
        if (!bits)
            goto out_nofds;
    }
    fds.in      = bits;
    fds.out     = bits +   size;
    fds.ex      = bits + 2*size;
    fds.res_in  = bits + 3*size;
    fds.res_out = bits + 4*size;
    fds.res_ex  = bits + 5*size;

    // 将用户态的 fd_set 拷贝至内核态
    if ((ret = get_fd_set(n, inp, fds.in)) ||
        (ret = get_fd_set(n, outp, fds.out)) ||
        (ret = get_fd_set(n, exp, fds.ex)))
        goto out;
    // 清空输出
    zero_fd_set(n, fds.res_in);
    zero_fd_set(n, fds.res_out);
    zero_fd_set(n, fds.res_ex);

    // 调用 do_select 进行实际监控(见下文)
    ret = do_select(n, &fds, end_time);

    if (ret < 0)
        goto out;
    if (!ret) {
        ret = -ERESTARTNOHAND;
        if (signal_pending(current))
            goto out;
        ret = 0;
    }

    // 将结果拷贝回用户态
    if (set_fd_set(n, inp, fds.res_in) ||
        set_fd_set(n, outp, fds.res_out) ||
        set_fd_set(n, exp, fds.res_ex))
        ret = -EFAULT;

 out:
     if (bits != stack_fds)
         kvfree(bits);
 out_nofds:
       return ret;
}

关键监控逻辑:

// 可读事件包括:普通数据可读、优先级带数据可读、数据可读、挂起、错误、无效
#define POLLIN_SET  (EPOLLRDNORM | EPOLLRDBAND | EPOLLIN | EPOLLHUP | EPOLLERR | \
                     EPOLLNVAL)
// 可写事件包括:普通数据可写、优先级带数据可写、数据可写、错误、无效
#define POLLOUT_SET (EPOLLWRBAND | EPOLLWRNORM | EPOLLOUT | EPOLLERR |           \
                     EPOLLNVAL)
// 异常事件包括:高优先级数据可读、无效
#define POLLEX_SET  (EPOLLPRI | EPOLLNVAL)

static noinline_for_stack int do_select(int n, fd_set_bits *fds, struct timespec64*end_time)
{
    ktime_t expire, *to = NULL;
    struct poll_wqueues table;
    poll_table*wait;
    int retval, i, timed_out = 0;
    u64 slack = 0;
    __poll_t busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;
    unsigned long busy_start = 0;

    // 获取最大的有效 fd
    rcu_read_lock();
    retval = max_select_fd(n, fds);
    rcu_read_unlock();
    if (retval < 0)
        return retval;
    n = retval;

    // 初始化等待队列
    poll_initwait(&table);
    wait = &table.pt;

    // 处理已经超时的情况
    if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
        wait->_qproc = NULL;
        timed_out = 1;
    }
    if (end_time && !timed_out)
        slack = select_estimate_accuracy(end_time);
    retval = 0;

    // 循环检查文件描述符状态
    for (;;) {
        unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
        bool can_busy_loop = false;

        inp = fds->in; outp = fds->out; exp = fds->ex;
        rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;

        // 遍历所有文件描述符
        for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
            unsigned long in, out, ex, all_bits, bit = 1, j;
            unsigned long res_in = 0, res_out = 0, res_ex = 0;
            __poll_t mask;

            // 每次处理 BITS_PER_LONG 位 fd
            in = *inp++; out = *outp++; ex = *exp++;
            all_bits = in | out | ex;
            // 如果这一段 fd 没有设置任何监控,则跳过
            if (all_bits == 0) {
                i += BITS_PER_LONG;
                continue;
            }

            // 检查这一段 fd 的每个 bit 位
            for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {
                if (i >= n)
                    break;
                // 如果该 bit 位没有设置监控,则跳过
                if (!(bit & all_bits))
                    continue;
                // 如果该 bit 位设置了监控,则调用 select_poll_one 检查状态(见下文)
                mask = select_poll_one(i, wait, in, out, bit, busy_flag);
                // 检测到可读事件
                if ((mask & POLLIN_SET) && (in & bit)) {
                    res_in |= bit;
                    retval++;
                    wait->_qproc = NULL;
                }
                // 检测到可写事件
                if ((mask & POLLOUT_SET) && (out & bit)) {
                    res_out |= bit;
                    retval++;
                    wait->_qproc = NULL;
                }
                // 检测到异常事件
                if ((mask & POLLEX_SET) && (ex & bit)) {
                    res_ex |= bit;
                    retval++;
                    wait->_qproc = NULL;
                }
                if (retval) {
                    // 如果检测到事件,则结束忙等待
                    can_busy_loop = false;
                    busy_flag = 0;
                } else if (busy_flag & mask)
                    can_busy_loop = true;
            }
            // 保存结果
            if (res_in)
                *rinp = res_in;
            if (res_out)
                *routp = res_out;
            if (res_ex)
                *rexp = res_ex;
            // 重新调度
            cond_resched();
        }
        // 清空等待队列
        wait->_qproc = NULL;
        // 检查是否有就绪的文件描述符或超时
        if (retval || timed_out || signal_pending(current))
            break;
        if (table.error) {
            retval = table.error;
            break;
        }

        // 检查是否可以进入忙等待
        if (can_busy_loop && !need_resched()) {
            if (!busy_start) {
                busy_start = busy_loop_current_time();
                continue;
            }
            if (!busy_loop_timeout(busy_start))
                continue;
        }
        busy_flag = 0;

        if (end_time && !to) {
            expire = timespec64_to_ktime(*end_time);
            to = &expire;
        }

        // 进入等待
        if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
                       to, slack))
            timed_out = 1;
    }

    poll_freewait(&table);

    return retval;
}

具体检查某一个 fd 的实现:

static inline __poll_t select_poll_one(int fd, poll_table *wait, unsigned long in,
                                       unsigned long out, unsigned long bit,
                                       __poll_t ll_flag)
{
    CLASS(fd, f)(fd);

    if (fd_empty(f))
        return EPOLLNVAL;
    
    // 异常或者低延迟总是会进行检测
    wait->_key = POLLEX_SET | ll_flag;
    // 设置可读和可写的监控位
    if (in & bit)
        wait->_key |= POLLIN_SET;
    if (out & bit)
        wait->_key |= POLLOUT_SET;
    
    // 读取 fd 的状态
    return vfs_poll(fd_file(f), wait);
}

根据这段源码,除了前文所述的几个优化点,还可以发现,尽管在运行时区分了输入的 fd_set 和输出的 fd_set,但返回结果时,输出会覆盖掉原来的输入。因此,select 在每次使用后,都必须重新设置所有传入的参数

讨论

综合以上讨论,我们得出结论,select 存在如下问题:

  • 最多只能监视 1024 个 fd
  • 每次调用 select 都需要将用户传入的集合拷贝进出内核,开销较大
  • 每次返回结果后,都需要重新设置参数
  • 运行时需要遍历集合的每一位,效率较低

那么,有没有更好的改进方案呢?

poll

基本使用

poll 是对 select 的改进,它解决了 select 的一些限制,比如 fd 数量的限制和集合拷贝的问题。

poll 的函数原型在 poll.h 中:

int poll (struct pollfd *fds,
          nfds_t nfds,
          int timeout)

类似的,其中:

  • fds 是一个指向 pollfd 结构体数组的指针,每个结构体表示一个 fd 及其感兴趣的事件

    pollfd 结构体定义在 sys/poll.h 中:

    struct pollfd {
        int fd;            // 要监视的文件描述符
        short int events;  // 感兴趣的事件类型
        short int revents; // 就绪的事件类型
    };
    

    eventsrevents 字段均为 bitmap,用于表示事件类型。不同的系统可能支持不同的类型,例如对于 GNU/Linux 系统,支持的事件类型定义在 bits/poll.h 中:

    // 所有系统共有
    #define POLLIN     0x001  // 可读事件
    #define POLLPRI    0x002  // 紧急数据事件
    #define POLLOUT    0x004  // 可写事件
    #define POLLERR    0x008  // 错误事件
    #define POLLHUP    0x010  // 挂起事件
    #define POLLNVAL   0x020  // 无效请求
    // GNU 特有
    #define POLLMSG    0x400  // 消息事件
    #define POLLREMOVE 0x1000 // 移除事件
    #define POLLRDHUP  0x2000 // 读挂起事件
    
  • nfds 是数组中元素的数量;
  • timeout 是等待事件发生的最长时间(毫秒),可以设置为 -1 表示无限等待。

下面是一个典型的 poll 使用示例4

点击查看代码
#include <fcntl.h>
#include <poll.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>

#define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); \
                        } while (0)

int
main(int argc, char *argv[])
{
    int            ready;
    char           buf[10];
    nfds_t         num_open_fds, nfds;
    ssize_t        s;
    struct pollfd  *pfds;

    if (argc < 2) {
      fprintf(stderr, "Usage: %s file...\n", argv[0]);
      exit(EXIT_FAILURE);
    }

    num_open_fds = nfds = argc - 1;
    pfds = calloc(nfds, sizeof(struct pollfd));
    if (pfds == NULL)
        errExit("malloc");

    /* Open each file on command line, and add it to 'pfds' array. */

    for (nfds_t j = 0; j < nfds; j++) {
        pfds[j].fd = open(argv[j + 1], O_RDONLY);
        if (pfds[j].fd == -1)
            errExit("open");

        printf("Opened \"%s\" on fd %d\n", argv[j + 1], pfds[j].fd);

        pfds[j].events = POLLIN;
    }

    /* Keep calling poll() as long as at least one file descriptor is
      open. */

    while (num_open_fds > 0) {
        printf("About to poll()\n");
        ready = poll(pfds, nfds, -1);
        if (ready == -1)
            errExit("poll");

        printf("Ready: %d\n", ready);

        /* Deal with array returned by poll(). */

        for (nfds_t j = 0; j < nfds; j++) {
            if (pfds[j].revents != 0) {
                printf("  fd=%d; events: %s%s%s\n", pfds[j].fd,
                       (pfds[j].revents & POLLIN)  ? "POLLIN "  : "",
                       (pfds[j].revents & POLLHUP) ? "POLLHUP " : "",
                       (pfds[j].revents & POLLERR) ? "POLLERR " : "");

                if (pfds[j].revents & POLLIN) {
                    s = read(pfds[j].fd, buf, sizeof(buf));
                    if (s == -1)
                        errExit("read");
                    printf("    read %zd bytes: %.*s\n",
                          s, (int) s, buf);
                } else {                /* POLLERR | POLLHUP */
                    printf("    closing fd %d\n", pfds[j].fd);
                    if (close(pfds[j].fd) == -1)
                        errExit("close");
                    num_open_fds--;
                }
            }
        }
    }

    printf("All file descriptors closed; bye\n");
    exit(EXIT_SUCCESS);
}

源码解读

poll 的工作原理与 select 类似,都是通过阻塞等待 fd 的状态变化来实现 I/O 多路复用。不同之处在于,poll 不再使用位图来表示 fd,而是使用数组,这样就可以支持更多的 fd。

更确切地讲,用户设置的 pollfd 数组进入内核态处理后会被转换为 poll_list 节点组成的链表,每个节点包含多个 pollfd 结构体。fd 的处理以 poll_list 节点为单位,这样可以减少内存拷贝的开销。

poll_list 节点如下所示:

struct poll_list {
    struct poll_list *next;
    unsigned int len;
    struct pollfd entries[] __counted_by(len);
};

以目前最新的 Linux 6.16 为例,poll 的实现同样也在 fs/select.c3

点击查看源码解析

系统调用的入口:

SYSCALL_DEFINE3(poll, struct pollfd __user *, ufds, unsigned int, nfds,
                int, timeout_msecs)
{
    struct timespec64 end_time, *to = NULL;
    int ret;

    // 设置超时时间
    if (timeout_msecs >= 0) {
        to = &end_time;
        poll_select_set_timeout(to, timeout_msecs / MSEC_PER_SEC,
                                NSEC_PER_MSEC * (timeout_msecs % MSEC_PER_SEC));
    }

    // 调用核心处理函数(见下文)
    ret = do_sys_poll(ufds, nfds, to);

    // 如果被信号中断,设置重启块
    if (ret == -ERESTARTNOHAND) {
        struct restart_block *restart_block;

        restart_block = &current->restart_block;
        restart_block->poll.ufds = ufds;
        restart_block->poll.nfds = nfds;

        if (timeout_msecs >= 0) {
            restart_block->poll.tv_sec = end_time.tv_sec;
            restart_block->poll.tv_nsec = end_time.tv_nsec;
            restart_block->poll.has_timeout = 1;
        } else
            restart_block->poll.has_timeout = 0;

        ret = set_restart_fn(restart_block, do_restart_poll);
    }
    return ret;
}

核心处理函数:

static int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds,
                       struct timespec64 *end_time)
{
    struct poll_wqueues table;
    int err = -EFAULT, fdcount;
    // 使用栈空间存储最开始的一段 poll_list,以减少内存分配
    long stack_pps[POLL_STACK_ALLOC/sizeof(long)];
    struct poll_list *const head = (struct poll_list *)stack_pps;
    struct poll_list *walk = head;
    unsigned int todo = nfds;
    unsigned int len;

    // 检查文件描述符数量限制
    if (nfds > rlimit(RLIMIT_NOFILE))
        return -EINVAL;

    // 根据栈空间大小,计算首次能够处理的 pollfd 最大数量
    len = min_t(unsigned int, nfds, N_STACK_PPS);
    // 将用户传入的 pollfd 数组构建为 poll_list 链表
    for (;;) {
        walk->next = NULL;
        walk->len = len;
        if (!len)
            break;

        // 从用户态拷贝数据到内核态
        if (copy_from_user(walk->entries, ufds + nfds-todo,
                           sizeof(struct pollfd) * walk->len))
            goto out_fds;

        if (walk->len >= todo)
            break;
        todo -= walk->len;

        // 如果栈空间不足,则动态分配内存
        len = min(todo, POLLFD_PER_PAGE);
        walk = walk->next = kmalloc(struct_size(walk, entries, len),
                                    GFP_KERNEL);
        if (!walk) {
            err = -ENOMEM;
            goto out_fds;
        }
    }

    // 初始化等待队列
    poll_initwait(&table);
    // 核心处理逻辑(见下文)
    fdcount = do_poll(head, &table, end_time);
    poll_freewait(&table);

    if (!user_write_access_begin(ufds, nfds * sizeof(*ufds)))
        goto out_fds;

    for (walk = head; walk; walk = walk->next) {
        struct pollfd *fds = walk->entries;
        unsigned int j;

        // 将内核态的结果拷贝回用户态
        for (j = walk->len; j; fds++, ufds++, j--)
            unsafe_put_user(fds->revents, &ufds->revents, Efault);
      }
    user_write_access_end();

    err = fdcount;
out_fds:
    walk = head->next;
    while (walk) {
        struct poll_list *pos = walk;
        walk = walk->next;
        kfree(pos);
    }
    return err;

Efault:
    user_write_access_end();
    err = -EFAULT;
    goto out_fds;
}

核心处理逻辑:

static int do_poll(struct poll_list *list, struct poll_wqueues *wait,
                   struct timespec64 *end_time)
{
    poll_table* pt = &wait->pt;
    ktime_t expire, *to = NULL;
    int timed_out = 0, count = 0;
    u64 slack = 0;
    __poll_t busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;
    unsigned long busy_start = 0;

    // 处理已经超时的情况
    if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
        pt->_qproc = NULL;
        timed_out = 1;
    }

    if (end_time && !timed_out)
        slack = select_estimate_accuracy(end_time);

    // 主循环
    for (;;) {
        struct poll_list *walk;
        bool can_busy_loop = false;

        // 遍历所有的 poll_list 节点
        for (walk = list; walk != NULL; walk = walk->next) {
            struct pollfd * pfd, * pfd_end;

            pfd = walk->entries;
            pfd_end = pfd + walk->len;
            // 遍历 poll_list 节点中的每个 pollfd
            for (; pfd != pfd_end; pfd++) {
                __poll_t mask;
                // 关键监控逻辑(见下文)
                mask = do_pollfd(pfd, pt, &can_busy_loop, busy_flag);
                // 处理就绪的事件
                pfd->revents = mangle_poll(mask);
                if (mask) {
                    count++;
                    pt->_qproc = NULL;
                    busy_flag = 0;
                    can_busy_loop = false;
                }
            }
        }

        pt->_qproc = NULL;
        if (!count) {
            count = wait->error;
            if (signal_pending(current))
                count = -ERESTARTNOHAND;
        }

        // 检查是否超时
        if (count || timed_out)
            break;

        // 检查是否需要进入忙等待
        if (can_busy_loop && !need_resched()) {
            if (!busy_start) {
                busy_start = busy_loop_current_time();
                continue;
            }
            if (!busy_loop_timeout(busy_start))
                continue;
        }
        busy_flag = 0;

        if (end_time && !to) {
            expire = timespec64_to_ktime(*end_time);
            to = &expire;
        }

        // 进入等待
        if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack))
            timed_out = 1;
    }
    return count;
}

关键监控逻辑:

static inline __poll_t do_pollfd(struct pollfd *pollfd, poll_table *pwait,
                                 bool *can_busy_poll,
                                 __poll_t busy_flag)
{
    int fd = pollfd->fd;
    __poll_t mask, filter;

    if (unlikely(fd < 0))
        return 0;

    CLASS(fd, f)(fd);
    if (fd_empty(f))
        return EPOLLNVAL;

    filter = demangle_poll(pollfd->events) | EPOLLERR | EPOLLHUP;
    pwait->_key = filter | busy_flag;
    mask = vfs_poll(fd_file(f), pwait);
    if (mask & busy_flag)
        *can_busy_poll = true;
    return mask & filter;
}

讨论

可以看到,poll 的使用方式与 select 类似,但它相比之下做了很多优化:

  • fd 的数量不再有限制,仅需控制在最大内存范围内;
  • 不需要每次设置集合;
  • 支持更多的事件类型。

然而,它仍然存在一些不足之处:

  • 仍然需要遍历整个数组来检查就绪的 fd,效率较低;
  • 水平触发模式(即只要 fd 就绪但没有处理,poll 就会一直返回它)可能导致应用程序频繁被唤醒,浪费 CPU 资源。

Linux 2.5.44 内核中首次引入的 epoll,改进了上述问题。

epoll

基本使用

epoll 是 Linux 特有的 I/O 多路复用机制,它在 sys/epoll.h 中总共有 3 个系统调用:

// 创建 epoll 实例
int epoll_create(int size);   // 旧版本
int epoll_create1(int flags); // 新版本

// 控制 epoll 实例
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

// 等待事件
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

其中,

  • epoll_create
    • size:指定要监视的 fd 的数量,通常设置为 1
  • epoll_create1
    • flags:当设置为 0 时,表现和 epoll_create 相同;如果设置为 EPOLL_CLOEXEC,则在执行 exec 时自动关闭该 fd ,防止 fd 泄漏。
  • epoll_ctl
    • epfdepoll 实例的 fd ;
    • op:操作类型(ADD / DEL / MOD);
    • fd:要监视的 fd ;
    • event:感兴趣的事件类型。
  • epoll_wait
    • epfdepoll 实例的 fd ;
    • events:就绪事件的数组;
    • maxevents:数组的最大容量;
    • timeout:超时时间(毫秒)。

epoll_create v.s. epoll_create1

epoll_create 是较早的版本,参数 size 在现代 Linux 系统中已经被忽略,可以随意设置;而 epoll_create1 是较新的版本,增加了 flags 参数,允许设置 fd 的关闭行为,更加灵活和安全。

epoll 的工作原理与 selectpoll 类似,但它使用了事件驱动的方式,避免了遍历所有 fd 的开销。

epoll 为了解决 poll 的水平触发问题,提供了两种触发模式:

  • 水平触发 (Level-Triggered):当 fd 就绪时,epoll_wait 会一直返回它,直到应用程序处理完毕。
  • 边缘触发 (Edge-Triggered):当 fd 状态发生变化时,epoll_wait 只会在状态变化的瞬间返回一次,之后需要再次注册才能继续接收事件。

下面是一个典型的 epoll 使用示例5

点击查看代码
#define MAX_EVENTS 10
struct epoll_event ev, events[MAX_EVENTS];
int listen_sock, conn_sock, nfds, epollfd;

// 创建 epoll 实例
epollfd = epoll_create1(0);
if (epollfd == -1) {
    perror("epoll_create1");
    exit(EXIT_FAILURE);
}

// 添加监听 socket
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
    perror("epoll_ctl: listen_sock");
    exit(EXIT_FAILURE);
}

for (;;) {
    // 等待事件发生
    nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
    if (nfds == -1) {
        perror("epoll_wait");
        exit(EXIT_FAILURE);
    }

    // 处理就绪的事件
    for (int n = 0; n < nfds; ++n) {
        if (events[n].data.fd == listen_sock) {
            // 接受新连接
            conn_sock = accept(listen_sock,
                             (struct sockaddr *) &addr, &addrlen);
            if (conn_sock == -1) {
                perror("accept");
                exit(EXIT_FAILURE);
            }
            setnonblocking(conn_sock);
            // 添加新连接到 epoll
            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = conn_sock;
            if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,
                        &ev) == -1) {
                perror("epoll_ctl: conn_sock");
                exit(EXIT_FAILURE);
            }
        } else {
            // 处理数据
            do_use_fd(events[n].data.fd);
        }
    }
}

源码解读

epoll 的核心数据结构是一个红黑树(用于管理所有监视的 fd)和一个就绪队列(用于存储就绪的 fd)。

主要数据结构如下:

struct eventpoll {
    spinlock_t lock;
    struct mutex mtx;
    wait_queue_head_t wq;       // 等待队列
    wait_queue_head_t poll_wait; // poll 等待队列
    struct list_head rdllist;   // 就绪队列
    struct rb_root_cached rbr;  // 红黑树根节点
    struct epitem *ovflist;     // 溢出列表
    // ...
};

struct epitem {
    union {
        struct rb_node rbn;     // 红黑树节点
        struct rcu_head rcu;
    };
    struct list_head rdllink;   // 就绪队列链接
    struct epitem *next;
    struct epoll_filefd ffd;    // 文件描述符信息
    struct eventpoll *ep;       // 所属的 eventpoll
    struct epoll_event event;   // 用户设置的事件
    // ...
};

以目前最新的 Linux 6.16 为例,epoll 的实现在 fs/eventpoll.c6

点击查看源码解析

创建 epoll 实例:

SYSCALL_DEFINE1(epoll_create1, int, flags)
{
    return do_epoll_create(flags);
}

static int do_epoll_create(int flags)
{
    int error, fd;
    struct eventpoll *ep = NULL;
    struct file *file;

    // 检查 flags 参数
    if (flags & ~EPOLL_CLOEXEC)
        return -EINVAL;

    // 分配 eventpoll 结构
    error = ep_alloc(&ep);
    if (error < 0)
        return error;

    // 获取未使用的文件描述符
    fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
    if (fd < 0) {
        error = fd;
        goto out_free_ep;
    }

    // 创建匿名 inode 文件
    file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
                             O_RDWR | (flags & O_CLOEXEC));
    if (IS_ERR(file)) {
        error = PTR_ERR(file);
        goto out_free_fd;
    }
    ep->file = file;
    fd_install(fd, file);
    return fd;

out_free_fd:
    put_unused_fd(fd);
out_free_ep:
    ep_free(ep);
    return error;
}

控制 epoll 实例:

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
                struct epoll_event __user *, event)
{
    struct epoll_event epds;

    if (ep_op_has_event(op) &&
        copy_from_user(&epds, event, sizeof(struct epoll_event)))
        return -EFAULT;

    return do_epoll_ctl(epfd, op, fd, &epds, false);
}

int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds,
                bool nonblock)
{
    int error;
    struct fd f, tf;
    struct eventpoll *ep;
    struct epitem *epi;
    struct eventpoll *tep = NULL;

    // 获取 epoll 文件描述符
    f = fdget(epfd);
    if (!f.file)
        return -EBADF;

    // 获取目标文件描述符
    tf = fdget(fd);
    if (!tf.file) {
        error = -EBADF;
        goto error_fput;
    }

    // 验证是否为 epoll 实例
    if (!is_file_epoll(f.file)) {
        error = -EINVAL;
        goto error_tgt_fput;
    }

    ep = f.file->private_data;

    mutex_lock_nested(&ep->mtx, 0);
    // 在红黑树中查找对应的 epitem
    epi = ep_find(ep, tf.file, fd);

    error = -EINVAL;
    switch (op) {
    case EPOLL_CTL_ADD:
        if (!epi) {
            epds->events |= EPOLLERR | EPOLLHUP;
            error = ep_insert(ep, epds, tf.file, fd, full_check);
        }
        break;
    case EPOLL_CTL_DEL:
        if (epi)
            error = ep_remove(ep, epi);
        break;
    case EPOLL_CTL_MOD:
        if (epi) {
            epds->events |= EPOLLERR | EPOLLHUP;
            error = ep_modify(ep, epi, epds);
        }
        break;
    }
    mutex_unlock(&ep->mtx);

error_tgt_fput:
    fdput(tf);
error_fput:
    fdput(f);
    return error;
}

等待事件:

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
                int, maxevents, int, timeout)
{
    return do_epoll_wait(epfd, events, maxevents, timeout);
}

static int do_epoll_wait(int epfd, struct epoll_event __user *events,
                        int maxevents, int timeout)
{
    int error;
    struct fd f;
    struct eventpoll *ep;

    // 参数检查
    if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
        return -EINVAL;

    if (!access_ok(events, maxevents * sizeof(struct epoll_event)))
        return -EFAULT;

    // 获取 epoll 文件描述符
    f = fdget(epfd);
    if (!f.file)
        return -EBADF;

    if (!is_file_epoll(f.file)) {
        error = -EINVAL;
        goto error_fput;
    }

    ep = f.file->private_data;

    // 核心等待逻辑
    error = ep_poll(ep, events, maxevents, timeout);

error_fput:
    fdput(f);
    return error;
}

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
                  int maxevents, long timeout)
{
    int res = 0, eavail, timed_out = 0;
    u64 slack = 0;
    wait_queue_entry_t wait;
    ktime_t expires, *to = NULL;

    // 设置超时时间
    if (timeout > 0) {
        struct timespec64 end_time = ep_set_mstimeout(timeout);
        slack = select_estimate_accuracy(&end_time);
        to = &expires;
        *to = timespec64_to_ktime(end_time);
    } else if (timeout == 0) {
        timed_out = 1;
        write_lock_irq(&ep->lock);
        eavail = ep_events_available(ep);
        write_unlock_irq(&ep->lock);
        goto send_events;
    }

fetch_events:
    // 检查是否有就绪事件
    if (!ep_events_available(ep))
        ep_busy_loop(ep, timed_out);

    eavail = ep_events_available(ep);
    if (eavail)
        goto send_events;

    // 进入等待
    if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
        timed_out = 1;

    eavail = ep_events_available(ep);
    if (!eavail)
        goto fetch_events;

send_events:
    // 扫描就绪队列并发送事件
    res = ep_send_events(ep, events, maxevents);

    return res;
}

讨论

epoll 相比 selectpoll 有以下优势:

  • 无 fd 数量限制:可以监视任意数量的 fd(受系统资源限制);
  • 高效的事件通知:使用红黑树管理 fd,使用就绪队列存储就绪的 fd,无需遍历所有 fd;
  • 支持边缘触发:避免水平触发模式下的频繁唤醒;
  • 减少数据拷贝:使用内存映射(mmap)可以减少用户态和内核态之间的数据拷贝。

然而,epoll 仍然存在一些不足:

  • 仅适用于 Linux:不具有跨平台性;
  • 仍需系统调用:每次操作都需要陷入内核;
  • 同步 I/O:必须等待事件发生后才能处理。

为了进一步提升性能,Linux 5.1 引入了全新的异步 I/O 框架——io_uring

io_uring

io_uring 是 Linux 在 2019 年引入的全新异步 I/O 接口,由 Jens Axboe 设计开发7。它的设计目标是提供一个高性能、低延迟的异步 I/O 框架,解决传统 I/O 模型的性能瓶颈。

io_uring 的核心思想是通过两个无锁环形队列(Submission Queue 和 Completion Queue)在用户态和内核态之间传递 I/O 请求和完成事件,从而大幅减少系统调用开销。

graph LR
    A[用户空间] -->|提交 I/O 请求| B[SQ<br/>Submission Queue]
    B --> C[内核空间<br/>io_uring]
    C -->|完成事件| D[CQ<br/>Completion Queue]
    D --> A

主要特点:

  • 零拷贝:使用共享内存映射,避免数据在用户态和内核态之间拷贝;
  • 批量操作:可以一次性提交多个 I/O 请求,减少系统调用次数;
  • 真正的异步:支持真正的异步 I/O,无需轮询或阻塞等待;
  • 高性能:通过无锁队列和轮询模式,实现极低的延迟。

基本使用

io_uring 的使用需要通过三个系统调用:

// 设置 io_uring 实例
int io_uring_setup(unsigned entries, struct io_uring_params *p);

// 注册文件描述符或缓冲区
int io_uring_register(unsigned int fd, unsigned int opcode,
                     void *arg, unsigned int nr_args);

// 提交 I/O 请求和获取完成事件
int io_uring_enter(unsigned int fd, unsigned int to_submit,
                  unsigned int min_complete, unsigned int flags,
                  sigset_t *sig);

不过在实际使用中,通常会使用 liburing 库来简化操作8

下面是一个使用 liburing 的简单示例:

点击查看代码
#include <liburing.h>
#include <stdio.h>
#include <string.h>

#define QUEUE_DEPTH 1
#define BLOCK_SIZE  4096

int main(int argc, char *argv[]) {
    struct io_uring ring;
    struct io_uring_sqe *sqe;
    struct io_uring_cqe *cqe;
    char buffer[BLOCK_SIZE];
    int fd;

    // 初始化 io_uring
    if (io_uring_queue_init(QUEUE_DEPTH, &ring, 0) < 0) {
        perror("io_uring_queue_init");
        return 1;
    }

    // 打开文件
    fd = open(argv[1], O_RDONLY);
    if (fd < 0) {
        perror("open");
        return 1;
    }

    // 获取一个 SQE(提交队列条目)
    sqe = io_uring_get_sqe(&ring);
    if (!sqe) {
        fprintf(stderr, "Could not get SQE.\n");
        return 1;
    }

    // 准备读取操作
    io_uring_prep_read(sqe, fd, buffer, BLOCK_SIZE, 0);

    // 提交请求
    io_uring_submit(&ring);

    // 等待完成
    io_uring_wait_cqe(&ring, &cqe);

    // 检查结果
    if (cqe->res < 0) {
        fprintf(stderr, "Async read failed: %s\n", strerror(-cqe->res));
        return 1;
    }

    printf("Read %d bytes\n", cqe->res);

    // 标记完成
    io_uring_cqe_seen(&ring, cqe);

    // 清理
    close(fd);
    io_uring_queue_exit(&ring);

    return 0;
}

源码解读

io_uring 的核心数据结构包括:

struct io_ring_ctx {
    struct {
        struct io_rings *rings;           // 共享内存区域
        struct io_ring_sq sq;             // 提交队列
        struct io_ring_cq cq;             // 完成队列
    } ____cacheline_aligned_in_smp;
    
    struct io_wq *io_wq;                  // 工作队列
    struct io_sq_data *sq_data;           // SQ 线程数据
    // ...
};

struct io_rings {
    struct io_uring_sq sq;                // SQ 环形缓冲区
    struct io_uring_cq cq;                // CQ 环形缓冲区
};

io_uring 的实现在 io_uring/ 目录下9

点击查看关键流程

初始化流程:

SYSCALL_DEFINE2(io_uring_setup, u32, entries,
                struct io_uring_params __user *, params)
{
    return io_uring_setup(entries, params);
}

static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
{
    struct io_uring_params p;
    int ret;

    if (copy_from_user(&p, params, sizeof(p)))
        return -EFAULT;

    // 创建 io_uring 实例
    ret = io_uring_create(entries, &p, params);
    if (ret < 0)
        return ret;

    return ret;
}

提交和完成流程:

SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
                u32, min_complete, u32, flags, const void __user *, argp,
                size_t, argsz)
{
    struct io_ring_ctx *ctx;
    struct file *file;
    long ret;

    // 获取 io_uring 实例
    file = fget(fd);
    if (unlikely(!file))
        return -EBADF;

    ctx = file->private_data;

    // 提交 I/O 请求
    if (to_submit) {
        ret = io_submit_sqes(ctx, to_submit);
        if (ret < 0)
            goto out;
    }

    // 等待完成事件
    if (min_complete) {
        ret = io_cqring_wait(ctx, min_complete, argp, argsz, flags);
    }

out:
    fput(file);
    return ret;
}

讨论

io_uring 代表了 I/O 多路复用的最新发展方向:

  • 优势:

    • 极高的性能:通过减少系统调用和上下文切换,实现了比 epoll 更高的吞吐量;
    • 真正的异步:支持完全异步的 I/O 操作,包括文件 I/O、网络 I/O 等;
    • 灵活性强:支持多种 I/O 操作类型(readwriteacceptconnectfsync 等);
    • 可扩展性好:持续添加新特性,如链式操作、缓冲区选择等。
  • 挑战:

    • 复杂性高:API 相对复杂,学习曲线陡峭;
    • 兼容性:仅在较新的 Linux 内核(5.1+)中可用;
    • 安全问题:早期版本存在一些安全漏洞,需要谨慎使用。

总结

selectio_uring,我们见证了 I/O 多路复用技术的演进:

timeline
    title I/O 多路复用技术演进
    1983 : select
    1997 : poll
    2002 : epoll
    2019 : io_uring
特性 select poll epoll io_uring
fd 数量限制 \(1024\) 无限制 无限制 无限制
数据拷贝 每次调用 每次调用 仅注册时 零拷贝
遍历开销 \(O(n)\) \(O(n)\) \(O(1)\) \(O(1)\)
触发模式 水平触发 水平触发 水平/边缘 异步
跨平台 🟢 🟢 🔴 🔴
系统调用 每次轮询 每次轮询 每次轮询 批量提交

选择合适的 I/O 模型需要根据具体场景:

  • 兼容性优先:使用 selectpoll
  • 高并发场景:使用 epoll
  • 极致性能:使用 io_uring

随着硬件性能的提升和应用需求的增长,I/O 多路复用技术仍在不断发展。从 C10K 到 C10M,再到未来可能的 C100M,这场技术演进的故事还将继续。