从内核看epoll的实现(基于5.9.9)

2024-03-27 21:08
文章标签 实现 内核 5.9 epoll

本文主要是介绍从内核看epoll的实现(基于5.9.9),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言:epoll是现代服务器的基石,也是高效处理大量请求的利器,从设计上来看,epoll的设计思想也是非常优秀的,本文介绍epoll的实现,从中我们不仅看到epoll的实现原理和机制,同时也能领略到其中优秀的设计思想。

epoll的使用非常简单,主要是几个API,下面我们一个个分析。

1 epoll_create

epoll_create是创建epoll实例的API,对使用方来说,epoll是一个黑盒子,我们通过操作系统提供的API,拿到一个实例(黑盒子)之后,就可以往里面注册我们想要监听的fd和事件,条件满足的时候,epoll就会通知我们,下面我们看看epoll_create的实现。

SYSCALL_DEFINE1(epoll_create1, int, flags)
{return do_epoll_create(flags);
}SYSCALL_DEFINE1(epoll_create, int, size)
{if (size <= 0)return -EINVAL;return do_epoll_create(0);
}

我们看到epoll_create有两个版本,其中epoll_create1多支持了flags参数,比如设置非阻塞模式,两个API具体的区别不大。接下来我们看do_epoll_create。

static int do_epoll_create(int flags)
{int error, fd;struct eventpoll *ep = NULL;struct file *file;// 只支持CLOEXECif (flags & ~EPOLL_CLOEXEC)return -EINVAL;// 分配一个eventpollerror = ep_alloc(&ep);// 获取一个空闲文件描述符fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));// 获取一个file,并且关联eventpoll_fops和上下文epfile = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC));// ep和file关联起来,上面是file和ep关联ep->file = file;// 关联fd和filefd_install(fd, file);return fd;
}

我们看到do_epoll_create的实现非常简单,主要是创建了一个eventpoll结构体,eventpoll结构体比较复杂。下面列出核心的字段。

struct eventpoll {struct mutex mtx;// 阻塞在该epoll的进程队列wait_queue_head_t wq;// 当epoll被另一个epoll监听时需要使用poll_wait记录阻塞在该epoll的队列wait_queue_head_t poll_wait;// 就绪队列struct list_head rdllist;rwlock_t lock;// 红黑树根节点struct rb_root_cached rbr;// 记录epitem的单链表struct epitem *ovflist;struct wakeup_source *ws;// 创建该epoll的用户信息struct user_struct *user;// epoll对应的filestruct file *file;
};

创建了一个eventpoll结构体后,接着申请了一个file和fd,并且把file和eventpoll关联起来,主要的作用是调用方后续可以通过fd操作eventpoll,架构如下。

2 epoll_ctl

epoll_ctl是操作epoll的总入口,也是非常复杂的开始,但是简单来说就是增删改的接口。

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,struct epoll_event __user *, event)
{struct epoll_event epds;// 判断是否需要复制数据,如果是删除则不需要,根据fd删除就行if (ep_op_has_event(op) &&copy_from_user(&epds, event, sizeof(struct epoll_event)))return -EFAULT;return do_epoll_ctl(epfd, op, fd, &epds, false);
}

epoll_ctl是对do_epoll_ctl的封装。

// 操作epoll
int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds,bool nonblock)
{int error;int full_check = 0;struct fd f, tf;struct eventpoll *ep;struct epitem *epi;struct eventpoll *tep = NULL;error = -EBADF;// 根据fd找到对应的数据结构f = fdget(epfd);// 获取被操作的文件描述符的数据结构tf = fdget(fd);error = -EPERM;// 资源有没有实现poll接口,使用epoll监听的资源需要实现poll钩子if (!file_can_poll(tf.file))goto error_tgt_fput;error = -EINVAL;// 保证被操作的fd不是自己,并且自己是epollif (f.file == tf.file || !is_file_epoll(f.file))goto error_tgt_fput;// 根据fd找到epoll数据结构ep = f.file->private_data;// 加锁epoll_mutex_lock(&ep->mtx, 0, nonblock);// 判断fd是否已经存在epoll的红黑树中epi = ep_find(ep, tf.file, fd);error = -EINVAL;switch (op) {// 新增case EPOLL_CTL_ADD:// 之前没有则可以新增,否则报错if (!epi) {epds->events |= EPOLLERR | EPOLLHUP;// 插入epollerror = ep_insert(ep, epds, tf.file, fd, full_check);} elseerror = -EEXIST;break;// 删除case EPOLL_CTL_DEL:// 存在则删除,否则报错if (epi)error = ep_remove(ep, epi);elseerror = -ENOENT;break;// 修改case EPOLL_CTL_MOD:// 存在则修改,否则报错if (epi) {if (!(epi->event.events & EPOLLEXCLUSIVE)) {epds->events |= EPOLLERR | EPOLLHUP;error = ep_modify(ep, epi, epds);}} elseerror = -ENOENT;break;}return error;
}

我们看到do_epoll_ctl主要首先通过两个fd拿到对应的epoll和资源,然后做了一些校验,接着根据操作类型做进一步处理,操作类型有增删改,我们只需要分析插入就行,这是epoll核心。

static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,struct file *tfile, int fd, int full_check)
{int error, pwake = 0;__poll_t revents;long user_watches;struct epitem *epi;struct ep_pqueue epq;lockdep_assert_irqs_enabled();// 监听的文件描述符个数user_watches = atomic_long_read(&ep->user->epoll_watches);// 超了if (unlikely(user_watches >= max_user_watches))return -ENOSPC;// 分配一个epitemif (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))return -ENOMEM;// 初始化INIT_LIST_HEAD(&epi->rdllink);INIT_LIST_HEAD(&epi->fllink);INIT_LIST_HEAD(&epi->pwqlist);// 所属的epollepi->ep = ep;// 保存fd和fileep_set_ffd(&epi->ffd, tfile, fd);// 记录订阅事件epi->event = *event;epi->nwait = 0;epi->next = EP_UNACTIVE_PTR;spin_lock(&tfile->f_lock);// 把epi插入所属file的队列list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links);spin_unlock(&tfile->f_lock);// 插入红黑树ep_rbtree_insert(ep, epi);error = -EINVAL;// 关联对应的epitemepq.epi = epi;// 初始化ep_pqueueinit_poll_funcptr(&epq.pt, ep_ptable_queue_proc);// 判断是否有事件触发了revents = ep_item_poll(epi, &epq.pt, 1);error = -ENOMEM;write_lock_irq(&ep->lock);// 事件触发了,并且还没有加入就绪队列则加入if (revents && !ep_is_linked(epi)) {list_add_tail(&epi->rdllink, &ep->rdllist);// 等待队列非空则唤醒阻塞在该epoll的队列if (waitqueue_active(&ep->wq))wake_up(&ep->wq);// 一个epoll被另一个监听,唤醒主epollif (waitqueue_active(&ep->poll_wait))pwake++;}// 一个epoll被另一个监听,唤醒主epollif (pwake)ep_poll_safewake(ep, NULL);write_unlock_irq(&ep->lock);// 监听数加一atomic_long_inc(&ep->user->epoll_watches);return 0;
}

插入操作的逻辑分为以下几个部分
1 分配一个epitem表示一个被epoll监听的项,插入红黑树。
2 判断当前被监听的fd订阅的事件是否触发了,即注册的时候,事件就触发了,是则插入就绪队列。
3 初始化并注册节点到资源对应的队列中。
1,2的逻辑是很自然的,执行完后的架构如下

我们重点来分析3,3也是epoll最核心的设计,也就是资源满足条件的时候是如何通知epoll的,核心代码如下。

struct ep_pqueue epq;
// 关联对应的epitem
epq.epi = epi;
// 把函数ep_ptable_queue_proc保存到epq.pt
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
// 判断是否有事件触发了
revents = ep_item_poll(epi, &epq.pt, 1);

我们看到上面代码初始化了一个ep_pqueue结构体,重点是把epitem关联到了ep_pqueue结构体中,后面会看到它的作用。我们看看ep_pqueue结构体的定义。

typedef struct poll_table_struct {// 函数指针poll_queue_proc _qproc;// unsigned__poll_t _key;
} poll_table;struct ep_pqueue {poll_table pt;struct epitem *epi;
};

上面代码执行完之后架构如下。

初始化完后接着看ep_item_poll函数。

static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt,int depth)
{struct eventpoll *ep;bool locked;pt->_key = epi->event.events;// 不是epoll,则执行钩子函数pollif (!is_file_epoll(epi->ffd.file))return vfs_poll(epi->ffd.file, pt) & epi->event.events;
}static inline __poll_t vfs_poll(struct file *file, struct poll_table_struct *pt)
{if (unlikely(!file->f_op->poll))return DEFAULT_POLLMASK;return file->f_op->poll(file, pt);
}

ep_item_poll的逻辑是主要是执行poll钩子函数。epoll是一种机制,支持epoll的其他模块,需要实现poll钩子函数。下面以eventfd为例。

static __poll_t eventfd_poll(struct file *file, poll_table *wait)
{struct eventfd_ctx *ctx = file->private_data;__poll_t events = 0;u64 count;/*核心逻辑,wqh是wait_queue_head_t结构体,即管理一个队列的结构体struct wait_queue_head {spinlock_t		lock;struct list_head	head;};*/poll_wait(file, &ctx->wqh, wait);// 判断当前触发的事件count = READ_ONCE(ctx->count);if (count > 0)events |= EPOLLIN;if (count == ULLONG_MAX)events |= EPOLLERR;if (ULLONG_MAX - 1 > count)events |= EPOLLOUT;return events;
}

eventfd的poll函数为eventfd_poll。eventfd_poll会判断当前触发的事件,如果恰好是调用方订阅的事件,则直接插入就绪队列。我们主要看poll_wait的逻辑,这是非常核心的逻辑。

/*file和p参数是被监听fd对应的数据结构wait_address是某个模块定义的数据结构,用于记录当前等待资源事件触发的节点
*/
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{if (p && p->_qproc && wait_address)p->_qproc(filp, wait_address, p);
}

poll_wait简单地调用_qproc函数。如果我们还有印象的话,可能会记得这个函数是ep_ptable_queue_proc。

// 具体的资源方调用
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,poll_table *pt)
{	// 获取pt关联的epitemstruct epitem *epi = ep_item_from_epqueue(pt);// 分配一个eppoll_entrystruct eppoll_entry *pwq;if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {// 初始化pwq,记录ep_poll_callback函数init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);pwq->whead = whead;// 关联的epitempwq->base = epi;// pwq插入whead队列,whead由具体资源提供,比如文件,管道,资源满足条件时会pwd对应的回调// 插入EPOLLEXCLUSIVE解决惊群if (epi->event.events & EPOLLEXCLUSIVE)add_wait_queue_exclusive(whead, &pwq->wait);elseadd_wait_queue(whead, &pwq->wait);// 插入关联的epi队列list_add_tail(&pwq->llink, &epi->pwqlist);epi->nwait++;} else {/* We have to signal that an error occurred */epi->nwait = -1;}
}

ep_ptable_queue_proc申请了一个eppoll_entry结构体,定义如下。

struct wait_queue_entry {unsigned int		flags;void			*private;wait_queue_func_t	func;struct list_head	entry;
};struct eppoll_entry {// 插入所属epitem节点的队列struct list_head llink;// 关联的epitemstruct epitem *base;// 插入资源等待队列的节点wait_queue_entry_t wait;// 指向资源等待队列的头指针所在结构体wait_queue_head_t *whead;
};

ep_ptable_queue_proc申请了eppoll_entry结构体并初始化后,插入资具体功能模块定义的队列中,架构如下。

我们看到调用方往epoll注册了fd和事件,epoll并没有自己去实现检测的逻辑,而是同样地注册一个节点到对应的底层资源,等待它的通知。

3 epoll_wait

注册完fd和事件后,我们就会执行epoll_wait等待事件的触发,虽然有时候我们epoll_wait的时候,事件已经触发了,但是很多情况下,事件往往是异步触发的,比如我们发送一个网络请求,等待响应的时候,下面我们来分析实现。

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,int, maxevents, int, timeout)
{return do_epoll_wait(epfd, events, maxevents, timeout);
}

epoll_wait是对do_epoll_wait的封装。

static int do_epoll_wait(int epfd, struct epoll_event __user *events,int maxevents, int timeout)
{int error;struct fd f;struct eventpoll *ep;/* 校验 */if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)return -EINVAL;//  通过fd拿到底层的数据结构f = fdget(epfd);error = -EINVAL;// 判断是不是epoll实例if (!is_file_epoll(f.file))goto error_fput;// 取得epoll的核心结构体ep = f.file->private_data;error = ep_poll(ep, events, maxevents, timeout);
}

do_epoll_wait逻辑也不多,主要是拿到epoll实例,继续看ep_poll。

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,int maxevents, long timeout)
{int res = 0, eavail, timed_out = 0;u64 slack = 0;wait_queue_entry_t wait;ktime_t expires, *to = NULL;// 设置了阻塞时间if (timeout > 0) {struct timespec64 end_time = ep_set_mstimeout(timeout);slack = select_estimate_accuracy(&end_time);to = &expires;*to = timespec64_to_ktime(end_time);} else if (timeout == 0) {// 0说明不阻塞timed_out = 1;write_lock_irq(&ep->lock);// 是否有就绪事件eavail = ep_events_available(ep);write_unlock_irq(&ep->lock);// 直接返回goto send_events;}fetch_events:// 是否有就绪事件eavail = ep_events_available(ep);// 有则通知用户if (eavail)goto send_events;do {/*初始化wait,保存上下文 => 当前进程,即当前进程插入epoll等待队列#define init_wait(wait)								\do {									\(wait)->private = current;					\(wait)->func = autoremove_wake_function;			\INIT_LIST_HEAD(&(wait)->entry);					\(wait)->flags = 0;						\} while (0)*/init_wait(&wait);write_lock_irq(&ep->lock);__set_current_state(TASK_INTERRUPTIBLE);// 是否有就绪队列eavail = ep_events_available(ep);// 没有但是当前有信号需要处理则返回EINTR,否则把当前进程加入队列if (!eavail) {if (signal_pending(current))res = -EINTR;else__add_wait_queue_exclusive(&ep->wq, &wait);}write_unlock_irq(&ep->lock);// 报错或者有就绪事件则breakif (eavail || res)break;// 阻塞当前进程if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS)) {timed_out = 1;break;}eavail = 1;} while (0);// 进程继续执行__set_current_state(TASK_RUNNING);// 当前进程还在队列(阻塞队列)则移除,因为进程被唤醒了if (!list_empty_careful(&wait.entry)) {write_lock_irq(&ep->lock);__remove_wait_queue(&ep->wq, &wait);write_unlock_irq(&ep->lock);}send_events:// 没有报错并且有就绪事件,通知用户if (!res && eavail &&!(res = ep_send_events(ep, events, maxevents)) && !timed_out)goto fetch_events;return res;
}

接下来我们看看ep_send_events的逻辑。

static int ep_send_events(struct eventpoll *ep,struct epoll_event __user *events, int maxevents)
{struct ep_send_events_data esed;// 定义保存触发的事件的结构体esed.maxevents = maxevents;esed.events = events;ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);return esed.res;
}

继续看ep_scan_ready_list。

static __poll_t ep_scan_ready_list(struct eventpoll *ep,__poll_t (*sproc)(struct eventpoll *,struct list_head *, void *),void *priv, int depth, bool ep_locked)
{__poll_t res;struct epitem *epi, *nepi;LIST_HEAD(txlist);// 把就绪队列移到txlistlist_splice_init(&ep->rdllist, &txlist);// 执行传进来的函数ep_send_events_procres = (*sproc)(ep, &txlist, priv);/*把剩下的移到就绪队列,ep_read_events_proc里面会移除txlist列表的节点,但是可能因为达到阈值,没有处理完。见ep_read_events_proc里面的esed->res >= esed->maxevents逻辑*/list_splice(&txlist, &ep->rdllist);return res;
}static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head,void *priv)
{struct ep_send_events_data *esed = priv;__poll_t revents;struct epitem *epi, *tmp;struct epoll_event __user *uevent = esed->events;struct wakeup_source *ws;poll_table pt;init_poll_funcptr(&pt, NULL);esed->res = 0;// 遍历就绪队列list_for_each_entry_safe(epi, tmp, head, rdllink) {if (esed->res >= esed->maxevents)break;// 移出就绪队列list_del_init(&epi->rdllink);// 触发的事件revents = ep_item_poll(epi, &pt, 1);// 写入调用方传入的结构体,返回0说明成功if (__put_user(revents, &uevent->events) ||__put_user(epi->event.data, &uevent->data)) {// 失败则插入队列中list_add(&epi->rdllink, head);return 0;}// 处理个数加一esed->res++;uevent++;// 设置了EPOLLONESHOT则清除订阅的事件if (epi->event.events & EPOLLONESHOT)epi->event.events &= EP_PRIVATE_BITS;// 没有设置水平触发则重新插入,下次epoll_wait继续触发,边缘触发模式则只会触发一次else if (!(epi->event.events & EPOLLET)) {list_add_tail(&epi->rdllink, &ep->rdllist);}}return 0;
}

ep_send_events_proc主要是把触发的事件复制给调用方,并且根据工作模式和设置的属性对该次事件做进一步处理。至此,epoll的核心逻辑貌似分析完了,但是我们似乎遗留了一个重要的地方,那就是就绪队列的节点是谁又是什么时候插入的呢?

4 事件就绪

我们接着看资源有事件触发的时候是如何通知epoll的。这里以eventfd的eventfd_write为例,即写入的时候。

static ssize_t eventfd_write(struct file *file, const char __user *buf, size_t count,loff_t *ppos)
{struct eventfd_ctx *ctx = file->private_data;ssize_t res;__u64 ucnt;spin_lock_irq(&ctx->wqh.lock);res = -EAGAIN;// 可写if (ULLONG_MAX - ctx->count > ucnt)res = sizeof(ucnt);// 写成功if (likely(res > 0)) {ctx->count += ucnt;// 队列非空,即操作epoll时,epoll注册的节点if (waitqueue_active(&ctx->wqh))// ”唤醒“它,有数据可写wake_up_locked_poll(&ctx->wqh, EPOLLIN);}spin_unlock_irq(&ctx->wqh.lock);return res;
}

eventfd_write写入数据后,会通知等待该资源的节点,我们看看wake_up_locked_poll。

#define wake_up_locked_poll(x, m)						\__wake_up_locked_key((x), TASK_NORMAL, poll_to_key(m))void __wake_up_locked_key(struct wait_queue_head *wq_head, unsigned int mode, void *key)
{__wake_up_common(wq_head, mode, 1, 0, key, NULL);
}static int __wake_up_common(struct wait_queue_head *wq_head, unsigned int mode,int nr_exclusive, int wake_flags, void *key,wait_queue_entry_t *bookmark)
{wait_queue_entry_t *curr, *next;int cnt = 0;// 遍历队列list_for_each_entry_safe_from(curr, next, &wq_head->head, entry) {unsigned flags = curr->flags;int ret;// 执行回调ret = curr->func(curr, mode, wake_flags, key);if (ret < 0)break;// 设置了WQ_FLAG_EXCLUSIVE则只会回调一个,nr_exclusive是1if (ret && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)break;}return nr_exclusive;
}

如果我们有印象,这里执行的回调函数是ep_poll_callback

// 条件满足时,具体资源回调
static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
{// 拿到wait关联的epitemstruct epitem *epi = ep_item_from_wait(wait);// 再拿到epitem关联的epollstruct eventpoll *ep = epi->ep;// 触发的事件__poll_t pollflags = key_to_poll(key);unsigned long flags;int ewake = 0;// 还没插入队列if (!ep_is_linked(epi)) {// 插入就绪队列if (list_add_tail_lockless(&epi->rdllink, &ep->rdllist))ep_pm_stay_awake_rcu(epi);}// 唤醒阻塞到epoll的进程队列if (waitqueue_active(&ep->wq)) {// “唤醒“阻塞在epoll的进程wake_up(&ep->wq);}
}

ep_poll_callback会调用wake_up唤醒阻塞到epoll的进程,我们看看wake_up。

#define wake_up(x)			__wake_up(x, TASK_NORMAL, 1, NULL)void __wake_up(struct wait_queue_head *wq_head, unsigned int mode,int nr_exclusive, void *key)
{__wake_up_common_lock(wq_head, mode, nr_exclusive, 0, key);
}static void __wake_up_common_lock(struct wait_queue_head *wq_head, unsigned int mode,int nr_exclusive, int wake_flags, void *key)
{unsigned long flags;wait_queue_entry_t bookmark;bookmark.flags = 0;bookmark.private = NULL;bookmark.func = NULL;INIT_LIST_HEAD(&bookmark.entry);// 这里只会执行一次do {spin_lock_irqsave(&wq_head->lock, flags);nr_exclusive = __wake_up_common(wq_head, mode, nr_exclusive,wake_flags, key, &bookmark);spin_unlock_irqrestore(&wq_head->lock, flags);} while (bookmark.flags & WQ_FLAG_BOOKMARK);
}

核心逻辑是__wake_up_common。__wake_up_common函数的代码刚才已经贴过,但是因为这个函数的逻辑很重要,这里再简单贴一下。

static int __wake_up_common(struct wait_queue_head *wq_head, unsigned int mode,int nr_exclusive, int wake_flags, void *key,wait_queue_entry_t *bookmark)
{wait_queue_entry_t *curr, *next;int cnt = 0;// 头指针所在结构体curr = list_first_entry(&wq_head->head, wait_queue_entry_t, entry);// 遍历队列list_for_each_entry_safe_from(curr, next, &wq_head->head, entry) {unsigned flags = curr->flags;int ret;// 执行回调ret = curr->func(curr, mode, wake_flags, key);// 设置了WQ_FLAG_EXCLUSIVE则只执行一次,即只唤醒一个进程,解决惊群问题if (ret && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)break;}return nr_exclusive;
}

那么这里的回调又是什么呢?如果我还记得init_wait函数的话,就会知道,init_wait函数只在epoll_wait的时候执行的,用于记录阻塞于epoll的进程队列。

/*初始化wait,保存上下文 => 当前进程,即当前进程插入epoll等待队列#define init_wait(wait)								\do {									\(wait)->private = current;					\(wait)->func = autoremove_wake_function;			\INIT_LIST_HEAD(&(wait)->entry);					\(wait)->flags = 0;						\} while (0)*/
init_wait(&wait);

我们看到函数是autoremove_wake_function。

int autoremove_wake_function(struct wait_queue_entry *wq_entry, unsigned mode, int sync, void *key)
{int ret = default_wake_function(wq_entry, mode, sync, key);if (ret)list_del_init_careful(&wq_entry->entry);return ret;
}int default_wake_function(wait_queue_entry_t *curr, unsigned mode, int wake_flags,void *key)
{WARN_ON_ONCE(IS_ENABLED(CONFIG_SCHED_DEBUG) && wake_flags & ~WF_SYNC);// curr->private为进程pcb即task_structreturn try_to_wake_up(curr->private, mode, wake_flags);
}static int
try_to_wake_up(struct task_struct *p, unsigned int state, int wake_flags)
{ttwu_runnable(p, wake_flags)}static int ttwu_runnable(struct task_struct *p, int wake_flags)
{ttwu_do_wakeup(rq, p, wake_flags, &rf);
}static void ttwu_do_wakeup(struct rq *rq, struct task_struct *p, int wake_flags,struct rq_flags *rf)
{p->state = TASK_RUNNING;
}

autoremove_wake_function流程很长,最终设置进程为就绪状态。

5 监听epoll

监听epoll,这不仅是个非常有意思的功能,同时是一个很有意思的思想。把epoll本身抽象为一种资源。但是这种场景貌似还没有见过。下面我们看一下实现。epoll可以被epoll监听,也可以被poll(早期的io复用机制)监听。不过这种场景貌似很少,有epoll,为什么还会用poll呢?我能想到的场景就是业务代码早期用poll实现的,后期有了epoll,又不能改旧代码,所以就用poll来监听epoll,anyway,我们大概先了解一下实现,被epoll和poll监听是两种不同的情况,虽然代码是一样的,我们分来看。

5.1 poll监听epoll

要被poll监听,就需要实现poll钩子,我们从epoll实现的poll钩子ep_eventpoll_poll开始分析。

static __poll_t ep_eventpoll_poll(struct file *file, poll_table *wait)
{// 被监听的epollstruct eventpoll *ep = file->private_data;int depth = 0;// 熟悉的操作poll_wait(file, &ep->poll_wait, wait);// 判断当前有没有事件触发return ep_scan_ready_list(ep, ep_read_events_proc,&depth, depth, false);
}

poll_wait我们已经分析过了就不再分析,ep_scan_ready_list我们也分析过了,主要逻辑是在里面执行函数,ep_read_events_proc,我们看一下ep_read_events_proc是如何判断被监听的epoll中是否有事件触发的。

static __poll_t ep_read_events_proc(struct eventpoll *ep, struct list_head *head,void *priv)
{struct epitem *epi, *tmp;poll_table pt;int depth = *(int *)priv;init_poll_funcptr(&pt, NULL);depth++;// 遍历就绪队列list_for_each_entry_safe(epi, tmp, head, rdllink) {// ep_item_poll判断epitem中实现有事件触发if (ep_item_poll(epi, &pt, depth)) {return EPOLLIN | EPOLLRDNORM;} }return 0;
}

5.2 epoll监听epoll

epoll监听epoll和epoll监听一般的fd是一样的,区别在于插入的时候,poll逻辑的实现。具体逻辑在ep_item_poll。

static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt,int depth)
{struct eventpoll *ep;bool locked;pt->_key = epi->event.events;// 不是epoll,则执行钩子函数pollif (!is_file_epoll(epi->ffd.file))return vfs_poll(epi->ffd.file, pt) & epi->event.events;// 在epoll里监听另一个epoll,即epitem的fd是另一个epoll对应的fd// 是epoll则首先取得原始epoll的核心数据结构eventpollep = epi->ffd.file->private_data;// 执行pt中的函数poll_wait(epi->ffd.file, &ep->poll_wait, pt);locked = pt && (pt->_qproc == ep_ptable_queue_proc);// 判断是否有事件触发return ep_scan_ready_list(epi->ffd.file->private_data,ep_read_events_proc, &depth, depth,locked) & epi->event.events;
}

我们看到ep_item_poll中做了一个判断,被poll的是epoll还是非epoll,是epoll的时候则进入另一种逻辑,不过操作和一般fd的情况是一样的,区别只是操作的具体数据结构。这里的逻辑看起来似乎可以放到ep_eventpoll_poll里,但是内核开发者没有这样做。这部分就先不深入分析,因为我们主要是要理解epoll的基础原理。

6 实现支持epoll机制的模块

最后我们实现一个简单的支持epoll机制的模块。该模块实现了一种通知机制,逻辑非常简单,如果值为0则可写,非0则可读,并通过这个条件约束进程的状态。实现进程的简单通信,具体可参考eventfd机制。

struct demo_context {wait_queue_head_t head;unsigned int count;
};static ssize_t demo_read(struct kiocb *iocb, struct iov_iter *to)
{struct file *file = iocb->ki_filp;struct demo_context *ctx = file->private_data;spin_lock_irq(&ctx->head.lock);for (;;) {if (ctx->count == 0) {spin_unlock_irq(&ctx->head.lock);// 阻塞} else {break;}spin_lock_irq(&ctx->head.lock);}unsigned int ucnt = ctx->count;ctx->count = 0;if (waitqueue_active(&ctx->wqh))wake_up_locked_poll(&ctx->wqh, EPOLLOUT);spin_unlock_irq(&ctx->head.lock);if (unlikely(copy_to_iter(&ucnt, sizeof(ucnt), to) != sizeof(ucnt)))return -EFAULT;return sizeof(ucnt);
}static __poll_t demo_poll(struct file *file, poll_table *wait)
{struct demo_context *ctx = file->private_data;__poll_t events = 0;unsigned int count;poll_wait(file, &ctx->head, wait);count = READ_ONCE(ctx->count);if (count == 0)events |= EPOLLOUT;elseevents |= EPOLLIN;return events;
}static ssize_t demo_write(struct file *file, const char __user *buf, size_t count,loff_t *ppos)
{struct demo_context *ctx = file->private_data;ssize_t res;unsigned int ucnt;if (copy_from_user(&ucnt, buf, sizeof(ucnt)))return -EFAULT;spin_lock_irq(&ctx->head.lock);for (;;) {if (ctx->count != 0) {spin_unlock_irq(&ctx->head.lock);// 阻塞} else {break;}spin_lock_irq(&ctx->head.lock);}ctx->count = ucnt;if (waitqueue_active(&ctx->wqh))wake_up_locked_poll(&ctx->wqh, EPOLLIN);spin_unlock_irq(&ctx->head.lock);return res;
}static const struct file_operations demo_fops = {.poll		= demo_poll,.read_iter	= demo_read,.write		= demo_write,
};static int do_demo(unsigned int count)
{struct demo_context *ctx = kmalloc(sizeof(*ctx), GFP_KERNEL);int fd = get_unused_fd_flags(flags);struct file *file = anon_inode_getfile("[demo]", &demo_fops, ctx, flags);;init_waitqueue_head(&ctx->head);ctx->count = count;fd_install(fd, file);return fd;
}SYSCALL_DEFINE1(demo, unsigned int, count)
{return do_demo(count);
}

至此,真的分析完了,epoll的代码一共2522行,但是还涉及了操作系统中的很多代码,是非常复杂的模块,epoll不做具体的处理逻辑,他只是提供一种机制,遵循这种机制的资源(实现poll钩子),都可以被监听。我们看到epoll的代码不仅复杂,而且关系非常绕,在epoll中,有几个概念我们需要了解。
1 进程
2 资源(比如网络、管道、eventfd)
3 epoll
4 epitem(管理一个被监听的项)
我们看看他们的关系。

后记:epoll作为一种机制,作用远远超过了它的代码量,存量和以后新增的模块都可以使用这种机制。比如管道、TCP、新增的eventfd等等。从中我们也看到了epoll本身的一些知识,比如他为什么高效、水平触发和边缘触发、epoll本身如何解决惊群现象。也看到了在简单的API使用下是如此复杂的实现。另外更有意思的是epoll也支持监听另外一个epoll,因为epoll也可以被当作一种资源。最后,本文不是epoll的全部,因为涉及的细节实在太多,感兴趣的同学可以自己研究一下,网上也有很多优秀的文章。

这篇关于从内核看epoll的实现(基于5.9.9)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/853384

相关文章

使用Python实现IP地址和端口状态检测与监控

《使用Python实现IP地址和端口状态检测与监控》在网络运维和服务器管理中,IP地址和端口的可用性监控是保障业务连续性的基础需求,本文将带你用Python从零打造一个高可用IP监控系统,感兴趣的小伙... 目录概述:为什么需要IP监控系统使用步骤说明1. 环境准备2. 系统部署3. 核心功能配置系统效果展

Python实现微信自动锁定工具

《Python实现微信自动锁定工具》在数字化办公时代,微信已成为职场沟通的重要工具,但临时离开时忘记锁屏可能导致敏感信息泄露,下面我们就来看看如何使用Python打造一个微信自动锁定工具吧... 目录引言:当微信隐私遇到自动化守护效果展示核心功能全景图技术亮点深度解析1. 无操作检测引擎2. 微信路径智能获

Python中pywin32 常用窗口操作的实现

《Python中pywin32常用窗口操作的实现》本文主要介绍了Python中pywin32常用窗口操作的实现,pywin32主要的作用是供Python开发者快速调用WindowsAPI的一个... 目录获取窗口句柄获取最前端窗口句柄获取指定坐标处的窗口根据窗口的完整标题匹配获取句柄根据窗口的类别匹配获取句

在 Spring Boot 中实现异常处理最佳实践

《在SpringBoot中实现异常处理最佳实践》本文介绍如何在SpringBoot中实现异常处理,涵盖核心概念、实现方法、与先前查询的集成、性能分析、常见问题和最佳实践,感兴趣的朋友一起看看吧... 目录一、Spring Boot 异常处理的背景与核心概念1.1 为什么需要异常处理?1.2 Spring B

Python位移操作和位运算的实现示例

《Python位移操作和位运算的实现示例》本文主要介绍了Python位移操作和位运算的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录1. 位移操作1.1 左移操作 (<<)1.2 右移操作 (>>)注意事项:2. 位运算2.1

如何在 Spring Boot 中实现 FreeMarker 模板

《如何在SpringBoot中实现FreeMarker模板》FreeMarker是一种功能强大、轻量级的模板引擎,用于在Java应用中生成动态文本输出(如HTML、XML、邮件内容等),本文... 目录什么是 FreeMarker 模板?在 Spring Boot 中实现 FreeMarker 模板1. 环

Qt实现网络数据解析的方法总结

《Qt实现网络数据解析的方法总结》在Qt中解析网络数据通常涉及接收原始字节流,并将其转换为有意义的应用层数据,这篇文章为大家介绍了详细步骤和示例,感兴趣的小伙伴可以了解下... 目录1. 网络数据接收2. 缓冲区管理(处理粘包/拆包)3. 常见数据格式解析3.1 jsON解析3.2 XML解析3.3 自定义

SpringMVC 通过ajax 前后端数据交互的实现方法

《SpringMVC通过ajax前后端数据交互的实现方法》:本文主要介绍SpringMVC通过ajax前后端数据交互的实现方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价... 在前端的开发过程中,经常在html页面通过AJAX进行前后端数据的交互,SpringMVC的controll

Spring Security自定义身份认证的实现方法

《SpringSecurity自定义身份认证的实现方法》:本文主要介绍SpringSecurity自定义身份认证的实现方法,下面对SpringSecurity的这三种自定义身份认证进行详细讲解,... 目录1.内存身份认证(1)创建配置类(2)验证内存身份认证2.JDBC身份认证(1)数据准备 (2)配置依

利用python实现对excel文件进行加密

《利用python实现对excel文件进行加密》由于文件内容的私密性,需要对Excel文件进行加密,保护文件以免给第三方看到,本文将以Python语言为例,和大家讲讲如何对Excel文件进行加密,感兴... 目录前言方法一:使用pywin32库(仅限Windows)方法二:使用msoffcrypto-too