网络编程 io_uring

2024-03-02 11:28
文章标签 编程 网络 io uring

本文主要是介绍网络编程 io_uring,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

io_uring

1、概述

io_uring是Linux(内核版本在5.1以后)在2019年加入到内核中的一种新型的异步I/O模型;

io_uring使用共享内存,解决高IOPS场景中的用户态和内核态的切换过程,减少系统调用;用户可以直接向共享内存提交要发起的I/O操作,内核线程可以直接获取共享内存中的I/O操作,并进行相应的读写操作;io_uring是一种proactor模式的网络架构;

  • Reactor 是非阻塞同步网络模式,感知的是就绪可读写事件。在每次感知到有事件发生(比如可读就绪事件)后,就需要应用进程主动调用 read 方法来完成数据的读取,也就是要应用进程主动将 socket 接收缓存中的数据读到应用进程内存中,这个过程是同步的,读取完数据后应用进程才能处理数据。

  • Proactor 是异步网络模式, 感知的是已完成的读写事件。在发起异步读写请求时,需要传入数据缓冲区的地址(用来存放结果数据)等信息,这样系统内核才可以自动帮我们把数据的读写工作完成,这里的读写工作全程由操作系统来做,并不需要像 Reactor 那样还需要应用进程主动发起 read/write 来读写数据,操作系统完成读写工作后,就会通知应用进程直接处理数据。

优点
  • 避免了提交I/O事件和完成事件中存在的内存拷贝(使用共享内存)

  • 减少的了I/O任务提交和完成事件任务是的系统调用过程

  • 采取无锁队列,减少了锁资源的竞争

主要内存结构
  • 提交队列(Submission Queue,SQ)连续的内存空间,环形队列,存放将要执行的I/O操作数据
  • 完成队列(Completion Queue, CQ)连续的内存空间,环形队列,存放执行完成I/O操作后的返回结果
  • 提交队列项数组提(Submission Queue Entry,SQE):方便通过环形缓冲区提交内存请求
2、主要接口

io_uring提供三个用户态的系统调用接口

  1. io_uring_setup:初始化一个新的io_uring对象,一个SQ和一个CQ,通过使用共享内存进行数据操作
  2. io_uring_register:注册用于异步I/O的文件或用户缓冲区(buffers)
  3. io_uring_enter:提交I/O任务,等待I/O完成

在这里插入图片描述

SQ和CQ保存的都是SQEs数据的索引,不是真正的请求,真实是请求保存在SQE数组中,在提交请求时可以批量提交一组SQE数值上不连续的请求;

SQ、CQ、SQE中的内存区域都是有内核进行分配的,用户初始化会返回对应的fd,通过fd进行mmap和内核共享内存空间;

3、第三方库

liburing通过对io_uring进行分装,提供了一个简单的API,通过一下命令可以安装该动态库

git clone https://github.com/axboe/liburing.git
cd liburing
./configure
make
sudo make install
sudo ldconfig #更新动态库连接缓存
4、主要使用流程
1. io_uring初始化

io_uring通过io_uring_setup函数初始化,在liburing库中,通过io_uring_queue_init_params函数进行初始化,创建sumbmit队列和complete队列,以及SQE内存数组;

//io_uring实现异步的方式
struct io_uring_params pragma;
memset(&pragma, 0, sizeof(pragma));
struct io_uring ring;
// 初始化io_uring 创建submit队列和complite队列
io_uring_queue_init_params(1024, &ring, &pragma);
2. io_uring 提交(注册)到SQ环形队列

io_uring通过io_uring_register函数提交(注册)到用于异步I/O的缓冲区中,在liburing中通过io_uring_prep_accept函数对io_uring_refister进行封装使用;

// 获取ringbuffer的头
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
connect_info_t accept_info = {sockfd, EVENT_ACCEPT};
// 注册一个I/O事件
io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)clientaddr, addrlen, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));
3. io_uring_enter 提交I/O

io_uring中通过io_uring_enter函数来提交I/O,并等待事件的完成;在liburing中通过io_uring_submit来提交SQE的读写请求,io_uring_wait_cqe来等待I/O的处理结果,io_uring_peek_batch_cqe来获取CQ中的处理结果;

 // 提交worker中执行
io_uring_submit(&ring);
struct io_uring_cqe *cqe;
//等待complete队列中的结果
io_uring_wait_cqe(&ring, &cqe);
struct io_uring_cqe *cqes[128];
// 获取CQ环形队列中的处理结果
int count = io_uring_peek_batch_cqe(&ring, cqes, 128);
5、实现

io_uring_server.c

#include <liburing.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <netinet/in.h>enum event_type {EVENT_ACCEPT,EVENT_READ,EVENT_WRITE
};typedef struct connect_info{int conn_fd;int event;
}connect_info_t;struct conn_info {int fd;int event;
};int init_server(unsigned short port) 
{   int sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0) {perror("socket");return -1;}struct sockaddr_in serveraddr;;serveraddr.sin_family = AF_INET;serveraddr.sin_port = htons(port);serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);if (bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)) < 0) {perror("bind error");return -1;}int opt = 1;if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {perror("setsockopt");return -1;}listen(sockfd, 10);return sockfd; 
}int set_event_recv(struct io_uring *ring, int sockfd, void *buf, int len, int flags)
{struct io_uring_sqe *sqe = io_uring_get_sqe(ring);connect_info_t accept_info = {sockfd, EVENT_READ};io_uring_prep_recv(sqe, sockfd, buf, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));printf("set event recv----\n");return 0;
}int set_event_send(struct io_uring *ring, int sockfd, const void *buf, int len, int flags)
{struct io_uring_sqe *sqe = io_uring_get_sqe(ring);connect_info_t accept_info = {sockfd, EVENT_WRITE};io_uring_prep_send(sqe, sockfd, buf, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));printf("set event send----\n");return 0;
}int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *clientaddr,socklen_t *addrlen, int flags) {// 获取sqestruct io_uring_sqe *sqe = io_uring_get_sqe(ring);// 初始化accept_infoconnect_info_t accept_info = {sockfd, EVENT_ACCEPT};// 准备accept操作io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)clientaddr, addrlen, flags);// 设置用户数据memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));printf("set event accept\n");return 0;
}int main(int argc, char *argv[])
{// 初始化服务器unsigned short port = 9999;// 初始化服务器int socketfd = init_server(port);if (socketfd < 0)return -1;//io_uring实现异步的方式struct io_uring_params pragma;// 初始化io_uring 创建submit队列和complite队列memset(&pragma, 0, sizeof(pragma));struct io_uring ring;io_uring_queue_init_params(1024, &ring, &pragma);struct sockaddr_in clientaddr;socklen_t addrlen = sizeof(struct sockaddr);// 提交到submit队列中set_event_accept(&ring, socketfd, (struct sockaddr*)&clientaddr, &addrlen, 0);char buffer[1024] = {0};while (1){// 提交worker中执行io_uring_submit(&ring);printf("complete\n");struct io_uring_cqe *cqe;//等待complete队列中的结果io_uring_wait_cqe(&ring, &cqe);printf("complete end\n");struct io_uring_cqe *cqes[128];int count = io_uring_peek_batch_cqe(&ring, cqes, 128);for (int i = 0; i < count; i++){struct io_uring_cqe *entries = cqes[i];connect_info_t result;//struct conn_info result;memcpy(&result, &entries->user_data, sizeof(connect_info_t));if (result.event == EVENT_ACCEPT) {// 设置读事件set_event_accept(&ring, socketfd, (struct sockaddr*)&clientaddr, &addrlen, 0);printf("accept success\n");int conn_fd = entries->res;printf("conn_fd = %d  res = %d\n", conn_fd, entries->res);// 设置读事件set_event_recv(&ring, conn_fd, buffer, 1024,0);}else if (result.event == EVENT_READ){int ret = entries->res;printf("set_event_recv ret: %d, %s\n", ret, buffer);if (ret == 0){close(result.conn_fd);continue;}else if (ret > 0){// 设置写事件set_event_send(&ring, result.conn_fd, buffer, ret,0);}printf("read success\n");}else if (result.event == EVENT_WRITE){int ret = entries->res;set_event_recv(&ring, result.conn_fd, buffer, 1024,0);printf("write success\n");}}io_uring_cq_advance(&ring, count);}return 0;
}

io_uring_test.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/time.h>#include <sys/socket.h>
#include <arpa/inet.h>#define TIMESUB_MS(tv1, tv2)  (((tv2).tv_sec - (tv1).tv_sec) * 1000 + ((tv2).tv_usec - (tv1).tv_usec) / 1000)
#define TEST_MESSAGE   "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyz\r\n"
#define RBUFFER_LENGTH 2048
#define WBUFFER_LENGTH 2048typedef struct test_conttext
{char server_ip[16];int server_port;int thread_num;int connection_num;int request_num;int fail_num;
} test_conttext_t;int send_recv_tcp(int sockfd)
{char wbuffer[WBUFFER_LENGTH];char rbuffer[RBUFFER_LENGTH];memset(wbuffer, 0, sizeof(wbuffer));memset(rbuffer, 0, sizeof(rbuffer));for (int i = 0; i < 8; i++){strcpy(wbuffer + i * strlen(TEST_MESSAGE), TEST_MESSAGE);}int res = send(sockfd, wbuffer, strlen(wbuffer), 0);if (res <= 0){return -1;}res = recv(sockfd, rbuffer, sizeof(rbuffer), 0);if (res <= 0){return -1;}if (strcmp(rbuffer, wbuffer) != 0){printf("failed: '%s' != '%s'\n", rbuffer, wbuffer);return -1;}return 0;
}int connect_tcpserver(char *ip, int port)
{int sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0){perror("socket");return -1;}struct sockaddr_in server_addr;server_addr.sin_family = AF_INET;server_addr.sin_port = htons(port);server_addr.sin_addr.s_addr = inet_addr(ip);if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0){perror("connect");close(sockfd);return -1;}return sockfd;
}static void *test_qps(void *arg)
{test_conttext_t *ctx = (test_conttext_t *)arg;int sockfd = connect_tcpserver(ctx->server_ip, ctx->server_port);if (sockfd < 0){printf("connect server failed\n");return NULL;}int conut = ctx->request_num / ctx->connection_num;int indx = 0;int res;while (indx++ < conut){res = send_recv_tcp(sockfd);if (res < 0){printf("send_recv_tcp failed\n");ctx->fail_num++;continue;}}return NULL;
}int main(int argc, char *argv[])
{int i;printf("----%d\n", argc);// for (i = 1; i < argc; i++)//     printf("%s\n", argv[i]);test_conttext_t ctx = {0};int opt;while ((opt = getopt(argc, argv, "s:p:t:c:n:")) != -1){switch (opt){case 's':strcpy(ctx.server_ip, optarg);printf("-s: %s\n", optarg);break;case 'p':ctx.server_port = atoi(optarg);printf("-p: %s\n", optarg);break;case 't':ctx.thread_num = atoi(optarg);printf("-t: %s\n", optarg);break;case 'c':ctx.connection_num = atoi(optarg);printf("-c: %s\n", optarg);break;case 'n':ctx.request_num = atoi(optarg);printf("-n: %s\n", optarg);break;default:return EXIT_FAILURE;}}pthread_t *threads = (pthread_t *)malloc(sizeof(pthread_t) * ctx.thread_num);struct timeval start, end;gettimeofday(&start, NULL);for (i = 0; i < ctx.thread_num; i++){printf("thread %d pthread_create\n", i);pthread_create(&threads[i], NULL, test_qps, &ctx);}for (i = 0; i < ctx.thread_num; i++){pthread_join(threads[i], NULL);printf("thread %d finished\n", i);}gettimeofday(&end, NULL);int time_used = TIMESUB_MS(start, end);printf("success :%d, failed:%d,  time used: %d , qps %d\n", ctx.request_num-ctx.fail_num, ctx.fail_num, time_used, ctx.request_num * 1000 / time_used);free(threads);return EXIT_SUCCESS;
}

这篇关于网络编程 io_uring的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/765965

相关文章

Linux中压缩、网络传输与系统监控工具的使用完整指南

《Linux中压缩、网络传输与系统监控工具的使用完整指南》在Linux系统管理中,压缩与传输工具是数据备份和远程协作的桥梁,而系统监控工具则是保障服务器稳定运行的眼睛,下面小编就来和大家详细介绍一下它... 目录引言一、压缩与解压:数据存储与传输的优化核心1. zip/unzip:通用压缩格式的便捷操作2.

Go语言数据库编程GORM 的基本使用详解

《Go语言数据库编程GORM的基本使用详解》GORM是Go语言流行的ORM框架,封装database/sql,支持自动迁移、关联、事务等,提供CRUD、条件查询、钩子函数、日志等功能,简化数据库操作... 目录一、安装与初始化1. 安装 GORM 及数据库驱动2. 建立数据库连接二、定义模型结构体三、自动迁

如何解决Druid线程池Cause:java.sql.SQLRecoverableException:IO错误:Socket read timed out的问题

《如何解决Druid线程池Cause:java.sql.SQLRecoverableException:IO错误:Socketreadtimedout的问题》:本文主要介绍解决Druid线程... 目录异常信息触发场景找到版本发布更新的说明从版本更新信息可以看到该默认逻辑已经去除总结异常信息触发场景复

Linux网络配置之网桥和虚拟网络的配置指南

《Linux网络配置之网桥和虚拟网络的配置指南》这篇文章主要为大家详细介绍了Linux中配置网桥和虚拟网络的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 一、网桥的配置在linux系统中配置一个新的网桥主要涉及以下几个步骤:1.为yum仓库做准备,安装组件epel-re

python如何下载网络文件到本地指定文件夹

《python如何下载网络文件到本地指定文件夹》这篇文章主要为大家详细介绍了python如何实现下载网络文件到本地指定文件夹,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下...  在python中下载文件到本地指定文件夹可以通过以下步骤实现,使用requests库处理HTTP请求,并结合o

Python文件操作与IO流的使用方式

《Python文件操作与IO流的使用方式》:本文主要介绍Python文件操作与IO流的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、python文件操作基础1. 打开文件2. 关闭文件二、文件读写操作1.www.chinasem.cn 读取文件2. 写

Linux高并发场景下的网络参数调优实战指南

《Linux高并发场景下的网络参数调优实战指南》在高并发网络服务场景中,Linux内核的默认网络参数往往无法满足需求,导致性能瓶颈、连接超时甚至服务崩溃,本文基于真实案例分析,从参数解读、问题诊断到优... 目录一、问题背景:当并发连接遇上性能瓶颈1.1 案例环境1.2 初始参数分析二、深度诊断:连接状态与

Python 异步编程 asyncio简介及基本用法

《Python异步编程asyncio简介及基本用法》asyncio是Python的一个库,用于编写并发代码,使用协程、任务和Futures来处理I/O密集型和高延迟操作,本文给大家介绍Python... 目录1、asyncio是什么IO密集型任务特征2、怎么用1、基本用法2、关键字 async1、async

Java并发编程之如何优雅关闭钩子Shutdown Hook

《Java并发编程之如何优雅关闭钩子ShutdownHook》这篇文章主要为大家详细介绍了Java如何实现优雅关闭钩子ShutdownHook,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起... 目录关闭钩子简介关闭钩子应用场景数据库连接实战演示使用关闭钩子的注意事项开源框架中的关闭钩子机制1.

Qt实现网络数据解析的方法总结

《Qt实现网络数据解析的方法总结》在Qt中解析网络数据通常涉及接收原始字节流,并将其转换为有意义的应用层数据,这篇文章为大家介绍了详细步骤和示例,感兴趣的小伙伴可以了解下... 目录1. 网络数据接收2. 缓冲区管理(处理粘包/拆包)3. 常见数据格式解析3.1 jsON解析3.2 XML解析3.3 自定义