ThreadPoolExecutor的实现机制

2024-05-28 17:32

本文主要是介绍ThreadPoolExecutor的实现机制,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文地址:《ThreadPoolExecutor的实现机制》
1、什么是ThreadPoolExecutor
ThreadPoolExecutor是一个 ExecutorService一个具体实现,在实际项目中,主要使用ThreadPoolExecutor维护的线程队列中的任意一个空闲线程去执行每个提交任务。说的直白点就是在实际项目中,没有办法为每个提交的任务立马分配一个线程,所以在程序中维护一个数量一定的线程集合来应对提交的任务。ThreadPoolExecutor是对指定数量线程资源和待处理任务队列的维护。

2、为什么要使用ThreadPoolExecutor
ThreadPoolExecutor(线程池)主要有一些优势:

降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
3、ThreadPoolExecutor具体实现
ThreadPoolExecutor提交任务的大致流程图如下:

如上所示的流程图有一点需要说明,在添加任务到任务队列如果出现失败,会当成当前任务队列已满情况来处理,尝试创建新线程来执行任务。流程中的具体的实现代码如下:

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn’t, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf© < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning© && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
在使用execute方法执行任务时,首先通过ctl获取当前的值,ctl是一个AtomicInteger对象,初始值为-536870912,通过workerCountOf确定当前线程池中的线程数。

线程数小于线程池的核心线程数,则执行创建线程并执行任务的流程(addWorker(command, true)),流程代码如下:

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}

在 C 值没有超过0时,workerCountOf(int C)函数返回的值为(C-(-RUNNING)),runStateOf(int C)函数的返回值为min(C,RUNNING),所以在第一个for循环中,程序不会进入if语句中,在第二个for循环中,由于当前函数传入的core为true,表明添加的线程为核心线程,程序值判断当前线程数小于核心线程数,ctl自增1,并跳出for循环开始创建工作线程。这里使用了ReentrantLock进行了加锁操作。在整个锁部分主要执行创建工作线程、添加工作线程到线程集合中、更新线程池当前线程数、工作线程启动执行任务,返回当前添加线程是否成功,成功直接返回退出execute方法。

当前线程数大于等于线程池核心线程数时,添加新任务流程具体逻辑实现如下:

if (isRunning© && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
isRunning方法判断当前入参是否小于0,条件显然时满足的,接着执行workQueue.offer(command)操作,将当前任务添加至线程池的任务队列中,后续的if条件都不满足,整个execute的调用只是将任务添加到线程池的任务队列中。

在任务队列已满的情况下添加任务队列, 添加新任务流程具体逻辑实现如下:

if (isRunning© && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
由于任务队列已经满了,所以添加任务到任务队列中失败,返回false,执行else if语句,调用addWorker(command, false)方法,执行相关流程,流程实现代码如下:

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;? {
int c = ctl.get();
int rs = runStateOf©;

        // Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;

}
判断当前线程池的wc即线程数是否大于最大线程数,如果不大于则之间更新当前线程池的线程数量,然后创建新的工作线程,执行任务。

在线程池任务队列已满,并且线程数已经达到线程池的最大线程数时,再次往线程池提交任务,线程池会根据线程池设置的拒绝策略来处理任务,即addWorker方法返回false,在execute中执行reject(command)方法,具体代码如下:

final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
直接使用初始化线程池设置的handler来处理任务。

工作线程如何从任务队列中获取任务,每个工作线程都会执行自己的run方法,具体的方法实现如下:

public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
在代码中存在while (task != null || (task = getTask()) != null)语句,这里只有当task为空时,才会正常结束当前线程。在整个任务执行的过程中,存在连个hook函数,beforeExecute和afterExecute通过方法的重写可以在方法中对task信息进行预处理等。

4、 关于ThreadPoolExecutor的总结
在向线程池提交任务时,可能会触发创建新的工作线程。如果是在高并发的情况下会不会造成创建超过线程池规定的数量的线程。因为在代码中通过ctl获取值,在到ctl自增更新线程数量的过程可能会有其他的线程也执行到了这段代码,都会通过线程数量限制条件的判断,最终都会创建新线程执行任务,具体代码片段如下:
retry:
for (;? {
int c = ctl.get();
int rs = runStateOf©;
//假设线程A和线程B同时执行这段代码,其中当前的线程池数量为最
//大线程数-1,但是线程A和线程B进行线程数量判断时都可以通过,
//随后都会进入新建工作线程执行任务的流程。
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

        for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;

线程池并不能节省任务执行的时间,使用线程池只能节省线程的创建和回收时间。
如果项目中需要频繁创建任务来处理大量的短耗时任务,那么使用线程池是一个不错的选择。如果是非常耗时的任务,可能在初始化线程池的时候需要计算好线程池的核心线程数量以防止任务长时间得不到执行。

这篇关于ThreadPoolExecutor的实现机制的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1011218

相关文章

Java实现字节字符转bcd编码

《Java实现字节字符转bcd编码》BCD是一种将十进制数字编码为二进制的表示方式,常用于数字显示和存储,本文将介绍如何在Java中实现字节字符转BCD码的过程,需要的小伙伴可以了解下... 目录前言BCD码是什么Java实现字节转bcd编码方法补充总结前言BCD码(Binary-Coded Decima

SpringBoot全局域名替换的实现

《SpringBoot全局域名替换的实现》本文主要介绍了SpringBoot全局域名替换的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录 项目结构⚙️ 配置文件application.yml️ 配置类AppProperties.Ja

Python实现批量CSV转Excel的高性能处理方案

《Python实现批量CSV转Excel的高性能处理方案》在日常办公中,我们经常需要将CSV格式的数据转换为Excel文件,本文将介绍一个基于Python的高性能解决方案,感兴趣的小伙伴可以跟随小编一... 目录一、场景需求二、技术方案三、核心代码四、批量处理方案五、性能优化六、使用示例完整代码七、小结一、

Java实现将HTML文件与字符串转换为图片

《Java实现将HTML文件与字符串转换为图片》在Java开发中,我们经常会遇到将HTML内容转换为图片的需求,本文小编就来和大家详细讲讲如何使用FreeSpire.DocforJava库来实现这一功... 目录前言核心实现:html 转图片完整代码场景 1:转换本地 HTML 文件为图片场景 2:转换 H

C#使用Spire.Doc for .NET实现HTML转Word的高效方案

《C#使用Spire.Docfor.NET实现HTML转Word的高效方案》在Web开发中,HTML内容的生成与处理是高频需求,然而,当用户需要将HTML页面或动态生成的HTML字符串转换为Wor... 目录引言一、html转Word的典型场景与挑战二、用 Spire.Doc 实现 HTML 转 Word1

C#实现一键批量合并PDF文档

《C#实现一键批量合并PDF文档》这篇文章主要为大家详细介绍了如何使用C#实现一键批量合并PDF文档功能,文中的示例代码简洁易懂,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言效果展示功能实现1、添加文件2、文件分组(书签)3、定义页码范围4、自定义显示5、定义页面尺寸6、PDF批量合并7、其他方法

SpringBoot实现不同接口指定上传文件大小的具体步骤

《SpringBoot实现不同接口指定上传文件大小的具体步骤》:本文主要介绍在SpringBoot中通过自定义注解、AOP拦截和配置文件实现不同接口上传文件大小限制的方法,强调需设置全局阈值远大于... 目录一  springboot实现不同接口指定文件大小1.1 思路说明1.2 工程启动说明二 具体实施2

Python实现精确小数计算的完全指南

《Python实现精确小数计算的完全指南》在金融计算、科学实验和工程领域,浮点数精度问题一直是开发者面临的重大挑战,本文将深入解析Python精确小数计算技术体系,感兴趣的小伙伴可以了解一下... 目录引言:小数精度问题的核心挑战一、浮点数精度问题分析1.1 浮点数精度陷阱1.2 浮点数误差来源二、基础解决

Java实现在Word文档中添加文本水印和图片水印的操作指南

《Java实现在Word文档中添加文本水印和图片水印的操作指南》在当今数字时代,文档的自动化处理与安全防护变得尤为重要,无论是为了保护版权、推广品牌,还是为了在文档中加入特定的标识,为Word文档添加... 目录引言Spire.Doc for Java:高效Word文档处理的利器代码实战:使用Java为Wo

Java实现远程执行Shell指令

《Java实现远程执行Shell指令》文章介绍使用JSch在SpringBoot项目中实现远程Shell操作,涵盖环境配置、依赖引入及工具类编写,详解分号和双与号执行多指令的区别... 目录软硬件环境说明编写执行Shell指令的工具类总结jsch(Java Secure Channel)是SSH2的一个纯J