Netty源码分析:NioEventLoop启动以及其IO操作和Task任务的处理

2024-05-14 03:48

本文主要是介绍Netty源码分析:NioEventLoop启动以及其IO操作和Task任务的处理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Netty源码分析:NioEventLoop启动以及其IO操作和Task任务的处理

在上篇博文分析服务端启动的过程中,我们遇到了如下的代码片段,

            if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new OneTimeTask() {@Overridepublic void run() {register0(promise);//分析}});}

以及

    private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});} 

这些代码片段中,都调用了NioEventLoop类的execute这个方法来提交任务。 本篇博文将以此函数作为切入点,来分析NioEventLoop 启动以及 EventLoop所负责的IO操作和task任务的处理思路。

NioEventLoop类的execute方法

由于NioEventLoop并没有实现此方法,因此,准确来说,应该是NioEventLoop类的超类SingleThreadEventExecutor中的方法。具体代码如下:

    @Overridepublic void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();//判断当前线程是否为该NioEventLoop所关联的线程,如果是,则添加任务到任务队列中,如果不是,则先启动线程,然后添加任务到任务队列中去if (inEventLoop) {addTask(task);} else {startThread();addTask(task);//如果发现该线程已经关闭则移除任务并拒绝if (isShutdown() && removeTask(task)) {reject();}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}

该方法的主要逻辑如下:

1、判断入参数是否为null,如果为null,则抛空指针异常,如果不为null,则进行第2步。

2、通过调用inEventLoop()方法判断当前工作线程是否为该NioEventLoop所关联的线程,如果是,则调用addTask(task)方法将任务添加到任务队列中,如果不是,则进行第3步。

3、启动该NioEventLoop所关联的线程,然后添加任务到任务队列中去 。

4、如果发现该线程已经关闭则移除任务并拒绝

在看startThread()这个方法的具体代码之前,我觉得有必要介绍下EventLoop.execute第一次被调用的地方,因此第一次被调用的地方就是触发startThread()的调用,进而导致了EventLoop所对应的Java线程的启动。

基于这样一个问题,我们开始在服务端启动时开始跟踪代码来寻找。最快的方式是,在EventLoop.execute方法的 startThread();这一行代码打上一个断点,然后看下其调用链即可。

调用链截图如下:

看到了没有,是在服务器端启动的过程中,调用AbstractChannel#AbstractUnsafe.register方法中调用了execute方法,这是因为整个代码都是在主线程中运行的因此在下面的register方法中 eventLoop.inEventLoop() 就为 false, 于是进入到 else 分支, 在这个分支中调用了 eventLoop.execute。eventLoop 是一个 NioEventLoop 的实例, 而 NioEventLoop 没有实现 execute 方法, 因此调用的是 SingleThreadEventExecutor.execute,继而调用startThread方法完成NioEventLoop所对应的线程的启动。

        @Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {//省略了部分检查代码AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new OneTimeTask() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {//省略了部分异常处理代码}}}

结论:当 EventLoop.execute 第一次被调用时, 就会触发 startThread() 的调用, 进而导致了 EventLoop 所对应的 Java 线程的启动。

EventLoop的启动方法:startThread

上面找到了startThread调用的调用链哈,这里看下startThread()方法

 SingleThreadEventExecutor.javaprivate void startThread() {if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {delayedTaskQueue.add(new ScheduledFutureTask<Void>(this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));thread.start();}}}

上面startThread方法中的 STATE_UPDATER 是 SingleThreadEventExecutor 内部维护的一个属性, 它的作用是标识当前的 thread 的状态. 在初始的时候, STATE_UPDATER == ST_NOT_STARTED, 因此第一次调用 startThread() 方法时, 就会进入到 if 语句内, 进而调用到 thread.start()完成NioEventLoop所对应的 Java线程的启动.

以上就完成了NioEventLoop所对应的Java线程的启动。那么启动之后,该线程主要用于干什么呢?

这里先说结论:在 Netty 中, 一个 EventLoop 需要负责两个工作, 第一个是作为 IO 线程, 负责相应的 IO 操作; 第二个是作为任务线程, 执行 taskQueue 中的任务.

上面的分析中调用startThread()方法继而调用thread.start()来启动EventLoop 所对应的Java 线程。下面来线程启动具体做了什么哈,也就是看下如下的run方法主要干了什么哈,

        thread = threadFactory.newThread(new Runnable() {@Overridepublic void run() {//...SingleThreadEventExecutor.this.run();//调用了NioEventLoop类中的run方法,下面将主要分析//...});      

上面的源码本来很长很长,上面是精简版本,简单来说,这里我们只关注一行代码: SingleThreadEventExecutor.this.run(),这里就是调用了NioEventLoop的run方法,下面继续跟。

NioEventLoop.run()方法

NioEventLoop类中的run方法的代码如下:

    @Overrideprotected void run() {for (;;) {boolean oldWakenUp = wakenUp.getAndSet(false);try {// 当有任务时为了保证任务及时执行采用不阻塞的selectNow获取准备好I/O的连接 if (hasTasks()) {selectNow();} else {// 当无任务时采用阻塞等待的方式获取连接  select(oldWakenUp);if (wakenUp.get()) {selector.wakeup();}}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {processSelectedKeys();//分析runAllTasks();//分析} else {final long ioStartTime = System.nanoTime();processSelectedKeys();final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}if (isShuttingDown()) {closeAll();if (confirmShutdown()) {break

这篇关于Netty源码分析:NioEventLoop启动以及其IO操作和Task任务的处理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/987663

相关文章

怎样通过分析GC日志来定位Java进程的内存问题

《怎样通过分析GC日志来定位Java进程的内存问题》:本文主要介绍怎样通过分析GC日志来定位Java进程的内存问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、GC 日志基础配置1. 启用详细 GC 日志2. 不同收集器的日志格式二、关键指标与分析维度1.

Python实现对阿里云OSS对象存储的操作详解

《Python实现对阿里云OSS对象存储的操作详解》这篇文章主要为大家详细介绍了Python实现对阿里云OSS对象存储的操作相关知识,包括连接,上传,下载,列举等功能,感兴趣的小伙伴可以了解下... 目录一、直接使用代码二、详细使用1. 环境准备2. 初始化配置3. bucket配置创建4. 文件上传到os

mysql表操作与查询功能详解

《mysql表操作与查询功能详解》本文系统讲解MySQL表操作与查询,涵盖创建、修改、复制表语法,基本查询结构及WHERE、GROUPBY等子句,本文结合实例代码给大家介绍的非常详细,感兴趣的朋友跟随... 目录01.表的操作1.1表操作概览1.2创建表1.3修改表1.4复制表02.基本查询操作2.1 SE

MySQL中的表连接原理分析

《MySQL中的表连接原理分析》:本文主要介绍MySQL中的表连接原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、环境3、表连接原理【1】驱动表和被驱动表【2】内连接【3】外连接【4编程】嵌套循环连接【5】join buffer4、总结1、背景

Golang如何对cron进行二次封装实现指定时间执行定时任务

《Golang如何对cron进行二次封装实现指定时间执行定时任务》:本文主要介绍Golang如何对cron进行二次封装实现指定时间执行定时任务问题,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录背景cron库下载代码示例【1】结构体定义【2】定时任务开启【3】使用示例【4】控制台输出总结背景

c++中的set容器介绍及操作大全

《c++中的set容器介绍及操作大全》:本文主要介绍c++中的set容器介绍及操作大全,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录​​一、核心特性​​️ ​​二、基本操作​​​​1. 初始化与赋值​​​​2. 增删查操作​​​​3. 遍历方

在Golang中实现定时任务的几种高效方法

《在Golang中实现定时任务的几种高效方法》本文将详细介绍在Golang中实现定时任务的几种高效方法,包括time包中的Ticker和Timer、第三方库cron的使用,以及基于channel和go... 目录背景介绍目的和范围预期读者文档结构概述术语表核心概念与联系故事引入核心概念解释核心概念之间的关系

MySQL追踪数据库表更新操作来源的全面指南

《MySQL追踪数据库表更新操作来源的全面指南》本文将以一个具体问题为例,如何监测哪个IP来源对数据库表statistics_test进行了UPDATE操作,文内探讨了多种方法,并提供了详细的代码... 目录引言1. 为什么需要监控数据库更新操作2. 方法1:启用数据库审计日志(1)mysql/mariad

springboot如何通过http动态操作xxl-job任务

《springboot如何通过http动态操作xxl-job任务》:本文主要介绍springboot如何通过http动态操作xxl-job任务的问题,具有很好的参考价值,希望对大家有所帮助,如有错... 目录springboot通过http动态操作xxl-job任务一、maven依赖二、配置文件三、xxl-

python中Hash使用场景分析

《python中Hash使用场景分析》Python的hash()函数用于获取对象哈希值,常用于字典和集合,不可变类型可哈希,可变类型不可,常见算法包括除法、乘法、平方取中和随机数哈希,各有优缺点,需根... 目录python中的 Hash除法哈希算法乘法哈希算法平方取中法随机数哈希算法小结在Python中,