动态线程池可以这样实现,便于上线及时调整!

2024-02-13 11:20

本文主要是介绍动态线程池可以这样实现,便于上线及时调整!,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在线程池日常实践中我们常常会遇到以下问题:

  • 代码中创建了一个线程池却不知道核心参数设置多少比较合适。

  • 参数设置好后,上线发现需要调整,改代码重启服务非常麻烦。

  • 线程池相对于开发人员来说是个黑箱,运行情况在出现问题 前很难被感知。

因此,动态可监控线程池一种针对以上痛点开发的线程池管理工具。

提供对 Spring 应用内线程池实例的全局管控、应用运行时动态变更线程池参数以及线程池数据采集和监控阈值报警。

已经实现的优秀开源动态线程池

hippo4j、dynamic-tp.....

实现思路

核心管理类

需要能实现对线程池的

  • 服务注册

  • 获取已经注册好的线程池

  • 以及对注册号线程池参数的刷新。

对于每一个线程池,我们使用一个线程池名字作为标识每个线程池的唯一ID。

伪代码实现

public class DtpRegistry {/*** 储存线程池*/private static final Map<String, Executor> EXECUTOR_MAP = new ConcurrentHashMap<>();/*** 获取线程池* @param executorName 线程池名字*/public static Executor getExecutor(String executorName) {return EXECUTOR_MAP.get(executorName);}/*** 线程池注册* @param executorName 线程池名字*/public static void registry(String executorName, Executor executor) {//注册EXECUTOR_MAP.put(executorName, executorWrapper);}/*** 刷新线程池参数* @param executorName 线程池名字* @param properties 线程池参数*/public static void refresh(String executorName, ThreadPoolProperties properties) {Executor executor = EXECUTOR_MAP.get(executorName)//刷新参数//.......}}
如何创建线程池?
STEP 1. 我们可以使用yml配置文件的方式配置一个线程池,将线程池实例的创建交由Spring容器。

相关配置

public class DtpProperties {private List<ThreadPoolProperties> executors;}public class ThreadPoolProperties {/*** 标识每个线程池的唯一名字*/private String poolName;private String poolType = "common";/*** 是否为守护线程*/private boolean isDaemon = false;/*** 以下都是核心参数*/private int corePoolSize = 1;private int maximumPoolSize = 1;private long keepAliveTime;private TimeUnit timeUnit = TimeUnit.SECONDS;private String queueType = "arrayBlockingQueue";private int queueSize = 5;private String threadFactoryPrefix = "-td-";private String RejectedExecutionHandler;
}

yml example:

spring:dtp:executors:# 线程池1- poolName: dtpExecutor1corePoolSize: 5maximumPoolSize: 10# 线程池2- poolName: dtpExecutor2corePoolSize: 2maximumPoolSize: 15
STEP 2 根据配置信息添加线程池的BeanDefinition

关键类

@Slf4j
public class DtpImportBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware {private Environment environment;@Overridepublic void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {log.info("注册");//绑定资源DtpProperties dtpProperties = new DtpProperties();ResourceBundlerUtil.bind(environment, dtpProperties);List<ThreadPoolProperties> executors = dtpProperties.getExecutors();if (Objects.isNull(executors)) {log.info("未检测本地到配置文件线程池");return;}//注册beanDefinitionexecutors.forEach((executorProp) -> {BeanUtil.registerIfAbsent(registry, executorProp);});}@Overridepublic void setEnvironment(Environment environment) {this.environment = environment;}
}/**** 工具类**/
public class BeanUtil{public static void registerIfAbsent(BeanDefinitionRegistry registry, ThreadPoolProperties executorProp) {register(registry, executorProp.getPoolName(), executorProp);}public static void register(BeanDefinitionRegistry registry, String beanName, ThreadPoolProperties executorProp) {Class<? extends Executor> executorType = ExecutorType.getClazz(executorProp.getPoolType());Object[] args = assembleArgs(executorProp);register(registry, beanName, executorType, args);}public static void register(BeanDefinitionRegistry registry, String beanName, Class<?> clazz, Object[] args) {BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(clazz);for (Object arg : args) {builder.addConstructorArgValue(arg);}registry.registerBeanDefinition(beanName, builder.getBeanDefinition());}private static Object[] assembleArgs(ThreadPoolProperties executorProp) {return new Object[]{executorProp.getCorePoolSize(),executorProp.getMaximumPoolSize(),executorProp.getKeepAliveTime(),executorProp.getTimeUnit(),QueueType.getInstance(executorProp.getQueueType(), executorProp.getQueueSize()),new NamedThreadFactory(executorProp.getPoolName() + executorProp.getThreadFactoryPrefix(),executorProp.isDaemon()),//先默认不做设置RejectPolicy.ABORT.getValue()};}
}

下面解释一下这个类的作用,environment实例中储存着spring启动时解析的yml配置,所以我们spring提供的Binder将配置绑定到我们前面定义的DtpProperties类中,方便后续使用。接下来的比较简单,就是将线程池的BeanDefinition注册到IOC容器中,让spring去帮我们实例化这个bean。

STEP 3. 将已经实例化的线程池注册到核心类 DtpRegistry 中

我们注册了 beanDefinition 后,spring会帮我们实例化出来, 在这之后我们可以根据需要将这个bean进行进一步的处理,spring也提供了很多机制让我们对bean的生命周期管理进行更多的扩展。对应到这里我们就是将实例化出来的线程池注册到核心类 DtpRegistry 中进行管理。

这里我们使用 BeanPostProcessor 进行处理。

@Slf4j
public class DtpBeanPostProcessor implements BeanPostProcessor {private DefaultListableBeanFactory beanFactory;@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if (bean instanceof DtpExecutor) {//直接纳入管理DtpRegistry.registry(beanName, (DtpExecutor) bean);}return bean;}
}

这里的逻辑很简单, 就是判断一下这个bean是不是线程池,是就统一管理起来。

STEP 4. 启用 BeanDefinitionRegistrar 和 BeanPostProcessor

在springboot程序中,只要加一个@MapperScan注解就能启用mybatis的功能,我们可以学习其在spring中的启用方式,自定义一个注解:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(DtpImportSelector.class)
public @interface EnableDynamicThreadPool {
}

其中,比较关键的是@Import注解,spring会导入注解中的类DtpImportSelector

DtpImportSelector这个类实现了:

public class DtpImportSelector implements DeferredImportSelector {@Overridepublic String[] selectImports(AnnotationMetadata importingClassMetadata) {return new String[]{DtpImportBeanDefinitionRegistrar.class.getName(),DtpBeanPostProcessor.class.getName()};}
}

这样,只要我们再启动类或者配置类上加上@EnableDynamicThreadPool这个注解,spring就会将DtpImportBeanDefinitionRegistrarDtpBeanPostProcessor这两个类加入spring容器管理,从而实现我们的线程池的注册。

@SpringBootApplication
@EnableDynamicThreadPool
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}
如何实现线程池配置的动态刷新

首先明确一点,对于线程池的实现类,例如:ThreadPoolExecutor等,都有提供核心参数对应的 set 方法,让我们实现参数修改。因此,在核心类DtpRegistry中的refresh方法,我们可以这样写:

public class DtpRegistry {/*** 储存线程池*/private static final Map<String, ThreadPoolExecutor> EXECUTOR_MAP = new ConcurrentHashMap<>();/*** 刷新线程池参数* @param executorName 线程池名字* @param properties 线程池参数*/public static void refresh(String executorName, ThreadPoolProperties properties) {ThreadPoolExecutor executor = EXECUTOR_MAP.get(executorName)//设置参数executor.setCorePoolSize(...);executor.setMaximumPoolSize(...);......}}

而这些新参数怎么来呢?我们可以引入Nacos、Apollo等配置中心,实现他们的监听器方法,在监听器方法里调用DtpRegistry的refresh方法刷新即可。

最后说一句(求关注!别白嫖!)

如果这篇文章对您有所帮助,或者有所启发的话,求一键三连:点赞、转发、在看。

关注公众号:woniuxgg,在公众号中回复:笔记  就可以获得蜗牛为你精心准备的java实战语雀笔记,回复面试、开发手册、有超赞的粉丝福利!

这篇关于动态线程池可以这样实现,便于上线及时调整!的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/705404

相关文章

Python实现精确小数计算的完全指南

《Python实现精确小数计算的完全指南》在金融计算、科学实验和工程领域,浮点数精度问题一直是开发者面临的重大挑战,本文将深入解析Python精确小数计算技术体系,感兴趣的小伙伴可以了解一下... 目录引言:小数精度问题的核心挑战一、浮点数精度问题分析1.1 浮点数精度陷阱1.2 浮点数误差来源二、基础解决

Java实现在Word文档中添加文本水印和图片水印的操作指南

《Java实现在Word文档中添加文本水印和图片水印的操作指南》在当今数字时代,文档的自动化处理与安全防护变得尤为重要,无论是为了保护版权、推广品牌,还是为了在文档中加入特定的标识,为Word文档添加... 目录引言Spire.Doc for Java:高效Word文档处理的利器代码实战:使用Java为Wo

Java实现远程执行Shell指令

《Java实现远程执行Shell指令》文章介绍使用JSch在SpringBoot项目中实现远程Shell操作,涵盖环境配置、依赖引入及工具类编写,详解分号和双与号执行多指令的区别... 目录软硬件环境说明编写执行Shell指令的工具类总结jsch(Java Secure Channel)是SSH2的一个纯J

使用Python实现Word文档的自动化对比方案

《使用Python实现Word文档的自动化对比方案》我们经常需要比较两个Word文档的版本差异,无论是合同修订、论文修改还是代码文档更新,人工比对不仅效率低下,还容易遗漏关键改动,下面通过一个实际案例... 目录引言一、使用python-docx库解析文档结构二、使用difflib进行差异比对三、高级对比方

深度解析Python中递归下降解析器的原理与实现

《深度解析Python中递归下降解析器的原理与实现》在编译器设计、配置文件处理和数据转换领域,递归下降解析器是最常用且最直观的解析技术,本文将详细介绍递归下降解析器的原理与实现,感兴趣的小伙伴可以跟随... 目录引言:解析器的核心价值一、递归下降解析器基础1.1 核心概念解析1.2 基本架构二、简单算术表达

QT Creator配置Kit的实现示例

《QTCreator配置Kit的实现示例》本文主要介绍了使用Qt5.12.12与VS2022时,因MSVC编译器版本不匹配及WindowsSDK缺失导致配置错误的问题解决,感兴趣的可以了解一下... 目录0、背景:qt5.12.12+vs2022一、症状:二、原因:(可以跳过,直奔后面的解决方法)三、解决方

Java中如何正确的停掉线程

《Java中如何正确的停掉线程》Java通过interrupt()通知线程停止而非强制,确保线程自主处理中断,避免数据损坏,线程池的shutdown()等待任务完成,shutdownNow()强制中断... 目录为什么不强制停止为什么 Java 不提供强制停止线程的能力呢?如何用interrupt停止线程s

MySQL中On duplicate key update的实现示例

《MySQL中Onduplicatekeyupdate的实现示例》ONDUPLICATEKEYUPDATE是一种MySQL的语法,它在插入新数据时,如果遇到唯一键冲突,则会执行更新操作,而不是抛... 目录1/ ON DUPLICATE KEY UPDATE的简介2/ ON DUPLICATE KEY UP

Python中Json和其他类型相互转换的实现示例

《Python中Json和其他类型相互转换的实现示例》本文介绍了在Python中使用json模块实现json数据与dict、object之间的高效转换,包括loads(),load(),dumps()... 项目中经常会用到json格式转为object对象、dict字典格式等。在此做个记录,方便后续用到该方

JWT + 拦截器实现无状态登录系统

《JWT+拦截器实现无状态登录系统》JWT(JSONWebToken)提供了一种无状态的解决方案:用户登录后,服务器返回一个Token,后续请求携带该Token即可完成身份验证,无需服务器存储会话... 目录✅ 引言 一、JWT 是什么? 二、技术选型 三、项目结构 四、核心代码实现4.1 添加依赖(pom