一步一步写线程之八线程池的完善之一线程管理

2024-03-24 02:44

本文主要是介绍一步一步写线程之八线程池的完善之一线程管理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、介绍

在前面实现了一个很粗陋的线程池,那么它的运行没有动态控制,相关线程池的动态伸缩也没有控制,这一切,在实际的应用中,是应该存在的。本篇文章就将相关的实现方法和源码分析一下。

二、线程池内线程的管理

线程池中的线程管理,一般有以下几种:
1、动态控制线程的启停
这意味着线程在启动时,不应用直接启动,而是根据实际情况来启动。
2、线程池的容量管理
线程的创建数量其实和实际场景中的硬件资源和OS系统强相关,所以线程池中的线程创建多少,需要开发者一开始设置一个处置的上限。达到既可以利用多线程的优势,又不能脱离实际资源以及浪费大量的时间在线程的上下文切换上。
3、获取线程ID
这个相对来说比较简单,STL中提供了相关的接口函数。
4、动态扩容和收缩
这个相对来说实现容易管理难。比如扩容和收缩的机制,如何才能更好的适应实际情况。

三、例程

先看一下代码,主要看一下与上一篇的不同之处:

//common.h#ifndef __COMMON_H__
#define __COMMON_H__#include <functional>using CallBackMsg = std::function<void(int *, int)>;
using Task = std::function<void(int)>;class NoCopy {
protected:NoCopy() = default;~NoCopy() = default;public:NoCopy(const NoCopy &) = delete;NoCopy &operator=(const NoCopy &) = delete;NoCopy(NoCopy &&) = delete;NoCopy &operator=(NoCopy &&) = delete;
};#endif // __COMMON_H__
//ThreadCondition.h
#include <condition_variable>
#include <mutex>#include "common.h"class ThreadCondition : private NoCopy {
public:ThreadCondition() {}~ThreadCondition() {}public:inline bool Wait(int timeOut) {signaled_ = false;std::unique_lock<std::mutex> lock(this->lockMutex_);if (this->cvLock_.wait_for(lock, std::chrono::milliseconds(timeOut)) == std::cv_status::timeout) {return false;}return true;}inline void Wait() {signaled_ = false;std::unique_lock<std::mutex> lock(this->lockMutex_);while (!signaled_) {this->cvLock_.wait(lock);}}inline void Signal() {std::unique_lock<std::mutex> lock(this->lockMutex_);signaled_ = true;// pthread_cond_broadcast(&_cond);this->cvLock_.notify_one();}void SetSignal(bool quit = false) noexcept {//设置退出循环标志if (quit) {this->quit_ = true;}//唤醒线程this->Signal();}private:bool signaled_ = false;std::mutex lockMutex_;std::condition_variable cvLock_;bool quit_ = false;
};//ThreadPool.h
#ifndef __THREADPOOL_H__
#define __THREADPOOL_H__#include "TaskQueue.h"
#include "ThreadCondition.h"
#include "common.h"
#include <atomic>
#include <memory>
#include <thread>
#include <vector>class ThreadWorker;class ThreadPool {
public:explicit ThreadPool();~ThreadPool();public:void InitThreadPool(int threadCount, bool initThreadStatus, CallBackMsg cb);void AddTask(Task t);std::tuple<bool, Task> GetTask();void Extend(int num = 3);void Contact(int num = 3);public:void Wait();void SetSignal();void SetSignal(std::thread::id threadid);void SetMaxThreadCount(int maxCount);std::vector<std::thread::id> GetThreadID();public:static std::shared_ptr<ThreadPool> Get();private:void Destory();bool getThreadRunStatus(std::thread::id id);private:std::vector<std::shared_ptr<ThreadWorker>> pVecThreadWorker_;CallBackMsg funcCallBack_ = nullptr;std::atomic<int> curId_ = 0;TaskQueue<Task> taskQueue_;std::shared_ptr<ThreadCondition> pThreadCon_ = nullptr;bool initThreadStatus_ = false;int threadCount_ = 0;int maxThreadCount_ = 30;std::vector<std::thread::id> idVec_;
};
#endif // __THREADPOOL_H__//ThreadPool.cpp
#include "ThreadPool.h"
#include "ThreadWorker.h"
#include <iostream>ThreadPool::ThreadPool() {}
ThreadPool::~ThreadPool() { this->Destory(); }
void ThreadPool::InitThreadPool(int threadCount, bool initThreadStatus, CallBackMsg cb) {this->funcCallBack_ = cb;if (threadCount > this->maxThreadCount_) {threadCount = this->maxThreadCount_;}this->threadCount_ = threadCount;this->initThreadStatus_ = initThreadStatus;for (int num = 0; num < threadCount; num++) {auto workerThread = std::make_shared<ThreadWorker>();workerThread->InitThread(initThreadStatus, cb);this->pVecThreadWorker_.emplace_back(workerThread);}this->pThreadCon_ = std::make_shared<ThreadCondition>();
}
void ThreadPool::Extend(int num) {for (int num = 0; num < num; num++) {auto workerThread = std::make_shared<ThreadWorker>();workerThread->InitThread(initThreadStatus_, this->funcCallBack_);this->pVecThreadWorker_.emplace_back(workerThread);}
}
void ThreadPool::Contact(int num) {int index = this->threadCount_ - 1;for (int id = 0; id < num; id++) {//auto th = this->pVecThreadWorker_[index--];th->Quit();this->pVecThreadWorker_.pop_back();}
}
void ThreadPool::AddTask(Task t) {this->taskQueue_.Push(t);this->SetSignal();std::cerr << "add task and signal" << std::endl;
}
void ThreadPool::Destory() {for (auto &au : this->pVecThreadWorker_) {au->Join();}
}
std::tuple<bool, Task> ThreadPool::GetTask() { return this->taskQueue_.PopFront(); }
std::shared_ptr<ThreadPool> ThreadPool::Get() {static auto threadPool = std::make_shared<ThreadPool>();return threadPool;
}void ThreadPool::Wait() { this->pThreadCon_->Wait(); }
void ThreadPool::SetSignal() { this->pThreadCon_->Signal(); }
// Wake up specified thread
void ThreadPool::SetSignal(std::thread::id threadid) {for (auto &pWorker : this->pVecThreadWorker_) {if (pWorker->GetCurThreadID() == threadid) {pWorker->Start();}}
}
void ThreadPool::SetMaxThreadCount(int maxCount) { this->maxThreadCount_ = maxCount; }
bool ThreadPool::getThreadRunStatus(std::thread::id id) {for (auto &worker : this->pVecThreadWorker_) {if (id == worker->GetCurThreadID()) {return worker->getCurRunStatus();}}return false;
}
std::vector<std::thread::id> ThreadPool::GetThreadID() {for (auto &worker : this->pVecThreadWorker_) {this->idVec_.emplace_back(worker->GetCurThreadID().value());}return this->idVec_;
}//ThreadWorker.h
#ifndef __THREADWORKER_H__
#define __THREADWORKER_H__#include "ThreadCondition.h"
#include "common.h"
#include <atomic>
#include <memory>
#include <optional>
#include <thread>class ThreadWorker {
public:ThreadWorker();~ThreadWorker() = default;public:void InitThread(bool initStatus, CallBackMsg cb);void Start();void SetSignal();void Run();void Join();void Quit();bool getCurRunStatus();std::optional<std::thread::id> GetCurThreadID() { return this->curThreadId_; }private:std::shared_ptr<std::thread> pWorkerThread_ = nullptr;std::atomic<bool> status_ = false;bool runStatus_ = false;std::thread::id curThreadId_;// std::shared_ptr<ThreadCondition> pThreadCon_ = nullptr;CallBackMsg cbm_;
};#endif // __THREADWORKER_H__
//ThreadWorker.cpp
#include "ThreadWorker.h"
#include "ThreadPool.h"
#include <iostream>ThreadWorker::ThreadWorker() {}void ThreadWorker::InitThread(bool initStatus, CallBackMsg cb) {this->cbm_ = cb;this->pWorkerThread_ = std::make_shared<std::thread>(&ThreadWorker::Run, this);if (nullptr != this->pWorkerThread_) {this->curThreadId_ = this->pWorkerThread_->get_id();}// this->pThreadCon_ = std::make_shared<ThreadCondition>();
}
void ThreadWorker::Start() {}
// void ThreadWorker::SetSignal() { this->pThreadCon_->Signal(); }
void ThreadWorker::Run() {int data[10] = {0};auto tPool = ThreadPool::Get();while (!status_) {this->runStatus_ = false;// this->pThreadCon_->Wait(); // Handling false wake-upstd::cerr << "cur thread id is:" << this->curThreadId_ << std::endl;ThreadPool::Get()->Wait();this->runStatus_ = true;std::cerr << "cur run thread id is:" << this->curThreadId_ << std::endl;auto [bRet, t] = tPool->GetTask();if (bRet) {t(10);data[0] = 10;data[1] = 11;data[2] = 12;cbm_(data, 3);} else {std::this_thread::sleep_for(std::chrono::milliseconds(100));if (this->cbm_ != nullptr) {cbm_(data, 0);}}std::this_thread::yield();}
}void ThreadWorker::Join() {if (this->pWorkerThread_ != nullptr && this->pWorkerThread_->joinable()) {this->pWorkerThread_->join();}
}void ThreadWorker::Quit() {this->status_ = true;// trigger conditional//   this->pThreadCon_.Signal();
}
bool ThreadWorker::getCurRunStatus() { return this->runStatus_; }

上面的代码有一些需要商榷的地方,比如现在是把条件变量的触发放到线程池的类中,所有线程都使用这一个,这样会不会引起类似惊群的现象?在线程池收缩时,线程退出时,是设置好线程状态好,再noticeAll,还是一个个的触发退出好。所以那块目前把代码注释掉了。如果使用触发所有,还需要在条件变量的类中增加noticeAll的相关API接口。还有如果收缩时退出线程正在运行怎么处理?这些都需要不断的完善。所以很多东西大家看别人的代码时,其实是理解不全面很多东西的,可自己亲自撸一把代码,思路就清晰很多。

四、分析

在上面的实现中,其实仍然还可以优化,比如将ThreadConditon类封装到每个ThreadWorker中,这样就可以精确控制每个线程的启停(Google的一个开源的框架中就是使用了类似的机制),这也为实现LF线程池的模型提供基础准备;另外,线程的退出机制需要完善,如何退出指定线程?是不是需要增加当前线程动态数量(多少运行,多少等待以及整体多少数量等待)的获取API,当前线程运行状态与收缩时的动态匹配等等。
其实上面的实现的也没有多么复杂,即使如此,线程池中的线程并行化控制仍然有很多没有解决,比如线程与CPU核心的绑定,线程的分组,线程的负载平衡(即防止线程的局部化应用,有的线程反复工作,有的线程发生饥饿现象)、条件变量的模板化封装等等。这个在后续逐渐完善,大家不要着急。

五、总结

在前面的文章发出去后,有兄弟留言说这个太简陋,对实际应用没啥作用。其实这个系列的文章重点不是一开始就推出一个完善的线程池供大家使用,更多的给初学写线程池的人一个设计编写的实操过程。正如前面反复说的,一个好的线程池一定是适应一种或少数几种场景的而不是普适于所有场景。所以重要是让大家有一个理解网上开源的线程池是如何写出来的过程。只要明白了设计和实现线程池的过程,那么这个线程池如何写好,就看个人的水平了。否则的话,直接从网上下载一个线程池用就可以了。可这样,又怎么能够针对实际应用的不同的场景进行完善呢?肯定还是要吃透别人的代码。而这个系列文章就是让大家在看别人的代码时,比较轻松的搞定相关的框架流程。
本篇的线程管理的完善,其实就是可以当成一种逐步对线程池完善的开始,如何管理线程池里的线程,甚至精确管理每一个线程,都可以根据情况来定。
罗马不是一天建成的,慢慢来,每天进步一点点!

这篇关于一步一步写线程之八线程池的完善之一线程管理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/840250

相关文章

Javaee多线程之进程和线程之间的区别和联系(最新整理)

《Javaee多线程之进程和线程之间的区别和联系(最新整理)》进程是资源分配单位,线程是调度执行单位,共享资源更高效,创建线程五种方式:继承Thread、Runnable接口、匿名类、lambda,r... 目录进程和线程进程线程进程和线程的区别创建线程的五种写法继承Thread,重写run实现Runnab

SpringBoot线程池配置使用示例详解

《SpringBoot线程池配置使用示例详解》SpringBoot集成@Async注解,支持线程池参数配置(核心数、队列容量、拒绝策略等)及生命周期管理,结合监控与任务装饰器,提升异步处理效率与系统... 目录一、核心特性二、添加依赖三、参数详解四、配置线程池五、应用实践代码说明拒绝策略(Rejected

Knife4j+Axios+Redis前后端分离架构下的 API 管理与会话方案(最新推荐)

《Knife4j+Axios+Redis前后端分离架构下的API管理与会话方案(最新推荐)》本文主要介绍了Swagger与Knife4j的配置要点、前后端对接方法以及分布式Session实现原理,... 目录一、Swagger 与 Knife4j 的深度理解及配置要点Knife4j 配置关键要点1.Spri

Java 线程安全与 volatile与单例模式问题及解决方案

《Java线程安全与volatile与单例模式问题及解决方案》文章主要讲解线程安全问题的五个成因(调度随机、变量修改、非原子操作、内存可见性、指令重排序)及解决方案,强调使用volatile关键字... 目录什么是线程安全线程安全问题的产生与解决方案线程的调度是随机的多个线程对同一个变量进行修改线程的修改操

使用jenv工具管理多个JDK版本的方法步骤

《使用jenv工具管理多个JDK版本的方法步骤》jenv是一个开源的Java环境管理工具,旨在帮助开发者在同一台机器上轻松管理和切换多个Java版本,:本文主要介绍使用jenv工具管理多个JD... 目录一、jenv到底是干啥的?二、jenv的核心功能(一)管理多个Java版本(二)支持插件扩展(三)环境隔

Java中实现线程的创建和启动的方法

《Java中实现线程的创建和启动的方法》在Java中,实现线程的创建和启动是两个不同但紧密相关的概念,理解为什么要启动线程(调用start()方法)而非直接调用run()方法,是掌握多线程编程的关键,... 目录1. 线程的生命周期2. start() vs run() 的本质区别3. 为什么必须通过 st

Linux实现线程同步的多种方式汇总

《Linux实现线程同步的多种方式汇总》本文详细介绍了Linux下线程同步的多种方法,包括互斥锁、自旋锁、信号量以及它们的使用示例,通过这些同步机制,可以解决线程安全问题,防止资源竞争导致的错误,示例... 目录什么是线程同步?一、互斥锁(单人洗手间规则)适用场景:特点:二、条件变量(咖啡厅取餐系统)工作流

Java中常见队列举例详解(非线程安全)

《Java中常见队列举例详解(非线程安全)》队列用于模拟队列这种数据结构,队列通常是指先进先出的容器,:本文主要介绍Java中常见队列(非线程安全)的相关资料,文中通过代码介绍的非常详细,需要的朋... 目录一.队列定义 二.常见接口 三.常见实现类3.1 ArrayDeque3.1.1 实现原理3.1.2

SpringBoot3中使用虚拟线程的完整步骤

《SpringBoot3中使用虚拟线程的完整步骤》在SpringBoot3中使用Java21+的虚拟线程(VirtualThreads)可以显著提升I/O密集型应用的并发能力,这篇文章为大家介绍了详细... 目录1. 环境准备2. 配置虚拟线程方式一:全局启用虚拟线程(Tomcat/Jetty)方式二:异步

如何解决Druid线程池Cause:java.sql.SQLRecoverableException:IO错误:Socket read timed out的问题

《如何解决Druid线程池Cause:java.sql.SQLRecoverableException:IO错误:Socketreadtimedout的问题》:本文主要介绍解决Druid线程... 目录异常信息触发场景找到版本发布更新的说明从版本更新信息可以看到该默认逻辑已经去除总结异常信息触发场景复