dubbo remoting 层分析

2023-10-20 15:18
文章标签 分析 dubbo remoting

本文主要是介绍dubbo remoting 层分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

remote 层关注 transport 和 exchange 这两个包即可. 因为这两个包下封装了通讯相关的内容.

先来补充点额外知识:

1.Endpoint 是端点的概念,我们可以看到,对其抽象,最主要的是一个 send 方法,用于发送数据. 但是我有一个疑问?为啥 Endpoint 不抽象出 receive ?

2.Endpoint 的 send 方法和 ChannelHandler 的 sent 方法的区别和联系?

刚刚我看了下 netty4 下面的 NettyServerHandler 和 NettyChannel,发现 ChannelHandler 中定义的 sent 方法、receive 方法和 Netty Channel 的实现有关,而 NettyChannel 中的 send 方法是和数据发送有关的.

换句话说:NettyChannel 中的 send 方法在发送的时候,会调用 ChannelHandler 中定义的对应方法进行处理.

其实上面问的 Endpoint 为啥不抽象出 receive?其实这个问题就很怪异. 因为 send 方法是一个主动的过程,我发送一个消息,主动的调用下就好,但是我收到一个消息,就不是主动调用下就可以完成的。或许有人会说,可以向 socket 编程那样阻塞住啊。但是仔细想想,netty 不是有 handler 可以在收到消息时进行处理吗?所以在 Endpoint 接口中抽象 receive 方法很鸡肋.

channel 是通讯的载体,信息通过 channel 进行发送和接收. channel 继承 Endpoint. 同时定义了和 attribute 相关的方法.

channelHandler 和 Netty 中的 channelHandler 相关联,是对 channel 的逻辑处理,同时该接口是一个可扩展接口.

client 继承 Endpoint,Channel,Resetable,IdleSensible 接口,客户端同样是一个端点,需要注意的是,客户端可以重连服务端.

server 继承 Endpoint,Resetable,IdleSensible 接口,可以返回所有连接到该服务端的所有客户端,同时可以根据 ip 地址获取具体的客户端.

Codec2 是编解码类

Decodeable 解码接口,用来判断消息是否可以被解码等

Dispatcher 接口用于调度,选择不同的 handler 来进行处理(将消息派发到线程池进行处理).

Transport 接口用于获取 Client 和 Server.

先去网上盗一张图.

下面我们按照上图进行分析:

AbstractPeer 实现 Endpoint 和 ChannelHandler. 主要是增加了对通道是否关闭的判断.

AbstractEndpoint 继承 AbstractPeer 类,增加了 codec2,超时时间,连接超时时间等属性.

AbstractServer 类继承 AbstractEndpoint 以及实现 Server 接口. 这在一方面印证了 Server 其实是一个端.

1.AbstractServer 中的 executor 是在哪个地方用到的了?首先我们看到在构造方法中看到其从 dataStore 中获取 executor. 那么是在哪里放进去的了?我们可以明确的从 WrappedChannelHandler 类的构造方法中看到这里对 executor 进行了赋值,如下:
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

    String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {componentKey = CONSUMER_SIDE;}DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}

2.现在分析下这里的 executor 干了个啥事了?优雅关机.
优雅停机是啥意思了?核心业务在服务器中正在执行时中断,可能出现严重后果,所以需要等待服务执行完后关闭.
dubbo 是如何做的了?
1.收到 kill 9 进程退出信号,Spring 容器会出发容器销毁事件
2.privoder 端会取消服务元数据信息
3.consumer 端会收到最新地址列表(不包含准备停机的地址)
4.Dubbo 协议会发送 readonly 事件报文通知 consumer 服务不可用
5.服务端等待已经执行的任务结束并拒绝新任务执行

AbstractServer 最主要是实现了 send 和 close 方法. 同时值得注意的是该构造方法调用了 doOpen 方法打开一个服务端.

AbstractClient 继承 AbstractEndpoint 并实现 Client 接口,也就是说 AbstractClient 在语义上是一个端. 同时值得注意的是该构造方法调用了 doOpen 方法和 connect 方法,完成客户端的初始化工作. 我们需要注意下,AbstractClient 中的 wrapChannelHandler 对 handler 的语义进行了增强,增加了心跳和多消息处理 handler. AbstractClient 的 send 方法和 AbstractServer 的 send 方法在实现的时候是不同的,Server 端的 send 方法是遍历 channel,然后调用每一个 channel 进行发送. Client 端只是调用 channel.send 进行发送.

connect 方法加锁了,避免了多线程同时对一个 channel 调用 connect 方法. Client 主要实现 connect、disconnect、reconnect 及 close 的功能.

AbstractChannel 继承 AbstractPeer,实现 Channel 接口. Channel 在 dubbo 看来,也是一个 Endpoint. 是数据的载体,客户端和服务端通过 channel 发送和接收数据,需要注意的是:AbstractPeer 实现了 ChannelHandler 接口.

ChannelHandlerDelegate 作为装饰者模式的 component. 该接口只定义了一个 getHandler 方法.

AbstractChannelHandlerDelegate 装饰角色,实现的方法都是直接调用成员变量 handler 中的方法.

DecodeHandler 继承 AbstractChannelHandlerDelegate,重写了 handler 的 receive 方法. 在接收到消息的时候进行了判断,如果消息可解码,则直接调用 decode 方法,如果消息是 request 类型的,则对数据部分进行解码,如果消息是 response 类型的,则对结果进行解码. 解码完成后,向上进行传递,换句话说可以无限对 handler 的功能进行增强. 其实这个设计和责任链的设计是两种思路吧,一种是责任链的方式,一种是装饰者模式(任何被解码的消息都必须可以被解码,即:实现 Decodeable 接口).

MultiMessageHandler 同样实现 AbstractChannelHandlerDelegate 接口,但是可以处理 MultiMessage. 我们可以把 MultiMessage 看成是一个聚合的消息(ArrayList). 所以处理的时候也是简单粗暴,直接遍历,然后调用 received 方法.

WrappedChannelHandler 同样是装饰者角色,同样是直接调用成员变量 handler 中的方法. 但是值得注意的是:该类增加了线程池,它的子类都是关心那些消息需要交给线程池处理,那些消息直接交由 I/O 线程处理.

public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

    String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {componentKey = CONSUMER_SIDE;}DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}

MessageOnlyChannelHandler 只有请求响应消息分发到线程池,其他连接断开、心跳等事件在 I/O 线程上执行. MessageOnlyChannelHandler 在重写 received 方法时,并没有 AllChannelHandler 中的那样处理,为什么了?因为该类抛出的异常,会在 HeaderExchangeHandler.caught 方法中进行统一处理.

ExecutionChannelHandler 只有请求消息派发到线程池(不含响应),响应、断开连接、心跳等事件在 I/O 线程上执行. 但是我们看到这里执行的时候,又和 MessageOnlyChannelHandler 中的处理方式不同?为什么了?因为 MessageOnlyChannelHandler 中不仅要处理 request,还要处理 response. 所以进行了统一的处理,但是 ExecutionChannelHandler 只需要在线程池中处理 request,所以就可以和 AllDispatcher 一样了(不必须).

AllChannelHandler 所有消息都派发到线程池,包括请求、响应、连接事件、断开事件、心跳等事件. 关于使用 AllDispatcher 的时候,在早期版本 dubbo provider 线程池可能被 exhausted. 这是由于异常也需要使用线程池来进行处理,但是线程池还是满的,所以无法进一步处理,consumer 只能死等,直至超时. 如何规避了?我们看到在 dubbo2.7 中,received 方法收到消息后,如果线程池抛出了拒绝异常,且这是一个请求,那么将会收到服务端线程池被打满的消息.

direct 所有消息都不派发到线程池,直接在 I/O 线程上执行.

ConnectionOrderedChannelHandler 在 I/O 线程上,将连接、断开事件放入队列,有序逐个执行,其他事件派发到线程池. 需要说一下的是:该类有自己的线程池 connectionExecutor. 因为该类的 caught 方法同样将任务分给了线程池,所以其 received 方法,必须要进行判断处理,如下:

public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//fix, reject exception can not be sent to consumer because thread pool is full, resulting in consumers waiting till timeout.
if (message instanceof Request && t instanceof RejectedExecutionException) {
Request request = (Request) message;
if (request.isTwoWay()) {
String msg = “Server side(” + url.getIp() + “,” + url.getPort() + “) threadpool is exhausted ,detail msg:” + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}

Dispatcher 就不说了,直接 new 对应的 handler.

ChannelHandlers 类是对 Dispatcher 返回的 channelHandler 的功能进行了增强,增加了处理心跳和多消息的 handler.

AbstractCodec 实现 Codec2 接口,主要定义了检查数据长度方法、判断是客户端侧,还是服务端侧方法,获取序列化方式方法.

TransportCodec 继承 AbstractCodec,主要对 encode 方法、decode 方法进行了重写. 实现序列化和反序列化的功能.

CodecAdapter 将 Codec 适配为 Codec2.

ChannelDelegate 实现 Channel 接口,为 Channel 的装饰角色.

ClientDelegate 实现 Client 接口,为 Client 的装饰角色.

ServerDelegate 实现 Server 接口,为 Server 的装饰角色.

ChannelHandlerAdapter 实现 ChannelHandler 接口,实现的都是空方法,方便子类重写.

ChannelHandlerDispatcher 为 ChannelHandler 的调度器,该类有一个成员变量 channelHandlers,用于保存 channelHandlers. 在实现 connected、disconnected 等方法的时候,都会遍历这个属性列表,依次调用.

CodecSupport 编解码工具类,用于查询序列化方式.

参考:
① https://segmentfault.com/a/1190000017390253#articleHeader28

这篇关于dubbo remoting 层分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/247931

相关文章

Android 缓存日志Logcat导出与分析最佳实践

《Android缓存日志Logcat导出与分析最佳实践》本文全面介绍AndroidLogcat缓存日志的导出与分析方法,涵盖按进程、缓冲区类型及日志级别过滤,自动化工具使用,常见问题解决方案和最佳实... 目录android 缓存日志(Logcat)导出与分析全攻略为什么要导出缓存日志?按需过滤导出1. 按

Linux中的HTTPS协议原理分析

《Linux中的HTTPS协议原理分析》文章解释了HTTPS的必要性:HTTP明文传输易被篡改和劫持,HTTPS通过非对称加密协商对称密钥、CA证书认证和混合加密机制,有效防范中间人攻击,保障通信安全... 目录一、什么是加密和解密?二、为什么需要加密?三、常见的加密方式3.1 对称加密3.2非对称加密四、

MySQL中读写分离方案对比分析与选型建议

《MySQL中读写分离方案对比分析与选型建议》MySQL读写分离是提升数据库可用性和性能的常见手段,本文将围绕现实生产环境中常见的几种读写分离模式进行系统对比,希望对大家有所帮助... 目录一、问题背景介绍二、多种解决方案对比2.1 原生mysql主从复制2.2 Proxy层中间件:ProxySQL2.3

python使用Akshare与Streamlit实现股票估值分析教程(图文代码)

《python使用Akshare与Streamlit实现股票估值分析教程(图文代码)》入职测试中的一道题,要求:从Akshare下载某一个股票近十年的财务报表包括,资产负债表,利润表,现金流量表,保存... 目录一、前言二、核心知识点梳理1、Akshare数据获取2、Pandas数据处理3、Matplotl

python panda库从基础到高级操作分析

《pythonpanda库从基础到高级操作分析》本文介绍了Pandas库的核心功能,包括处理结构化数据的Series和DataFrame数据结构,数据读取、清洗、分组聚合、合并、时间序列分析及大数据... 目录1. Pandas 概述2. 基本操作:数据读取与查看3. 索引操作:精准定位数据4. Group

MySQL中EXISTS与IN用法使用与对比分析

《MySQL中EXISTS与IN用法使用与对比分析》在MySQL中,EXISTS和IN都用于子查询中根据另一个查询的结果来过滤主查询的记录,本文将基于工作原理、效率和应用场景进行全面对比... 目录一、基本用法详解1. IN 运算符2. EXISTS 运算符二、EXISTS 与 IN 的选择策略三、性能对比

MySQL 内存使用率常用分析语句

《MySQL内存使用率常用分析语句》用户整理了MySQL内存占用过高的分析方法,涵盖操作系统层确认及数据库层bufferpool、内存模块差值、线程状态、performance_schema性能数据... 目录一、 OS层二、 DB层1. 全局情况2. 内存占js用详情最近连续遇到mysql内存占用过高导致

深度解析Nginx日志分析与499状态码问题解决

《深度解析Nginx日志分析与499状态码问题解决》在Web服务器运维和性能优化过程中,Nginx日志是排查问题的重要依据,本文将围绕Nginx日志分析、499状态码的成因、排查方法及解决方案展开讨论... 目录前言1. Nginx日志基础1.1 Nginx日志存放位置1.2 Nginx日志格式2. 499

Olingo分析和实践之EDM 辅助序列化器详解(最佳实践)

《Olingo分析和实践之EDM辅助序列化器详解(最佳实践)》EDM辅助序列化器是ApacheOlingoOData框架中无需完整EDM模型的智能序列化工具,通过运行时类型推断实现灵活数据转换,适用... 目录概念与定义什么是 EDM 辅助序列化器?核心概念设计目标核心特点1. EDM 信息可选2. 智能类

Olingo分析和实践之OData框架核心组件初始化(关键步骤)

《Olingo分析和实践之OData框架核心组件初始化(关键步骤)》ODataSpringBootService通过初始化OData实例和服务元数据,构建框架核心能力与数据模型结构,实现序列化、URI... 目录概述第一步:OData实例创建1.1 OData.newInstance() 详细分析1.1.1