阐述Dubbo服务提供方的解码原理

2024-05-09 03:36

本文主要是介绍阐述Dubbo服务提供方的解码原理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1 解码概述

在Dubbo服务消费方(客户端)和服务提供方(服务端)进行网络通信时,服务提供方会通过socket把需要发送的内容序列化为二进制流后发出。接着二进制流通过网络流向服务提供方。服务提供方接收到该请求后会解析该请求包,对接收到的数据进行反序列化后对请求进行处理。

服务提供方接收到请求后解析请求包(即Dubbo协议的内容)的过程即服务提供方的解码过程。

2 解码原理解析

2.1 解码入口

服务提供方解析请求包最终是借助于 InternalDecoder 的 decode 方法来实现的。具体实现代码如下所示。

protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {ChannelBuffer message = new NettyBackedChannelBuffer(input);NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);// decode object.do {int saveReaderIndex = message.readerIndex();Object msg = codec.decode(channel, message);if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {message.readerIndex(saveReaderIndex);break;} else {//is it possible to go here ?if (saveReaderIndex == message.readerIndex()) {throw new IOException("Decode without read data.");}if (msg != null) {out.add(msg);}}} while (message.readable());
}

2.2 解码实现细节

在 InternalDecoder 的 decode 方法内部再调用了具体解码实现来进行解码。如 ExchangeCodec 的 decode 方法,实现如下所示。

public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {// 获取dubbo协议帧的字节数int readable = buffer.readableBytes();// 将dubbo协议头读取到数组headerbyte[] header = new byte[Math.min(readable, HEADER_LENGTH)];buffer.readBytes(header);// 解析dubbo协议帧数据部分return decode(channel, buffer, readable, header);
}protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {// check magic number.if (readable > 0 && header[0] != MAGIC_HIGH|| readable > 1 && header[1] != MAGIC_LOW) {int length = header.length;if (header.length < readable) {header = Bytes.copyOf(header, readable);buffer.readBytes(header, length, readable - length);}for (int i = 1; i < header.length - 1; i++) {if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {buffer.readerIndex(buffer.readerIndex() - header.length + i);header = Bytes.copyOf(header, i);break;}}return super.decode(channel, buffer, readable, header);}// check length.// (1)dubbo协议帧实际字节数小于协议头长度,说明该协议帧不是一个完整的协议帧if (readable < HEADER_LENGTH) {return DecodeResult.NEED_MORE_INPUT;}// get data length.int len = Bytes.bytes2int(header, 12);// When receiving response, how to exceed the length, then directly construct a response to the client.// see more detail from https://github.com/apache/dubbo/issues/7021.Object obj = finishRespWhenOverPayload(channel, len, header);if (null != obj) {return obj;}// dubbo协议帧理论上的字节数int tt = len + HEADER_LENGTH;// (2)dubbo协议帧实际字节数小于理论上的字节数,说明该协议帧不是一个完整的协议帧if (readable < tt) {return DecodeResult.NEED_MORE_INPUT;}// limit input stream.ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);try {return decodeBody(channel, is, header);} finally {if (is.available() > 0) {try {if (logger.isWarnEnabled()) {logger.warn(TRANSPORT_SKIP_UNUSED_STREAM, "", "", "Skip input stream " + is.available());}StreamUtils.skipUnusedStream(is);} catch (IOException e) {logger.warn(TRANSPORT_SKIP_UNUSED_STREAM, "", "", e.getMessage(), e);}}}
}

其中将dubbo协议头读取到header数组的实现如下所示。

    @Overridepublic void readBytes(byte[] dst) {readBytes(dst, 0, dst.length);}

对dubbo协议body进行解析则是调用了 decodeBody 方法,实现如下所示。

protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);// get request id.long id = Bytes.bytes2long(header, 4);if ((flag & FLAG_REQUEST) == 0) {// decode response.Response res = new Response(id);if ((flag & FLAG_EVENT) != 0) {res.setEvent(true);}// get status.byte status = header[3];res.setStatus(status);try {if (status == Response.OK) {Object data;if (res.isEvent()) {byte[] eventPayload = CodecSupport.getPayload(is);if (CodecSupport.isHeartBeat(eventPayload, proto)) {// heart beat response data is always null;data = null;} else {data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload);}} else {data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(channel, res, id));}res.setResult(data);} else {res.setErrorMessage(CodecSupport.deserialize(channel.getUrl(), is, proto).readUTF());}} catch (Throwable t) {res.setStatus(Response.CLIENT_ERROR);res.setErrorMessage(StringUtils.toString(t));}return res;} else {// decode request.Request req;try {Object data;if ((flag & FLAG_EVENT) != 0) {byte[] eventPayload = CodecSupport.getPayload(is);if (CodecSupport.isHeartBeat(eventPayload, proto)) {// heart beat response data is always null;req = new HeartBeatRequest(id);((HeartBeatRequest) req).setProto(proto);data = null;} else {req = new Request(id);data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload);}req.setEvent(true);} else {req = new Request(id);data = decodeRequestData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));}req.setData(data);} catch (Throwable t) {// bad requestreq = new Request(id);req.setBroken(true);req.setData(t);}req.setVersion(Version.getProtocolVersion());req.setTwoWay((flag & FLAG_TWOWAY) != 0);return req;}
}

这篇关于阐述Dubbo服务提供方的解码原理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/972274

相关文章

SysMain服务可以关吗? 解决SysMain服务导致的高CPU使用率问题

《SysMain服务可以关吗?解决SysMain服务导致的高CPU使用率问题》SysMain服务是超级预读取,该服务会记录您打开应用程序的模式,并预先将它们加载到内存中以节省时间,但它可能占用大量... 在使用电脑的过程中,CPU使用率居高不下是许多用户都遇到过的问题,其中名为SysMain的服务往往是罪魁

Python中的filter() 函数的工作原理及应用技巧

《Python中的filter()函数的工作原理及应用技巧》Python的filter()函数用于筛选序列元素,返回迭代器,适合函数式编程,相比列表推导式,内存更优,尤其适用于大数据集,结合lamb... 目录前言一、基本概念基本语法二、使用方式1. 使用 lambda 函数2. 使用普通函数3. 使用 N

MyBatis-Plus 与 Spring Boot 集成原理实战示例

《MyBatis-Plus与SpringBoot集成原理实战示例》MyBatis-Plus通过自动配置与核心组件集成SpringBoot实现零配置,提供分页、逻辑删除等插件化功能,增强MyBa... 目录 一、MyBATis-Plus 简介 二、集成方式(Spring Boot)1. 引入依赖 三、核心机制

Java 中编码与解码的具体实现方法

《Java中编码与解码的具体实现方法》在Java中,字符编码与解码是处理数据的重要组成部分,正确的编码和解码可以确保字符数据在存储、传输、读取时不会出现乱码,本文将详细介绍Java中字符编码与解码的... 目录Java 中编码与解码的实现详解1. 什么是字符编码与解码?1.1 字符编码(Encoding)1

redis和redission分布式锁原理及区别说明

《redis和redission分布式锁原理及区别说明》文章对比了synchronized、乐观锁、Redis分布式锁及Redission锁的原理与区别,指出在集群环境下synchronized失效,... 目录Redis和redission分布式锁原理及区别1、有的同伴想到了synchronized关键字

解决若依微服务框架启动报错的问题

《解决若依微服务框架启动报错的问题》Invalidboundstatement错误通常由MyBatis映射文件未正确加载或Nacos配置未读取导致,需检查XML的namespace与方法ID是否匹配,... 目录ruoyi-system模块报错报错详情nacos文件目录总结ruoyi-systnGLNYpe

Linux中的HTTPS协议原理分析

《Linux中的HTTPS协议原理分析》文章解释了HTTPS的必要性:HTTP明文传输易被篡改和劫持,HTTPS通过非对称加密协商对称密钥、CA证书认证和混合加密机制,有效防范中间人攻击,保障通信安全... 目录一、什么是加密和解密?二、为什么需要加密?三、常见的加密方式3.1 对称加密3.2非对称加密四、

setsid 命令工作原理和使用案例介绍

《setsid命令工作原理和使用案例介绍》setsid命令在Linux中创建独立会话,使进程脱离终端运行,适用于守护进程和后台任务,通过重定向输出和确保权限,可有效管理长时间运行的进程,本文给大家介... 目录setsid 命令介绍和使用案例基本介绍基本语法主要特点命令参数使用案例1. 在后台运行命令2.

Nginx进行平滑升级的实战指南(不中断服务版本更新)

《Nginx进行平滑升级的实战指南(不中断服务版本更新)》Nginx的平滑升级(也称为热升级)是一种在不停止服务的情况下更新Nginx版本或添加模块的方法,这种升级方式确保了服务的高可用性,避免了因升... 目录一.下载并编译新版Nginx1.下载解压2.编译二.替换可执行文件,并平滑升级1.替换可执行文件

Spring Security 单点登录与自动登录机制的实现原理

《SpringSecurity单点登录与自动登录机制的实现原理》本文探讨SpringSecurity实现单点登录(SSO)与自动登录机制,涵盖JWT跨系统认证、RememberMe持久化Token... 目录一、核心概念解析1.1 单点登录(SSO)1.2 自动登录(Remember Me)二、代码分析三、