Netty是如何解决TCP的粘包和拆包问题的?

2023-12-27 18:50

本文主要是介绍Netty是如何解决TCP的粘包和拆包问题的?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

TCP是个“流协议”,所谓流,就是没有界限的一连串数据,没有界限。TCP底层不了解业务数据的含义,它会根据TCP缓冲区的实际情况进行包的划分,所以业务上认为,一个完整的包可能被TCP拆分为多个包进行发送,也可能把多个小包封装成一个大的数据包进行发送,这就是所谓的TCP粘包和拆包问题。

Netty如何解决TCP的粘包和拆包问题

  • 1. TCP粘包/拆包介绍
    • 1.1 什么是TCP粘包/拆包
    • 1.2 TCP为什么会粘包/拆包
    • 1.3 如何解决
  • 2. TCP粘包/拆包问题演示
    • 2.1 服务端代码
    • 2.2 客户端代码
    • 2.3 运行结果
  • 3. Netty解决TCP粘包/拆包问题示例
    • 3.1 服务端代码
    • 3.2 客户端代码
    • 3.3 运行结果
    • 3.4 工作原理

1. TCP粘包/拆包介绍

1.1 什么是TCP粘包/拆包

为了更加清楚的介绍这个问题,我们使用一个实例来演示。

假设客户端向服务端发送两个数据包:第一个内容为 abc;第二个内容为edf。
服务端接受一个数据并做相应的业务处理。为了简单起见,服务端将受到的内容添加星号*之后打印。

那么服务端输出结果将会出现下面四种情况:

服务端输出结果结论
abc* def*正常接受,没有发生粘包/拆包
abcdef*异常接受,发生粘包
abc* d* ef*异常接受,发生拆包
ab* cdef*异常接受,发生粘包和拆包

通过这个例子我们可以很好的了解TCP的粘包/拆包,本质上就是多次发送的数据之间发生重叠,或者单次发送的数据会发送拆分。

那为什么会发生这种情况呢?

1.2 TCP为什么会粘包/拆包

我们知道,TCP是以一种流的方式来进行网络转播的,当TCP三次握手简历通信后,客户端服务端之间就建立了一种通讯管道,我们可以想象成自来水管道,流出来的水是连城一片的,是没有分界线的。TCP底层并不了解上层的业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分。

所以对于我们应用层而言。我们直观是发送一个个连续完整TCP数据包的,而在底层就可能会出现将一个完整的TCP拆分成多个包发送或者将多个包封装成一个大的数据包发送。

具体来说,发生这种情况主要有以下三个原因:

  • 应用程序写入的字节大小大于套接口发送缓冲区大小;
  • 进行MSS大小的TCP分段;
  • 以太网帧的payload大于MTU进行IP分片。

1.3 如何解决

主流的协议解决方案可以归纳如下:

  • 消息定长,例如每个报文的大小固定为20个字节,如果不够,空位补空格;
  • 在包尾增加回车换行符进行切割;
  • 将消息分为消息头和消息体,消息头中包含表示消息总长度的字段;
  • 更复杂的应用层协议。

2. TCP粘包/拆包问题演示

下面我们将基于Netty来演示一下TCP粘包/拆包问题出现的场景以及结果。我们针对服务端和客户端分别进行介绍。

2.1 服务端代码

package netty.frame.tcpzjb.error;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/*** created by LMR on 2020/5/20*/
public class TimeServer {public static void main(String[] args) throws Exception{int port = 8080;new TimeServer().bind(port);}public void bind(int port) throws Exception{//配置服务端NIO线程组NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler());//绑定端口,同步等待成功ChannelFuture f = serverBootstrap.bind(port).sync();//等待服务端监听端口关闭f.channel().closeFuture().sync();}finally {//优雅退出,释放资源bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new TimeServerHandler());}}
}

TimeServer是服务端的启动类,该类与博客中的代码相同,在这里就不进行介绍,下面主要介绍其中实现服务端处理的TimeServerHandler类。

package netty.frame.tcpzjb.error;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/*** created by LMR on 2020/5/20*/
public class TimeServerHandler extends ChannelInboundHandlerAdapter {private int counter;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {//将消息转为bufByteBuf buf=(ByteBuf) msg;//创建一个buf长度的数组byte [] requestbyte=new byte[buf.readableBytes()];buf.readBytes(requestbyte);String request=new String(requestbyte,"utf-8").substring(0, requestbyte.length - System.getProperty("line.separator").length());System.out.println("The time server receive order : " + request+ " ; the counter is : " + ++counter);String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(request) ? newjava.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER";currentTime = currentTime + System.getProperty("line.separator");//响应给客户端ByteBuf resBuf= Unpooled.copiedBuffer(currentTime.getBytes());ctx.writeAndFlush(resBuf);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}

在TimeServerHandler中我们可以看到,服务端每读到一条消息,就计数一次,然后大宋消息给客户端。理论上来说,服务端受到的消息总数应该和客户端发送的数目相同,并且请求消息删除回车换行符后就不会换行。下面看看客户端的实现代码。

2.2 客户端代码

在这里我们就不再给出,客户端启动类代码,具体嗲吗可以见上一篇博客。下面给出客户端处理类的代码。

package netty.frame.tcpzjb.error;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/*** created by LMR on 2020/5/20*/
public class TimeClientHandler extends ChannelInboundHandlerAdapter {private int counter;private byte[] req;public TimeClientHandler(){req = "QUERY TIME ORDER".getBytes();}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ByteBuf message = null;for (int i = 0; i < 100; i++){message = Unpooled.buffer(req.length);message.writeBytes(req);ctx.writeAndFlush(message);}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "UTF-8");System.out.println("Now is : " + body + " ; the counter is :" + ++counter);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}

在上述代码中,当客户端与服务端建立连接之后,会向服务端发送100条消息,并且每次都会刷新,保证每条消息都会被写入channel中,理论上服务端会受到100条请求查询事件的请求消息。当客户端接收到服务器端的响应消息之后,就会打印一次计数器,理论上来说客户端会打印100次时间。下面我们来看看实际的运行结果。

2.3 运行结果

服务端运行结果:
在这里插入图片描述
在这里插入图片描述
客户端运行结果:
在这里插入图片描述
在这里插入图片描述
从上述结果可以看出,服务端收到了61条消息,但没有一条是查询时间的指令。客户端实际上也只接受到了3条从服务端返回的指令。

正是由于我们的程序没有考虑TCP的粘包/拆包问题,才会导致实际结果与理论上不符合。那么下面我们给出正确的代码。

3. Netty解决TCP粘包/拆包问题示例

为了解决TCP粘包/拆包问题,Netty提供了多种编码器,在本节中我们将利用Netty中的LineBasedFrameDecoder和StringDecoder来解决上面出现的问题,我们同样分服务端和客户端介绍。

3.1 服务端代码

package netty.frame.tcpzjb.right;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;/*** created by LMR on 2020/5/20*/
public class TimeServer {public static void main(String[] args) throws Exception{int port = 8080;new TimeServer().bind(port);}public void bind(int port) throws Exception{//配置服务端NIO线程组NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler());//绑定端口,同步等待成功ChannelFuture f = serverBootstrap.bind(port).sync();//等待服务端监听端口关闭f.channel().closeFuture().sync();}finally {//优雅退出,释放资源bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));socketChannel.pipeline().addLast(new StringDecoder());socketChannel.pipeline().addLast(new TimeServerHandler());}}
}

与上面代码不同点在于,我们在ChildChannelHandler初始化过程中,往SocketChannel的pipeline中添加了LineBasedFrameDecoder和StringDecoder两种编码器。

package netty.frame.tcpzjb.right;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/*** created by LMR on 2020/5/20*/
public class TimeServerHandler extends ChannelInboundHandlerAdapter {private int counter;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {String body = (String) msg;System.out.println("The time server receive order : " + body+ " ; the counter is : " + ++counter);String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? newjava.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER";currentTime = currentTime + System.getProperty("line.separator");//响应给客户端ByteBuf resBuf= Unpooled.copiedBuffer(currentTime.getBytes());ctx.writeAndFlush(resBuf);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}

而TimeServerHandler代码相对于上一节中的代码更为简洁,不需要对消息进行编码。这是因为我们设置了字符串解码器,不需要在进行手动解码,我们收到的消息就是字符串。

3.2 客户端代码

package netty.frame.tcpzjb.right;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;/*** created by LMR on 2020/5/20*/
public class TimeClient {public static void main(String[] args) throws Exception{int port = 8080;new TimeClient().connect("127.0.0.1", port);}public void connect(String host, int port) throws Exception{NioEventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LineBasedFrameDecoder(1024));ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new TimeClientHandler());}});//发起异步连接操作ChannelFuture f = bootstrap.connect(host, port).sync();//等待客户端连接关闭f.channel().closeFuture().sync();}finally {group.shutdownGracefully();}}
}

同样TimeClient与之前的代码相比也仅仅增加了LineBasedFrameDecoder和StringDecoder两种解码器。

package netty.frame.tcpzjb.right;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;import java.util.logging.Logger;/*** created by LMR on 2020/5/20*/
public class TimeClientHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());private int counter;private byte[] req;public TimeClientHandler(){req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ByteBuf message = null;for (int i = 0; i < 100; i++){message = Unpooled.buffer(req.length);message.writeBytes(req);ctx.writeAndFlush(message);}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {String body = (String) msg;System.out.println("Now is : " + body + " ; the counter is :" + ++counter);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {logger.warning("Unexpected exception from downstream : " + cause.getMessage());ctx.close();}
}

在TimeClientHandler中,我们同样不需要对服务端返回的消息进行解码,相比于之前的代码更加简洁。
下面我们看看运行结果。

3.3 运行结果

服务端结果:
在这里插入图片描述
客户端结果:
在这里插入图片描述
从结果上看来,没有发生TCP的粘包和拆包问题,那为什么会出现这样的情况呢,接下来我们简单解释下原因。

3.4 工作原理

LineBasedFrameDecoder的工作原理是它一次遍历ByteBuf中的可读字节,判断看是否有“\n”或者“\r\n”, 如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有出现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。

StringDecoder的功能非常简单,就是将接收到的对象转换成字符串,然后继续调用后面的Handler。LineBasedFrameDecoder + StringDecoder组合就是按行切换的文本解码器,它被设计用来支持TCP的粘包和拆包。

如果发送的消息不是以换行符结束的,该怎么办呢?或者没有回车换行符,靠消息头中的长度字段来分包怎么办?是不是需要自己写半包解码器?

答案是否定的,Netty提供了多种支持TCP粘包/拆包的解码器用来满足用户的不同诉求。在后续博客中我将继续介绍Netty提供的几种解码器的使用。



写在后面的话:

本文是我阅读《Netty权威指南》的笔记,代码是基于该书中的用例实现的,由于Netty版本不一致,我对代码中的部分进行了修改。

参考博客:
https://www.jb51.net/article/165349.htm
https://blog.csdn.net/qq_33227649/article/details/78319613

如果喜欢的话希望点赞收藏,关注我,将不间断更新博客。

希望热爱技术的小伙伴私聊,一起学习进步

来自于热爱编程的小白

这篇关于Netty是如何解决TCP的粘包和拆包问题的?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/544036

相关文章

线上Java OOM问题定位与解决方案超详细解析

《线上JavaOOM问题定位与解决方案超详细解析》OOM是JVM抛出的错误,表示内存分配失败,:本文主要介绍线上JavaOOM问题定位与解决方案的相关资料,文中通过代码介绍的非常详细,需要的朋... 目录一、OOM问题核心认知1.1 OOM定义与技术定位1.2 OOM常见类型及技术特征二、OOM问题定位工具

C++右移运算符的一个小坑及解决

《C++右移运算符的一个小坑及解决》文章指出右移运算符处理负数时左侧补1导致死循环,与除法行为不同,强调需注意补码机制以正确统计二进制1的个数... 目录我遇到了这么一个www.chinasem.cn函数由此可以看到也很好理解总结我遇到了这么一个函数template<typename T>unsigned

Vue3绑定props默认值问题

《Vue3绑定props默认值问题》使用Vue3的defineProps配合TypeScript的interface定义props类型,并通过withDefaults设置默认值,使组件能安全访问传入的... 目录前言步骤步骤1:使用 defineProps 定义 Props步骤2:设置默认值总结前言使用T

504 Gateway Timeout网关超时的根源及完美解决方法

《504GatewayTimeout网关超时的根源及完美解决方法》在日常开发和运维过程中,504GatewayTimeout错误是常见的网络问题之一,尤其是在使用反向代理(如Nginx)或... 目录引言为什么会出现 504 错误?1. 探索 504 Gateway Timeout 错误的根源 1.1 后端

Web服务器-Nginx-高并发问题

《Web服务器-Nginx-高并发问题》Nginx通过事件驱动、I/O多路复用和异步非阻塞技术高效处理高并发,结合动静分离和限流策略,提升性能与稳定性... 目录前言一、架构1. 原生多进程架构2. 事件驱动模型3. IO多路复用4. 异步非阻塞 I/O5. Nginx高并发配置实战二、动静分离1. 职责2

解决升级JDK报错:module java.base does not“opens java.lang.reflect“to unnamed module问题

《解决升级JDK报错:modulejava.basedoesnot“opensjava.lang.reflect“tounnamedmodule问题》SpringBoot启动错误源于Jav... 目录问题描述原因分析解决方案总结问题描述启动sprintboot时报以下错误原因分析编程异js常是由Ja

深度剖析SpringBoot日志性能提升的原因与解决

《深度剖析SpringBoot日志性能提升的原因与解决》日志记录本该是辅助工具,却为何成了性能瓶颈,SpringBoot如何用代码彻底破解日志导致的高延迟问题,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言第一章:日志性能陷阱的底层原理1.1 日志级别的“双刃剑”效应1.2 同步日志的“吞吐量杀手”

MySQL 表空却 ibd 文件过大的问题及解决方法

《MySQL表空却ibd文件过大的问题及解决方法》本文给大家介绍MySQL表空却ibd文件过大的问题及解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考... 目录一、问题背景:表空却 “吃满” 磁盘的怪事二、问题复现:一步步编程还原异常场景1. 准备测试源表与数据

解决Nginx启动报错Job for nginx.service failed because the control process exited with error code问题

《解决Nginx启动报错Jobfornginx.servicefailedbecausethecontrolprocessexitedwitherrorcode问题》Nginx启... 目录一、报错如下二、解决原因三、解决方式总结一、报错如下Job for nginx.service failed bec

SysMain服务可以关吗? 解决SysMain服务导致的高CPU使用率问题

《SysMain服务可以关吗?解决SysMain服务导致的高CPU使用率问题》SysMain服务是超级预读取,该服务会记录您打开应用程序的模式,并预先将它们加载到内存中以节省时间,但它可能占用大量... 在使用电脑的过程中,CPU使用率居高不下是许多用户都遇到过的问题,其中名为SysMain的服务往往是罪魁