异步线程traceId如何实现传递

2025-02-08 04:50

本文主要是介绍异步线程traceId如何实现传递,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《异步线程traceId如何实现传递》文章介绍了如何在异步请求中传递traceId,通过重写ThreadPoolTaskExecutor的方法和实现TaskDecorator接口来增强线程池,确保异步...

前言

在日常问题排查中,我们经常在ELK中根据traceId来查询请求的日志链路,在同步请求中,根据traceId一站到底,很爽,那如果是异步请求该如何处理呢?

项目中的异步请求都是结合线程池来开启异步线程,下面结合slf4j中的MDC和线程池来实现异步线程的traceId传递。

重写ThreadPoolTaskExecutor中方法

下面的工具类,分别在Callable和Runnable异步任务执行前通过MDC.setContextMap(context)设置请求映射上下文

import org.slf4j.MDC;
import org.springframework.util.CollectionUtils;

import Java.util.Map;
import java.util.concurrent.Callable;

/**
 * @desc: 定义MDC工具类,支持Runnable和Callable两种,目的就是为了把父线程的traceId设置给子线程
 */
public class MdcUtil {

    public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
        return () -> {
            if (CollectionUtils.isEmpty(context)) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                return callable.call();
            } finally {
                // 清除子线程的,避免内存溢出,就和ThreadLocal.remove()一个原因
                MDC.clear();
            }
        };
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
        return () -> {
            if (CollectionUtils.isEmpty(context)) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                runnable.run();
            } finally {
                MDC.clear();
            }
        };
    }
}

下面定义一个ThreadPoolMdcExecutor 类来继承ThreadPoolTaskExecutor 类,重写execute和submit方法

import java.util.concurrent.Callable;
import java.util.concurrent.Future;

/**
 * @desc: 把当前的traceId透传到子线程特意加的实现。
 *   重点就是 MDC.getCopyOfContextMap(),此方法获取当js前线程(父线程)的traceId
 */
public class ThreadPoolMdcExecutor extends ThreadPoolTaskExecutor {
    @Override
    publpythonic void execute(Runnable task) {
        super.execute(MdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(MdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return super.submit(MdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }
}

下面定义线程池,就可以使用ThreadPoolMdcExecutor

    @Bean(name = "callBackExecutorConfig")
    public Executor callBackExecutorConfig() {
        ThreadPoolTaskExecutor executor = new ThreadPoolMdcExecutor();
        // 配置核心线程数
        executor.setCorePoolSize(10);
        // 配置最大线程数
        executor.setMaxPoolSize(20);
        // 配置队列大小
        executor.setQueueCapacity(200);
        // 配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("async-Thread-");
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // abort:在调用executor执行的方法中抛出异常 RejectedExecutionException
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        // 执行初始化
        executor.initialize();
        return executor;
    }

定义好线程池之后,我们就可以使用callBackExecutorConfig线程池进行异步任务,避免异步线程中的traceId丢失。

线程池增强

上面是通过继承ThreadPoolTaskExecutor来,重写execute和submit方法,设置MDC.setContextMap(context)设置上下文,我们也可以通过实现TaskDecorator 接口来增强线程池

public class ContextTransferTaskDecorator implements TaskDecorator {
    @Override
    public Runnable decorate(Runnable runnable) {
        Map<String, String> context = MDC.getCopyOfContextMap();
        RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes();
        return () -> {
            try {
                MDC.setContextMap(context);
                RequestContextHolder.setRequestAttributes(requestAttributes);
                runnable.run();
            } finally {
                MDC.clear();
                RequestContextHolder.resetRequestAttributes();
            }
        };
    }
}

接下来,定义线程池,对线程池进行增强

    @Bean(name = "callBackExecutorConfig")
    public Executor callBackExecutorConfig() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ();
        // 配置核心线程数
        executor.setCorePoolSize(10);
        // 配置最大线程数
        executor.setMaxPoolSize(20);
        // 配置队列大小
        executor.setQueueCapacity(200);
        // 配置线程池中的线程的名称前缀
        executor.setThreadNamePpythonrefix("async-Thread-");
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // abort:在调用executor执行的方法中抛出异常 RejectedExecutionException
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        //线程池增强
        threadPoolTaskExecutor.setTaskDecorator(new ContextTransferTaskDecorator());
        // 执行初始化
        executor.initialize();
        return executphpor;
    python}

总结

上面两种方式其实本质都是通过Mdc来进行异步线程间的traceId同步,可以看下Mdc的源码,最终还是通过InheritableThreadLocal来实现子线程获取父线程信息

public class BasicMDCAdapter implements MDCAdapter {
    private InheritableThreadLocal<Map<String, String>> inheritableThreadLocal = 
        new InheritableThreadLocal<Map<String, String>>() {
        protected Map<String, String> childValue(Map<String, String> parentValue) {
            return parentValue == null ? null : new HashMap(parentValue);
        }
    };
    
    //省略若干
    ......
}

以上为个人经验,希望能给大家一个参考,也希望大家多多支持China编程(www.chinasem.cn)。

这篇关于异步线程traceId如何实现传递的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1153341

相关文章

Python对接支付宝支付之使用AliPay实现的详细操作指南

《Python对接支付宝支付之使用AliPay实现的详细操作指南》支付宝没有提供PythonSDK,但是强大的github就有提供python-alipay-sdk,封装里很多复杂操作,使用这个我们就... 目录一、引言二、准备工作2.1 支付宝开放平台入驻与应用创建2.2 密钥生成与配置2.3 安装ali

Spring Security 单点登录与自动登录机制的实现原理

《SpringSecurity单点登录与自动登录机制的实现原理》本文探讨SpringSecurity实现单点登录(SSO)与自动登录机制,涵盖JWT跨系统认证、RememberMe持久化Token... 目录一、核心概念解析1.1 单点登录(SSO)1.2 自动登录(Remember Me)二、代码分析三、

PyCharm中配置PyQt的实现步骤

《PyCharm中配置PyQt的实现步骤》PyCharm是JetBrains推出的一款强大的PythonIDE,结合PyQt可以进行pythion高效开发桌面GUI应用程序,本文就来介绍一下PyCha... 目录1. 安装China编程PyQt1.PyQt 核心组件2. 基础 PyQt 应用程序结构3. 使用 Q

Python实现批量提取BLF文件时间戳

《Python实现批量提取BLF文件时间戳》BLF(BinaryLoggingFormat)作为Vector公司推出的CAN总线数据记录格式,被广泛用于存储车辆通信数据,本文将使用Python轻松提取... 目录一、为什么需要批量处理 BLF 文件二、核心代码解析:从文件遍历到数据导出1. 环境准备与依赖库

linux下shell脚本启动jar包实现过程

《linux下shell脚本启动jar包实现过程》确保APP_NAME和LOG_FILE位于目录内,首次启动前需手动创建log文件夹,否则报错,此为个人经验,供参考,欢迎支持脚本之家... 目录linux下shell脚本启动jar包样例1样例2总结linux下shell脚本启动jar包样例1#!/bin

go动态限制并发数量的实现示例

《go动态限制并发数量的实现示例》本文主要介绍了Go并发控制方法,通过带缓冲通道和第三方库实现并发数量限制,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 目录带有缓冲大小的通道使用第三方库其他控制并发的方法因为go从语言层面支持并发,所以面试百分百会问到

Go语言并发之通知退出机制的实现

《Go语言并发之通知退出机制的实现》本文主要介绍了Go语言并发之通知退出机制的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录1、通知退出机制1.1 进程/main函数退出1.2 通过channel退出1.3 通过cont

Python实现PDF按页分割的技术指南

《Python实现PDF按页分割的技术指南》PDF文件处理是日常工作中的常见需求,特别是当我们需要将大型PDF文档拆分为多个部分时,下面我们就来看看如何使用Python创建一个灵活的PDF分割工具吧... 目录需求分析技术方案工具选择安装依赖完整代码实现使用说明基本用法示例命令输出示例技术亮点实际应用场景扩

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

如何在Java Spring实现异步执行(详细篇)

《如何在JavaSpring实现异步执行(详细篇)》Spring框架通过@Async、Executor等实现异步执行,提升系统性能与响应速度,支持自定义线程池管理并发,本文给大家介绍如何在Sprin... 目录前言1. 使用 @Async 实现异步执行1.1 启用异步执行支持1.2 创建异步方法1.3 调用