Netty学习——源码篇5 EventLoop

2024-03-25 12:36
文章标签 源码 学习 eventloop netty

本文主要是介绍Netty学习——源码篇5 EventLoop,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1 Reactor线程模型

        Reactor线程模型 中对Reactor的三种线程模型——单线程模型、多线程模型、主从多线程模型做了介绍,这里具体分析Reactor在Netty中的应用。

1.1单线程模型

单线程模型处理流程如下图:

        单线程模型,即Accept的处理和Handler的处理都在同一个线程中。这个模型的弊端是:当其中某个Handler阻塞时,会导致其他所有的Client的Handler都无法执行,并且更严重是,Handler的阻塞也会导致整个服务不能接收新的Client请求(因为Accept也被阻塞了)。 因为这个缺陷,所以单线程Reactor模型在Netty中的应用场景比较少。

1.2 多线程模型

        Netty中Reactor多线程模型的应用如如下图:

        1、设计一个专门的线程Accept,用于监听客户端的TCP连接请求。

        2、客户端的I/O操作都是由一个特定的NIO线程池负责。每个客户端连接都与一个特定的NIO线程绑定,因此在这个客户端连接中的所有I/O操作都是在同一个线程中完成的。

        3、客户端连接有很多,但是NIO线程数是比较少的,因此一个NIO线程可以同时绑定到多个客户端连接中。

1.3 主从多线程模型

        主从Reactor多线程模型在Netty中的应用,如下图

             一般情况下,Reactor的多线程模型已经适用于大部分业务场景。但如果服务端需要同时处理大量的客户端连接请求,或者需要再客户端连接时增加一些诸如权限的校验等操作,那么单个Accept就很有可能处理不过来,将会造成大量的客户端连接超时。主从Reactor多线程模型将服务端接收客户端的连接请求专门设计为一个独立的连接池。主从Reactor多线程模型和Reactor多线程模型很类似,只是在主动Reactor多线程模型的Accept线程池中获取数据,通过认证鉴权后进行派遣,再分配给Reactor线程池来处理客户端请求。

2 EventLoopGroup与Reactor关联

        不同的设置NioEventLoopGroup的方式对应了不同的Reactor线程模型。

        1、单线程模型在Netty中的应用代码如下:

        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup);

            首先实例化一个NioEventLoopGroup,接着调用bootstrap.group(bossGroup)设置服务端的EventLoopGroup。这里有个疑惑:在启动服务端的Netty程序时,需要设置bossGroup和workerGroup,为什么这里只设置了1个bossGroup?原因很简单,ServerBootstrap重写了group方法,代码如下:

@Overridepublic ServerBootstrap group(EventLoopGroup group) {return group(group, group);}

        因此,当传入一个group时,bossGroup和workerGroup就是同一个NioEventLoopGrouop,并且这个NioEventLoopGroup线程池数量只设置了1个线程,也就是说Netty中的Acceptor和后续的所有客户端连接的I/O操作都是在一个线程中处理的。那么对应到Reactor的线程模型中,这样设置NioEventLoopGroup,就相当于Reactor的单线程模式。

        2、多线程在Netty中的应用代码如下:

        NioEventLoopGroup bossGroup = new NioEventLoopGroup(16);ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup);

        从代码中可以看出,只需要将NioEventLoopGroup的参数设置大于1,就是Reactor多线程模型。

        3、主从Reactor模型在Netty中的应用代码如下:

        NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workGroup = new NioEventLoopGroup();ServerBootstrap server = new ServerBootstrap();server.group(bossGroup,workGroup);

        bossGroup为主线程,而workerGroup中的线程数是CPU核数*2,因此对应到Reactor线程模型中,这样设置的NioevGroup就是主从Reactor多线程模型。

3 EventLoopGroup的实例化

        首先,来看一下EventLoopGroup的类结构图,如下图:

        然后,在通过时序图来了解一下EventLoopGroup初始化的基本过程,如下图:

 

          基本步骤如下:

        1、EventLoopGroup内部维护一个属性为EventExecutor的children的数组,其大小是nThreads,这样就初始化一个线程池。

        2、在实例化NioEventLoopGroup时,如果指定县城池大小,则nThreads就是指定的值,否则是CPU核数 * 2。

        3、在MultithreadEventExecutorGroup中调用newChild()抽象方法来初始化children数组。

        4、newChild()抽象方法实际上是在NioEventLoopGroup中实现的,由它返回一个NioEventLoop实例。

        5、初始化NioEventLoop对象并给属性赋值,具体赋值属性如下:

                (1)provider:就是在NioEventLoopGroup构造器中,调用SelectorProvider的provider()方法获取的SelectorProvider对象。

                (2)selector:就是在NioEventLoop构造器中,调用selector=provider.openSelector()方法获取的Selector对象。

4 执行任务者EventLoop

        NioEventLoop继承自SingleThreadEventLoop,而SingleThreadEventLoop又继承自SingleThreadEventExecutor。SingleThreadEventExecutor是Netty对本地线程的抽象,它内部有一个Thread属性,实际上就是存储了一个本地Java 线程。因此可以简单的认为,一个NioEventLoop对象其实就是一个和特定的线程进行绑定,并且在NioEventLoop声明周期内,其绑定的线程都不会再改变,NioEventLoop的类层次结构图如下:

        NioEventLoop的类层次结构比较复杂,只需要关注重点即可。首先看NioEventLoop的继承关系:NioEventLoop继承SingleThreadEventLoop, SingleThreadEventLoop继承SingleThreadEventExecutor,SingleThreadEventExecutor继承AbstractScheduledEventExecutor。

        在AbstractScheduledEventExecutor,Netty实现了NioEventLoop的Schedule功能,即通过调用一个NioEventLoop实例的schedule方法来运行一些定时任务。而在SingleThreadEventLoop中,又实现了任务队列的功能。通过它,可以调用一个NioEventLoop实例的execute()方法向任务队列中添加一个Task,并由NioEventLoop进行调度执行。

        通常来说,NioEventLoop负责执行两个任务:第一个任务是作为I/O线程,执行与Channel相关的I/O操作,包括调用Selector等待就绪的I/O事件、读写数据与数据处理等;第二个任务是作为任务队列,执行taskQueue中的人物,例如用户调用eventLoop.schedule提交的定时任务也是由这个线程执行的。

4.1 NioEventLoop的实例化过程

        先了解一下EventLoop实例化的运行时序图,如下:

        从上图可以看出,SingleThreadEventExecutor有一个名为thread的Thread类型属性,这个属性就是与SingleThreadEventExecutor关联的本地线程。来看thread是在哪里被赋值的,代码如下:

private void doStartThread() {assert thread == null;executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {for (;;) {int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// Check if confirmShutdown() was called at the end of the loop.if (success && gracefulShutdownStartTime == 0) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +"before run() implementation terminates.");}try {// Run all remaining tasks and shutdown hooks.for (;;) {if (confirmShutdown()) {break;}}} finally {try {cleanup();} finally {STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.release();if (!taskQueue.isEmpty()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + taskQueue.size() + ')');}terminationFuture.setSuccess(null);}}}}});}

         前面分析过,SingleThreadEventExecutor启动时会调用doStartThread方法,然后调用executor.execute方法,将当前线程赋值给thread。在这个线程中所做的事情主要就是调用SingleThreadEventExecutor.this.run()方法,因为NioEventLoop实现了这个方法,所以根据多态性,其实调用的是NioEventLoop.run方法。

4.2 EventLoop与Channel关联

        在Netty中,每个Channel都有且仅有一个EventLoop与之关联,它们的关联过程如下图:

               

        从上图可以看到,当调用AbstractChannel.register()方法后,就完成了Channel和EventLoop的关联,register方法的具体实现如下:

        @Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {if (eventLoop == null) {throw new NullPointerException("eventLoop");}if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}}

        在register方法中,会将一个EventLoop赋值给AbstractChannel内部的eventLoop属性,这句代码就完成了EventLoop与Channel的关联过程。

4.3 EventLoop 启动

        前面已经介绍NioEventLoop本身就是一个SingleThreadEventExecutor,因此NioEventLoop的启动,其实就是NioEventLoop所绑定的本地Java线程的启动。

        按照这个思路,只需要找到在哪里调用了SingleThreadExecutor中thread属性的start方法就可以知道在哪里启动这个线程了。前面分析过,其实thread.start()被封装在SingleThreadExecutor.startThread()方法中,代码如下:

    private void startThread() {if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {doStartThread();}}}

        STATE_UPDATER是SingleThreadExecutor内部维护的一个属性,它的作用是标识当前的Thread的状态。在初始化的时候,STATE_UPDATER == ST_NOT_STARTED,因此第一次调用startThread方法时,就会进入if语句内,进而调用thread.start方法。而这个关键的startThread方法又是在哪调用的呢?用方法调用关系反向查找功能,就恢复阿贤,startTahread方法是在SingleThreadEventExecutor的execute方法中调用的,代码如下:

    @Overridepublic void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();if (inEventLoop) {addTask(task);} else {startThread();addTask(task);if (isShutdown() && removeTask(task)) {reject();}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}

        既然如此,现在只需要找到第一次调用SingleThreadEventExecutor的execute方法的位置即可。前面在提到注册Channel的过程中,会在AbstractChannel的register方法中调用eventLoop.execute方法,在EventLoop中进行Channel注册代码的执行,AbstractChannel的register方法的关键代码如下:

        @Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {//删除判断代码AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {//删除异常处理代码}}}

        很明显,从Boostrap的bind方法一路跟踪到AbstractChannel的register方法,整个代码都是在主线程中运行的,因此上面的eventLoop.inEventLoop()返回值为false,于是进入else分支,在这个分支中调用eventLoop.execute方法,而NioEventLoop没有实现execute方法,因此调用的是SingleThreadEventExecutor的execute方法,关键代码如下:

    @Overridepublic void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();if (inEventLoop) {addTask(task);} else {startThread();addTask(task);if (isShutdown() && removeTask(task)) {reject();}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}

       由于 inEventLoop ==false,因此直行道else分支就调用startThread方法来启动SingleThreadEventExecutor内部关联的Java本地线程。用一句话总结:当EventLoop的execute方法第一次被调用时,会触发startThread方法的调用,进而启动EventLoop所对应的Java本地线程。

        完整的EventLoop启动时序图如下:

这篇关于Netty学习——源码篇5 EventLoop的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/845030

相关文章

Go学习记录之runtime包深入解析

《Go学习记录之runtime包深入解析》Go语言runtime包管理运行时环境,涵盖goroutine调度、内存分配、垃圾回收、类型信息等核心功能,:本文主要介绍Go学习记录之runtime包的... 目录前言:一、runtime包内容学习1、作用:① Goroutine和并发控制:② 垃圾回收:③ 栈和

Android学习总结之Java和kotlin区别超详细分析

《Android学习总结之Java和kotlin区别超详细分析》Java和Kotlin都是用于Android开发的编程语言,它们各自具有独特的特点和优势,:本文主要介绍Android学习总结之Ja... 目录一、空安全机制真题 1:Kotlin 如何解决 Java 的 NullPointerExceptio

8种快速易用的Python Matplotlib数据可视化方法汇总(附源码)

《8种快速易用的PythonMatplotlib数据可视化方法汇总(附源码)》你是否曾经面对一堆复杂的数据,却不知道如何让它们变得直观易懂?别慌,Python的Matplotlib库是你数据可视化的... 目录引言1. 折线图(Line Plot)——趋势分析2. 柱状图(Bar Chart)——对比分析3

重新对Java的类加载器的学习方式

《重新对Java的类加载器的学习方式》:本文主要介绍重新对Java的类加载器的学习方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、介绍1.1、简介1.2、符号引用和直接引用1、符号引用2、直接引用3、符号转直接的过程2、加载流程3、类加载的分类3.1、显示

Android实现一键录屏功能(附源码)

《Android实现一键录屏功能(附源码)》在Android5.0及以上版本,系统提供了MediaProjectionAPI,允许应用在用户授权下录制屏幕内容并输出到视频文件,所以本文将基于此实现一个... 目录一、项目介绍二、相关技术与原理三、系统权限与用户授权四、项目架构与流程五、环境配置与依赖六、完整

Android实现定时任务的几种方式汇总(附源码)

《Android实现定时任务的几种方式汇总(附源码)》在Android应用中,定时任务(ScheduledTask)的需求几乎无处不在:从定时刷新数据、定时备份、定时推送通知,到夜间静默下载、循环执行... 目录一、项目介绍1. 背景与意义二、相关基础知识与系统约束三、方案一:Handler.postDel

Java学习手册之Filter和Listener使用方法

《Java学习手册之Filter和Listener使用方法》:本文主要介绍Java学习手册之Filter和Listener使用方法的相关资料,Filter是一种拦截器,可以在请求到达Servl... 目录一、Filter(过滤器)1. Filter 的工作原理2. Filter 的配置与使用二、Listen

Java 正则表达式URL 匹配与源码全解析

《Java正则表达式URL匹配与源码全解析》在Web应用开发中,我们经常需要对URL进行格式验证,今天我们结合Java的Pattern和Matcher类,深入理解正则表达式在实际应用中... 目录1.正则表达式分解:2. 添加域名匹配 (2)3. 添加路径和查询参数匹配 (3) 4. 最终优化版本5.设计思

Java调用C++动态库超详细步骤讲解(附源码)

《Java调用C++动态库超详细步骤讲解(附源码)》C语言因其高效和接近硬件的特性,时常会被用在性能要求较高或者需要直接操作硬件的场合,:本文主要介绍Java调用C++动态库的相关资料,文中通过代... 目录一、直接调用C++库第一步:动态库生成(vs2017+qt5.12.10)第二步:Java调用C++

Python实现无痛修改第三方库源码的方法详解

《Python实现无痛修改第三方库源码的方法详解》很多时候,我们下载的第三方库是不会有需求不满足的情况,但也有极少的情况,第三方库没有兼顾到需求,本文将介绍几个修改源码的操作,大家可以根据需求进行选择... 目录需求不符合模拟示例 1. 修改源文件2. 继承修改3. 猴子补丁4. 追踪局部变量需求不符合很