epoll
前言
epoll是当今高性能服务器的基石,通过下面这张libevent的benchmark图,我们可以看到,这个基准测试衡量服务一百个活动连接(将写入链接到新连接)直到发生一千次写入和读取所需的时间,通过对比select、poll,可以看出epoll在处理多个连接时的显著优势。
本文主要分为三个部分
- 预备知识
- epoll系统调用流程简介
- epoll内核源码分析
我觉得理解epoll的关键在于理解epoll_ctl流程中怎么把等待事件 放在socket的等待队列中的,在epoll内核源码分析这一部分,我也是花了不少功夫才理清这个流程。
本文基于5.7版本的linux内核源码进行分析
本文使用的绘图工具是draw.io
本文用到的演示代码放在了blog_code
预备知识
在正式分析epoll之前,有一些预备知识需要学习记录,包括
- file operations
- 等待队列
- 红黑树 对于这些知识的介绍不是本文主题,网上也有不少的学习资料,这里我主要关注这些知识和epoll的联系。
file operations
1
2
3
4
5
6
7
struct file_operations {
loff_t (*llseek) (struct file *, loff_t, int);
ssize_t (*read) (struct file *, char __user *, size_t, loff_t *);
ssize_t (*write) (struct file *, const char __user *, size_t, loff_t *);
unsigned int (*poll) (struct file *, struct poll_table_struct *);
} __randomize_layout;
以上是file operations的部分成员展示,在linux中万物皆文件,file operations定义了文件操作的接口,而具体的文件驱动程序则要实现这些接口。不是所有文件类型都支持select/poll,一个原因就是这些文件没有实现file_operation的poll。poll函数有两个作用
- 将当前线程加入到等待队列,等待唤醒
- 返回此时发生的事件,POLLIN,POLLOUT等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
struct hello_device
{
char data[128];
int len;
wait_queue_head_t rq, wq;
struct semaphore sem;
struct cdev cdev;
} hello_device;
struct file_operations hello_fops = {
.read = hello_read,
.write = hello_write,
.poll = hello_poll
};
static unsigned int hello_poll(struct file *filp, poll_table *wait)
{
unsigned int mask = 0;
struct hello_device *dev = filp->private_data;
printk("")
poll_wait(filp, &dev->rq, wait);
poll_wait(filp, &dev->wq, wait);
down(&dev->sem);
if (dev->len > 0)
{
mask |= POLLIN | POLLRDNORM; /*标示数据可获得*/
}
if (dev->len != 128)
{
mask |= POLLOUT | POLLWRNORM; /*标示数据可写入*/
}
up(&dev->sem);
return mask;
}
以上是一个字符设备驱动代码的一部分,在其实现的poll中,可读、可写的pollIN、pollOUT标志就是根据字符设备buff中是否 有数据来返回的。
等待队列
等待队列将进程(task_struct)加入设备的等待队列,当有事件发生,唤醒进程。
这里提前剧透下,在epoll中,会涉及到两个等待队列,一个是epollevent的,用于阻塞epollwait的进程,另外一个则是socket对象上的,用于在软中断将数据放到socket接收队列之后,通知到epoll
红黑树
在epoll中红黑树被用来存储用户的监视的sock fd,这使得epoll在O(logn)的复杂度下完成查找、插入、删除的操作。这里红黑树的节点是epitem,是epoll中重要的数据结构,后面会进行详细介绍。epitem使用文件指针的值进行比较操作,只是红黑树要求有这么个大小比较操作而已,比较方法没什么实际含义。
IO多路复用及select/poll
在日常生活中,这样的模型其实随处可见,想象这样一个场景:我们在海底捞吃火锅。尽管海底捞以热情的服务著称,但是他也不能做到给每一桌客人都配备一个服务员。服务员在几个桌子之间辗转腾挪,这就是一种多路复用。
回到网络编程的世界中,IO多路复用变成了这样子。
多路:存在多个需要被服务(监听事件)的socket fd
复用:复用一个thread 同时为多个 fd 提供处理服务
epoll简单分析
epoll_create创建一个struct eventpoll内核对象,并把它加入到进程的已打开文件列表
epoll_ctl(ADD为例)创建一个红黑树节点epitem,添加到sock的等待队列中,这一部分流程中我觉得最绕 的就是将epoll模块的回调注册进sock的等待队列。
epoll_wait观察 eventpoll->rdllist 链表里有没有数据即可。 有数据就返回,没有数据就创建一个等待队列项,将其添加到 eventpoll 的等待队列上,然后阻塞自己
epoll内核源码解读
linux-5.7-rc4/include/linux/eventpoll.h
linux-5.7-rc4/fs/eventpoll.c
linux-5.7-rc4/include/uapi/linux/eventpoll.h
epoll主要数据结构介绍
eventpoll表示一个epoll实例,这个实例也是被file的private data所指向的对象。当 调用epoll_create的时候,我们创建这么一个实例。 位于linux-5.7-rc4/fs/eventpoll.c
1
2
3
4
5
6
7
8
9
10
11
12
struct eventpoll {
/* Wait queue used by sys_epoll_wait() */
wait_queue_head_t wq;
/* RB tree root used to store monitored fd structs */
struct rb_root_cached rbr;
/* List of ready file descriptors */
struct list_head rdllist;
struct file *file;
};
这里我只将主要成员列出来了,锁等其他成员没列出来。
- wq:等待的进程队列
- rbr:红黑树,除了根之外每个节点都是epitem
- rdllist:事件就绪的socket fd
- eventpoll的匿名文件,file里面的private data指向eventpoll
epitem是一个我们感兴趣的sock fd实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
struct epitem {
union {
/* RB tree node links this structure to the eventpoll RB tree */
struct rb_node rbn;
/* Used to free the struct epitem */
struct rcu_head rcu;
};
/* List header used to link this structure to the eventpoll ready list */
struct list_head rdllink;
/* The file descriptor information this item refers to */
struct epoll_filefd ffd;
/* List containing poll wait queues */
struct list_head pwqlist;
/* The structure that describe the interested events and the source fd */
struct epoll_event event;
};
- union:红黑树节点
- rdllink:链接到eventpoll里面的rdllist的指针
- ffd:感兴趣(监听)的fd
- pwqlist:等待事件回调队列。当数据进入网卡,底层中断执行 ep_poll_callback。
- event:用户关注的事件,可读/可写等
epoll模块初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
static int __init eventpoll_init(void)
{
...
/* Allocates slab cache used to allocate "struct epitem" items */
epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),
0, SLAB_HWCACHE_ALIGN|SLAB_PANIC|SLAB_ACCOUNT, NULL);
/* Allocates slab cache used to allocate "struct eppoll_entry" */
pwq_cache = kmem_cache_create("eventpoll_pwq",
sizeof(struct eppoll_entry), 0, SLAB_PANIC|SLAB_ACCOUNT, NULL);
return 0;
}
fs_initcall(eventpoll_init);
epoll模块的初始化函数使用eventpoll_init进行了初始化,这里的fs_initcall是一个用于初始化内核功能模块的宏,作用和我们写内核模块 用到的module_init差不多,本文不对这个宏做讨论,更详细的解释可见fs_initcall
在以上初始化过程中,使用了kmem_cache_create提前开辟了epi和pwq的内存池,这在后续的操作中可以节省malloc的时间,提高效率。
epoll_create
1
2
3
4
5
6
7
8
9
10
11
12
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
return do_epoll_create(flags);
}
SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0)
return -EINVAL;
return do_epoll_create(0);
}
以上是在对应于用户空间epoll_create的系统调用,不管是epoll_create还是epoll_create,最后都是调用的do_epoll_create。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/*
* Open an eventpoll file descriptor.
*/
static int do_epoll_create(int flags)
{
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;
/* Check the EPOLL_* constant for consistency. */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
/*
* Create the internal data structure ("struct eventpoll").
*/
error = ep_alloc(&ep);
if (error < 0)
return error;
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
if (fd < 0) {
error = fd;
goto out_free_ep;
}
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (IS_ERR(file)) {
error = PTR_ERR(file);
goto out_free_fd;
}
ep->file = file;
fd_install(fd, file);
return fd;
out_free_fd:
put_unused_fd(fd);
out_free_ep:
ep_free(ep);
return error;
}
- 给eventpoll分配内存并初始化
- 使用get_unused_fd_flags获取一个未使用文件描述符
- anon_inode_getfile分配一个新的文件结构对象(struct file *),是个匿名文件,将fs与file结构绑定,并将private data指向eventpoll。
- eventpoll的file指针指向这个匿名文件
- 把fd和匿名文件file绑定
epoll_ctl
这里先贴一张流程中是怎么把ep_poll_callback作为sock等待队列回调函数的,我们可以看到 这个流程绕了一大圈,就是为了能在sock_poll执行epoll自己模块的ep_ptable_queue_proc,而ep_ptable_queue_proc 里面我们才将ep_poll_callback挂进了等待队列里。
我们先来看下用户空间的epoll_ctl
1
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
epfd: epoll 实例的文件描述符,通过 epoll_create 或 epoll_create1 获得。
op: 控制操作,可以是以下三者之一:
- EPOLL_CTL_ADD: 将文件描述符 fd 添加到 epoll 实例中。
- EPOLL_CTL_MOD: 修改文件描述符 fd 在 epoll 实例中的事件关注方式。
- EPOLL_CTL_DEL: 从 epoll 实例中删除文件描述符 fd。
fd: 要进行操作的文件描述符
event: 指向一个 struct epoll_event 结构的指针,定义了关注的事件类型和相关的数据
接下来我们以EPOLL_CTL_ADD为例,对epoll_ctl的内核源码进行分析
1
2
3
4
5
6
7
8
9
10
11
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
struct epoll_event epds;
if (ep_op_has_event(op) &&
copy_from_user(&epds, event, sizeof(struct epoll_event)))
return -EFAULT;
return do_epoll_ctl(epfd, op, fd, &epds, false);
}
将事件从用户态拷贝到内核态之后,调用do_epoll_ctl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds,
bool nonblock)
{
int error;
int full_check = 0;
struct fd f, tf;
struct eventpoll *ep;
struct epitem *epi;
struct eventpoll *tep = NULL;
error = -EBADF;
f = fdget(epfd);
if (!f.file)
goto error_return;
/* Get the "struct file *" for the target file */
tf = fdget(fd);
/*
* At this point it is safe to assume that the "private_data" contains
* our own data structure.
*/
ep = f.file->private_data;
/*
* Try to lookup the file inside our RB tree, Since we grabbed "mtx"
* above, we can be sure to be able to use the item looked up by
* ep_find() till we release the mutex.
*/
epi = ep_find(ep, tf.file, fd);
error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
epds->events |= EPOLLERR | EPOLLHUP;
error = ep_insert(ep, epds, tf.file, fd, full_check);
} else
error = -EEXIST;
if (full_check)
clear_tfile_check_list();
break;
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
if (epi) {
if (!(epi->event.events & EPOLLEXCLUSIVE)) {
epds->events |= EPOLLERR | EPOLLHUP;
error = ep_modify(ep, epi, epds);
}
} else
error = -ENOENT;
break;
}
return error;
}
do_epoll_ctl中主要做这些事
- 合法性检查,例如是否是epoll文件,监听文件是否支持poll等,这种代码太多了,所以在上面删去了
- 调用ep_insert
可以看出来,实际的操作应该都在调用ep_insert中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/*
* Must be called with "mtx" held.
*/
static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
struct file *tfile, int fd, int full_check)
{
int error, pwake = 0;
__poll_t revents;
long user_watches;
struct epitem *epi;
struct ep_pqueue epq;
/* Initialize the poll table using the queue callback */
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
/*
* Attach the item to the poll hooks and get current event bits.
* We can safely use the file* here because its usage count has
* been increased by the caller of this function. Note that after
* this operation completes, the poll callback can start hitting
* the new item.
*/
revents = ep_item_poll(epi, &epq.pt, 1);
/*
* We have to check if something went wrong during the poll wait queue
* install process. Namely an allocation for a wait queue failed due
* high memory pressure.
*/
error = -ENOMEM;
if (epi->nwait < 0)
goto error_unregister;
/* Add the current item to the list of active epoll hook for this file */
spin_lock(&tfile->f_lock);
list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links);
spin_unlock(&tfile->f_lock);
/*
* Add the current item to the RB tree. All RB tree operations are
* protected by "mtx", and ep_insert() is called with "mtx" held.
*/
ep_rbtree_insert(ep, epi);
/* We have to drop the new item inside our item list to keep track of it */
write_lock_irq(&ep->lock);
/* record NAPI ID of new item if present */
ep_set_busy_poll_napi_id(epi);
/* If the file is already "ready" we drop it inside the ready list */
if (revents && !ep_is_linked(epi)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
/* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq))
wake_up(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
write_unlock_irq(&ep->lock);
atomic_long_inc(&ep->user->epoll_watches);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(ep, NULL);
return 0;
}
初始化了一个ep_pqueue结构体,重点是把epitem关联到了ep_pqueue结构体中
1
2
3
4
5
6
7
8
9
10
11
typedef struct poll_table_struct {
// 函数指针
poll_queue_proc _qproc;
// unsigned
__poll_t _key;
} poll_table;
struct ep_pqueue {
poll_table pt;
struct epitem *epi;
};
ep_pqueue结构体定义如上
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt,
int depth)
{
struct eventpoll *ep;
bool locked;
pt->_key = epi->event.events;
if (!is_file_epoll(epi->ffd.file))
return vfs_poll(epi->ffd.file, pt) & epi->event.events;
// 监听epoll才会走到这
ep = epi->ffd.file->private_data;
poll_wait(epi->ffd.file, &ep->poll_wait, pt);
locked = pt && (pt->_qproc == ep_ptable_queue_proc);
return ep_scan_ready_list(epi->ffd.file->private_data,
ep_read_events_proc, &depth, depth,
locked) & epi->event.events;
}
static inline __poll_t vfs_poll(struct file *file, struct poll_table_struct *pt)
{
if (unlikely(!file->f_op->poll))
return DEFAULT_POLLMASK;
return file->f_op->poll(file, pt);
}
__poll_t tcp_poll(struct file *file, struct socket *sock, poll_table *wait) {
__poll_t mask;
struct sock *sk = sock->sk;
const struct tcp_sock *tp = tcp_sk(sk);
int state;
/* 添加等待队列和关联事件回调函数 ep_poll_callback
*(只有 epoll_ctl EPOLL_CTL_ADD 的情况下,才会添加等待事件,否则 wait == NULL)*/
sock_poll_wait(file, sock, wait);
// 检查 fd 是否有事件发生。
state = inet_sk_state_load(sk);
if (state == TCP_LISTEN)
return inet_csk_listen_poll(sk);
...
}
// socket.h
static inline void sock_poll_wait(struct file *filp, struct socket *sock, poll_table *p) {
// ep_insert 调用 ep_item_poll 才会插入等待事件。
if (!poll_does_not_wait(p)) {
poll_wait(filp, &sock->wq.wait, p);
...
}
}
// poll.h
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p) {
if (p && p->_qproc && wait_address)
// _qproc ---> ep_ptable_queue_proc
p->_qproc(filp, wait_address, p);
}
理解ep_item_poll这个函数对于我们理解epoll有很大帮助,这里详细展开讲讲。
这里可以看到对于epoll的嵌套处理,is_file_epoll用于判断当前的epi是不是针对epoll 我们这里关心监听socket的情况。调用vfs->poll其实就是sock_poll,这里我们假设就是tcp_poll,在tcp_poll中,继续调用了poll_wait,然后是p->_qproc,注意这里的_qproc就是我们之前在init_poll_funcptr 填的ep_ptable_queue_proc。
1
2
3
4
5
6
7
8
9
10
11
12
13
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct eppoll_entry *pwq;
f (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
//初始化回调方法
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
//将ep_poll_callback放入socket的等待队列whead(注意不是epoll的等待队列)
add_wait_queue(whead, &pwq->wait);
}
}
ep_ptable_queue_proc则会将ep_poll_callback放到sock的等待队列中去
- init_poll_funcptr:设置epq的回调函数为ep_ptable_queue_proc,当调用epoll_wait时会调用该回调函数
- ep_ptable_queue_proc:该函数内部所做的主要工作,就是把epitem对应fd的事件到来时的回调函数设置为ep_poll_callback。
- ep_poll_callback:主要工作就是把就绪的fd放到就绪链表rdllist上,然后唤醒epoll_wait的调用者,被唤醒的进程再把rdllist上就绪的fd的events拷贝给用户进程,
epoll_wait
epoll_wait所做的事情很简单,观察rdllist链表里有没有数据即可。有数据就返回,没有数据就创建一个等待队列项,将其添加到 eventpoll 的等待队列上,然后阻塞自己。
1
2
3
4
5
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
return do_epoll_wait(epfd, events, maxevents, timeout);
}
调用do_epoll_wait
1
2
3
4
5
6
7
8
9
10
11
12
/*
* Implement the event wait interface for the eventpoll file. It is the kernel
* part of the user space epoll_wait(2).
*/
static int do_epoll_wait(int epfd, struct epoll_event __user *events,
int maxevents, int timeout)
{
//合法性校验,省略
/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);
}
调用ep_poll
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res = 0, eavail, timed_out = 0;
u64 slack = 0;
bool waiter = false;
wait_queue_entry_t wait;
ktime_t expires, *to = NULL;
lockdep_assert_irqs_enabled();
fetch_events:
if (!ep_events_available(ep))
ep_busy_loop(ep, timed_out);
eavail = ep_events_available(ep);
if (eavail)
goto send_events;
/*
* Busy poll timed out. Drop NAPI ID for now, we can add
* it back in when we have moved a socket with a valid NAPI
* ID onto the ready list.
*/
ep_reset_busy_poll_napi_id(ep);
/*
* We don't have any available event to return to the caller. We need
* to sleep here, and we will be woken by ep_poll_callback() when events
* become available.
*/
if (!waiter) {
waiter = true;
init_waitqueue_entry(&wait, current);
write_lock_irq(&ep->lock);
__add_wait_queue_exclusive(&ep->wq, &wait);
write_unlock_irq(&ep->lock);
}
for (;;) {
/*
* We don't want to sleep if the ep_poll_callback() sends us
* a wakeup in between. That's why we set the task state
* to TASK_INTERRUPTIBLE before doing the checks.
*/
set_current_state(TASK_INTERRUPTIBLE);
/*
* Always short-circuit for fatal signals to allow
* threads to make a timely exit without the chance of
* finding more events available and fetching
* repeatedly.
*/
if (fatal_signal_pending(current)) {
res = -EINTR;
break;
}
eavail = ep_events_available(ep);
if (eavail)
break;
if (signal_pending(current)) {
res = -EINTR;
break;
}
if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS)) {
timed_out = 1;
break;
}
}
__set_current_state(TASK_RUNNING);
send_events:
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;
if (waiter) {
write_lock_irq(&ep->lock);
__remove_wait_queue(&ep->wq, &wait);
write_unlock_irq(&ep->lock);
}
return res;
}
1
2
3
4
static inline int ep_events_available(struct eventpoll *ep)
{
return !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
}
ep_events_available用于判断就绪链表中是否有可处理的事件
有数据的话我们可以自然可以去拿了,但这种情况体现不出阻塞epoll等待唤醒的情况, 所以对于这里的send_event我们不考虑
把当前进程添加到epoll的wait_queue中。关于linux网络收包时的sock回调细节,可以参考linux网络收包
最后看一下ep_poll_callback函数,这是我们给sock注册的回调函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;
__poll_t pollflags = key_to_poll(key);
unsigned long flags;
int ewake = 0;
read_lock_irqsave(&ep->lock, flags);
ep_set_busy_poll_napi_id(epi);
/*
* Wake up ( if active ) both the eventpoll wait list and the ->poll()
* wait list.
*/
if (waitqueue_active(&ep->wq)) {
if ((epi->event.events & EPOLLEXCLUSIVE) &&
!(pollflags & POLLFREE)) {
switch (pollflags & EPOLLINOUT_BITS) {
case EPOLLIN:
if (epi->event.events & EPOLLIN)
ewake = 1;
break;
case EPOLLOUT:
if (epi->event.events & EPOLLOUT)
ewake = 1;
break;
case 0:
ewake = 1;
break;
}
}
wake_up(&ep->wq);
}
if (waitqueue_active(&ep->poll_wait))
pwake++;
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(ep, epi);
return ewake;
}
这里主要做的事情就是去唤醒等待队列,于是返回到epoll_wait执行未完成的部分,也就是ep_send_events
1
2
3
4
5
6
7
8
9
10
11
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;
esed.maxevents = maxevents;
esed.events = events;
ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
return esed.res;
}
ep_send_events主要调用ep_scan_ready_list
1
2
3
4
5
6
7
8
9
10
11
12
13
14
static __poll_t ep_scan_ready_list(struct eventpoll *ep,
__poll_t (*sproc)(struct eventpoll *,
struct list_head *, void *),
void *priv, int depth, bool ep_locked)
{
__poll_t res;
struct epitem *epi, *nepi;
LIST_HEAD(txlist);
lockdep_assert_irqs_enabled();
res = (*sproc)(ep, &txlist, priv);
...
}
ep_scan_ready_list主要调用ep_send_events_proc,也就是上面代码里的函数指针
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
void *priv)
{
struct ep_send_events_data *esed = priv;
__poll_t revents;
struct epitem *epi, *tmp;
struct epoll_event __user *uevent = esed->events;
struct wakeup_source *ws;
poll_table pt;
init_poll_funcptr(&pt, NULL);
esed->res = 0;
lockdep_assert_held(&ep->mtx);
list_for_each_entry_safe(epi, tmp, head, rdllink) {
if (esed->res >= esed->maxevents)
break;
ws = ep_wakeup_source(epi);
if (ws) {
if (ws->active)
__pm_stay_awake(ep->ws);
__pm_relax(ws);
}
list_del_init(&epi->rdllink);
revents = ep_item_poll(epi, &pt, 1);
if (!revents)
continue;
if (__put_user(revents, &uevent->events) ||
__put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head);
ep_pm_stay_awake(epi);
if (!esed->res)
esed->res = -EFAULT;
return 0;
}
esed->res++;
uevent++;
if (epi->event.events & EPOLLONESHOT)
epi->event.events &= EP_PRIVATE_BITS;
else if (!(epi->event.events & EPOLLET)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
}
}
return 0;
}
这里主要就是遍历rdllink使用put_user将事件拷贝到用户空间,边缘触发,水平触发的区别在于边缘触发不会讲epitem重新加回ready list,水平触发的话,这个事件还是会加进就绪队列里。
结语
在阅读epoll源码的过程中,我收获了许多,尤其是在理解epoll_insert这一部分流程,我对等待队列,linux内核的面向对象设计,软中断处理网络数据包等方面知识都有了更加深刻的了解。
最后,我要深深感谢孟老师在网络程序设计这门课程上对于专业知识的讲授。在这门课程内容从网络前端到linux内核调试都有介绍,让我对网络程序设计知识方面的广度大大拓展了。