linux下I/O模型并发的epoll多进程池协程实现

2024-06-23 20:44

本文主要是介绍linux下I/O模型并发的epoll多进程池协程实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

方法1

主要思路:

  1. 定义了一个EventData结构体,用于存储事件相关的数据,如文件描述符、epoll 文件描述符、协程 ID 等。
  2. EchoDeal函数用于处理请求消息,并生成响应消息。
  3. handlerClient函数是协程的执行函数,用于处理客户端连接。它通过循环读取数据、解析请求、执行业务处理、发送响应等步骤,实现了对客户端请求的处理。
  4. handler函数是主函数,用于创建监听套接字、初始化 epoll、设置非阻塞模式、添加读事件等操作。然后进入一个循环,通过 epoll_wait 等待事件发生,并根据事件类型进行相应的处理,如接受新连接、处理客户端请求等。
  5. 在主函数中,通过 fork 创建多个子进程,每个子进程都执行handler函数,从而实现多进程并发处理。

示例代码: 

#include <arpa/inet.h>  // 包含网络地址转换相关的头文件
#include <assert.h>  // 包含断言相关的头文件
#include <fcntl.h>  // 包含文件控制相关的头文件
#include <netinet/in.h>  // 包含网络协议相关的头文件
#include <stdio.h>  // 包含标准输入输出相关的头文件
#include <stdlib.h>  // 包含标准库相关的头文件
#include <sys/epoll.h>  // 包含 epoll 相关的头文件
#include <sys/socket.h>  // 包含套接字相关的头文件
#include <unistd.h>  // 包含 Unix 标准相关的头文件#include <iostream>  // 包含 C++ 的输入输出流头文件#include "../coroutine.h"  // 包含自定义的协程相关头文件
#include "../epollctl.hpp"  // 包含自定义的 epoll 控制相关头文件struct EventData {  // 定义事件数据结构体EventData(int fd, int epoll_fd) : fd_(fd), epoll_fd_(epoll_fd){};  // 构造函数,初始化成员变量int fd_{0};  // 文件描述符int epoll_fd_{0};  // epoll 文件描述符int cid_{MyCoroutine::INVALID_ROUTINE_ID};  // 协程 IDMyCoroutine::Schedule *schedule_{nullptr};  // 协程调度器指针
};  // 结构体定义结束void EchoDeal(const std::string reqMessage, std::string &respMessage) { respMessage = reqMessage; }  // 处理请求并生成响应的函数void handlerClient(void *arg) {  // 处理客户端连接的函数EventData *eventData = (EventData *)arg;  // 获取事件数据指针auto releaseConn = [&eventData]() {  // 定义释放连接的匿名函数EchoServer::ClearEvent(eventData->epoll_fd_, eventData->fd_);  // 清除事件delete eventData;  // 释放内存};ssize_t ret = 0;  // 读取结果EchoServer::Codec codec;  // 编解码器对象std::string reqMessage;  // 请求消息std::string respMessage;  // 响应消息while (true) {  // 读操作循环uint8_t data[100];  // 数据缓冲区ret = read(eventData->fd_, data, 100);  // 尝试读取数据if (ret == 0) {  // 对端关闭连接perror("peer close connection");  // 打印错误信息releaseConn();  // 释放连接return;  // 函数返回}if (ret < 0) {  // 读取错误if (EINTR == errno) continue;  // 被中断,继续尝试if (EAGAIN == errno or EWOULDBLOCK == errno) {  // 无数据可读MyCoroutine::CoroutineYield(*eventData->schedule_);  // 让出 CPUcontinue;  // 继续下一次循环}perror("read failed");  // 打印读取失败的错误信息releaseConn();  // 释放连接return;  // 函数返回}codec.DeCode(data, ret);  // 解码数据if (codec.GetMessage(reqMessage)) {  // 获取完整请求break;  // 跳出循环}}// 执行到这里说明已经读取到一个完整的请求EchoDeal(reqMessage, respMessage);  // 处理请求生成响应EchoServer::Packet pkt;  // 数据包对象codec.EnCode(respMessage, pkt);  // 编码响应EchoServer::ModToWriteEvent(eventData->epoll_fd_, eventData->fd_, eventData);  // 切换为监听可写事件ssize_t sendLen = 0;  // 已发送长度while (sendLen!= pkt.Len()) {  // 写操作循环ret = write(eventData->fd_, pkt.Data() + sendLen, pkt.Len() - sendLen);  // 尝试写入if (ret < 0) {  // 写入错误if (EINTR == errno) continue;  // 被中断,继续尝试if (EAGAIN == errno or EWOULDBLOCK == errno) {  // 不可写MyCoroutine::CoroutineYield(*eventData->schedule_);  // 让出 CPUcontinue;  // 继续下一次循环}perror("write failed");  // 打印写入失败的错误信息releaseConn();  // 释放连接return;  // 函数返回}sendLen += ret;  // 更新已发送长度}releaseConn();  // 释放连接资源
}void handler(char *argv[]) {  // 主处理函数int sockFd = EchoServer::CreateListenSocket(argv[1], atoi(argv[2]), true);  // 创建监听套接字if (sockFd < 0) {  // 如果创建失败return;  // 函数返回}epoll_event events[2048];  // epoll 事件数组int epollFd = epoll_create(1024);  // 创建 epoll 实例if (epollFd < 0) {  // 如果创建失败perror("epoll_create failed");  // 打印错误信息return;  // 函数返回}EventData eventData(sockFd, epollFd);  // 创建事件数据对象EchoServer::SetNotBlock(sock

方法2:

思路:

  1. main 函数首先检查命令行参数数量是否正确。
  2. 然后通过循环创建子进程。
  3. 子进程调用 handler 函数。
  4. 在 handler 函数中,创建监听套接字和 epoll 实例,设置套接字为非阻塞并添加可读事件,初始化协程调度器。
  5. 进入一个无限循环,通过 epoll_wait 等待事件发生。
  6. 根据事件的类型进行处理:
    • 如果是监听套接字的事件,处理新的连接。
    • 对于客户端连接的事件,如果是第一次事件则创建协程并唤醒,否则唤醒已有的协程。
  7. 协程中的 handlerClient 函数处理客户端的读写操作。
#include <arpa/inet.h>
#include <assert.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>#include <iostream>#include "../coroutine.h"
#include "../epollctl.hpp"// 定义事件数据结构体
struct EventData {EventData(int fd, int epoll_fd) : fd_(fd), epoll_fd_(epoll_fd){};  // 构造函数int fd_{0};  // 文件描述符int epoll_fd_{0};  // epoll 文件描述符int cid_{MyCoroutine::INVALID_ROUTINE_ID};  // 协程 IDMyCoroutine::Schedule *schedule_{nullptr};  // 协程调度器指针
};// 处理请求并生成响应的函数
void EchoDeal(const std::string reqMessage, std::string &respMessage) { respMessage = reqMessage; }// 处理客户端的函数
void handlerClient(void *arg) {EventData *eventData = (EventData *)arg;  // 获取事件数据指针auto releaseConn = [&eventData]() {  // 定义释放连接的匿名函数EchoServer::ClearEvent(eventData->epoll_fd_, eventData->fd_);delete eventData;  // 释放内存};ssize_t ret = 0;  // 读取结果EchoServer::Codec codec;  // 编解码器对象std::string reqMessage;  // 请求消息std::string respMessage;  // 响应消息while (true) {  // 读操作循环uint8_t data[100];  // 数据缓冲区ret = read(eventData->fd_, data, 100);  // 尝试读取数据if (ret == 0) {  // 对端关闭连接perror("peer close connection");releaseConn();return;}if (ret < 0) {  // 读取错误if (EINTR == errno) continue;  // 被中断,继续尝试if (EAGAIN == errno or EWOULDBLOCK == errno) {  // 无数据可读MyCoroutine::CoroutineYield(*eventData->schedule_);  // 让出 CPUcontinue;}perror("read failed");releaseConn();return;}codec.DeCode(data, ret);  // 解码数据if (codec.GetMessage(reqMessage)) {  // 获取完整请求break;}}// 执行到这里说明已经读取到一个完整的请求EchoDeal(reqMessage, respMessage);  // 处理请求生成响应EchoServer::Packet pkt;codec.EnCode(respMessage, pkt);  // 编码响应EchoServer::ModToWriteEvent(eventData->epoll_fd_, eventData->fd_, eventData);  // 切换为监听可写事件ssize_t sendLen = 0;  // 已发送长度while (sendLen!= pkt.Len()) {  // 写操作循环ret = write(eventData->fd_, pkt.Data() + sendLen, pkt.Len() - sendLen);  // 尝试写入if (ret < 0) {  // 写入错误if (EINTR == errno) continue;  // 被中断,继续尝试if (EAGAIN == errno or EWOULDBLOCK == errno) {  // 不可写MyCoroutine::CoroutineYield(*eventData->schedule_);  // 让出 CPUcontinue;}perror("write failed");releaseConn();return;}sendLen += ret;  // 更新已发送长度}releaseConn();  // 释放连接资源
}// 主处理函数
void handler(char *argv[]) {int sockFd = EchoServer::CreateListenSocket(argv[1], atoi(argv[2]), true);  // 创建监听套接字if (sockFd < 0) {return;}epoll_event events[2048];  // epoll 事件数组int epollFd = epoll_create(1024);  // 创建 epoll 实例if (epollFd < 0) {perror("epoll_create failed");return;}EventData eventData(sockFd, epollFd);  // 创建事件数据对象EchoServer::SetNotBlock(sockFd);  // 设置套接字为非阻塞EchoServer::AddReadEvent(epollFd, sockFd, &eventData);  // 添加可读事件MyCoroutine::Schedule schedule;  // 协程调度器MyCoroutine::ScheduleInit(schedule, 10000);  // 初始化协程池int msec = -1;  // 超时时间while (true) {int num = epoll_wait(epollFd, events, 2048, msec);  // 等待 epoll 事件if (num < 0) {  // 等待失败perror("epoll_wait failed");continue;} else if (num == 0) {  // 超时无事件sleep(0);  // 让出 CPUmsec = -1;  // 下次超时时间设置为 -1continue;}msec = 0;  // 有事件,下次超时时间设置为 0for (int i = 0; i < num; i++) {  // 处理事件EventData *eventData = (EventData *)events[i].data.ptr;if (eventData->fd_ == sockFd) {  // 是监听套接字的事件EchoServer::LoopAccept(sockFd, 2048, [epollFd](int clientFd) {EventData *eventData = new EventData(clientFd, epollFd);EchoServer::SetNotBlock(clientFd);EchoServer::AddReadEvent(epollFd, clientFd, eventData);  // 处理新连接});continue;}if (eventData->cid_ == MyCoroutine::INVALID_ROUTINE_ID) {  // 第一次事件if (MyCoroutine::CoroutineCanCreate(schedule)) {  // 可以创建协程eventData->schedule_ = &schedule;eventData->cid_ = MyCoroutine::CoroutineCreate(schedule, handlerClient, eventData, 0);  // 创建协程MyCoroutine::CoroutineResumeById(schedule, eventData->cid_);  // 唤醒协程} else {std::cout << "MyCoroutine is full" << std::endl;}} else {MyCoroutine::CoroutineResumeById(schedule, eventData->cid_);  // 唤醒已有协程}}MyCoroutine::ScheduleTryReleaseMemory(schedule);  // 尝试释放协程内存}
}int main(int argc, char *argv[]) {if (argc!= 3) {  // 检查命令行参数数量std::cout << "invalid input" << std::endl;std::cout << "example:./EpollReactorProcessPoolCoroutine 0.0.0.0 1688" << std::endl;return -1;}for (int i = 0; i < EchoServer::GetNProcs(); i++) {  // 循环创建子进程pid_t pid = fork();  // 创建进程if (pid < 0) {  // 创建失败perror("fork failed");continue;}if (0 == pid) {  // 子进程handler(argv);  // 处理客户端请求exit(0);}}while (true) sleep(1);  // 父进程进入死循环return 0;
}

总的来说,程序通过多进程和协程的结合,实现了对客户端连接的处理和高效的 I/O 操作。

这篇关于linux下I/O模型并发的epoll多进程池协程实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1088233

相关文章

Flutter实现文字镂空效果的详细步骤

《Flutter实现文字镂空效果的详细步骤》:本文主要介绍如何使用Flutter实现文字镂空效果,包括创建基础应用结构、实现自定义绘制器、构建UI界面以及实现颜色选择按钮等步骤,并详细解析了混合模... 目录引言实现原理开始实现步骤1:创建基础应用结构步骤2:创建主屏幕步骤3:实现自定义绘制器步骤4:构建U

SpringBoot中四种AOP实战应用场景及代码实现

《SpringBoot中四种AOP实战应用场景及代码实现》面向切面编程(AOP)是Spring框架的核心功能之一,它通过预编译和运行期动态代理实现程序功能的统一维护,在SpringBoot应用中,AO... 目录引言场景一:日志记录与性能监控业务需求实现方案使用示例扩展:MDC实现请求跟踪场景二:权限控制与

Android实现定时任务的几种方式汇总(附源码)

《Android实现定时任务的几种方式汇总(附源码)》在Android应用中,定时任务(ScheduledTask)的需求几乎无处不在:从定时刷新数据、定时备份、定时推送通知,到夜间静默下载、循环执行... 目录一、项目介绍1. 背景与意义二、相关基础知识与系统约束三、方案一:Handler.postDel

windows和Linux使用命令行计算文件的MD5值

《windows和Linux使用命令行计算文件的MD5值》在Windows和Linux系统中,您可以使用命令行(终端或命令提示符)来计算文件的MD5值,文章介绍了在Windows和Linux/macO... 目录在Windows上:在linux或MACOS上:总结在Windows上:可以使用certuti

使用Python实现IP地址和端口状态检测与监控

《使用Python实现IP地址和端口状态检测与监控》在网络运维和服务器管理中,IP地址和端口的可用性监控是保障业务连续性的基础需求,本文将带你用Python从零打造一个高可用IP监控系统,感兴趣的小伙... 目录概述:为什么需要IP监控系统使用步骤说明1. 环境准备2. 系统部署3. 核心功能配置系统效果展

Python实现微信自动锁定工具

《Python实现微信自动锁定工具》在数字化办公时代,微信已成为职场沟通的重要工具,但临时离开时忘记锁屏可能导致敏感信息泄露,下面我们就来看看如何使用Python打造一个微信自动锁定工具吧... 目录引言:当微信隐私遇到自动化守护效果展示核心功能全景图技术亮点深度解析1. 无操作检测引擎2. 微信路径智能获

Java并发编程之如何优雅关闭钩子Shutdown Hook

《Java并发编程之如何优雅关闭钩子ShutdownHook》这篇文章主要为大家详细介绍了Java如何实现优雅关闭钩子ShutdownHook,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起... 目录关闭钩子简介关闭钩子应用场景数据库连接实战演示使用关闭钩子的注意事项开源框架中的关闭钩子机制1.

Python中pywin32 常用窗口操作的实现

《Python中pywin32常用窗口操作的实现》本文主要介绍了Python中pywin32常用窗口操作的实现,pywin32主要的作用是供Python开发者快速调用WindowsAPI的一个... 目录获取窗口句柄获取最前端窗口句柄获取指定坐标处的窗口根据窗口的完整标题匹配获取句柄根据窗口的类别匹配获取句

Linux之systemV共享内存方式

《Linux之systemV共享内存方式》:本文主要介绍Linux之systemV共享内存方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、工作原理二、系统调用接口1、申请共享内存(一)key的获取(二)共享内存的申请2、将共享内存段连接到进程地址空间3、将

在 Spring Boot 中实现异常处理最佳实践

《在SpringBoot中实现异常处理最佳实践》本文介绍如何在SpringBoot中实现异常处理,涵盖核心概念、实现方法、与先前查询的集成、性能分析、常见问题和最佳实践,感兴趣的朋友一起看看吧... 目录一、Spring Boot 异常处理的背景与核心概念1.1 为什么需要异常处理?1.2 Spring B