Linux epoll 实现原理与开发教程

内核进程与用户进程的协作方式

1 I/O 多路复用机制的概念

有过 Linux C 开发经验的同学都知道,在最基础的网络 I/O 模型中,每个用户进程进入内核态后,如果发现监听的 socket 对象的接收队列没有数据,就会阻塞等待。当内核发现数据到达时,再唤醒等待的进程,继续处理 I/O 数据。每次等待、唤醒都需要切换两次进程上下文。如果在客户端这么做还能够接受,但在高并发服务器上,频繁的上下文切换将严重降低网络吞吐量。为了避免由这种上下文切换带来的资源浪费,I/O 多路复用机制应运而生。

I/O 多路复用是一种用户进程与内核进程之间的特殊协作方式,它允许单个进程处理多个网络连接,而不再局限于一对一的处理方式。因此,这个批量处理连接的进程需要一种高效的方式来检测哪些连接上有可读数据,哪些连接可写数据了。通常,这部分工作由内核来完成。以 Linux 平台为例,它支持三种多路复用方式:包括 POSIX 标准定义的 select 和 poll,以及 Linux 平台独有的 epoll,而 epoll 在性能方面的表现最为出色。

本文内容基于 Linux-3.12,不同版本的源码不尽相同,但核心逻辑不变,不影响理解。

2 epoll 开发教程

在介绍 epoll 的实现原理之前,我们先通过编写一个 epoll 程序来加深对它的理解。

2.1 epoll 常用接口

epoll 有以下三个常用接口:

  • int epoll_create(int size):创建一个 epoll 句柄,其中的 size 参数用于通知内核这个监听的 socket 文件描述符数目总共有多大,需要传递的是待监听的最大 fd 值 +1。

    注意:一旦 epoll 句柄被成功创建,它就会占用一个文件描述符,我们可以通过查看路径 /proc/#{process_id}/fd/ 找到这个 fd。因此,在使用完 epoll 后,务必要调用 close() 函数来关闭它,以免占用文件描述符。

  • int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event):epoll 的事件注册函数,负责注册要监听的事件类型。
    • 第一个参数使用 epoll_create() 的返回值;

    • 第二个参数表示动作,用三个宏来表示:

      • EPOLL_CTL_ADD:注册新的 socket fd 到 epfd 中;
      • EPOLL_CTL_MOD:修改已经注册的 socket fd 的监听事件;
      • EPOLL_CTL_DEL:从 epfd 中删除一个 socket fd;
    • 第三个参数是需要监听的 socket fd;

    • 第四个参数是告诉内核需要监听什么事件,struct epoll_event 结构如下:

      1
      2
      3
      4
      
      struct epoll_event {
          __poll_t events; //表示事件类型
          __u64 data;  // 用户数据
      } EPOLL_PACKED;

      events 可以是多个宏的组合,常用的宏如下:

      • EPOLLIN:表示文件描述符上有数据可读,即输入事件;
      • EPOLLPRI:表示有紧急数据可读,通常是带外数据 (out-of-band data);
      • EPOLLOUT:表示文件描述符可写,即输出事件;
      • EPOLLERR:表示对应的文件描述符发生错误,例如连接错误;
      • EPOLLHUP:表示对应的文件描述符被挂起;
      • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个 socket,需要重新把它加入 epoll 队列;
      • EPOLLET:将 epoll 设为边缘触发 (Edge Triggered) 模式,这是相对于水平触发 (Level Triggered) 来说的。
  • int epoll_wait(int epfd, struct epoll_event __user *events, int maxevents, int timeout):等待事件的产生。
    • 参数 events 用来从内核得到事件的集合;
    • 参数 maxevents 告之内核这个 events 有多大,这个 maxevents 的值不能大于创建 epoll_create() 时的 size
    • 参数 timeout 是超时时间,单位毫秒:
      • 0 会立即返回;
      • 如果值小于 0 将不确定,也有说法说是永久阻塞。
    • 该函数的返回值为需要处理的事件数目,如返回 0 表示已超时。

2.2 epoll 工作模式

epoll 对文件描述符的操作有两种模式:边缘触发 (Edge-Triggered) 和水平触发 (Level-Triggered)。

  • 边缘触发(Edge-Triggered)模式:

    在边缘触发模式下,epoll 仅在文件描述符状态发生变化时通知应用程序,而不会反复通知相同的事件。也就是说,如果一个文件描述符从未就绪变为就绪状态,或者从就绪状态变为未就绪状态,epoll 将会通知应用程序。因此,应用程序需要一次性将所有可读或可写的数据处理完,以免错过状态变化的通知。该模式适用于需要及时响应文件描述符状态变化的场景。

  • 水平触发(Level-Triggered)模式:

    在水平触发模式下,epoll 会持续通知应用程序文件描述符仍然处于就绪状态,直到应用程序读取或写入操作导致文件描述符不再就绪。该模式适用于需要持续监测文件描述符状态并逐步处理数据的场景。

2.3 epoll 使用示例

下面我们基于 epoll 编写一个服务端 echo 程序,首先我们声明以下宏以及函数:

epoll demo

2.3.1 事件注册函数实现

然后着手实现上述函数,首先是增删改事件的相关函数:

epoll crud event

这些函数相当重要,负责根据需求操作被监听的 fd 上的事件。

2.3.2 socket_bind

接下来我们实现 TCP 服务端资源初始化函数 socket_bind

epoll socket_bind

socket_bind 函数就是普通的 C 语言 TCP 服务端注册,这里不做过多介绍了。

2.3.3 事件循环器实现

接下来我们实现 do_epoll 函数:

epoll do_epoll

do_epoll 是整个程序的核心,内部封装了 epoll 事件处理循环,统筹调度整个程序。该函数首先调用 add_event() 监听连接事件,当有请求到达时通过 handle_events 事件处理器分发相应的事件。

2.3.4 事件处理函数实现

handle_events() 函数实现如下:

epoll handle_events

handle_events 会遍历就绪列表,然后判断各个就绪事件对应的监听类型,并将其分发给不同的事件处理器处理。各类事件处理器的实现如下:

epoll handle_accpet

可见,处理器在同步处理完 I/O 事件后,紧接着会根据业务修改要监听的事件。最终,我们在 main 函数中使用上述函数:

1
2
3
4
5
6
int main() {
    int listenfd = socket_bind(IPADDRESS, PORT);
    listen(listenfd, LISTENQ);
    do_epoll(listenfd);
    return 0;
}

main 函数首先调用 socket_bind() 函数初始化 TCP 服务端资源,然后调用 listen() 初始化接收队列并监听端口,最终调用 do_epoll 循环处理事件。

3 epoll 实现原理

3.1 epoll_create 创建内核对象

在用户进程调用 epoll_create 时,内核会创建一个 struct eventpoll 类型的内核对象,并把它关联到当前进程已打开的文件列表中,相关结构体如下图所示:

epoll 内核对象

对于 struct eventpoll 对象,更详细的结构如下图所示:

eventpoll 结构体
  • wq:等待队列链表。当数据就绪触发软中断时,会通过 wq 来寻找阻塞在 epoll 对象上的用户进程;
  • rbr:一颗红黑树,用于管理用户进程添加的所有 TCP 连接,可以实现高效的增、删、查询操作;
  • rdllist:就绪的文件描述符列表。当有连接就绪时,内核会将这些就绪的连接添加到这个链表中,这样用户进程就可以直接获取已就绪的所有事件,而无需像使用 POSIX API 那样遍历所有描述符。

熟悉了主要的数据结构之后,我们再来查看 fs/eventpoll.c 中 epoll_create 的源码实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
    int error, fd;
	struct eventpoll *ep = NULL;
	struct file *file;
    ...
    // [1] 创建并初始化 eventpoll 对象
    error = ep_alloc(&ep);
    ...
    // [2] 获取一个未使用的文件描述符,作为 epfd
    fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
    ...
    // [3] 创建 epoll 的 file 实例
    //  file 的 f_op 被设置为 eventpoll_fops
    //  file 的 private_data 设置为上边的 eventpoll 对象,即 ep 
    file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC));
    ...
    // [4] 建立 fd 和 file 的关联关系
    fd_install(fd, file);
    return fd;
}
  • [1]:首先调用 ep_alloc 创建一个 eventpoll 结构体,完成等待队列、就绪队列等参数的初始化:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    
    static int ep_alloc(struct eventpoll **pep)
    {
        struct eventpoll *ep;
        // 申请内存空间
        ep = kzalloc(sizeof(*ep), GFP_KERNEL);
        ...
        // 初始化等待队列头
        init_waitqueue_head(&ep->wq);
        ...
        // 初始化就绪队列
        INIT_LIST_HEAD(&ep->rdllist);
        ...
        // 初始化红黑树
        ep->rbr = RB_ROOT;
        ...
    }
  • [2]:获取一个未使用的文件描述符,作为 epfd
  • [3]:创建 epoll file 实例,以及匿名 inode 节点和 dentry 等数据结构:
    • epoll 文件的 file->f_op 会被设置为 eventpoll_fops,否则将不支持 epoll:
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      
      static const struct file_operations eventpoll_fops = {
      #ifdef CONFIG_PROC_FS
          .show_fdinfo	= ep_show_fdinfo,
      #endif
          .release	= ep_eventpoll_release,
           // poll 指针不为空,表示支持 epoll 操作
           //  因此普通文件不支持这个特性
          .poll		= ep_eventpoll_poll,
          .llseek		= noop_llseek,
      };
    • epoll 文件的 file->private_data 会被设置为参数传入的 eventpoll 变量 (ep);
  • [4]:最后建立 fdfile 之间的关联关系。

3.2 epoll_ctl 添加 socket

当用户进程调用 epoll_ctl 向 epoll 添加 socket 时,内核会做以下三件事情:

  1. 初始化一个红黑树节点对象 epitem
  2. 将监听的事件添加到 socket 的等待队列中,其回调函数是 ep_poll_callback
  3. epitem 节点插入 epoll 对象的红黑树。

源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
		struct epoll_event __user *, event)
{
	struct fd f, tf;
	struct eventpoll *ep;
	struct epitem *epi;
	struct epoll_event epds;
	...
    // [1] 根据 epfd 找到其封装了 file 的 fd 内核对象
	f = fdget(epfd);
	// 根据需要被监听 socket 文件描述符,找到其封装了 file 的 fd 内核对象
	tf = fdget(fd);
	/* 确认被监听的 tf 支持 epoll,即文件的 fops 为 struct file_operations */
	if (!tf.file->f_op || !tf.file->f_op->poll) 
    ...

	// [2] 找到 epfd 关联的 eventpoll 内核对象
	ep = f.file->private_data;
    ...
	switch (op) {
	case EPOLL_CTL_ADD:
		if (!epi) { // 新增操作,需要确保 epitem 不存在
			epds.events |= POLLERR | POLLHUP;
    // [3] 完成 socket 注册操作
			error = ep_insert(ep, &epds, tf.file, fd);
		} else ...
		clear_tfile_check_list();
		break;
	...
	}
	return error;
}

epoll_ctl 首先根据文件描述符,找到被监听 fd 关联的内核文件对象 tf.file,以及 epfd 关联的 epollevent,然后调用 ep_insert 完成注册操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
		     struct file *tfile, int fd)
{
    // [1] 初始化一个 epitem
	...
	struct epitem *epi;
    struct ep_pqueue epq;
	...
	if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
		return -ENOMEM;
	/* Item initialization follow here ... */
	...
	INIT_LIST_HEAD(&epi->pwqlist);
	epi->ep = ep;
    // epitem->fdd 中存储了被监听 socket 的 fd 描述符和 struct file 对象
	ep_set_ffd(&epi->ffd, tfile, fd);
	...
	epq.epi = epi;
    // [2] 初始化 socket 等待队列,将回调函数 ep_poll_callback 设置为数据就绪时的回调
	init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
	revents = ep_item_poll(epi, &epq.pt);
    ...
    // [3] 插入红黑树
	ep_rbtree_insert(ep, epi);
	...
error_xxx:
    // 各种错误处理
	return error;
}
  • [1]:初始化一个 epitemepitem 中的 fdd 存储了被监听 socket 的 fd 描述符和 struct file 对象地址;
  • [2]:调用 init_poll_funcptr() 函数,将 ep_ptable_queue_proc() 函数注册为 poll 模式的回调,在 ep_ptable_queue_proc() 里再把 ep_poll_callback() 设置为监听事件触发时真正的回调;
  • [3]:将这个 epitem 插入红黑树。

接下来,我们将对这三个步骤进行详细分析。

3.2.1 分配并初始化 epitem

对于每一个 socket,调用 epoll_ctl 注册事件时,都会为其创建一个 epitem 对象,该结构体的数据结构如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
struct epitem {
	// 红黑树节点
	struct rb_node rbn;
	// 被监听 socket 的文件描述符信息
	struct epoll_filefd ffd;
    // 所属的 eventpoll 对象
	struct eventpoll *ep;
    // 等待队列头节点
    struct list_head pwqlist;
    ...
};

初始化完成后的 epitem 关联关系如下:

epitem 内核数据结构关系

到这一步,epitem 已经实现了 eventpoll 文件和被监听 socket 文件的关联

3.2.2 设置 socket 等待队列

创建并初始化 epitem 后,紧接着就是初始化 socket 对象上的等待队列,并把 fs/eventpoll.c 中的 ep_epoll_callback() 设置为数据就绪时的回调函数。这里的实现比较复杂,是通过 poll 模式完成的,首先来看 ep_item_poll

1
2
3
4
5
static inline unsigned int ep_item_poll(struct epitem *epi, poll_table *pt)
{
	pt->_key = epi->event.events;
	return epi->ffd.file->f_op->poll(epi->ffd.file, pt) & epi->event.events;
}

这里调用到了 socket (epi->ffd) 下的 file->f_op->poll(),该函数指针指向的是 net/socket.c 中的 sock_poll()

1
2
3
4
5
static unsigned int sock_poll(struct file *file, poll_table *wait)
{
    ...
    return busy_flag | sock->ops->poll(file, sock, wait);
}

其中的 sock->ops->poll() 又指向 tcp_poll()

1
2
3
4
5
6
7
8
unsigned int tcp_poll(struct file *file, poll_table *wait)
{
    ...
    struct sock *sk = sock->sk;
    ...
    sock_poll_wait(file, sk_sleep(sk), wait);
    ...
}

在调用 sock_poll_wait() 之前,先在第二个参数这里调用了 sk_sleep() 函数。sk_sleep() 函数会返回 sock 对象里等待队列的头节点 wait_queue_head_t,稍后等待队列项将插入这里,实现如下:

1
2
3
4
5
static inline wait_queue_head_t *sk_sleep(struct sock *sk)
{
	BUILD_BUG_ON(offsetof(struct socket_wq, wait) != 0);
	return &rcu_dereference_raw(sk->sk_wq)->wait;
}
注意
这里的等待队列是 socket 中的,而非 epoll 的等待队列。

接着真正进入 include/net/sock.h 中的 sock_poll_wait() 函数:

1
2
3
4
5
6
7
static inline void sock_poll_wait(struct file *filp,
		wait_queue_head_t *wait_address, poll_table *p)
{
    ...
    poll_wait(filp, wait_address, p);
    ...
}

其核心实现是 include/net/poll.h 中的 poll_wait() 函数:

1
2
3
4
5
static inline void poll_wait(struct file *filp,
        wait_queue_head_t *wait_address, poll_table *p) {
    if (p && p->_qproc && wait_address)
        p->_qproc(filp, wait_address, p)
}

_qproc 函数指针在上文 ep_insert 函数执行时会被 init_poll_funcptr 设置为 ep_ptable_queue_proc()到这里总算进入了重点

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
				 poll_table *pt)
{
    // 根据地址拿到参数 pt 指针下边的 epitem,也就是上边创建的那个 epitem
	struct epitem *epi = ep_item_from_epqueue(pt);
	struct eppoll_entry *pwq;
    // [1] 分配一个 eppoll_entry 结构体
	if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
        // [2] 将 eppoll_entry->wait->func 指向 ep_poll_callback
		init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
        // [3] 完成最终关联
		pwq->whead = whead; // whead 为 socket 等待队列头
		pwq->base = epi; // base 指向 epitem
		add_wait_queue(whead, &pwq->wait);
		...
	}...
}
  • [1]:首先分配一个 eppoll_entry 结构体,该结构体负责关联 epitem、事件就绪回调函数与 socket 等待队列:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    struct eppoll_entry {
        ...
        // 指向 epitem
        struct epitem *base;
        // 指向回调函数
        wait_queue_t wait;
        // 指向被监听的 socket 等待队列
        wait_queue_head_t *whead;
    };
  • [2]:调用 init_waitqueue_func_entry()eppoll_entry->wait->func 指向 ep_poll_callback()
    1
    2
    3
    4
    5
    6
    7
    
    static inline void init_waitqueue_func_entry(wait_queue_t *q,
                        wait_queue_func_t func)
    {
        q->flags = 0;
        q->private = NULL;
        q->func = func;
    }
    注意
    正常情况下,q->private 指向当前用户进程,当 socket 有数据到达时唤醒用户进程,但这里的 socket 是交给 epoll 管理的,因此 q->private 会被设置为 NULL。
  • [3]:完成最终关联。这里首先用 eppoll_entry->whead 缓存当前 socket 等待队列的头节点,然后将 eppoll_entry->base 指向传入的 epitem,最后调用 add_wait_queue()eppoll_entry->wait 移动到 socket 等待队列头节点,完成最终关联。

到这一步,相关的结构体之间的关系如下:

epitem 内核数据结构关系

3.2.3 插入红黑树

完成上述两个步骤后,将这个 epitem 插入红黑树,核心流程就完成了。红黑树使得 epoll 在查找效率、插入效率、内存开销等多个方面都更加均衡。具体的插入算法实现可以参考 《红黑树插入节点算法实现》

最终,这些内核数据结构的最终关系如下图所示:

epoll 内核数据结构关系

3.3 epoll_wait 等待事件就绪

epoll_wait 的逻辑非常简单:当调用它时,它会检查 eventpoll->rdllist 链表,查看其中是否有准备好的 socket,如果没有,它会创建一个等待项并将其加入到 eventpoll->wq 等待队列中,然后将自己阻塞。如下图所示:

epoll_wait 流程

其源码实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
		   int maxevents, long timeout)
{
    ...
	wait_queue_t wait;
    ...
fetch_events:
    // [1] 判断就绪队列上是否有事件就绪
	if (!ep_events_available(ep)) {
		// [2] 未就绪,则定义等待事件项并关联当前进程
		init_waitqueue_entry(&wait, current);
        // [3] 将等待事件项添加到 epoll 的等待队列上
        __add_wait_queue_exclusive(&ep->wq, &wait);
		for (;;) {
			...
            // [4] 让出 CPU,阻塞直到超时或触发软中断
			if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
				timed_out = 1;
    ...
    // [5] 判断是否存在就绪事件
    eavail = ep_events_available(ep);
    ...
    if (!res && eavail &&
        // [6] 向用户态发送就绪事件,就绪的事件会被转储到 events 数组
	    !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
		goto fetch_events;

	return res;
}
  • [1]:调用 ep_events_available() 判断就绪队列上是否存在已经就绪的 socket,这个队列上的数据是由上文注册的回调 ep_poll_callback() 插入的;
  • [2]:如果没有就绪事件,就调用 init_waitqueue_entry() 创建一个 epoll 等待队列项 wait_queue_t,同时把当前进程挂在 wait_queue_t->private 上;
  • [3]:将创建的 wait_queue_t 添加到 eventpoll->wq 上;
  • [4]:让出 CPU,阻塞直到超时或数据就绪发送中断;
  • [5]:判断是否存在就绪事件;
  • [6]:当前进程被唤醒,且存在就绪事件时,调用 ep_send_events 将绪的事件转储到用户态 events 数组。

    ep_send_events 方法内部会调用 __copy_to_user 将数据复制到用户空间。

3.4 事件就绪时的唤醒过程

在前边执行 epoll_ctl 监听事件时,内核为每个 socket 都添加了一个注册了 ep_poll_callback() 回调的等待队列项。在调用 epoll_wait 陷入阻塞前,又在 eventpoll 对象的等待队列中添加了指向当前多路复用线程的等待项。

各种 epoll 相关的队列
  • 在 socket 的等待队列中,其回调函数是 ep_poll_callback(),另外其 private 不需要唤醒进程,所以指向了 NULL;
  • eventpoll 的等待队列中,其回调函数是 default_wake_function(),其 private 指针指向等待事件就绪的用户进程,即控制 epoll 的多路复用进程。

当 socket 有数据到达时,内核会查找 socket 等待队列元素上的回调函数,并交给系统的软中断去调用,ep_poll_callback() 的实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
	...
    // [1] 获取 wait 节点关联的 epitem
	struct epitem *epi = ep_item_from_wait(wait);
    // [2] 获取 epitem 所属的 eventpoll
	struct eventpoll *ep = epi->ep;
    ...
    // [3] 将当前 epitem 添加到 eventpoll 的就绪队列
	if (!ep_is_linked(&epi->rdllink)) {
		list_add_tail(&epi->rdllink, &ep->rdllist);
		ep_pm_stay_awake_rcu(epi);
	}
    // [4] 查看 eventpoll 的等待队列上是否有等待的进程
	if (waitqueue_active(&ep->wq))
		wake_up_locked(&ep->wq); // 如果有,则唤醒
	...
	return 1;
}
  • [1]:获取就绪的 wait 节点所关联的 epitem
  • [2]:获取 epitem 所属的 eventpoll
  • [3]:将 epitem 添加到 eventpoll 的就绪队列;
  • [4]:调用 default_wake_function() 唤醒调用 epoll_wait 的进程。

一旦该线程被唤醒,便会使用上文介绍的 ep_send_events() 函数,将就绪队列的数据转储到用户态的 events 数组中,以供用户态进程 直接 使用。

至此,整个 epoll 的运行逻辑就介绍完了。



欢迎关注我的公众号,第一时间获取文章更新:

微信公众号

相关内容