协程(coroutine)应用实例:计时器过期事件响应

2024-06-16 22:08

本文主要是介绍协程(coroutine)应用实例:计时器过期事件响应,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

协程应用实例:计时器过期事件响应

    • 1. 计时调度中心
    • 2.基于协程的事件处理

  早期我曾把弄过War3 的WE编辑器,算是我编程的启蒙教育了。其事件响应系统在我心中一直印象深刻,特别是每个事件都可以用等待函数延迟执行,昨天我看到了协程,心血来潮便写了个简陋的计时器响应机制。

1. 计时调度中心

  计时调度中心采用linux时间轮式设计,网上资源很全面,不做过多叙述。
  详见: 基于Linux内核的时间轮算法设计实现【附代码】(https://cloud.tencent.com/developer/article/1553274)

2.基于协程的事件处理

  通常,在这种同时处理多个事件的情况下,一般是用 多线程? 执行事件响应函数(回调函数)的。
  考察实际应用方面,更多的是简短的事件响应,这样一来大部分的时间开销都用在创建/销毁线程对象上面了。
  当然为了解决这个问题,一种线程池技术被提出来。但仍然存在线程超过一定阈值时,大量时间被浪费在线程切换上所带来的问题。
  基于以上几点考虑,我决定用协程执行事件响应处理,同时也方面了等待函数的实现(不需要阻塞线程,仅仅只需要在一段时间后再度调度)。
  当然如果一个线程同时处理成千上万的协程压力过大,这时候平分到几个线程区执行就可以了。

协程返回值类型 promise_type,用于和协程交互

class co_call
{
public:class promise_type{public:using value_type = size_t;public:promise_type &get_return_object(){return *this;}auto initial_suspend(){return std::experimental::suspend_always{};}auto final_suspend(){// suspend it to save the coroutine handle return std::experimental::suspend_always{};}void yield_value(value_type _Value){_CurrentValue = _Value;}auto return_value(value_type _Value){_CurrentValue = _Value;return std::experimental::suspend_always{}; // dont suspend it}value_type operator *(void) const noexcept{return _CurrentValue;}public:value_type _CurrentValue;};using value_type = promise_type::value_type;
public:explicit co_call(promise_type &_Prom): _Coro(::std::experimental::coroutine_handle<promise_type>::from_promise(_Prom)),_Value(*_Prom){}co_call() = default;co_call(co_call const &) = delete;co_call(co_call &&_Right) : _Coro(_Right._Coro), _Value(_Right._Value){_Right._Coro = nullptr;_Right._Value = 0;}~co_call(){if (_Coro) {_Coro.destroy();}}
public:_NODISCARD value_type resume(){if (_Coro) {_Coro.resume();_Value = *_Coro.promise();if (_Coro.done() || (_Value == 0)){_Coro.destroy();_Coro = 0;_Value = 0;return _Value;}}return _Value;}public:co_call &operator=(co_call const &) = delete;co_call &operator=(co_call &&_Right){if (this != _STD addressof(_Right)) {_Coro = _Right._Coro;_Right._Coro = nullptr;_Value = _Right._Value;_Right._Value = 0;}return *this;}operator bool(void){return (_Coro != 0);}
private:::std::experimental::coroutine_handle<promise_type> _Coro = nullptr;value_type _Value = 0;
};

计时器对象,保存有协程句柄,如果事件存在等待函数则先行挂起返回一个等待时间,在一段时候后再次被调度,否则直接返回0

class _XTimer
{
public:public:_XTimer():task(0),id(0),handle(){}_XTimer(int c,int i):task(c),id(i),handle(){}~_XTimer(){}public:co_call on_event(size_t now){std::cout << "timer ID: " << id << ", BEGIN AT: " << now << ", WAIT: " << task << std::endl;// 挂起一段时间,让出时间片,执行其他COROUTINE_COT_WAIT(task);std::cout << "timer ID: " << id << ", TASK DONE! (AT:" << now << ") " <<  std::endl;_COT_NORET();}public:int task;int expires;int id;co_call handle;
};

下面是全部代码


#include <iostream>
#include <list>
#include <cassert>
#include <algorithm>
#include <mutex>
#include <experimental/coroutine>
#include <experimental/resumable>
#include <experimental/generator>
#include <Windows.h>
#include <thread>
#include <future>using namespace std;#define _COT_WAIT(x) (co_yield (x))
#define _COT_NORET() co_return (0)class co_call
{
public:class promise_type{public:using value_type = size_t;public:promise_type &get_return_object(){return *this;}auto initial_suspend(){return std::experimental::suspend_always{};}auto final_suspend(){// suspend it to save the coroutine handle return std::experimental::suspend_always{};}void yield_value(value_type _Value){_CurrentValue = _Value;}auto return_value(value_type _Value){_CurrentValue = _Value;return std::experimental::suspend_always{}; // dont suspend it}value_type operator *(void) const noexcept{return _CurrentValue;}public:value_type _CurrentValue;};using value_type = promise_type::value_type;
public:explicit co_call(promise_type &_Prom): _Coro(::std::experimental::coroutine_handle<promise_type>::from_promise(_Prom)),_Value(*_Prom){}co_call() = default;co_call(co_call const &) = delete;co_call(co_call &&_Right) : _Coro(_Right._Coro), _Value(_Right._Value){_Right._Coro = nullptr;_Right._Value = 0;}~co_call(){if (_Coro) {_Coro.destroy();}}
public:_NODISCARD value_type resume(){if (_Coro) {_Coro.resume();_Value = *_Coro.promise();if (_Coro.done() || (_Value == 0)){_Coro.destroy();_Coro = 0;_Value = 0;return _Value;}}return _Value;}public:co_call &operator=(co_call const &) = delete;co_call &operator=(co_call &&_Right){if (this != _STD addressof(_Right)) {_Coro = _Right._Coro;_Right._Coro = nullptr;_Value = _Right._Value;_Right._Value = 0;}return *this;}operator bool(void){return (_Coro != 0);}
private:::std::experimental::coroutine_handle<promise_type> _Coro = nullptr;value_type _Value = 0;
};class _XTimer
{
public:public:_XTimer():task(0),id(0),handle(){}_XTimer(int c,int i):task(c),id(i),handle(){}~_XTimer(){}public:co_call on_event(size_t now){std::cout << "timer ID: " << id << ", BEGIN AT: " << now << ", WAIT: " << task << std::endl;// 挂起一段时间,让出时间片,执行其他COROUTINE_COT_WAIT(task);std::cout << "timer ID: " << id << ", TASK DONE! (AT:" << now << ") " <<  std::endl;_COT_NORET();}public:int task;int expires;int id;co_call handle;
};class _XTimeDisp
{
public:using _Cont_ty = ::std::list<class _XTimer*>;static constexpr int _TVN_BITS = 4;static constexpr int _TVR_BITS = 6;static constexpr int _TVN_SIZE = 1 << _TVN_BITS;static constexpr int _TVR_SIZE = 1 << _TVR_BITS;static constexpr int _TVN_MASK = _TVN_SIZE - 1;static constexpr int _TVR_MASK = _TVR_SIZE - 1;public:_XTimeDisp():_tq_mutex(),tvec(),_sign_mutex(),_run_thread(),_sign(),_thread_state(0){}~_XTimeDisp(){}
public:static constexpr int _INDEX(int expires, int n){return ((expires >> (_TVR_BITS + n * _TVN_BITS)) & _TVN_MASK);}static constexpr int _OFFSET(int n){return (_TVR_SIZE + n*_TVN_SIZE);}
public:void add(_XTimer* timer){unsigned long long expires = timer->expires;unsigned long long index = expires - _Check_time;unsigned int _VecIndex = 0;if (index < _TVR_SIZE)		// tvec_1{tvec_1[expires & _TVR_MASK].push_back(timer);}else if (index < (1 << (_TVR_BITS + 1 * _TVN_BITS)))	// tvec_2{tvec_2[_INDEX(expires, 0)].push_back(timer);}else if (index <= 0)	// 异常处理,视为即将到期的计时器{tvec_1[_Check_time & _TVR_MASK].push_back(timer);}else {if (index > 0xFFFFFFFFUL){index = 0xFFFFFFFFUL;expires = index + _Check_time;}tvec_1[_INDEX(expires, 1)].push_back(timer);}}int cascade(int offset, int index){::std::unique_lock<::std::recursive_mutex>  _lock(_tq_mutex);_Cont_ty& list = tvec[offset + index];_Cont_ty empty;::std::swap(empty, list);for (auto it = empty.begin(); it != empty.end(); ++it){this->add(*it);}return index;}void tick(size_t _Now){::std::unique_lock<::std::recursive_mutex> _lock(_tq_mutex);while (_Check_time <= _Now){int index = _Check_time & _TVR_MASK;if (!index &&		// tv1!cascade(_OFFSET(0), _INDEX(_Check_time, 0)))	// tv2{cascade(_OFFSET(1), _INDEX(_Check_time, 1));	// tv3}++_Check_time;_Cont_ty& list = tvec[index];_Cont_ty empty;::std::swap(empty, list);for (auto it = empty.begin(); it != empty.end(); ++it){ // 如果有句柄说明处于挂起状态,继续执行if ((*it)->handle){auto ret = (*it)->handle.resume();if (ret != 0){(*it)->expires += ret;add(*it);}}else {// 视为第一次执行auto res = (*it)->on_event(_Now);auto ret = res.resume();if (ret != 0){(*it)->expires += ret;(*it)->handle = ::std::move(res);add(*it);}}}}}void run(){while (_thread_state == 1){tick(_Check_time);::std::unique_lock<::std::mutex> _nofity(_sign_mutex);_sign.wait(_nofity);}}void nofity(size_t now){_Check_time = now;_sign.notify_one();}void start(size_t time){if (_thread_state){return;}_Check_time = time;_thread_state = 1;_run_thread = ::std::thread(&_XTimeDisp::run, this);}void stop(){_thread_state = 0;_sign.notify_one();if (_run_thread.joinable()) {_run_thread.join();}}
public:size_t _Check_time;union{class{public:_Cont_ty tvec_1[_TVR_SIZE];_Cont_ty tvec_2[_TVN_SIZE];_Cont_ty tvec_3[_TVN_SIZE];};_Cont_ty tvec[_TVR_SIZE + 2 * _TVN_SIZE];};::std::recursive_mutex _tq_mutex;::std::mutex _sign_mutex;::std::condition_variable _sign;::std::thread _run_thread;int _thread_state;
};using timer = _XTimer;
using tdc = _XTimeDisp;void main()
{timer t1, t2,t3,t4;t1.id = 1;t1.expires = 0;t1.task = 300;t2.id = 2;t2.task = 100;t2.expires = 3;t3.id = 3;t3.task = 50;t3.expires = 3;t4.id = 4;t4.task = 30;t4.expires = 88;tdc tm;tm._Check_time = 0;tm.add(&t1);tm.add(&t2);tm.add(&t3);tm.add(&t4);tm.start(0);int tbegin = 0;while (1){::std::this_thread::sleep_for(::std::chrono::milliseconds(10));tm.nofity(tbegin);++tbegin;}system("pause");
}#undef _COT_WAIT
#undef _COT_NORET

运行截图
在这里插入图片描述
由于没有采取多线程Sleep阻塞,CPU利用自然是比较高的

这篇关于协程(coroutine)应用实例:计时器过期事件响应的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1067678

相关文章

CSS中的Static、Relative、Absolute、Fixed、Sticky的应用与详细对比

《CSS中的Static、Relative、Absolute、Fixed、Sticky的应用与详细对比》CSS中的position属性用于控制元素的定位方式,不同的定位方式会影响元素在页面中的布... css 中的 position 属性用于控制元素的定位方式,不同的定位方式会影响元素在页面中的布局和层叠关

SpringBoot3应用中集成和使用Spring Retry的实践记录

《SpringBoot3应用中集成和使用SpringRetry的实践记录》SpringRetry为SpringBoot3提供重试机制,支持注解和编程式两种方式,可配置重试策略与监听器,适用于临时性故... 目录1. 简介2. 环境准备3. 使用方式3.1 注解方式 基础使用自定义重试策略失败恢复机制注意事项

Python实例题之pygame开发打飞机游戏实例代码

《Python实例题之pygame开发打飞机游戏实例代码》对于python的学习者,能够写出一个飞机大战的程序代码,是不是感觉到非常的开心,:本文主要介绍Python实例题之pygame开发打飞机... 目录题目pygame-aircraft-game使用 Pygame 开发的打飞机游戏脚本代码解释初始化部

Redis过期删除机制与内存淘汰策略的解析指南

《Redis过期删除机制与内存淘汰策略的解析指南》在使用Redis构建缓存系统时,很多开发者只设置了EXPIRE但却忽略了背后Redis的过期删除机制与内存淘汰策略,下面小编就来和大家详细介绍一下... 目录1、简述2、Redis http://www.chinasem.cn的过期删除策略(Key Expir

Spring组件实例化扩展点之InstantiationAwareBeanPostProcessor使用场景解析

《Spring组件实例化扩展点之InstantiationAwareBeanPostProcessor使用场景解析》InstantiationAwareBeanPostProcessor是Spring... 目录一、什么是InstantiationAwareBeanPostProcessor?二、核心方法解

java String.join()方法实例详解

《javaString.join()方法实例详解》String.join()是Java提供的一个实用方法,用于将多个字符串按照指定的分隔符连接成一个字符串,这一方法是Java8中引入的,极大地简化了... 目录bVARxMJava String.join() 方法详解1. 方法定义2. 基本用法2.1 拼接

Python使用Tkinter打造一个完整的桌面应用

《Python使用Tkinter打造一个完整的桌面应用》在Python生态中,Tkinter就像一把瑞士军刀,它没有花哨的特效,却能快速搭建出实用的图形界面,作为Python自带的标准库,无需安装即可... 目录一、界面搭建:像搭积木一样组合控件二、菜单系统:给应用装上“控制中枢”三、事件驱动:让界面“活”

如何确定哪些软件是Mac系统自带的? Mac系统内置应用查看技巧

《如何确定哪些软件是Mac系统自带的?Mac系统内置应用查看技巧》如何确定哪些软件是Mac系统自带的?mac系统中有很多自带的应用,想要看看哪些是系统自带,该怎么查看呢?下面我们就来看看Mac系统内... 在MAC电脑上,可以使用以下方法来确定哪些软件是系统自带的:1.应用程序文件夹打开应用程序文件夹

Linux lvm实例之如何创建一个专用于MySQL数据存储的LVM卷组

《Linuxlvm实例之如何创建一个专用于MySQL数据存储的LVM卷组》:本文主要介绍使用Linux创建一个专用于MySQL数据存储的LVM卷组的实例,具有很好的参考价值,希望对大家有所帮助,... 目录在Centos 7上创建卷China编程组并配置mysql数据目录1. 检查现有磁盘2. 创建物理卷3. 创

Python Flask 库及应用场景

《PythonFlask库及应用场景》Flask是Python生态中​轻量级且高度灵活的Web开发框架,基于WerkzeugWSGI工具库和Jinja2模板引擎构建,下面给大家介绍PythonFl... 目录一、Flask 库简介二、核心组件与架构三、常用函数与核心操作 ​1. 基础应用搭建​2. 路由与参