基于NioEventLoop线程夯住问题了解线程池工作流程

2023-11-23 16:48

本文主要是介绍基于NioEventLoop线程夯住问题了解线程池工作流程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

问题现象

近期我们使用NioEventLoop出现一个奇怪的现象,在消息密集的情况下,服务端处理会断断续续的,一会性能看着又没问题,一会又会阻塞很久再处理消息。经过不断的摸索排查发现是线程池使用不当导致的,我们不妨来看看这个问题的代码。

代码演示

在演示代码之前,我们不妨先来了解一下这个需求,如下图,客户端和服务端建立连接之后,会向该通道不断发送消息。然后服务端收到消息,会将消息提交到业务线程池中异步处理。

先来看看客户端的代码,就是一套标准的模板代码,设置好对应参数以及业务处理器之后,直接向服务端的9999端口发起连接。

public class Client9 {static final int MSG_SIZE = 256;public void connect() throws Exception {EventLoopGroup group = new NioEventLoopGroup(8);Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {//业务处理器ch.pipeline().addLast(new ClientHandler());}});ChannelFuture f = b.connect("127.0.0.1", 9999).sync();f.channel().closeFuture().addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {group.shutdownGracefully();}});}public static void main(String[] args) throws Exception {new Client9().connect();}
}

再来看看客户端的业务处理器,代码逻辑也很简单,和服务端建立了连接之后,创建一个线程,无限循环,每隔1毫秒发送消息给服务端。

public class ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) {new Thread(() -> {//无限循环,每隔一毫秒发送一次消息while (true) {ByteBuf firstMessage = Unpooled.buffer(Client9.MSG_SIZE);for (int i = 0; i < firstMessage.capacity(); i++) {firstMessage.writeByte((byte) i);}ctx.writeAndFlush(firstMessage);try {TimeUnit.MILLISECONDS.sleep(1);} catch (Exception e) {e.printStackTrace();}}}).start();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}

了解了客户端代码之后,我们再来看看服务端的代码,首先自然是启动类,就是典型的一套启动类模板,配置声明主从reactor,设置参数,以及配置业务处理器。最终阻塞监听等待连接。

public class Server9 {public static void main(String[] args) throws Exception {//声明主从reactorEventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {//追加业务处理器ChannelPipeline p = ch.pipeline();p.addLast(new ServerHandler());}});//监听9999端口ChannelFuture f = b.bind(9999).sync();f.channel().closeFuture().addListener((ChannelFutureListener) future -> {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();});}
}

而业务处理器的代码如下,可以看到笔者在服务端收到消息之后将消息提交到业务线程池中,该线程池我们假设已经达到服务器可以达到的最大值了,而拒绝策略我们也配置为当任务处理不过来时,用当前调用线程池的线程处理当前任务。

注意笔者下面的if判断有一个逻辑判断Thread.currentThread() == ctx.channel().eventLoop(),这个就是笔者为了重现该问题而特地加的一个判断,读者现在留意到即可,我们会在后文详述原因。

public class ServerHandler extends ChannelInboundHandlerAdapter {static AtomicInteger sum = new AtomicInteger(0);static ExecutorService executorService = new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());public void channelRead(ChannelHandlerContext ctx, Object msg) {//通过原子类记录收到的第几个消息SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");String date = simpleDateFormat.format(new Date());System.out.println("--> Server receive client message : " + sum.incrementAndGet()+"time: "+date);//将消息提交到业务线程池中处理executorService.execute(() -> {ByteBuf req = (ByteBuf) msg;//其它业务逻辑处理,访问数据库if (sum.get() % 100 == 0 || (Thread.currentThread() == ctx.channel().eventLoop()))try {//访问数据库,模拟偶现的数据库慢,同步阻塞15秒TimeUnit.SECONDS.sleep(15);} catch (Exception e) {e.printStackTrace();}//转发消息,此处代码省略,转发成功之后返回响应给终端ctx.writeAndFlush(req);});}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}

自此我们的代码都编写完成,我们不妨将服务端和客户端代码都启动。通过控制台可以发现,1毫秒发送的消息,会时不时的卡15s才能继续处理消息。

在这里插入图片描述

排查过程

这类问题我们用jvisualvm看看GC情况是否正常,看看是不是频繁的Full GC导致整个进程处于STW状态导致消息任务阻塞。

监控结果如下,很明显GC没有问题,我们只能看看CPU使用情况。

在这里插入图片描述

很明显的CPU使用情况也是正常,没有什么奇奇怪怪的任务导致使用率飙升。

在这里插入图片描述

所以我们只能看看线程使用情况了,果然,我们发现NioEventLoop居然长时间的处于休眠状态。

在这里插入图片描述

所以我们用jps定位Java进程id后键入jstack查看线程使用情况

jstack -l 17892

自此我们终于找到了线程长期休眠的原因,从下面的堆栈我们可以看出,正是任务量巨大,导致业务线程池无法及时处理消息,最终业务线程池走到了拒绝策略,这就使得业务线程池一直走到CallerRunsPolicy,也就是说业务线程池忙不过来的时候会将任务交由NioEventLoop执行。而一个连接只会有一个NioEventLoop的线程执行,使得原本非常忙碌的NioEventLoop还得分神处理一下我们业务线程池的任务。

在这里插入图片描述

为了验证这一点,我们不妨在业务线程池中打印线程名:

//将消息提交到业务线程池中处理executorService.execute(() -> {System.out.println(" executorService execute thread name: "+Thread.currentThread().getName());ByteBuf req = (ByteBuf) msg;//其它业务逻辑处理,访问数据库if (sum.get() % 100 == 0 || (Thread.currentThread() == ctx.channel().eventLoop()))try {//访问数据库,模拟偶现的数据库慢,同步阻塞15秒TimeUnit.SECONDS.sleep(15);} catch (Exception e) {e.printStackTrace();}//转发消息,此处代码省略,转发成功之后返回响应给终端ctx.writeAndFlush(req);});

最终我们可以看到,线程池中的任务都被nioEventLoopGroup这个线程执行,所以这也是笔者为什么在模拟问题时在if中增加 (Thread.currentThread() == ctx.channel().eventLoop())的原因,就是为了模仿那些耗时的业务被nioEventLoopGroup的线程执行的情况,例如:一个耗时需要15s的任务刚刚好因为拒绝策略被nioEventLoopGroup执行,那么Netty服务端的消息处理自然就会阻塞,出现本文所说的问题。

在这里插入图片描述

解决方案

从上文的分析中我们可以得出下面这样一个结果,所以解决该问题的方式又两种:

  1. 调整业务线程池大小。
  2. 调整拒绝策略。

在这里插入图片描述

由于我们当前的线程池大小已经假设为最大值了,所以如果我们能够保证消息幂等,我们建议将拒绝策略改为直接丢弃。

static ExecutorService executorService = new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.DiscardOldestPolicy());

自此之后我们再查看控制台输出和NioEventLoop线程状态,发现运行都没有阻塞,那些实在无法处理的消息都被丢弃了。
在这里插入图片描述

反思总结

自此我们对于本次的事件总结出以下几点要求和建议:

  1. 耗时操作不要用NioEventLoop,尤其是本次这种高并发且拒绝策略配置为用执行线程接收忙碌任务的方式。
  2. 服务端收不到消息时,建议从CPU、GC、线程等角度分析问题。
  3. 建议使用两个NioEventLoop构成主从Reactor模式。

参考

Java性能调优 6步实现项目性能升级

这篇关于基于NioEventLoop线程夯住问题了解线程池工作流程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/419338

相关文章

线上Java OOM问题定位与解决方案超详细解析

《线上JavaOOM问题定位与解决方案超详细解析》OOM是JVM抛出的错误,表示内存分配失败,:本文主要介绍线上JavaOOM问题定位与解决方案的相关资料,文中通过代码介绍的非常详细,需要的朋... 目录一、OOM问题核心认知1.1 OOM定义与技术定位1.2 OOM常见类型及技术特征二、OOM问题定位工具

通过Docker容器部署Python环境的全流程

《通过Docker容器部署Python环境的全流程》在现代化开发流程中,Docker因其轻量化、环境隔离和跨平台一致性的特性,已成为部署Python应用的标准工具,本文将详细演示如何通过Docker容... 目录引言一、docker与python的协同优势二、核心步骤详解三、进阶配置技巧四、生产环境最佳实践

MyBatis分页查询实战案例完整流程

《MyBatis分页查询实战案例完整流程》MyBatis是一个强大的Java持久层框架,支持自定义SQL和高级映射,本案例以员工工资信息管理为例,详细讲解如何在IDEA中使用MyBatis结合Page... 目录1. MyBATis框架简介2. 分页查询原理与应用场景2.1 分页查询的基本原理2.1.1 分

Vue3绑定props默认值问题

《Vue3绑定props默认值问题》使用Vue3的defineProps配合TypeScript的interface定义props类型,并通过withDefaults设置默认值,使组件能安全访问传入的... 目录前言步骤步骤1:使用 defineProps 定义 Props步骤2:设置默认值总结前言使用T

深入浅出Spring中的@Autowired自动注入的工作原理及实践应用

《深入浅出Spring中的@Autowired自动注入的工作原理及实践应用》在Spring框架的学习旅程中,@Autowired无疑是一个高频出现却又让初学者头疼的注解,它看似简单,却蕴含着Sprin... 目录深入浅出Spring中的@Autowired:自动注入的奥秘什么是依赖注入?@Autowired

Java中如何正确的停掉线程

《Java中如何正确的停掉线程》Java通过interrupt()通知线程停止而非强制,确保线程自主处理中断,避免数据损坏,线程池的shutdown()等待任务完成,shutdownNow()强制中断... 目录为什么不强制停止为什么 Java 不提供强制停止线程的能力呢?如何用interrupt停止线程s

Web服务器-Nginx-高并发问题

《Web服务器-Nginx-高并发问题》Nginx通过事件驱动、I/O多路复用和异步非阻塞技术高效处理高并发,结合动静分离和限流策略,提升性能与稳定性... 目录前言一、架构1. 原生多进程架构2. 事件驱动模型3. IO多路复用4. 异步非阻塞 I/O5. Nginx高并发配置实战二、动静分离1. 职责2

redis-sentinel基础概念及部署流程

《redis-sentinel基础概念及部署流程》RedisSentinel是Redis的高可用解决方案,通过监控主从节点、自动故障转移、通知机制及配置提供,实现集群故障恢复与服务持续可用,核心组件包... 目录一. 引言二. 核心功能三. 核心组件四. 故障转移流程五. 服务部署六. sentinel部署

解决升级JDK报错:module java.base does not“opens java.lang.reflect“to unnamed module问题

《解决升级JDK报错:modulejava.basedoesnot“opensjava.lang.reflect“tounnamedmodule问题》SpringBoot启动错误源于Jav... 目录问题描述原因分析解决方案总结问题描述启动sprintboot时报以下错误原因分析编程异js常是由Ja

SpringBoot集成XXL-JOB实现任务管理全流程

《SpringBoot集成XXL-JOB实现任务管理全流程》XXL-JOB是一款轻量级分布式任务调度平台,功能丰富、界面简洁、易于扩展,本文介绍如何通过SpringBoot项目,使用RestTempl... 目录一、前言二、项目结构简述三、Maven 依赖四、Controller 代码详解五、Service