netty框架tcp协议粘包半包序列化解决方案

2023-11-21 07:20

本文主要是介绍netty框架tcp协议粘包半包序列化解决方案,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

主要依赖

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.39.Final</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.76</version></dependency>

粘包半包解决

粘包半包问题,这里使用LengthFieldBasedFrameDecoder来作为解决方案
在发送消息前,先约定用定长字节表示接下来数据的长度
LengthFieldBasedFrameDecoder()构造方法参数:

  1. maxFrameLength:数据帧最大长度
  2. lengthFieldoffset:长度字段偏移量
  3. lengthFieldLength:长度字段长度(字节)
  4. lengthAdjustment:长度字段后第几个字节是正文内容
  5. initialBytesToStrip:从帧首去除字节数

序列化(二进制序列化)

java序列化

注意事项:
序列化对象需要实现Serializable接口(java.io.Serializable)
代码:

public class MessageCodecJava extends MessageToMessageCodec<ByteBuf, Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {ByteBuf out = ctx.alloc().buffer();//获取内容ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes = bos.toByteArray();//长度,我们设定为4个字节out.writeInt(bytes.length);// 写入内容out.writeBytes(bytes);outList.add(out);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int length = in.readInt();byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();System.out.println(message);out.add(message);}
}

fastjson序列化

注意事项: 不需要实现Serializable接口(java.io.Serializable)
代码:

public class MessageCodecJson extends MessageToMessageCodec<ByteBuf, Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, List<Object> list) throws Exception {ByteBuf out = ctx.alloc().buffer();//获取内容字节byte[] jsonBytes = JSON.toJSONString(msg, SerializerFeature.WriteClassName).getBytes(StandardCharsets.UTF_8);// 获取内容长度out.writeInt(jsonBytes.length);//  写入内容out.writeBytes(jsonBytes);list.add(out);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> list) throws Exception {//获取内容长度int length = in.readInt();//将二进制反序列化为对象byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);Message msg = JSON.parseObject(new String(bytes, StandardCharsets.UTF_8),Message.class);System.out.println(msg);list.add(msg);}
}

粘包半包序列化解决举例:

客户端代码:

package com.haust.blog;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;public class MyClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new MyClientInitializer());ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();channelFuture.channel().closeFuture().sync();}catch (Exception e){e.printStackTrace();}finally {group.shutdownGracefully();}}
}

服务器端代码:

package com.haust.blog;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class MyServer {public static void main(String[] args) {NioEventLoopGroup boosGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(boosGroup,workerGroup).option(ChannelOption.SO_BACKLOG,128).childOption(ChannelOption.SO_KEEPALIVE,true).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer());ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();channelFuture.channel().closeFuture().sync();}catch (Exception e){e.printStackTrace();}finally {boosGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

POJO类
为提高编解码的扩展性,我们单独定义一个父类,然后让其他POJO类集成此父类

package com.haust.blog.pojo;import java.io.Serializable;public class Message implements Serializable {
}
package com.haust.blog.pojo;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class JsonBeanMessage extends Message {private int id;private String name;
}
package com.haust.blog.pojo;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class BinaryBeanMessage extends Message{private int id;private String name;private String sex;
}

客户端初始化类

package com.haust.blog;import com.haust.blog.handler.MyClientHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;public class MyClientInitializer extends ChannelInitializer<SocketChannel> {protected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new LengthFieldBasedFrameDecoder(1024,0,4,0,0));pipeline.addLast(new MessageCodecJson());pipeline.addLast(new MyClientHandler());}
}

服务器端初始化类

package com.haust.blog;import com.haust.blog.handler.MyServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;public class MyServerInitializer extends ChannelInitializer<SocketChannel> {protected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new LengthFieldBasedFrameDecoder(1024,0,4,0,0));
//        pipeline.addLast(new MessageCodecJava());pipeline.addLast(new MessageCodecJson());pipeline.addLast(new MyServerHandler());}
}

客户端处理类

package com.haust.blog.handler;import com.haust.blog.pojo.BinaryBeanMessage;
import com.haust.blog.pojo.JsonBeanMessage;
import com.haust.blog.pojo.Message;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;public class MyClientHandler extends SimpleChannelInboundHandler<Message> {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, Message message) throws Exception {System.out.println("客户端收到了服务器发来的数据:"+message);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {
//        BinaryBeanMessage message = new BinaryBeanMessage(1, "张三", "男");JsonBeanMessage message = new JsonBeanMessage(1, "李四");ctx.writeAndFlush(message);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println(cause.getMessage());ctx.close();}
}

服务器端处理类

package com.haust.blog.handler;import com.haust.blog.pojo.BinaryBeanMessage;
import com.haust.blog.pojo.Message;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;public class MyServerHandler extends SimpleChannelInboundHandler<Message> {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, Message message) throws Exception {System.out.println("服务器收到了客户端发来的数据:"+message);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {BinaryBeanMessage message = new BinaryBeanMessage(1, "张三", "男");ctx.writeAndFlush(message);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println(cause.getMessage());ctx.close();}
}

java序列化类

package com.haust.blog;import com.haust.blog.pojo.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import lombok.extern.slf4j.Slf4j;import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;@Slf4j
@ChannelHandler.Sharable
/*** 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的*/
public class MessageCodecJava extends MessageToMessageCodec<ByteBuf, Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {ByteBuf out = ctx.alloc().buffer();//获取内容的字节数组ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes = bos.toByteArray();//长度out.writeInt(bytes.length);//写入内容out.writeBytes(bytes);outList.add(out);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int length = in.readInt();byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();System.out.println(message);out.add(message);}
}

fastjson序列化类

package com.haust.blog;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.haust.blog.pojo.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import java.nio.charset.StandardCharsets;
import java.util.List;@ChannelHandler.Sharable
public class MessageCodecJson extends MessageToMessageCodec<ByteBuf, Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, List<Object> list) throws Exception {ByteBuf out = ctx.alloc().buffer();//获取内容字节byte[] jsonBytes = JSON.toJSONString(msg, SerializerFeature.WriteClassName).getBytes(StandardCharsets.UTF_8);// 获取内容长度out.writeInt(jsonBytes.length);//  写入内容out.writeBytes(jsonBytes);list.add(out);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> list) throws Exception {//获取内容长度int length = in.readInt();//将二进制反序列化为对象byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);Message msg = JSON.parseObject(new String(bytes, StandardCharsets.UTF_8),Message.class);System.out.println(msg);list.add(msg);}
}

包结构:
在这里插入图片描述
结果:
在这里插入图片描述
在这里插入图片描述

总结:

  1. 如何解决粘包半包?思路:自定义协议,在对内容进行编码时,将内容的长度也作为信息发送过去,使解码时可以确定信息边界正确解码,从而避免粘包半包。
  2. 在已经确定好编解码规则(自定义协议)的情况下,为何还会需要LengthFieldBasedFrameDecoder类解决粘包半包问题?当发送数据过大时,导致一组消息多次发送(一个完整的消息,经过发送多次才发送结束),若直接按照解码规则进行解码就会导致半包问题出现,若我们使用LengthFieldBasedFrameDecoder在解码前进行预处理,LengthFieldBasedFrameDecoder会等到接收到完整的消息之后才会传给自定义decoder进行解码,从而避免了半包问题等。

代码举例:

package com.haust.blog;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.haust.blog.pojo.JsonBeanMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;import java.nio.charset.StandardCharsets;public class test {public static void main(String[] args) throws Exception {EmbeddedChannel channel = new EmbeddedChannel(new LoggingHandler(),new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 0),new MessageCodecJson());//encodeJsonBeanMessage json = new JsonBeanMessage(1, "张三");channel.writeOutbound(json);//decodeByteBuf buf = ByteBufAllocator.DEFAULT.buffer();byte[] jsonBytes = JSON.toJSONString(json, SerializerFeature.WriteClassName).getBytes(StandardCharsets.UTF_8);// 获取内容长度buf.writeInt(jsonBytes.length);buf.writeBytes(jsonBytes);ByteBuf s1 = buf.slice(0, 50);ByteBuf s2 = buf.slice(50, buf.readableBytes() - 50);s1.retain(); // 引用计数 2channel.writeInbound(s1);channel.writeInbound(s2);}
}

结果:传过来74B信息,第一次只传过来50B信息,此时LengthFieldBasedFrameDecoder进行拦截(若此时直接将50B信息传给解码器就会出现半包),直到另外24B信息到来(此时信息已经完整),LengthFieldBasedFrameDecoder才将完整消息传给decoder(自定义)进行解码操作

17:34:28 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] REGISTERED
17:34:28 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] ACTIVE
17:34:28 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: 74B+-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 46 7b 22 40 74 79 70 65 22 3a 22 63 6f |...F{"@type":"co|
|00000010| 6d 2e 68 61 75 73 74 2e 62 6c 6f 67 2e 70 6f 6a |m.haust.blog.poj|
|00000020| 6f 2e 4a 73 6f 6e 42 65 61 6e 4d 65 73 73 61 67 |o.JsonBeanMessag|
|00000030| 65 22 2c 22 69 64 22 3a 31 2c 22 6e 61 6d 65 22 |e","id":1,"name"|
|00000040| 3a 22 e5 bc a0 e4 b8 89 22 7d                   |:"......"}      |
+--------+-------------------------------------------------+----------------+
17:34:28 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] FLUSH
17:34:28 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 50B+-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 46 7b 22 40 74 79 70 65 22 3a 22 63 6f |...F{"@type":"co|
|00000010| 6d 2e 68 61 75 73 74 2e 62 6c 6f 67 2e 70 6f 6a |m.haust.blog.poj|
|00000020| 6f 2e 4a 73 6f 6e 42 65 61 6e 4d 65 73 73 61 67 |o.JsonBeanMessag|
|00000030| 65 22                                           |e"              |
+--------+-------------------------------------------------+----------------+
17:34:28 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE
17:34:28 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 24B+-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 2c 22 69 64 22 3a 31 2c 22 6e 61 6d 65 22 3a 22 |,"id":1,"name":"|
|00000010| e5 bc a0 e4 b8 89 22 7d                         |......"}        |
+--------+-------------------------------------------------+----------------+
JsonBeanMessage(id=1, name=张三)
17:34:28 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETEProcess finished with exit code 0
  1. 自定义编码器加@ChannelHandler.Sharable注解时在这里插入图片描述
    需要extends MessageToMessageCodec<ByteBuf, Message>
    Message是当接收到Message及其子类时进入此handler进行处理

  2. 为什么需要定义父类Message,然后其他POJO类继承?提高了可扩展性,如果父类message实现了序列化接口,子类就无需再显示的实现序列化接口。其次在编解码(handler)处理时指定了泛型,那么泛型类型及其子类均可以进入handler进行处理。

这篇关于netty框架tcp协议粘包半包序列化解决方案的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/400758

相关文章

Java 线程安全与 volatile与单例模式问题及解决方案

《Java线程安全与volatile与单例模式问题及解决方案》文章主要讲解线程安全问题的五个成因(调度随机、变量修改、非原子操作、内存可见性、指令重排序)及解决方案,强调使用volatile关键字... 目录什么是线程安全线程安全问题的产生与解决方案线程的调度是随机的多个线程对同一个变量进行修改线程的修改操

Spring 框架之Springfox使用详解

《Spring框架之Springfox使用详解》Springfox是Spring框架的API文档工具,集成Swagger规范,自动生成文档并支持多语言/版本,模块化设计便于扩展,但存在版本兼容性、性... 目录核心功能工作原理模块化设计使用示例注意事项优缺点优点缺点总结适用场景建议总结Springfox 是

全面解析MySQL索引长度限制问题与解决方案

《全面解析MySQL索引长度限制问题与解决方案》MySQL对索引长度设限是为了保持高效的数据检索性能,这个限制不是MySQL的缺陷,而是数据库设计中的权衡结果,下面我们就来看看如何解决这一问题吧... 目录引言:为什么会有索引键长度问题?一、问题根源深度解析mysql索引长度限制原理实际场景示例二、五大解决

如何在Spring Boot项目中集成MQTT协议

《如何在SpringBoot项目中集成MQTT协议》本文介绍在SpringBoot中集成MQTT的步骤,包括安装Broker、添加EclipsePaho依赖、配置连接参数、实现消息发布订阅、测试接口... 目录1. 准备工作2. 引入依赖3. 配置MQTT连接4. 创建MQTT配置类5. 实现消息发布与订阅

使用Python进行GRPC和Dubbo协议的高级测试

《使用Python进行GRPC和Dubbo协议的高级测试》GRPC(GoogleRemoteProcedureCall)是一种高性能、开源的远程过程调用(RPC)框架,Dubbo是一种高性能的分布式服... 目录01 GRPC测试安装gRPC编写.proto文件实现服务02 Dubbo测试1. 安装Dubb

SpringSecurity显示用户账号已被锁定的原因及解决方案

《SpringSecurity显示用户账号已被锁定的原因及解决方案》SpringSecurity中用户账号被锁定问题源于UserDetails接口方法返回值错误,解决方案是修正isAccountNon... 目录SpringSecurity显示用户账号已被锁定的解决方案1.问题出现前的工作2.问题出现原因各

Python的端到端测试框架SeleniumBase使用解读

《Python的端到端测试框架SeleniumBase使用解读》:本文主要介绍Python的端到端测试框架SeleniumBase使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全... 目录SeleniumBase详细介绍及用法指南什么是 SeleniumBase?SeleniumBase

javax.net.ssl.SSLHandshakeException:异常原因及解决方案

《javax.net.ssl.SSLHandshakeException:异常原因及解决方案》javax.net.ssl.SSLHandshakeException是一个SSL握手异常,通常在建立SS... 目录报错原因在程序中绕过服务器的安全验证注意点最后多说一句报错原因一般出现这种问题是因为目标服务器

C++高效内存池实现减少动态分配开销的解决方案

《C++高效内存池实现减少动态分配开销的解决方案》C++动态内存分配存在系统调用开销、碎片化和锁竞争等性能问题,内存池通过预分配、分块管理和缓存复用解决这些问题,下面就来了解一下... 目录一、C++内存分配的性能挑战二、内存池技术的核心原理三、主流内存池实现:TCMalloc与Jemalloc1. TCM

MyBatis Plus 中 update_time 字段自动填充失效的原因分析及解决方案(最新整理)

《MyBatisPlus中update_time字段自动填充失效的原因分析及解决方案(最新整理)》在使用MyBatisPlus时,通常我们会在数据库表中设置create_time和update... 目录前言一、问题现象二、原因分析三、总结:常见原因与解决方法对照表四、推荐写法前言在使用 MyBATis