Taskflow:限制最大并发度( Limit the Maximum Concurrency)

2024-03-31 21:36

本文主要是介绍Taskflow:限制最大并发度( Limit the Maximum Concurrency),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

定义信号量Semaphore

Taskflow提供了一个机制,tf::Semaphore,用于限制任务部分中的最大并发。您可以让任务在执行工作之前/之后获取/释放一个或多个信号量。一项任务可以获取和释放信号量,或者只是获取或只是释放它。tf::Semaphore对象以初始计数开始。只要该计数高于0,Task就可以获得信号量并完成其工作。如果计数为0或更少,试图获取信号量的Task将不会运行,而是会进入该信号量的等待列表。当另一个Task释放信号量时,它会重新安排该等待列表中的所有任务。

#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor(4); // 开启四个线程tf::Taskflow taskflow;std::vector<tf::Task> tasks {taskflow.emplace([](){ std::cout << "A" << std::endl; }),taskflow.emplace([](){ std::cout << "B" << std::endl; }),taskflow.emplace([](){ std::cout << "C" << std::endl; }),taskflow.emplace([](){ std::cout << "D" << std::endl; }),taskflow.emplace([](){ std::cout << "E" << std::endl; })};tf::Semaphore semaphore(1); // 创建一个信号量,初始值为1// 每个task在执行前尝试获取semaphore,在执行结束后释放for(auto& task : tasks) {task.acquire(semaphore);task.release(semaphore); }executor.run(taskflow).wait();
}

在这里插入图片描述
可以发现,在同一时刻,只会有一个任务被获取到semaphore并执行。

注意,用户有责任确保信号量在执行获取和释放信号量的任务时保持活力。executor和taskflow都不管理任何semaphore的生命周期。

信号量不仅能限制Task部分的最大并发性,而且限制Task不同部分的最大并发性。具体来说,您可以让一个Task获得信号量,并有另一个释放该信号量。以下示例使用信号量而不是使用显式依赖关系执行五对任务。

#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor(4); // 开启四个线程tf::Taskflow taskflow;tf::Semaphore semaphore(1); // 创建一个信号量,初始值为1int n = 5, counter = 0;for(int i = 0; i < n; i++) {tf::Task f = taskflow.emplace([&](){ counter++; }).name("from-" + std::to_string(i));tf::Task t = taskflow.emplace([&](){ counter--; }).name("to-" + std::to_string(i));f.precede(t);// 隐含偏序关系:f -> tf.acquire(semaphore);t.release(semaphore);}executor.run(taskflow).wait();taskflow.dump(std::cout);
}

在这里插入图片描述
同时,因为信号量的count为1,所以同一时刻仅有一个任务在执行,且顺序一定是from-x --> to-x;
在这里插入图片描述

定义 Critical Section

tf::CriticalSection是tf::Semaphore的包装, 当Task添加到Critical Section时,该Task获取并释放Critical Section内部的信号量。此方法tf::CriticalSection::add为添加到Critical Section的每个Task自动调用tf::Task::acquire和tf::Task::release

#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor(8);   // create an executor of 8 workerstf::Taskflow taskflow;// create a critical section of two workerstf::CriticalSection critical_section(2); tf::Task A = taskflow.emplace([](){ std::cout << "A" << std::endl; });tf::Task B = taskflow.emplace([](){ std::cout << "B" << std::endl; });tf::Task C = taskflow.emplace([](){ std::cout << "C" << std::endl; });tf::Task D = taskflow.emplace([](){ std::cout << "D" << std::endl; });tf::Task E = taskflow.emplace([](){ std::cout << "E" << std::endl; });critical_section.add(A, B, C, D, E);executor.run(taskflow).wait();taskflow.dump(std::cout);
}

tf::Semaphore 的一种重要应用是,有时因为任务的设计问题,任务间和任务间虽然没有显式的依赖关系,但是他们并不能在同一时刻执行(比如共享了临界区?!),如下图所示:

在这里插入图片描述

这三个Task均没有相互依赖关系,但是A和B之间同一时刻只能执行一个(谁先无所谓), 同样,A和C之间只能执行一个,而B和C没有竞争关系,可以并发执行,那么可以为这两个竞争关系各自设置一个信号量,来实现上述约束:

#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor;tf::Taskflow taskflow;tf::Semaphore conflict_AB(1);tf::Semaphore conflict_AC(1);tf::Task A = taskflow.emplace([](){ std::cout << "A" << std::endl; });tf::Task B = taskflow.emplace([](){ std::cout << "B" << std::endl; });tf::Task C = taskflow.emplace([](){ std::cout << "C" << std::endl; });// describe the conflict between A and BA.acquire(conflict_AB).release(conflict_AB);B.acquire(conflict_AB).release(conflict_AB);// describe the conflict between A and CA.acquire(conflict_AC).release(conflict_AC);C.acquire(conflict_AC).release(conflict_AC);executor.run(taskflow).wait();taskflow.dump(std::cout);
}

可以看到,B和C并发执行了一段,但是A和B,A和C均不会并发执行。

在这里插入图片描述

当然,也可以使用tf::CriticalSection,简化上述逻辑:

#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor;tf::Taskflow taskflow;tf::CriticalSection cs_AB(1); // tf::CriticalSection cs_AC(1);tf::Task A = taskflow.emplace([](){ std::cout << "A" << std::endl; });tf::Task B = taskflow.emplace([](){ std::cout << "B" << std::endl; });tf::Task C = taskflow.emplace([](){ std::cout << "C" << std::endl; });// describe the conflict between A and Bcs_AB.add(A,B);cs_AC.add(A,C);executor.run(taskflow).wait();taskflow.dump(std::cout);
}

这篇关于Taskflow:限制最大并发度( Limit the Maximum Concurrency)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/865404

相关文章

python多线程并发测试过程

《python多线程并发测试过程》:本文主要介绍python多线程并发测试过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、并发与并行?二、同步与异步的概念?三、线程与进程的区别?需求1:多线程执行不同任务需求2:多线程执行相同任务总结一、并发与并行?1、

Nginx 413修改上传文件大小限制的方法详解

《Nginx413修改上传文件大小限制的方法详解》在使用Nginx作为Web服务器时,有时会遇到客户端尝试上传大文件时返回​​413RequestEntityTooLarge​​... 目录1. 理解 ​​413 Request Entity Too Large​​ 错误2. 修改 Nginx 配置2.1

Linux高并发场景下的网络参数调优实战指南

《Linux高并发场景下的网络参数调优实战指南》在高并发网络服务场景中,Linux内核的默认网络参数往往无法满足需求,导致性能瓶颈、连接超时甚至服务崩溃,本文基于真实案例分析,从参数解读、问题诊断到优... 目录一、问题背景:当并发连接遇上性能瓶颈1.1 案例环境1.2 初始参数分析二、深度诊断:连接状态与

Windows系统宽带限制如何解除?

《Windows系统宽带限制如何解除?》有不少用户反映电脑网速慢得情况,可能是宽带速度被限制的原因,只需解除限制即可,具体该如何操作呢?本文就跟大家一起来看看Windows系统解除网络限制的操作方法吧... 有不少用户反映电脑网速慢得情况,可能是宽带速度被限制的原因,只需解除限制即可,具体该如何操作呢?本文

Java并发编程之如何优雅关闭钩子Shutdown Hook

《Java并发编程之如何优雅关闭钩子ShutdownHook》这篇文章主要为大家详细介绍了Java如何实现优雅关闭钩子ShutdownHook,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起... 目录关闭钩子简介关闭钩子应用场景数据库连接实战演示使用关闭钩子的注意事项开源框架中的关闭钩子机制1.

SpringBoot项目中报错The field screenShot exceeds its maximum permitted size of 1048576 bytes.的问题及解决

《SpringBoot项目中报错ThefieldscreenShotexceedsitsmaximumpermittedsizeof1048576bytes.的问题及解决》这篇文章... 目录项目场景问题描述原因分析解决方案总结项目场景javascript提示:项目相关背景:项目场景:基于Spring

MySQL 中的 LIMIT 语句及基本用法

《MySQL中的LIMIT语句及基本用法》LIMIT语句用于限制查询返回的行数,常用于分页查询或取部分数据,提高查询效率,:本文主要介绍MySQL中的LIMIT语句,需要的朋友可以参考下... 目录mysql 中的 LIMIT 语句1. LIMIT 语法2. LIMIT 基本用法(1) 获取前 N 行数据(

Java并发编程必备之Synchronized关键字深入解析

《Java并发编程必备之Synchronized关键字深入解析》本文我们深入探索了Java中的Synchronized关键字,包括其互斥性和可重入性的特性,文章详细介绍了Synchronized的三种... 目录一、前言二、Synchronized关键字2.1 Synchronized的特性1. 互斥2.

浅谈mysql的sql_mode可能会限制你的查询

《浅谈mysql的sql_mode可能会限制你的查询》本文主要介绍了浅谈mysql的sql_mode可能会限制你的查询,这个问题主要说明的是,我们写的sql查询语句违背了聚合函数groupby的规则... 目录场景:问题描述原因分析:解决方案:第一种:修改后,只有当前生效,若是mysql服务重启,就会失效;

Python异步编程中asyncio.gather的并发控制详解

《Python异步编程中asyncio.gather的并发控制详解》在Python异步编程生态中,asyncio.gather是并发任务调度的核心工具,本文将通过实际场景和代码示例,展示如何结合信号量... 目录一、asyncio.gather的原始行为解析二、信号量控制法:给并发装上"节流阀"三、进阶控制