本文主要是介绍从内核看eventfd的实现(基于5.9.9),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
前言:eventfd是一种进程/线程通信的机制,他类似信号,不过eventfd只是一种通知机制,无法承载数据(eventfd承载的数据是8个字节),他的好处是简单并且只消耗一个fd。
我们先看个例子感受一下。
#include <sys/eventfd.h>
#include <unistd.h>
#include <inttypes.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdint.h> int main(int argc, char *argv[])
{int efd;uint64_t u;ssize_t s;// 创建一个eventfd实例efd = eventfd(0, 0);switch (fork()) {// 子进程case 0:u = 1;// 写端write(efd, &u, sizeof(uint64_t));exit(EXIT_SUCCESS);case -1: break;// 主进程default:// 睡一会,保证另一个进程写入sleep(2);// 读端s = read(efd, &u, sizeof(uint64_t));exit(EXIT_SUCCESS);}
}
我们看到例子比较简单,首先在主进程中创建一个eventfd实例,然后fork出子进程,这样主进程/子进程都指向该eventfd实例,因为文件描述符默认是被子进程继承的,架构如下。
下面我们从内核看一下eventfd的实现。
1 创建eventfd
SYSCALL_DEFINE2(eventfd2, unsigned int, count, int, flags)
{return do_eventfd(count, flags);
}SYSCALL_DEFINE1(eventfd, unsigned int, count)
{return do_eventfd(count, 0);
}
内核支持两个版本的eventfd函数,eventfd2是支持直接设置一些flags而不需要再额外调用其他函数。count是一个初始化值,一会我们会看到他的作用,接下来我们看do_eventfd。
static int do_eventfd(unsigned int count, int flags)
{struct eventfd_ctx *ctx;struct file *file;int fd;// 只支持三种flags (O_CLOEXEC | O_NONBLOCK | EFD_SEMAPHORE)if (flags & ~EFD_FLAGS_SET)return -EINVAL;// 分配一个eventfd_ctxctx = kmalloc(sizeof(*ctx), GFP_KERNEL);// 初始化kref_init(&ctx->kref);init_waitqueue_head(&ctx->wqh);// 初始值ctx->count = count;ctx->flags = flags;ctx->id = ida_simple_get(&eventfd_ida, 0, 0, GFP_KERNEL);flags &= EFD_SHARED_FCNTL_FLAGS;flags |= O_RDWR;// 获取可用的fdfd = get_unused_fd_flags(flags);// 分配一个file结构体,file和eventfd_fops(操作函数集)以及ctx关联起来file = anon_inode_getfile("[eventfd]", &eventfd_fops, ctx, flags);file->f_mode |= FMODE_NOWAIT;// 关联fd和filefd_install(fd, file);return fd;
err:eventfd_free_ctx(ctx);return fd;
}
do_eventfd主要是创建了一个eventfd_ctx结构体并初始化。我看看这个结构体。
struct eventfd_ctx {struct kref kref;wait_queue_head_t wqh;__u64 count;unsigned int flags;int id;
};
创建完结构体后,主要的逻辑是适配文件系统,首先申请了fd和file并关联起来,然后把file和eventfd_ctx关联起来,这样后续操作fd的时候,就可以通过fd找到file,从而找到对应的eventfd_ctx。另外还需要把操作函数集保存到file结构体中,这是VFS设计的要求。创建完之后,我们来看看写操作。
2 写eventfd
static ssize_t eventfd_write(struct file *file, const char __user *buf, size_t count, loff_t *ppos)
{ // 从file找到ctxstruct eventfd_ctx *ctx = file->private_data;ssize_t res;__u64 ucnt;if (copy_from_user(&ucnt, buf, sizeof(ucnt)))return -EFAULT;// 太大则报错if (ucnt == ULLONG_MAX)return -EINVAL;spin_lock_irq(&ctx->wqh.lock);res = -EAGAIN;// 还有空闲的大小可写if (ULLONG_MAX - ctx->count > ucnt)// 写入的字节数,write函数要求res = sizeof(ucnt);else if (!(file->f_flags & O_NONBLOCK)) {// 还没空闲大小可写并且是阻塞模式// 下面是阻塞进程的逻辑__add_wait_queue(&ctx->wqh, &wait);// 死循环for (res = 0;;) {set_current_state(TASK_INTERRUPTIBLE);// 直到有空闲大小可写if (ULLONG_MAX - ctx->count > ucnt) {res = sizeof(ucnt);break;}// 有信号处理则返回ERESTARTSYSif (signal_pending(current)) {res = -ERESTARTSYS;break;}spin_unlock_irq(&ctx->wqh.lock);// 进程调度,自己则进入阻塞schedule();spin_lock_irq(&ctx->wqh.lock);}// 条件满足,真正恢复运行__remove_wait_queue(&ctx->wqh, &wait);__set_current_state(TASK_RUNNING);}// 返回值大于0,则唤醒等待数据的进程if (likely(res > 0)) {// 累加到count,即当前的值ctx->count += ucnt;if (waitqueue_active(&ctx->wqh))wake_up_locked_poll(&ctx->wqh, EPOLLIN);}spin_unlock_irq(&ctx->wqh.lock);return res;
}
代码看起来很多,但是并不复杂,最核心的逻辑是把写入的值累加到当前的值中,然后通知等待者,剩下的就是条件不满足时的一些处理逻辑。接下来我们看读的逻辑。
3 读eventfd
static ssize_t eventfd_read(struct kiocb *iocb, struct iov_iter *to)
{struct file *file = iocb->ki_filp;struct eventfd_ctx *ctx = file->private_data;__u64 ucnt = 0;spin_lock_irq(&ctx->wqh.lock);// 没有值if (!ctx->count) {// 设置非阻塞则直接返回if ((file->f_flags & O_NONBLOCK) ||(iocb->ki_flags & IOCB_NOWAIT)) {spin_unlock_irq(&ctx->wqh.lock);return -EAGAIN;}// 否则进入阻塞逻辑__add_wait_queue(&ctx->wqh, &wait);// 死循环for (;;) {set_current_state(TASK_INTERRUPTIBLE);// 直到有值跳出if (ctx->count)break;/// 有信号则先返回if (signal_pending(current)) {__remove_wait_queue(&ctx->wqh, &wait);__set_current_state(TASK_RUNNING);spin_unlock_irq(&ctx->wqh.lock);return -ERESTARTSYS;}spin_unlock_irq(&ctx->wqh.lock);// 进程调度,自己则阻塞了schedule();spin_lock_irq(&ctx->wqh.lock);}// 灰度运行__remove_wait_queue(&ctx->wqh, &wait);__set_current_state(TASK_RUNNING);}// 读取数据eventfd_ctx_do_read(ctx, &ucnt);// 消费了数据,说明有空闲大小可写了,则唤醒等待者if (waitqueue_active(&ctx->wqh))wake_up_locked_poll(&ctx->wqh, EPOLLOUT);spin_unlock_irq(&ctx->wqh.lock);// 复制给调用方if (unlikely(copy_to_iter(&ucnt, sizeof(ucnt), to) != sizeof(ucnt)))return -EFAULT;return sizeof(ucnt);
}
读者和写者逻辑类似,我们主要看一下消费的逻辑。
static void eventfd_ctx_do_read(struct eventfd_ctx *ctx, __u64 *cnt)
{*cnt = (ctx->flags & EFD_SEMAPHORE) ? 1 : ctx->count;ctx->count -= *cnt;
}
当设置了EFD_SEMAPHORE标记的时候,消费一次count就会减去1,如果没有设置的话,会直接清0。
4 支持epoll机制
eventfd还有一个好处是支持epoll机制,即实现了poll钩子。我们看看具体实现。
static __poll_t eventfd_poll(struct file *file, poll_table *wait)
{struct eventfd_ctx *ctx = file->private_data;__poll_t events = 0;u64 count;count = READ_ONCE(ctx->count);// 大于0说明可消费,即可读if (count > 0)events |= EPOLLIN;// 等于ULLONG_MAX说明出错if (count == ULLONG_MAX)events |= EPOLLERR;// 小于ULLONG_MAX说明可写if (ULLONG_MAX - 1 > count)events |= EPOLLOUT;// 返回事件集合return events;
}
5 使用
最后我们看一下eventfd的使用,文章开头讲过,eventfd只是一种通知机制,无法承载过多数据,所以通常还需要另外维护一些数据结构,下面摘取一些Libuv的的代码,看看具体使用。
// 加锁uv_mutex_lock(&handle->cf_mutex);// 插入队列QUEUE_ADD(&handle->cf_events, events);// 解锁uv_mutex_unlock(&handle->cf_mutex);// 写eventfduv_async_send(handle->cf_cb);
我们再看一下uv_async_send的核心逻辑。
static const uint64_t val = 1;const void* buf = &val;ssize_t len = sizeof(val);int fd = loop->async_io_watcher.fd;write(fd, buf, len);
我们看到Libuv使用额外的队列维护了任务,并且通过互斥变量实现操作队列的逻辑,但是我们看到操作eventfd是不需要加锁的,因为内核已经帮我们处理了。
后记:我们看到eventfd的实现相对是比较简单的,多个进程/线程通过fd指向同一个file,然后file关联一个eventfd_ctx。多个进程/线程通过这个共同的eventfd_ctx实现通信。
这篇关于从内核看eventfd的实现(基于5.9.9)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!