Taskflow:限制最大并发度( Limit the Maximum Concurrency)

2024-03-31 21:36

本文主要是介绍Taskflow:限制最大并发度( Limit the Maximum Concurrency),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

定义信号量Semaphore

Taskflow提供了一个机制,tf::Semaphore,用于限制任务部分中的最大并发。您可以让任务在执行工作之前/之后获取/释放一个或多个信号量。一项任务可以获取和释放信号量,或者只是获取或只是释放它。tf::Semaphore对象以初始计数开始。只要该计数高于0,Task就可以获得信号量并完成其工作。如果计数为0或更少,试图获取信号量的Task将不会运行,而是会进入该信号量的等待列表。当另一个Task释放信号量时,它会重新安排该等待列表中的所有任务。

#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor(4); // 开启四个线程tf::Taskflow taskflow;std::vector<tf::Task> tasks {taskflow.emplace([](){ std::cout << "A" << std::endl; }),taskflow.emplace([](){ std::cout << "B" << std::endl; }),taskflow.emplace([](){ std::cout << "C" << std::endl; }),taskflow.emplace([](){ std::cout << "D" << std::endl; }),taskflow.emplace([](){ std::cout << "E" << std::endl; })};tf::Semaphore semaphore(1); // 创建一个信号量,初始值为1// 每个task在执行前尝试获取semaphore,在执行结束后释放for(auto& task : tasks) {task.acquire(semaphore);task.release(semaphore); }executor.run(taskflow).wait();
}

在这里插入图片描述
可以发现,在同一时刻,只会有一个任务被获取到semaphore并执行。

注意,用户有责任确保信号量在执行获取和释放信号量的任务时保持活力。executor和taskflow都不管理任何semaphore的生命周期。

信号量不仅能限制Task部分的最大并发性,而且限制Task不同部分的最大并发性。具体来说,您可以让一个Task获得信号量,并有另一个释放该信号量。以下示例使用信号量而不是使用显式依赖关系执行五对任务。

#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor(4); // 开启四个线程tf::Taskflow taskflow;tf::Semaphore semaphore(1); // 创建一个信号量,初始值为1int n = 5, counter = 0;for(int i = 0; i < n; i++) {tf::Task f = taskflow.emplace([&](){ counter++; }).name("from-" + std::to_string(i));tf::Task t = taskflow.emplace([&](){ counter--; }).name("to-" + std::to_string(i));f.precede(t);// 隐含偏序关系:f -> tf.acquire(semaphore);t.release(semaphore);}executor.run(taskflow).wait();taskflow.dump(std::cout);
}

在这里插入图片描述
同时,因为信号量的count为1,所以同一时刻仅有一个任务在执行,且顺序一定是from-x --> to-x;
在这里插入图片描述

定义 Critical Section

tf::CriticalSection是tf::Semaphore的包装, 当Task添加到Critical Section时,该Task获取并释放Critical Section内部的信号量。此方法tf::CriticalSection::add为添加到Critical Section的每个Task自动调用tf::Task::acquire和tf::Task::release

#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor(8);   // create an executor of 8 workerstf::Taskflow taskflow;// create a critical section of two workerstf::CriticalSection critical_section(2); tf::Task A = taskflow.emplace([](){ std::cout << "A" << std::endl; });tf::Task B = taskflow.emplace([](){ std::cout << "B" << std::endl; });tf::Task C = taskflow.emplace([](){ std::cout << "C" << std::endl; });tf::Task D = taskflow.emplace([](){ std::cout << "D" << std::endl; });tf::Task E = taskflow.emplace([](){ std::cout << "E" << std::endl; });critical_section.add(A, B, C, D, E);executor.run(taskflow).wait();taskflow.dump(std::cout);
}

tf::Semaphore 的一种重要应用是,有时因为任务的设计问题,任务间和任务间虽然没有显式的依赖关系,但是他们并不能在同一时刻执行(比如共享了临界区?!),如下图所示:

在这里插入图片描述

这三个Task均没有相互依赖关系,但是A和B之间同一时刻只能执行一个(谁先无所谓), 同样,A和C之间只能执行一个,而B和C没有竞争关系,可以并发执行,那么可以为这两个竞争关系各自设置一个信号量,来实现上述约束:

#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor;tf::Taskflow taskflow;tf::Semaphore conflict_AB(1);tf::Semaphore conflict_AC(1);tf::Task A = taskflow.emplace([](){ std::cout << "A" << std::endl; });tf::Task B = taskflow.emplace([](){ std::cout << "B" << std::endl; });tf::Task C = taskflow.emplace([](){ std::cout << "C" << std::endl; });// describe the conflict between A and BA.acquire(conflict_AB).release(conflict_AB);B.acquire(conflict_AB).release(conflict_AB);// describe the conflict between A and CA.acquire(conflict_AC).release(conflict_AC);C.acquire(conflict_AC).release(conflict_AC);executor.run(taskflow).wait();taskflow.dump(std::cout);
}

可以看到,B和C并发执行了一段,但是A和B,A和C均不会并发执行。

在这里插入图片描述

当然,也可以使用tf::CriticalSection,简化上述逻辑:

#include <taskflow/taskflow.hpp>
int main() {tf::Executor executor;tf::Taskflow taskflow;tf::CriticalSection cs_AB(1); // tf::CriticalSection cs_AC(1);tf::Task A = taskflow.emplace([](){ std::cout << "A" << std::endl; });tf::Task B = taskflow.emplace([](){ std::cout << "B" << std::endl; });tf::Task C = taskflow.emplace([](){ std::cout << "C" << std::endl; });// describe the conflict between A and Bcs_AB.add(A,B);cs_AC.add(A,C);executor.run(taskflow).wait();taskflow.dump(std::cout);
}

这篇关于Taskflow:限制最大并发度( Limit the Maximum Concurrency)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/865404

相关文章

Java JUC并发集合详解之线程安全容器完全攻略

《JavaJUC并发集合详解之线程安全容器完全攻略》Java通过java.util.concurrent(JUC)包提供了一整套线程安全的并发容器,它们不仅是简单的同步包装,更是基于精妙并发算法构建... 目录一、为什么需要JUC并发集合?二、核心并发集合分类与详解三、选型指南:如何选择合适的并发容器?在多

Java 结构化并发Structured Concurrency实践举例

《Java结构化并发StructuredConcurrency实践举例》Java21结构化并发通过作用域和任务句柄统一管理并发生命周期,解决线程泄漏与任务追踪问题,提升代码安全性和可观测性,其核心... 目录一、结构化并发的核心概念与设计目标二、结构化并发的核心组件(一)作用域(Scopes)(二)任务句柄

k8s容器放开锁内存限制问题

《k8s容器放开锁内存限制问题》nccl-test容器运行mpirun时因NCCL_BUFFSIZE过大导致OOM,需通过修改docker服务配置文件,将LimitMEMLOCK设为infinity并... 目录问题问题确认放开容器max locked memory限制总结参考:https://Access

Web服务器-Nginx-高并发问题

《Web服务器-Nginx-高并发问题》Nginx通过事件驱动、I/O多路复用和异步非阻塞技术高效处理高并发,结合动静分离和限流策略,提升性能与稳定性... 目录前言一、架构1. 原生多进程架构2. 事件驱动模型3. IO多路复用4. 异步非阻塞 I/O5. Nginx高并发配置实战二、动静分离1. 职责2

Spring Security 前后端分离场景下的会话并发管理

《SpringSecurity前后端分离场景下的会话并发管理》本文介绍了在前后端分离架构下实现SpringSecurity会话并发管理的问题,传统Web开发中只需简单配置sessionManage... 目录背景分析传统 web 开发中的 sessionManagement 入口ConcurrentSess

基于Python实现数字限制在指定范围内的五种方式

《基于Python实现数字限制在指定范围内的五种方式》在编程中,数字范围限制是常见需求,无论是游戏开发中的角色属性值、金融计算中的利率调整,还是传感器数据处理中的异常值过滤,都需要将数字控制在合理范围... 目录引言一、基础条件判断法二、数学运算巧解法三、装饰器模式法四、自定义类封装法五、NumPy数组处理

MySQL中处理数据的并发一致性的实现示例

《MySQL中处理数据的并发一致性的实现示例》在MySQL中处理数据的并发一致性是确保多个用户或应用程序同时访问和修改数据库时,不会导致数据冲突、数据丢失或数据不一致,MySQL通过事务和锁机制来管理... 目录一、事务(Transactions)1. 事务控制语句二、锁(Locks)1. 锁类型2. 锁粒

深入解析Java NIO在高并发场景下的性能优化实践指南

《深入解析JavaNIO在高并发场景下的性能优化实践指南》随着互联网业务不断演进,对高并发、低延时网络服务的需求日益增长,本文将深入解析JavaNIO在高并发场景下的性能优化方法,希望对大家有所帮助... 目录简介一、技术背景与应用场景二、核心原理深入分析2.1 Selector多路复用2.2 Buffer

go动态限制并发数量的实现示例

《go动态限制并发数量的实现示例》本文主要介绍了Go并发控制方法,通过带缓冲通道和第三方库实现并发数量限制,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 目录带有缓冲大小的通道使用第三方库其他控制并发的方法因为go从语言层面支持并发,所以面试百分百会问到

Go语言并发之通知退出机制的实现

《Go语言并发之通知退出机制的实现》本文主要介绍了Go语言并发之通知退出机制的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录1、通知退出机制1.1 进程/main函数退出1.2 通过channel退出1.3 通过cont