c++ 线程池/Github 开源项目源码分析(progschj/ThreadPool)

2024-03-20 12:20

本文主要是介绍c++ 线程池/Github 开源项目源码分析(progschj/ThreadPool),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

c++ 线程池/Github 开源项目源码分析(progschj/ThreadPool)

  • 前言
  • [ThreadPool 项目地址](https://github.com/progschj/ThreadPool)
  • 项目源码:
  • 基本用法
  • 类成员变量
  • 类成员函数
    • 构造函数的签名
    • 创建线程
    • 线程默认的任务
    • 向任务队列中添加一个任务
    • 析构函数
  • 总结

前言

维基百科上对线程池的简要介绍:

线程池(thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

ThreadPool 项目地址

progschj/ThreadPool 是一个简易的基于 c++11 标准的线程池实现,采用了 Zlib license(相当宽松自由的开源协议,任意修改分发商用),截止当前时间点,已获得 7k+ stars。整个项目源码仅有一个头文件,代码行数不足一百行,早在多年前就已稳定不再更新。

项目源码:

#ifndef THREAD_POOL_H
#define THREAD_POOL_H#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>class ThreadPool {
public:ThreadPool(size_t);template<class F, class... Args>auto enqueue(F&& f, Args&&... args)-> std::future<typename std::result_of<F(Args...)>::type>;~ThreadPool();
private:// need to keep track of threads so we can join themstd::vector< std::thread > workers;// the task queuestd::queue< std::function<void()> > tasks;// synchronizationstd::mutex queue_mutex;std::condition_variable condition;bool stop;
};// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads): stop(false)
{for (size_t i = 0; i < threads; ++i)workers.emplace_back([this]{for (;;){std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });if (this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}});
}// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the poolif (stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task]() { (*task)(); });}condition.notify_one();return res;
}// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();for (std::thread& worker : workers)worker.join();
}#endif

基本用法

// create thread pool with 4 worker threads
ThreadPool pool(4);// enqueue and store future
auto result = pool.enqueue([](int answer) { return answer; }, 42);// get result from future
std::cout << result.get() << std::endl;

类成员变量

std::vector< std::thread > workers;
std::queue< std::function<void()> > tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
  • workers:存储线程池中 std::thread 的容器
  • tasks:任务队列
  • queue_mutex:任务队列的互斥锁
  • condition:任务队列的条件变量
  • stop:线程池是否停止的标志位

类成员函数

构造函数的签名

inline ThreadPool::ThreadPool(size_t threads): stop(false)
  • 构造函数传入一个 size_t 类型的参数,初始化线程池中线程的数量
  • 初始化列表将 stop 标志位初始化为 false

创建线程

for (size_t i = 0; i < threads; ++i)
{workers.emplace_back([this]{for (;;){//...}});
}
  • 使用 for 循环创建 threads 个线程,将线程加入 workers 容器
  • lambda 表达式用于创建线程,捕获 this,lambda 表达式中包含一个无限循环

线程默认的任务

std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });if (this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();
}task();
  • 首先声明了一个 std::function<void()> 类型的变量 task
  • 在互斥锁保护任务队列后,调用 condition.wait() 等待任务队列非空或线程池停止,线程创建后,会在这里等待;如果 stop 标志位为 true 或者任务队列不为空,解除等待,继续往下执行
  • 如果标志位 stop 为 true,且任务队列为空,此任务将退出
  • 以上条件都通过后,将从任务队列中取出一个任务 task,移动到局部变量 task 中(吐槽下:距离 c++11 标准的发布已经过去了十几年,现在还不明白这一条的,就很难评价了)
  • 执行 task(),也就是上一步从队列头部取出的任务

向任务队列中添加一个任务

template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)-> std::future<typename std::result_of<F(Args...)>::type>;
  • enqueue 函数模板,用于任务的入队列
  • F&& f,这里预期是一个任意的 callable 对象
  • Args&&... args,一个可变模板参数,会在编译期展开参数包
  • 返回值是一个 std::future 类型的对象,用于获取任务的执行结果,std::future 的模板参数使用 std::result_of 萃取可调用对象的返回值类型
  • 注意,c++17 后 std::result_of 就已经是 deprecated,可以使用 std::invoke_result 类型萃取

继续往下看 enqueue 函数的实现:

using return_type = typename std::result_of<F(Args...)>::type;
  • 使用 std::result_of 类型萃取可调用对象的返回值类型,并使用 using 为其起个别名 reture_type
auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);

这一段做了好几件事,一步一步拆解:

  • std::make_shared 构建一个 std::shared_ptr
  • std::packaged_task 模板是用于包装 callable 对象,使用了前面推导出的 return_type 类型来实例化模板
  • std::make_shared 需要调用实例类型的构造函数,而 std::packaged_task 的构造函数需要一个可调用对象,所以这里使用 std::bind 将可变模板参数绑定给 f(对 std::bind 不熟悉的建议先行查阅资料),std::forward 转发一下类型
  • 简单来说,以上只是构建一个 callable 对象的包装器
std::future<return_type> res = task->get_future();
{std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the poolif (stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
  • 从 task 中获取 std::future 对象
  • 使用大括号控制代码块,在这个代码块中上锁
  • 如果线程池已经停止,抛出异常
  • 否则正常执行,将 task 推入到队列尾部
  • 条件变量通知一个等待的线程,这个时候,构造函数中 condition.wait() 会被唤醒,以执行后面的代码块,即从队列头部取出一个任务并执行
  • 最后返回 std::future 对象

析构函数

遵循 RAII 原则,释放所有资源

{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;
}
condition.notify_all();
for (std::thread& worker : workers)worker.join();
  • 上锁,将停止标志位置为 true
  • 通知所有等待的线程
  • 等待所有线程终止

总结

该项目仅是一个线程池的简易实现,对学习 c++11 标准的多线程及部分特性有一定帮助,如果想要更复杂的具有各种调度策略的线程池,还需进一步细化。

这篇关于c++ 线程池/Github 开源项目源码分析(progschj/ThreadPool)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/829434

相关文章

Spring Boot项目打包和运行的操作方法

《SpringBoot项目打包和运行的操作方法》SpringBoot应用内嵌了Web服务器,所以基于SpringBoot开发的web应用也可以独立运行,无须部署到其他Web服务器中,下面以打包dem... 目录一、打包为JAR包并运行1.打包为可执行的 JAR 包2.运行 JAR 包二、打包为WAR包并运行

基于Go语言实现Base62编码的三种方式以及对比分析

《基于Go语言实现Base62编码的三种方式以及对比分析》Base62编码是一种在字符编码中使用62个字符的编码方式,在计算机科学中,,Go语言是一种静态类型、编译型语言,它由Google开发并开源,... 目录一、标准库现状与解决方案1. 标准库对比表2. 解决方案完整实现代码(含边界处理)二、关键实现细

PostgreSQL 序列(Sequence) 与 Oracle 序列对比差异分析

《PostgreSQL序列(Sequence)与Oracle序列对比差异分析》PostgreSQL和Oracle都提供了序列(Sequence)功能,但在实现细节和使用方式上存在一些重要差异,... 目录PostgreSQL 序列(Sequence) 与 oracle 序列对比一 基本语法对比1.1 创建序

C++类和对象之初始化列表的使用方式

《C++类和对象之初始化列表的使用方式》:本文主要介绍C++类和对象之初始化列表的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录C++初始化列表详解:性能优化与正确实践什么是初始化列表?初始化列表的三大核心作用1. 性能优化:避免不必要的赋值操作2. 强

Nginx部署React项目时重定向循环问题的解决方案

《Nginx部署React项目时重定向循环问题的解决方案》Nginx在处理React项目请求时出现重定向循环,通常是由于`try_files`配置错误或`root`路径配置不当导致的,本文给大家详细介... 目录问题原因1. try_files 配置错误2. root 路径错误解决方法1. 检查 try_f

C++迭代器失效的避坑指南

《C++迭代器失效的避坑指南》在C++中,迭代器(iterator)是一种类似指针的对象,用于遍历STL容器(如vector、list、map等),迭代器失效是指在对容器进行某些操作后... 目录1. 什么是迭代器失效?2. 哪些操作会导致迭代器失效?2.1 vector 的插入操作(push_back,

解决Maven项目报错:failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.13.0的问题

《解决Maven项目报错:failedtoexecutegoalorg.apache.maven.plugins:maven-compiler-plugin:3.13.0的问题》这篇文章主要介... 目录Maven项目报错:failed to execute goal org.apache.maven.pl

Android实现一键录屏功能(附源码)

《Android实现一键录屏功能(附源码)》在Android5.0及以上版本,系统提供了MediaProjectionAPI,允许应用在用户授权下录制屏幕内容并输出到视频文件,所以本文将基于此实现一个... 目录一、项目介绍二、相关技术与原理三、系统权限与用户授权四、项目架构与流程五、环境配置与依赖六、完整

Android实现定时任务的几种方式汇总(附源码)

《Android实现定时任务的几种方式汇总(附源码)》在Android应用中,定时任务(ScheduledTask)的需求几乎无处不在:从定时刷新数据、定时备份、定时推送通知,到夜间静默下载、循环执行... 目录一、项目介绍1. 背景与意义二、相关基础知识与系统约束三、方案一:Handler.postDel

慢sql提前分析预警和动态sql替换-Mybatis-SQL

《慢sql提前分析预警和动态sql替换-Mybatis-SQL》为防止慢SQL问题而开发的MyBatis组件,该组件能够在开发、测试阶段自动分析SQL语句,并在出现慢SQL问题时通过Ducc配置实现动... 目录背景解决思路开源方案调研设计方案详细设计使用方法1、引入依赖jar包2、配置组件XML3、核心配