基于多反应堆的高并发服务器【C/C++/Reactor】(中)处理任务队列中的任务

2024-01-03 15:12

本文主要是介绍基于多反应堆的高并发服务器【C/C++/Reactor】(中)处理任务队列中的任务,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 一、处理任务队列中的任务

(1)EventLoop启动 EventLoop初始化和启动

// 启动反应堆模型
int eventLoopRun(struct EventLoop* evLoop) {assert(evLoop != NULL);// 取出事件分发和检测模型struct Dispatcher* dispatcher = evLoop->dispatcher;// 比较线程ID是否正常if(evLoop->threadID != pthread_self()) {return -1;}// 循环进行事件处理while(!evLoop->isQuit) {dispatcher->dispatch(evLoop,2); // 超时时长 2s// 已续写eventLoopProcessTask(evLoop);}return 0;   
}

多加一句eventLoopProcessTask(evLoop); 理由在后文提到!

(2)添加任务到任务队列中(在EventLoop的任务队列中添加新任务)

// 添加任务到任务队列
int eventLoopAddTask(struct EventLoop* evLoop,struct Channel* channel,int type) {// 加锁,保护共享资源pthread_mutex_lock(&evLoop->mutex);// 创建新节点,后添加到任务队列中去struct ChannelElement* node = (struct ChannelElement*)malloc(sizeof(struct ChannelElement));node->channel = channel;node->type = type;node->next = NULL;// 链表为空if(evLoop->head == NULL) {evLoop->head = evLoop->tail = node;}else {evLoop->tail->next = node; // 添加evLoop->tail = node; // 后移}pthread_mutex_unlock(&evLoop->mutex);if(evLoop->threadID == pthread_self()) {// 当前子线程eventLoopProcessTask(evLoop);}else{// 主线程 -- 告诉子线程处理任务队列中的任务// 1.子线程在工作 2.子线程被阻塞了:select、poll、epolltaskWakeup(evLoop);}return 0;
}

小细节:假设说添加任务的是主线程,那么程序就会执行taskWakeup这个函数,主线程执行这个函数对于子线程来说两种情况

第一种情况,它正在干活,对于子线程没有影响,充其量就是它检测的那个集合里边多出来了一个被激活的文件描述符。

第二种情况,如果说此时子线程select、poll、或epoll_wait阻塞了,调用taskWakeup函数可以解除其阻塞。如果解除阻塞了,我们希望子线程干什么事情呢?

  1. 具体流程:因为主线程是在子线程的任务队列里添加了一个任务 先调用taskWakeup函数解除阻塞,让子线程解除阻塞是需要让它去处理任务。接着eventLoopRun函数中调用eventLoopProcessTask(evLoop);
  2. 详解流程:因为这个反应堆模型只要开始运行(eventLoopRun)就会不停的调用dispatch函数,这个dispatch是一个函数指针,底层指向的是poll模型的poll函数,select模型的select函数,epoll模型的epoll_wait函数。如果当前的子线程正在被刚才的提到的这三个函数里边的其中一个阻塞着,此时正好被主线程唤醒了(调用taskWakeup函数)。需要在循环进行事件处理中添加一句eventLoopProcessTask(evLoop);

taskWakeup函数的调用和影响

  1. 主线程调用taskWakeup函数时,子线程可能正在被select、poll、或epoll_wait阻塞
  2. 主线程在子线程的任务队列中添加任务,因此需要让子线程解除阻塞并处理队列中的任务

总结关于任务队列的处理有两个路径:

  1. 第一个路径:子线程往任务队列里边添加一个任务,比如说修改文件描述符里边的事件,肯定是子线程自己修改自己检测的文件描述符的事件,修改完了之后,子线程就直接调用这个函数去处理任务队列里边的任务。
  2. 第二个路径:主线程在子线程的任务队列里边添加了一个任务,主线程是处理不了的,并且主线程现在也不知道子线程是在工作还是在阻塞,所以主线程就默认子线程现在正在阻塞,因此主线程就调用了一个唤醒函数(taskWakeup)来解除阻塞,调用这个函数保证子线程肯定是在运行的,子线程eventLoopRun函数dispatch函数调用位置解除了阻塞,然后调用eventLoopProcessTask(evLoop);
// 启动反应堆模型
int eventLoopRun(struct EventLoop* evLoop) {...// 循环进行事件处理while(!evLoop->isQuit) {dispatcher->dispatch(evLoop,2); // 超时时长 2s// 已续写eventLoopProcessTask(evLoop);}return 0;   
}// 添加任务到任务队列
int eventLoopAddTask(struct EventLoop* evLoop,struct Channel* channel,int type) {...if(evLoop->threadID == pthread_self()) {// 当前子线程eventLoopProcessTask(evLoop);}else{// 主线程 -- 告诉子线程处理任务队列中的任务// 1.子线程在工作 2.子线程被阻塞了:select、poll、epolltaskWakeup(evLoop);}return 0;
}
  • EventLoop.h
// 处理任务队列中的任务
int eventLoopProcessTask(struct EventLoop* evLoop);// 处理dispatcher中的任务
int eventLoopAdd(struct EventLoop* evLoop,struct Channel* channel);
int eventLoopRemove(struct EventLoop* evLoop,struct Channel* channel);
int eventLoopModify(struct EventLoop* evLoop,struct Channel* channel);
  • EventLoop.c

eventLoopProcessTask函数的作用:它处理队列中的任务,需要遍历链表并根据type进行对应处理。

  1. 在加锁和解锁函数之间遍历链表
  2. 需要遍历链表,处理节点,并在处理完后从列表中删除节点
  3. 如果当前节点被处理完,需要移动head指针以处理下一个节点
  4. 需要定义temp指针来保存head指针指向的地址,并在head后移后释放temp指针指向的节点
// 处理任务队列中的任务
int eventLoopProcessTask(struct EventLoop* evLoop) {pthread_mutex_lock(&evLoop->mutex);// 取出头节点struct ChannelElement* head = evLoop->head;while (head!=NULL) {struct Channel* channel = head->channel;if(head->type == ADD) {// 添加eventLoopAdd(evLoop,channel);}else if(head->type == DELETE) {// 删除eventLoopRemove(evLoop,channel);}else if(head->type == MODIFY) {// 修改eventLoopModify(evLoop,channel);}struct ChannelElement* tmp = head;head = head->next;// 释放节点free(tmp);}evLoop->head = evLoop->tail = NULL;pthread_mutex_unlock(&evLoop->mutex);return 0;
}

注意事项

  1. 在写程序时,每个功能应对应一个任务函数,以提高程序逻辑性和可维护性
  2. 在处理任务队列时,需要注意线程安全和资源管理问题
// 处理dispatcher中的任务
int eventLoopAdd(struct EventLoop* evLoop,struct Channel* channel) {int fd = channel->fd;struct ChannelMap* channelMap = evLoop->channelMap;if(fd >= channelMap->size) {// 没有足够的空间存储键值对 fd->channel ==> 扩容if(!makeMapRoom(channelMap,fd,sizeof(struct Channel*))) {return -1;}}// 找到fd对应的数组元素位置,并存储if(channelMap->list[fd] == NULL) {channelMap->list[fd] = channel;evLoop->dispatcher->add(channel,evLoop);} return 0;
}int eventLoopRemove(struct EventLoop* evLoop,struct Channel* channel) {int fd = channel->fd;struct ChannelMap* channelMap = evLoop->channelMap;if(fd >= channelMap->size) {return -1;}int ret = evLoop->dispatcher->remove(channel,evLoop);return ret;
}int eventLoopModify(struct EventLoop* evLoop,struct Channel* channel) {int fd = channel->fd;struct ChannelMap* channelMap = evLoop->channelMap;if(fd >= channelMap->size || channelMap->list[fd] == NULL) {return -1;}int ret = evLoop->dispatcher->modify(channel,evLoop);return ret;
}

总结

  1. 讲解了任务队列处理函数的作用和实现细节,以及task wake up函数的调用和影响
  2. 强调了将功能分散到单独函数中的重要性,以提高程序的可读性和可维护性

这篇关于基于多反应堆的高并发服务器【C/C++/Reactor】(中)处理任务队列中的任务的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/566122

相关文章

从入门到精通C++11 <chrono> 库特性

《从入门到精通C++11<chrono>库特性》chrono库是C++11中一个非常强大和实用的库,它为时间处理提供了丰富的功能和类型安全的接口,通过本文的介绍,我们了解了chrono库的基本概念... 目录一、引言1.1 为什么需要<chrono>库1.2<chrono>库的基本概念二、时间段(Durat

C++20管道运算符的实现示例

《C++20管道运算符的实现示例》本文简要介绍C++20管道运算符的使用与实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录标准库的管道运算符使用自己实现类似的管道运算符我们不打算介绍太多,因为它实际属于c++20最为重要的

Visual Studio 2022 编译C++20代码的图文步骤

《VisualStudio2022编译C++20代码的图文步骤》在VisualStudio中启用C++20import功能,需设置语言标准为ISOC++20,开启扫描源查找模块依赖及实验性标... 默认创建Visual Studio桌面控制台项目代码包含C++20的import方法。右键项目的属性:

mysql中的服务器架构详解

《mysql中的服务器架构详解》:本文主要介绍mysql中的服务器架构,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、mysql服务器架构解释3、总结1、背景简单理解一下mysqphpl的服务器架构。2、mysjsql服务器架构解释mysql的架

Golang如何对cron进行二次封装实现指定时间执行定时任务

《Golang如何对cron进行二次封装实现指定时间执行定时任务》:本文主要介绍Golang如何对cron进行二次封装实现指定时间执行定时任务问题,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录背景cron库下载代码示例【1】结构体定义【2】定时任务开启【3】使用示例【4】控制台输出总结背景

c++中的set容器介绍及操作大全

《c++中的set容器介绍及操作大全》:本文主要介绍c++中的set容器介绍及操作大全,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录​​一、核心特性​​️ ​​二、基本操作​​​​1. 初始化与赋值​​​​2. 增删查操作​​​​3. 遍历方

解析C++11 static_assert及与Boost库的关联从入门到精通

《解析C++11static_assert及与Boost库的关联从入门到精通》static_assert是C++中强大的编译时验证工具,它能够在编译阶段拦截不符合预期的类型或值,增强代码的健壮性,通... 目录一、背景知识:传统断言方法的局限性1.1 assert宏1.2 #error指令1.3 第三方解决

在Golang中实现定时任务的几种高效方法

《在Golang中实现定时任务的几种高效方法》本文将详细介绍在Golang中实现定时任务的几种高效方法,包括time包中的Ticker和Timer、第三方库cron的使用,以及基于channel和go... 目录背景介绍目的和范围预期读者文档结构概述术语表核心概念与联系故事引入核心概念解释核心概念之间的关系

C++11委托构造函数和继承构造函数的实现

《C++11委托构造函数和继承构造函数的实现》C++引入了委托构造函数和继承构造函数这两个重要的特性,本文主要介绍了C++11委托构造函数和继承构造函数的实现,具有一定的参考价值,感兴趣的可以了解一下... 目录引言一、委托构造函数1.1 委托构造函数的定义与作用1.2 委托构造函数的语法1.3 委托构造函

C++11作用域枚举(Scoped Enums)的实现示例

《C++11作用域枚举(ScopedEnums)的实现示例》枚举类型是一种非常实用的工具,C++11标准引入了作用域枚举,也称为强类型枚举,本文主要介绍了C++11作用域枚举(ScopedEnums... 目录一、引言二、传统枚举类型的局限性2.1 命名空间污染2.2 整型提升问题2.3 类型转换问题三、C