如何让一台服务器接受尽可能多的连接?
The C10k problem is the problem of optimizing network sockets to handle a large number of clients at the same time. The name C10k is a numeronym for concurrently handling ten thousand connections. Handling many concurrent connections is a different problem from handling many requests per second: the latter requires high throughput (processing them quickly), while the former does not have to be fast, but requires efficient scheduling of connections.
早在 1999 年,Dan Kegel 就提出了 C10K
问题1,旨在解决如何让一台服务器同时处理 \(10,000\) 个连接的问题;到了 2010 年前后,这个问题变为了 C10M
——即如何用 1U 服务器处理 \(10,000,000\) 个连接;而如今,随着服务器性能和网络技术的发展,这个数字还在不断攀升。
在最基本的 socket 编程模型中,服务器通常采用阻塞 I/O 模型,即每个连接都对应一个线程或进程。当面临成千上万个连接时,我们就不得不开启成千上万个线程或进程,这会消耗大量的系统资源,并导致频繁的上下文切换,严重影响性能。
为了解决这个问题,操作系统提供了多种 I/O 模型,使得单个线程或进程能够同时处理多个连接,从而大幅提升服务器的并发处理能力。今天我们就来聊聊,是什么技术,使得这一切成为可能。
select
基本使用
select
是 Unix/Linux 系统中最传统的 I/O 多路复用机制之一,它允许程序同时监视多个 fd(如 socket、管道等),并在其中任何一个 fd 就绪(读 / 写 / 异常)时通知应用程序进行处理。
select
的函数原型在 sys/select.h
中:
int select (int nfds,
fd_set *readfds,
fd_set *writefds,
fd_set *exceptfds,
struct timeval *timeout);
其中,
-
nfds
是需要监视的 fd 的数量; -
readfds
是一个 fd 集合,用于监视可读事件; -
writefds
是一个 fd 集合,用于监视可写事件; -
exceptfds
是一个 fd 集合,用于监视异常事件; -
timeout
是一个指向struct timeval
的指针,用于设置超时时间。设置为NULL
表示无限等待,设置为{0, 0}
表示立即返回。
select
函数会阻塞当前线程,直到其中有至少一个 fd 就绪,或者长时间没有 fd 就绪而导致超时。它的返回值:
- 正常情况下,返回就绪的 fd 数量;
- 如果超时,返回
0
; - 如果出错,返回
-1
。
fd_set
是一个特殊的结构体,用于表示 fd 集合。它被定义为:#define __FD_SETSIZE 1024 typedef long int __fd_mask; #define __NFDBITS (sizeof(__fd_mask) * 8) typedef struct { __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS]; } fd_set;
fd_set
是一个 bitmap,每个 fd 对应一个比特位,比特位为1
表示 fd 已就绪。它总共有__FD_SETSIZE
即 1024 位,因此select
最多只能监视 1024 个 fd 。
fd_set
主要依靠四个宏来操作:
FD_ZERO(fdsetp)
:将fdsetp
中的所有比特位置0
FD_SET(fd, fdsetp)
:将fdsetp
中对应fd
的比特位置1
FD_CLR(fd, fdsetp)
:将fdsetp
中对应fd
的比特位置0
FD_ISSET(fd, fdsetp)
:检查fdsetp
中对应fd
的比特位是否为1
这些宏的实现在
select.h
中。点击查看源码
#define __FDS_BITS(set) ((set)->fds_bits) #define __FD_ZERO(s) \ do { \ unsigned int __i; \ fd_set *__arr = (s); \ for (__i = 0; __i < sizeof(fd_set)/sizeof(__fd_mask); ++__i) \ __FDS_BITS[__arr](__i) = 0; \ } while (0) #define __FD_SET(d, s) \ ((void)(__FDS_BITS[s](__FD_ELT(d)) |= __FD_MASK(d))) #define __FD_CLR(d, s) \ ((void)(__FDS_BITS[s](__FD_ELT(d)) &= ~__FD_MASK(d))) #define __FD_ISSET(d, s) \ ((__FDS_BITS[s](__FD_ELT(d))&__FD_MASK(d)) != 0) #define FD_SET(fd, fdsetp) __FD_SET (fd, fdsetp) #define FD_CLR(fd, fdsetp) __FD_CLR (fd, fdsetp) #define FD_ISSET(fd, fdsetp) __FD_ISSET (fd, fdsetp) #define FD_ZERO(fdsetp) __FD_ZERO (fdsetp)
在使用 select
时,需要依次:
- 使用
FD_ZERO
清空fd_set
- 使用
FD_SET
将需要监视的 fd 添加到fd_set
中 - 调用
select
函数进行监视 - 对返回的
fd_set
进行处理,使用FD_ISSET
检查哪些 fd 就绪
下面是一个典型的 select
使用示例2。
点击查看代码
#include <stdio.h>
#include <stdlib.h>
#include <sys/select.h>
#include <sys/time.h>
int
main(void)
{
int retval;
fd_set rfds;
struct timeval tv;
// 监控 stdin (fd 0) 是否有输入
FD_ZERO(&rfds);
FD_SET(0, &rfds);
// 超时时间设置为 5 秒
tv.tv_sec = 5;
tv.tv_usec = 0;
retval = select(1, &rfds, NULL, NULL, &tv);
// 此时,tv 可能已经被修改
if (retval == -1)
perror("select()");
else if (retval)
printf("Data is available now.\n");
// FD_ISSET(0, &rfds) 为真
else
printf("No data within five seconds.\n");
exit(EXIT_SUCCESS);
}
源码解读
select
内部执行时经过了以下步骤:
- 将传入的
fd_set
拷贝至内核态 - 再新建三个
fd_set
作为输出(in / out / except) - 遍历每一位,如果设置了监视且有事件发生,则将输出的对应位设置为
1
- 如果没有事件发生,则阻塞等待
- 如果有事件发生或超时,则返回输出
flowchart LR
F[将 fd_set<br>拷贝至<br>内核态] --> G[新建三个<br>fd_set<br>作为输出] --> H[遍历<br>每一位<br>检查状态] --> I{fd 设置了监视<br>且有事件<br>发生?}
I -->|是| J[将输出的<br>对应位<br>设置为 1]
I -->|否| K{所有位<br>都检查<br>完了?}
J --> K
K -->|否| H
K -->|是| N{超时或<br>有事件<br>发生?}
N -->|否| H
N -->|是| O[返回<br>输出]
为了合理调度资源,select
还引入了许多优化机制:
-
栈空间优先
当
select
需要处理的 fd 数量较少(少于 256)时,内核会优先使用栈空间来存储fd_set
,这样可以减少内存分配的开销。而当数量较多时,才会使用堆空间分配。 -
忙等待
忙等待主要被用于网络驱动中。对于高频小包网络应用,可以减少上下文切换和中断处理的开销,从而提高性能。
忙等待的触发需要同时满足以下条件:
- 网络子系统启用忙等待
- 当前没有更高优先级的任务需要调度
- 至少有一个socket设置了
SO_BUSY_POLL
选项 - 没有检测到任何就绪事件
在每次主循环检查时,会计算当前时间与上次忙等待开始的时间差,如果超过了忙等待的超时时间(通常设置为 2 个 CPU 周期),则会执行一次正常的检查流程。
-
延迟唤醒
在初始化时,
wait->_qproc
会指向__pollwait()
函数。这个函数相当于一个回调函数,当有事件发生时,内核会调用这个函数来唤醒等待的进程。当文件描述符就绪时,会将
_qproc
置为NULL
,这样后续检查时,可以避免重复唤醒。
以目前最新的 Linux 6.16 为例,select
的实现在 fs/select.c
中3。
点击查看源码解析
系统调用的入口:
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
fd_set __user *, exp, struct __kernel_old_timeval __user *, tvp)
{
return kern_select(n, inp, outp, exp, tvp);
}
核心处理函数:
static int kern_select(int n, fd_set __user *inp, fd_set __user *outp,
fd_set __user *exp, struct __kernel_old_timeval __user *tvp)
{
struct timespec64 end_time, *to = NULL;
struct __kernel_old_timeval tv;
int ret;
// 如果设置了超时时间,则拷贝进内核态并转换为 timespec64
if (tvp) {
if (copy_from_user(&tv, tvp, sizeof(tv)))
return -EFAULT;
to = &end_time;
if (poll_select_set_timeout(to,
tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),
(tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))
return -EINVAL;
}
// 处理核心逻辑(见下文)
ret = core_sys_select(n, inp, outp, exp, to);
// 处理结果
return poll_select_finish(&end_time, tvp, PT_TIMEVAL, ret);
}
核心处理逻辑:
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
fd_set __user *exp, struct timespec64 *end_time)
{
fd_set_bits fds;
void *bits;
int ret, max_fds;
size_t size, alloc_size;
struct fdtable *fdt;
long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];
ret = -EINVAL;
if (unlikely(n < 0))
goto out_nofds;
rcu_read_lock();
fdt = files_fdtable(current->files);
max_fds = fdt->max_fds;
rcu_read_unlock();
if (n > max_fds)
n = max_fds;
// 总共需要 6 个 bitmap,分别存储 in/out/ex 的输入和输出
size = FDS_BYTES(n);
// 如果数量较少,则使用栈分配
bits = stack_fds;
if (size > sizeof(stack_fds) / 6) {
ret = -ENOMEM;
if (size > (SIZE_MAX / 6))
goto out_nofds;
alloc_size = 6 * size;
// 如果数量较多,则使用堆分配
bits = kvmalloc(alloc_size, GFP_KERNEL);
if (!bits)
goto out_nofds;
}
fds.in = bits;
fds.out = bits + size;
fds.ex = bits + 2*size;
fds.res_in = bits + 3*size;
fds.res_out = bits + 4*size;
fds.res_ex = bits + 5*size;
// 将用户态的 fd_set 拷贝至内核态
if ((ret = get_fd_set(n, inp, fds.in)) ||
(ret = get_fd_set(n, outp, fds.out)) ||
(ret = get_fd_set(n, exp, fds.ex)))
goto out;
// 清空输出
zero_fd_set(n, fds.res_in);
zero_fd_set(n, fds.res_out);
zero_fd_set(n, fds.res_ex);
// 调用 do_select 进行实际监控(见下文)
ret = do_select(n, &fds, end_time);
if (ret < 0)
goto out;
if (!ret) {
ret = -ERESTARTNOHAND;
if (signal_pending(current))
goto out;
ret = 0;
}
// 将结果拷贝回用户态
if (set_fd_set(n, inp, fds.res_in) ||
set_fd_set(n, outp, fds.res_out) ||
set_fd_set(n, exp, fds.res_ex))
ret = -EFAULT;
out:
if (bits != stack_fds)
kvfree(bits);
out_nofds:
return ret;
}
关键监控逻辑:
// 可读事件包括:普通数据可读、优先级带数据可读、数据可读、挂起、错误、无效
#define POLLIN_SET (EPOLLRDNORM | EPOLLRDBAND | EPOLLIN | EPOLLHUP | EPOLLERR | \
EPOLLNVAL)
// 可写事件包括:普通数据可写、优先级带数据可写、数据可写、错误、无效
#define POLLOUT_SET (EPOLLWRBAND | EPOLLWRNORM | EPOLLOUT | EPOLLERR | \
EPOLLNVAL)
// 异常事件包括:高优先级数据可读、无效
#define POLLEX_SET (EPOLLPRI | EPOLLNVAL)
static noinline_for_stack int do_select(int n, fd_set_bits *fds, struct timespec64*end_time)
{
ktime_t expire, *to = NULL;
struct poll_wqueues table;
poll_table*wait;
int retval, i, timed_out = 0;
u64 slack = 0;
__poll_t busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;
unsigned long busy_start = 0;
// 获取最大的有效 fd
rcu_read_lock();
retval = max_select_fd(n, fds);
rcu_read_unlock();
if (retval < 0)
return retval;
n = retval;
// 初始化等待队列
poll_initwait(&table);
wait = &table.pt;
// 处理已经超时的情况
if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
wait->_qproc = NULL;
timed_out = 1;
}
if (end_time && !timed_out)
slack = select_estimate_accuracy(end_time);
retval = 0;
// 循环检查文件描述符状态
for (;;) {
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
bool can_busy_loop = false;
inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
// 遍历所有文件描述符
for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;
__poll_t mask;
// 每次处理 BITS_PER_LONG 位 fd
in = *inp++; out = *outp++; ex = *exp++;
all_bits = in | out | ex;
// 如果这一段 fd 没有设置任何监控,则跳过
if (all_bits == 0) {
i += BITS_PER_LONG;
continue;
}
// 检查这一段 fd 的每个 bit 位
for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {
if (i >= n)
break;
// 如果该 bit 位没有设置监控,则跳过
if (!(bit & all_bits))
continue;
// 如果该 bit 位设置了监控,则调用 select_poll_one 检查状态(见下文)
mask = select_poll_one(i, wait, in, out, bit, busy_flag);
// 检测到可读事件
if ((mask & POLLIN_SET) && (in & bit)) {
res_in |= bit;
retval++;
wait->_qproc = NULL;
}
// 检测到可写事件
if ((mask & POLLOUT_SET) && (out & bit)) {
res_out |= bit;
retval++;
wait->_qproc = NULL;
}
// 检测到异常事件
if ((mask & POLLEX_SET) && (ex & bit)) {
res_ex |= bit;
retval++;
wait->_qproc = NULL;
}
if (retval) {
// 如果检测到事件,则结束忙等待
can_busy_loop = false;
busy_flag = 0;
} else if (busy_flag & mask)
can_busy_loop = true;
}
// 保存结果
if (res_in)
*rinp = res_in;
if (res_out)
*routp = res_out;
if (res_ex)
*rexp = res_ex;
// 重新调度
cond_resched();
}
// 清空等待队列
wait->_qproc = NULL;
// 检查是否有就绪的文件描述符或超时
if (retval || timed_out || signal_pending(current))
break;
if (table.error) {
retval = table.error;
break;
}
// 检查是否可以进入忙等待
if (can_busy_loop && !need_resched()) {
if (!busy_start) {
busy_start = busy_loop_current_time();
continue;
}
if (!busy_loop_timeout(busy_start))
continue;
}
busy_flag = 0;
if (end_time && !to) {
expire = timespec64_to_ktime(*end_time);
to = &expire;
}
// 进入等待
if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
to, slack))
timed_out = 1;
}
poll_freewait(&table);
return retval;
}
具体检查某一个 fd 的实现:
static inline __poll_t select_poll_one(int fd, poll_table *wait, unsigned long in,
unsigned long out, unsigned long bit,
__poll_t ll_flag)
{
CLASS(fd, f)(fd);
if (fd_empty(f))
return EPOLLNVAL;
// 异常或者低延迟总是会进行检测
wait->_key = POLLEX_SET | ll_flag;
// 设置可读和可写的监控位
if (in & bit)
wait->_key |= POLLIN_SET;
if (out & bit)
wait->_key |= POLLOUT_SET;
// 读取 fd 的状态
return vfs_poll(fd_file(f), wait);
}
根据这段源码,除了前文所述的几个优化点,还可以发现,尽管在运行时区分了输入的 fd_set
和输出的 fd_set
,但返回结果时,输出会覆盖掉原来的输入。因此,select
在每次使用后,都必须重新设置所有传入的参数。
讨论
综合以上讨论,我们得出结论,select
存在如下问题:
- 最多只能监视 1024 个 fd
- 每次调用
select
都需要将用户传入的集合拷贝进出内核,开销较大 - 每次返回结果后,都需要重新设置参数
- 运行时需要遍历集合的每一位,效率较低
那么,有没有更好的改进方案呢?
poll
基本使用
poll
是对 select
的改进,它解决了 select
的一些限制,比如 fd 数量的限制和集合拷贝的问题。
poll
的函数原型在 poll.h
中:
int poll (struct pollfd *fds,
nfds_t nfds,
int timeout)
类似的,其中:
-
fds
是一个指向pollfd
结构体数组的指针,每个结构体表示一个 fd 及其感兴趣的事件pollfd
结构体定义在sys/poll.h
中:struct pollfd { int fd; // 要监视的文件描述符 short int events; // 感兴趣的事件类型 short int revents; // 就绪的事件类型 };
events
和revents
字段均为 bitmap,用于表示事件类型。不同的系统可能支持不同的类型,例如对于 GNU/Linux 系统,支持的事件类型定义在bits/poll.h
中:// 所有系统共有 #define POLLIN 0x001 // 可读事件 #define POLLPRI 0x002 // 紧急数据事件 #define POLLOUT 0x004 // 可写事件 #define POLLERR 0x008 // 错误事件 #define POLLHUP 0x010 // 挂起事件 #define POLLNVAL 0x020 // 无效请求 // GNU 特有 #define POLLMSG 0x400 // 消息事件 #define POLLREMOVE 0x1000 // 移除事件 #define POLLRDHUP 0x2000 // 读挂起事件
-
nfds
是数组中元素的数量; -
timeout
是等待事件发生的最长时间(毫秒),可以设置为-1
表示无限等待。
下面是一个典型的 poll
使用示例4。
点击查看代码
#include <fcntl.h>
#include <poll.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); \
} while (0)
int
main(int argc, char *argv[])
{
int ready;
char buf[10];
nfds_t num_open_fds, nfds;
ssize_t s;
struct pollfd *pfds;
if (argc < 2) {
fprintf(stderr, "Usage: %s file...\n", argv[0]);
exit(EXIT_FAILURE);
}
num_open_fds = nfds = argc - 1;
pfds = calloc(nfds, sizeof(struct pollfd));
if (pfds == NULL)
errExit("malloc");
/* Open each file on command line, and add it to 'pfds' array. */
for (nfds_t j = 0; j < nfds; j++) {
pfds[j].fd = open(argv[j + 1], O_RDONLY);
if (pfds[j].fd == -1)
errExit("open");
printf("Opened \"%s\" on fd %d\n", argv[j + 1], pfds[j].fd);
pfds[j].events = POLLIN;
}
/* Keep calling poll() as long as at least one file descriptor is
open. */
while (num_open_fds > 0) {
printf("About to poll()\n");
ready = poll(pfds, nfds, -1);
if (ready == -1)
errExit("poll");
printf("Ready: %d\n", ready);
/* Deal with array returned by poll(). */
for (nfds_t j = 0; j < nfds; j++) {
if (pfds[j].revents != 0) {
printf(" fd=%d; events: %s%s%s\n", pfds[j].fd,
(pfds[j].revents & POLLIN) ? "POLLIN " : "",
(pfds[j].revents & POLLHUP) ? "POLLHUP " : "",
(pfds[j].revents & POLLERR) ? "POLLERR " : "");
if (pfds[j].revents & POLLIN) {
s = read(pfds[j].fd, buf, sizeof(buf));
if (s == -1)
errExit("read");
printf(" read %zd bytes: %.*s\n",
s, (int) s, buf);
} else { /* POLLERR | POLLHUP */
printf(" closing fd %d\n", pfds[j].fd);
if (close(pfds[j].fd) == -1)
errExit("close");
num_open_fds--;
}
}
}
}
printf("All file descriptors closed; bye\n");
exit(EXIT_SUCCESS);
}
源码解读
poll
的工作原理与 select
类似,都是通过阻塞等待 fd 的状态变化来实现 I/O 多路复用。不同之处在于,poll
不再使用位图来表示 fd,而是使用数组,这样就可以支持更多的 fd。
更确切地讲,用户设置的 pollfd
数组进入内核态处理后会被转换为 poll_list
节点组成的链表,每个节点包含多个 pollfd
结构体。fd 的处理以 poll_list
节点为单位,这样可以减少内存拷贝的开销。
poll_list
节点如下所示:
struct poll_list {
struct poll_list *next;
unsigned int len;
struct pollfd entries[] __counted_by(len);
};
以目前最新的 Linux 6.16 为例,poll
的实现同样也在 fs/select.c
中3。
点击查看源码解析
系统调用的入口:
SYSCALL_DEFINE3(poll, struct pollfd __user *, ufds, unsigned int, nfds,
int, timeout_msecs)
{
struct timespec64 end_time, *to = NULL;
int ret;
// 设置超时时间
if (timeout_msecs >= 0) {
to = &end_time;
poll_select_set_timeout(to, timeout_msecs / MSEC_PER_SEC,
NSEC_PER_MSEC * (timeout_msecs % MSEC_PER_SEC));
}
// 调用核心处理函数(见下文)
ret = do_sys_poll(ufds, nfds, to);
// 如果被信号中断,设置重启块
if (ret == -ERESTARTNOHAND) {
struct restart_block *restart_block;
restart_block = ¤t->restart_block;
restart_block->poll.ufds = ufds;
restart_block->poll.nfds = nfds;
if (timeout_msecs >= 0) {
restart_block->poll.tv_sec = end_time.tv_sec;
restart_block->poll.tv_nsec = end_time.tv_nsec;
restart_block->poll.has_timeout = 1;
} else
restart_block->poll.has_timeout = 0;
ret = set_restart_fn(restart_block, do_restart_poll);
}
return ret;
}
核心处理函数:
static int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds,
struct timespec64 *end_time)
{
struct poll_wqueues table;
int err = -EFAULT, fdcount;
// 使用栈空间存储最开始的一段 poll_list,以减少内存分配
long stack_pps[POLL_STACK_ALLOC/sizeof(long)];
struct poll_list *const head = (struct poll_list *)stack_pps;
struct poll_list *walk = head;
unsigned int todo = nfds;
unsigned int len;
// 检查文件描述符数量限制
if (nfds > rlimit(RLIMIT_NOFILE))
return -EINVAL;
// 根据栈空间大小,计算首次能够处理的 pollfd 最大数量
len = min_t(unsigned int, nfds, N_STACK_PPS);
// 将用户传入的 pollfd 数组构建为 poll_list 链表
for (;;) {
walk->next = NULL;
walk->len = len;
if (!len)
break;
// 从用户态拷贝数据到内核态
if (copy_from_user(walk->entries, ufds + nfds-todo,
sizeof(struct pollfd) * walk->len))
goto out_fds;
if (walk->len >= todo)
break;
todo -= walk->len;
// 如果栈空间不足,则动态分配内存
len = min(todo, POLLFD_PER_PAGE);
walk = walk->next = kmalloc(struct_size(walk, entries, len),
GFP_KERNEL);
if (!walk) {
err = -ENOMEM;
goto out_fds;
}
}
// 初始化等待队列
poll_initwait(&table);
// 核心处理逻辑(见下文)
fdcount = do_poll(head, &table, end_time);
poll_freewait(&table);
if (!user_write_access_begin(ufds, nfds * sizeof(*ufds)))
goto out_fds;
for (walk = head; walk; walk = walk->next) {
struct pollfd *fds = walk->entries;
unsigned int j;
// 将内核态的结果拷贝回用户态
for (j = walk->len; j; fds++, ufds++, j--)
unsafe_put_user(fds->revents, &ufds->revents, Efault);
}
user_write_access_end();
err = fdcount;
out_fds:
walk = head->next;
while (walk) {
struct poll_list *pos = walk;
walk = walk->next;
kfree(pos);
}
return err;
Efault:
user_write_access_end();
err = -EFAULT;
goto out_fds;
}
核心处理逻辑:
static int do_poll(struct poll_list *list, struct poll_wqueues *wait,
struct timespec64 *end_time)
{
poll_table* pt = &wait->pt;
ktime_t expire, *to = NULL;
int timed_out = 0, count = 0;
u64 slack = 0;
__poll_t busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;
unsigned long busy_start = 0;
// 处理已经超时的情况
if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
pt->_qproc = NULL;
timed_out = 1;
}
if (end_time && !timed_out)
slack = select_estimate_accuracy(end_time);
// 主循环
for (;;) {
struct poll_list *walk;
bool can_busy_loop = false;
// 遍历所有的 poll_list 节点
for (walk = list; walk != NULL; walk = walk->next) {
struct pollfd * pfd, * pfd_end;
pfd = walk->entries;
pfd_end = pfd + walk->len;
// 遍历 poll_list 节点中的每个 pollfd
for (; pfd != pfd_end; pfd++) {
__poll_t mask;
// 关键监控逻辑(见下文)
mask = do_pollfd(pfd, pt, &can_busy_loop, busy_flag);
// 处理就绪的事件
pfd->revents = mangle_poll(mask);
if (mask) {
count++;
pt->_qproc = NULL;
busy_flag = 0;
can_busy_loop = false;
}
}
}
pt->_qproc = NULL;
if (!count) {
count = wait->error;
if (signal_pending(current))
count = -ERESTARTNOHAND;
}
// 检查是否超时
if (count || timed_out)
break;
// 检查是否需要进入忙等待
if (can_busy_loop && !need_resched()) {
if (!busy_start) {
busy_start = busy_loop_current_time();
continue;
}
if (!busy_loop_timeout(busy_start))
continue;
}
busy_flag = 0;
if (end_time && !to) {
expire = timespec64_to_ktime(*end_time);
to = &expire;
}
// 进入等待
if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack))
timed_out = 1;
}
return count;
}
关键监控逻辑:
static inline __poll_t do_pollfd(struct pollfd *pollfd, poll_table *pwait,
bool *can_busy_poll,
__poll_t busy_flag)
{
int fd = pollfd->fd;
__poll_t mask, filter;
if (unlikely(fd < 0))
return 0;
CLASS(fd, f)(fd);
if (fd_empty(f))
return EPOLLNVAL;
filter = demangle_poll(pollfd->events) | EPOLLERR | EPOLLHUP;
pwait->_key = filter | busy_flag;
mask = vfs_poll(fd_file(f), pwait);
if (mask & busy_flag)
*can_busy_poll = true;
return mask & filter;
}
讨论
可以看到,poll
的使用方式与 select
类似,但它相比之下做了很多优化:
- fd 的数量不再有限制,仅需控制在最大内存范围内;
- 不需要每次设置集合;
- 支持更多的事件类型。
然而,它仍然存在一些不足之处:
- 仍然需要遍历整个数组来检查就绪的 fd,效率较低;
- 水平触发模式(即只要 fd 就绪但没有处理,
poll
就会一直返回它)可能导致应用程序频繁被唤醒,浪费 CPU 资源。
Linux 2.5.44 内核中首次引入的 epoll
,改进了上述问题。
epoll
基本使用
epoll
是 Linux 特有的 I/O 多路复用机制,它在 sys/epoll.h
中总共有 3 个系统调用:
// 创建 epoll 实例
int epoll_create(int size); // 旧版本
int epoll_create1(int flags); // 新版本
// 控制 epoll 实例
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 等待事件
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
其中,
-
epoll_create
-
size
:指定要监视的 fd 的数量,通常设置为1
。
-
-
epoll_create1
-
flags
:当设置为0
时,表现和epoll_create
相同;如果设置为EPOLL_CLOEXEC
,则在执行exec
时自动关闭该 fd ,防止 fd 泄漏。
-
-
epoll_ctl
-
epfd
:epoll
实例的 fd ; -
op
:操作类型(ADD / DEL / MOD); -
fd
:要监视的 fd ; -
event
:感兴趣的事件类型。
-
-
epoll_wait
-
epfd
:epoll
实例的 fd ; -
events
:就绪事件的数组; -
maxevents
:数组的最大容量; -
timeout
:超时时间(毫秒)。
-
epoll_create
v.s.epoll_create1
epoll_create
是较早的版本,参数size
在现代 Linux 系统中已经被忽略,可以随意设置;而epoll_create1
是较新的版本,增加了flags
参数,允许设置 fd 的关闭行为,更加灵活和安全。
epoll
的工作原理与 select
和 poll
类似,但它使用了事件驱动的方式,避免了遍历所有 fd 的开销。
epoll
为了解决 poll
的水平触发问题,提供了两种触发模式:
-
水平触发 (Level-Triggered):当 fd 就绪时,
epoll_wait
会一直返回它,直到应用程序处理完毕。 -
边缘触发 (Edge-Triggered):当 fd 状态发生变化时,
epoll_wait
只会在状态变化的瞬间返回一次,之后需要再次注册才能继续接收事件。
下面是一个典型的 epoll
使用示例5。
点击查看代码
#define MAX_EVENTS 10
struct epoll_event ev, events[MAX_EVENTS];
int listen_sock, conn_sock, nfds, epollfd;
// 创建 epoll 实例
epollfd = epoll_create1(0);
if (epollfd == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}
// 添加监听 socket
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
perror("epoll_ctl: listen_sock");
exit(EXIT_FAILURE);
}
for (;;) {
// 等待事件发生
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
}
// 处理就绪的事件
for (int n = 0; n < nfds; ++n) {
if (events[n].data.fd == listen_sock) {
// 接受新连接
conn_sock = accept(listen_sock,
(struct sockaddr *) &addr, &addrlen);
if (conn_sock == -1) {
perror("accept");
exit(EXIT_FAILURE);
}
setnonblocking(conn_sock);
// 添加新连接到 epoll
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,
&ev) == -1) {
perror("epoll_ctl: conn_sock");
exit(EXIT_FAILURE);
}
} else {
// 处理数据
do_use_fd(events[n].data.fd);
}
}
}
源码解读
epoll
的核心数据结构是一个红黑树(用于管理所有监视的 fd)和一个就绪队列(用于存储就绪的 fd)。
主要数据结构如下:
struct eventpoll {
spinlock_t lock;
struct mutex mtx;
wait_queue_head_t wq; // 等待队列
wait_queue_head_t poll_wait; // poll 等待队列
struct list_head rdllist; // 就绪队列
struct rb_root_cached rbr; // 红黑树根节点
struct epitem *ovflist; // 溢出列表
// ...
};
struct epitem {
union {
struct rb_node rbn; // 红黑树节点
struct rcu_head rcu;
};
struct list_head rdllink; // 就绪队列链接
struct epitem *next;
struct epoll_filefd ffd; // 文件描述符信息
struct eventpoll *ep; // 所属的 eventpoll
struct epoll_event event; // 用户设置的事件
// ...
};
以目前最新的 Linux 6.16 为例,epoll
的实现在 fs/eventpoll.c
中6。
点击查看源码解析
创建 epoll 实例:
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
return do_epoll_create(flags);
}
static int do_epoll_create(int flags)
{
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;
// 检查 flags 参数
if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
// 分配 eventpoll 结构
error = ep_alloc(&ep);
if (error < 0)
return error;
// 获取未使用的文件描述符
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
if (fd < 0) {
error = fd;
goto out_free_ep;
}
// 创建匿名 inode 文件
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (IS_ERR(file)) {
error = PTR_ERR(file);
goto out_free_fd;
}
ep->file = file;
fd_install(fd, file);
return fd;
out_free_fd:
put_unused_fd(fd);
out_free_ep:
ep_free(ep);
return error;
}
控制 epoll 实例:
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
struct epoll_event epds;
if (ep_op_has_event(op) &&
copy_from_user(&epds, event, sizeof(struct epoll_event)))
return -EFAULT;
return do_epoll_ctl(epfd, op, fd, &epds, false);
}
int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds,
bool nonblock)
{
int error;
struct fd f, tf;
struct eventpoll *ep;
struct epitem *epi;
struct eventpoll *tep = NULL;
// 获取 epoll 文件描述符
f = fdget(epfd);
if (!f.file)
return -EBADF;
// 获取目标文件描述符
tf = fdget(fd);
if (!tf.file) {
error = -EBADF;
goto error_fput;
}
// 验证是否为 epoll 实例
if (!is_file_epoll(f.file)) {
error = -EINVAL;
goto error_tgt_fput;
}
ep = f.file->private_data;
mutex_lock_nested(&ep->mtx, 0);
// 在红黑树中查找对应的 epitem
epi = ep_find(ep, tf.file, fd);
error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
epds->events |= EPOLLERR | EPOLLHUP;
error = ep_insert(ep, epds, tf.file, fd, full_check);
}
break;
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
break;
case EPOLL_CTL_MOD:
if (epi) {
epds->events |= EPOLLERR | EPOLLHUP;
error = ep_modify(ep, epi, epds);
}
break;
}
mutex_unlock(&ep->mtx);
error_tgt_fput:
fdput(tf);
error_fput:
fdput(f);
return error;
}
等待事件:
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
return do_epoll_wait(epfd, events, maxevents, timeout);
}
static int do_epoll_wait(int epfd, struct epoll_event __user *events,
int maxevents, int timeout)
{
int error;
struct fd f;
struct eventpoll *ep;
// 参数检查
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
return -EINVAL;
if (!access_ok(events, maxevents * sizeof(struct epoll_event)))
return -EFAULT;
// 获取 epoll 文件描述符
f = fdget(epfd);
if (!f.file)
return -EBADF;
if (!is_file_epoll(f.file)) {
error = -EINVAL;
goto error_fput;
}
ep = f.file->private_data;
// 核心等待逻辑
error = ep_poll(ep, events, maxevents, timeout);
error_fput:
fdput(f);
return error;
}
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res = 0, eavail, timed_out = 0;
u64 slack = 0;
wait_queue_entry_t wait;
ktime_t expires, *to = NULL;
// 设置超时时间
if (timeout > 0) {
struct timespec64 end_time = ep_set_mstimeout(timeout);
slack = select_estimate_accuracy(&end_time);
to = &expires;
*to = timespec64_to_ktime(end_time);
} else if (timeout == 0) {
timed_out = 1;
write_lock_irq(&ep->lock);
eavail = ep_events_available(ep);
write_unlock_irq(&ep->lock);
goto send_events;
}
fetch_events:
// 检查是否有就绪事件
if (!ep_events_available(ep))
ep_busy_loop(ep, timed_out);
eavail = ep_events_available(ep);
if (eavail)
goto send_events;
// 进入等待
if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
timed_out = 1;
eavail = ep_events_available(ep);
if (!eavail)
goto fetch_events;
send_events:
// 扫描就绪队列并发送事件
res = ep_send_events(ep, events, maxevents);
return res;
}
讨论
epoll
相比 select
和 poll
有以下优势:
- 无 fd 数量限制:可以监视任意数量的 fd(受系统资源限制);
- 高效的事件通知:使用红黑树管理 fd,使用就绪队列存储就绪的 fd,无需遍历所有 fd;
- 支持边缘触发:避免水平触发模式下的频繁唤醒;
- 减少数据拷贝:使用内存映射(mmap)可以减少用户态和内核态之间的数据拷贝。
然而,epoll
仍然存在一些不足:
- 仅适用于 Linux:不具有跨平台性;
- 仍需系统调用:每次操作都需要陷入内核;
- 同步 I/O:必须等待事件发生后才能处理。
为了进一步提升性能,Linux 5.1 引入了全新的异步 I/O 框架——io_uring
。
io_uring
io_uring
是 Linux 在 2019 年引入的全新异步 I/O 接口,由 Jens Axboe 设计开发7。它的设计目标是提供一个高性能、低延迟的异步 I/O 框架,解决传统 I/O 模型的性能瓶颈。
io_uring
的核心思想是通过两个无锁环形队列(Submission Queue 和 Completion Queue)在用户态和内核态之间传递 I/O 请求和完成事件,从而大幅减少系统调用开销。
graph LR
A[用户空间] -->|提交 I/O 请求| B[SQ<br/>Submission Queue]
B --> C[内核空间<br/>io_uring]
C -->|完成事件| D[CQ<br/>Completion Queue]
D --> A
主要特点:
- 零拷贝:使用共享内存映射,避免数据在用户态和内核态之间拷贝;
- 批量操作:可以一次性提交多个 I/O 请求,减少系统调用次数;
- 真正的异步:支持真正的异步 I/O,无需轮询或阻塞等待;
- 高性能:通过无锁队列和轮询模式,实现极低的延迟。
基本使用
io_uring
的使用需要通过三个系统调用:
// 设置 io_uring 实例
int io_uring_setup(unsigned entries, struct io_uring_params *p);
// 注册文件描述符或缓冲区
int io_uring_register(unsigned int fd, unsigned int opcode,
void *arg, unsigned int nr_args);
// 提交 I/O 请求和获取完成事件
int io_uring_enter(unsigned int fd, unsigned int to_submit,
unsigned int min_complete, unsigned int flags,
sigset_t *sig);
不过在实际使用中,通常会使用 liburing
库来简化操作8。
下面是一个使用 liburing
的简单示例:
点击查看代码
#include <liburing.h>
#include <stdio.h>
#include <string.h>
#define QUEUE_DEPTH 1
#define BLOCK_SIZE 4096
int main(int argc, char *argv[]) {
struct io_uring ring;
struct io_uring_sqe *sqe;
struct io_uring_cqe *cqe;
char buffer[BLOCK_SIZE];
int fd;
// 初始化 io_uring
if (io_uring_queue_init(QUEUE_DEPTH, &ring, 0) < 0) {
perror("io_uring_queue_init");
return 1;
}
// 打开文件
fd = open(argv[1], O_RDONLY);
if (fd < 0) {
perror("open");
return 1;
}
// 获取一个 SQE(提交队列条目)
sqe = io_uring_get_sqe(&ring);
if (!sqe) {
fprintf(stderr, "Could not get SQE.\n");
return 1;
}
// 准备读取操作
io_uring_prep_read(sqe, fd, buffer, BLOCK_SIZE, 0);
// 提交请求
io_uring_submit(&ring);
// 等待完成
io_uring_wait_cqe(&ring, &cqe);
// 检查结果
if (cqe->res < 0) {
fprintf(stderr, "Async read failed: %s\n", strerror(-cqe->res));
return 1;
}
printf("Read %d bytes\n", cqe->res);
// 标记完成
io_uring_cqe_seen(&ring, cqe);
// 清理
close(fd);
io_uring_queue_exit(&ring);
return 0;
}
源码解读
io_uring
的核心数据结构包括:
struct io_ring_ctx {
struct {
struct io_rings *rings; // 共享内存区域
struct io_ring_sq sq; // 提交队列
struct io_ring_cq cq; // 完成队列
} ____cacheline_aligned_in_smp;
struct io_wq *io_wq; // 工作队列
struct io_sq_data *sq_data; // SQ 线程数据
// ...
};
struct io_rings {
struct io_uring_sq sq; // SQ 环形缓冲区
struct io_uring_cq cq; // CQ 环形缓冲区
};
io_uring
的实现在 io_uring/
目录下9。
点击查看关键流程
初始化流程:
SYSCALL_DEFINE2(io_uring_setup, u32, entries,
struct io_uring_params __user *, params)
{
return io_uring_setup(entries, params);
}
static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
{
struct io_uring_params p;
int ret;
if (copy_from_user(&p, params, sizeof(p)))
return -EFAULT;
// 创建 io_uring 实例
ret = io_uring_create(entries, &p, params);
if (ret < 0)
return ret;
return ret;
}
提交和完成流程:
SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
u32, min_complete, u32, flags, const void __user *, argp,
size_t, argsz)
{
struct io_ring_ctx *ctx;
struct file *file;
long ret;
// 获取 io_uring 实例
file = fget(fd);
if (unlikely(!file))
return -EBADF;
ctx = file->private_data;
// 提交 I/O 请求
if (to_submit) {
ret = io_submit_sqes(ctx, to_submit);
if (ret < 0)
goto out;
}
// 等待完成事件
if (min_complete) {
ret = io_cqring_wait(ctx, min_complete, argp, argsz, flags);
}
out:
fput(file);
return ret;
}
讨论
io_uring
代表了 I/O 多路复用的最新发展方向:
-
优势:
-
极高的性能:通过减少系统调用和上下文切换,实现了比
epoll
更高的吞吐量; - 真正的异步:支持完全异步的 I/O 操作,包括文件 I/O、网络 I/O 等;
-
灵活性强:支持多种 I/O 操作类型(
read
、write
、accept
、connect
、fsync
等); - 可扩展性好:持续添加新特性,如链式操作、缓冲区选择等。
-
极高的性能:通过减少系统调用和上下文切换,实现了比
-
挑战:
- 复杂性高:API 相对复杂,学习曲线陡峭;
- 兼容性:仅在较新的 Linux 内核(5.1+)中可用;
- 安全问题:早期版本存在一些安全漏洞,需要谨慎使用。
总结
从 select
到 io_uring
,我们见证了 I/O 多路复用技术的演进:
timeline
title I/O 多路复用技术演进
1983 : select
1997 : poll
2002 : epoll
2019 : io_uring
特性 | select | poll | epoll | io_uring |
---|---|---|---|---|
fd 数量限制 | \(1024\) | 无限制 | 无限制 | 无限制 |
数据拷贝 | 每次调用 | 每次调用 | 仅注册时 | 零拷贝 |
遍历开销 | \(O(n)\) | \(O(n)\) | \(O(1)\) | \(O(1)\) |
触发模式 | 水平触发 | 水平触发 | 水平/边缘 | 异步 |
跨平台 | 🟢 | 🟢 | 🔴 | 🔴 |
系统调用 | 每次轮询 | 每次轮询 | 每次轮询 | 批量提交 |
选择合适的 I/O 模型需要根据具体场景:
-
兼容性优先:使用
select
或poll
-
高并发场景:使用
epoll
-
极致性能:使用
io_uring
随着硬件性能的提升和应用需求的增长,I/O 多路复用技术仍在不断发展。从 C10K 到 C10M,再到未来可能的 C100M,这场技术演进的故事还将继续。
Comments