异步线程traceId如何实现传递

2025-02-08 04:50

本文主要是介绍异步线程traceId如何实现传递,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《异步线程traceId如何实现传递》文章介绍了如何在异步请求中传递traceId,通过重写ThreadPoolTaskExecutor的方法和实现TaskDecorator接口来增强线程池,确保异步...

前言

在日常问题排查中,我们经常在ELK中根据traceId来查询请求的日志链路,在同步请求中,根据traceId一站到底,很爽,那如果是异步请求该如何处理呢?

项目中的异步请求都是结合线程池来开启异步线程,下面结合slf4j中的MDC和线程池来实现异步线程的traceId传递。

重写ThreadPoolTaskExecutor中方法

下面的工具类,分别在Callable和Runnable异步任务执行前通过MDC.setContextMap(context)设置请求映射上下文

import org.slf4j.MDC;
import org.springframework.util.CollectionUtils;

import Java.util.Map;
import java.util.concurrent.Callable;

/**
 * @desc: 定义MDC工具类,支持Runnable和Callable两种,目的就是为了把父线程的traceId设置给子线程
 */
public class MdcUtil {

    public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
        return () -> {
            if (CollectionUtils.isEmpty(context)) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                return callable.call();
            } finally {
                // 清除子线程的,避免内存溢出,就和ThreadLocal.remove()一个原因
                MDC.clear();
            }
        };
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
        return () -> {
            if (CollectionUtils.isEmpty(context)) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                runnable.run();
            } finally {
                MDC.clear();
            }
        };
    }
}

下面定义一个ThreadPoolMdcExecutor 类来继承ThreadPoolTaskExecutor 类,重写execute和submit方法

import java.util.concurrent.Callable;
import java.util.concurrent.Future;

/**
 * @desc: 把当前的traceId透传到子线程特意加的实现。
 *   重点就是 MDC.getCopyOfContextMap(),此方法获取当js前线程(父线程)的traceId
 */
public class ThreadPoolMdcExecutor extends ThreadPoolTaskExecutor {
    @Override
    publpythonic void execute(Runnable task) {
        super.execute(MdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(MdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return super.submit(MdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }
}

下面定义线程池,就可以使用ThreadPoolMdcExecutor

    @Bean(name = "callBackExecutorConfig")
    public Executor callBackExecutorConfig() {
        ThreadPoolTaskExecutor executor = new ThreadPoolMdcExecutor();
        // 配置核心线程数
        executor.setCorePoolSize(10);
        // 配置最大线程数
        executor.setMaxPoolSize(20);
        // 配置队列大小
        executor.setQueueCapacity(200);
        // 配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("async-Thread-");
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // abort:在调用executor执行的方法中抛出异常 RejectedExecutionException
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        // 执行初始化
        executor.initialize();
        return executor;
    }

定义好线程池之后,我们就可以使用callBackExecutorConfig线程池进行异步任务,避免异步线程中的traceId丢失。

线程池增强

上面是通过继承ThreadPoolTaskExecutor来,重写execute和submit方法,设置MDC.setContextMap(context)设置上下文,我们也可以通过实现TaskDecorator 接口来增强线程池

public class ContextTransferTaskDecorator implements TaskDecorator {
    @Override
    public Runnable decorate(Runnable runnable) {
        Map<String, String> context = MDC.getCopyOfContextMap();
        RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes();
        return () -> {
            try {
                MDC.setContextMap(context);
                RequestContextHolder.setRequestAttributes(requestAttributes);
                runnable.run();
            } finally {
                MDC.clear();
                RequestContextHolder.resetRequestAttributes();
            }
        };
    }
}

接下来,定义线程池,对线程池进行增强

    @Bean(name = "callBackExecutorConfig")
    public Executor callBackExecutorConfig() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ();
        // 配置核心线程数
        executor.setCorePoolSize(10);
        // 配置最大线程数
        executor.setMaxPoolSize(20);
        // 配置队列大小
        executor.setQueueCapacity(200);
        // 配置线程池中的线程的名称前缀
        executor.setThreadNamePpythonrefix("async-Thread-");
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // abort:在调用executor执行的方法中抛出异常 RejectedExecutionException
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        //线程池增强
        threadPoolTaskExecutor.setTaskDecorator(new ContextTransferTaskDecorator());
        // 执行初始化
        executor.initialize();
        return executphpor;
    python}

总结

上面两种方式其实本质都是通过Mdc来进行异步线程间的traceId同步,可以看下Mdc的源码,最终还是通过InheritableThreadLocal来实现子线程获取父线程信息

public class BasicMDCAdapter implements MDCAdapter {
    private InheritableThreadLocal<Map<String, String>> inheritableThreadLocal = 
        new InheritableThreadLocal<Map<String, String>>() {
        protected Map<String, String> childValue(Map<String, String> parentValue) {
            return parentValue == null ? null : new HashMap(parentValue);
        }
    };
    
    //省略若干
    ......
}

以上为个人经验,希望能给大家一个参考,也希望大家多多支持China编程(www.chinasem.cn)。

这篇关于异步线程traceId如何实现传递的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1153341

相关文章

Python的Darts库实现时间序列预测

《Python的Darts库实现时间序列预测》Darts一个集统计、机器学习与深度学习模型于一体的Python时间序列预测库,本文主要介绍了Python的Darts库实现时间序列预测,感兴趣的可以了解... 目录目录一、什么是 Darts?二、安装与基本配置安装 Darts导入基础模块三、时间序列数据结构与

Python使用FastAPI实现大文件分片上传与断点续传功能

《Python使用FastAPI实现大文件分片上传与断点续传功能》大文件直传常遇到超时、网络抖动失败、失败后只能重传的问题,分片上传+断点续传可以把大文件拆成若干小块逐个上传,并在中断后从已完成分片继... 目录一、接口设计二、服务端实现(FastAPI)2.1 运行环境2.2 目录结构建议2.3 serv

C#实现千万数据秒级导入的代码

《C#实现千万数据秒级导入的代码》在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,所以本文我就给大家分享一下千万数据秒级导入怎么实现,文中有详细的代码示例供大家参考,需要的朋友可... 目录前言一、数据存储二、处理逻辑优化前代码处理逻辑优化后的代码总结前言在实际开发中excel导入很

SpringBoot+RustFS 实现文件切片极速上传的实例代码

《SpringBoot+RustFS实现文件切片极速上传的实例代码》本文介绍利用SpringBoot和RustFS构建高性能文件切片上传系统,实现大文件秒传、断点续传和分片上传等功能,具有一定的参考... 目录一、为什么选择 RustFS + SpringBoot?二、环境准备与部署2.1 安装 RustF

Nginx部署HTTP/3的实现步骤

《Nginx部署HTTP/3的实现步骤》本文介绍了在Nginx中部署HTTP/3的详细步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录前提条件第一步:安装必要的依赖库第二步:获取并构建 BoringSSL第三步:获取 Nginx

MyBatis Plus实现时间字段自动填充的完整方案

《MyBatisPlus实现时间字段自动填充的完整方案》在日常开发中,我们经常需要记录数据的创建时间和更新时间,传统的做法是在每次插入或更新操作时手动设置这些时间字段,这种方式不仅繁琐,还容易遗漏,... 目录前言解决目标技术栈实现步骤1. 实体类注解配置2. 创建元数据处理器3. 服务层代码优化填充机制详

Python实现Excel批量样式修改器(附完整代码)

《Python实现Excel批量样式修改器(附完整代码)》这篇文章主要为大家详细介绍了如何使用Python实现一个Excel批量样式修改器,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录前言功能特性核心功能界面特性系统要求安装说明使用指南基本操作流程高级功能技术实现核心技术栈关键函

Java实现字节字符转bcd编码

《Java实现字节字符转bcd编码》BCD是一种将十进制数字编码为二进制的表示方式,常用于数字显示和存储,本文将介绍如何在Java中实现字节字符转BCD码的过程,需要的小伙伴可以了解下... 目录前言BCD码是什么Java实现字节转bcd编码方法补充总结前言BCD码(Binary-Coded Decima

SpringBoot全局域名替换的实现

《SpringBoot全局域名替换的实现》本文主要介绍了SpringBoot全局域名替换的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录 项目结构⚙️ 配置文件application.yml️ 配置类AppProperties.Ja

Python实现批量CSV转Excel的高性能处理方案

《Python实现批量CSV转Excel的高性能处理方案》在日常办公中,我们经常需要将CSV格式的数据转换为Excel文件,本文将介绍一个基于Python的高性能解决方案,感兴趣的小伙伴可以跟随小编一... 目录一、场景需求二、技术方案三、核心代码四、批量处理方案五、性能优化六、使用示例完整代码七、小结一、