netty编程之实现断点续传(分片发送)功能

2024-08-30 21:04

本文主要是介绍netty编程之实现断点续传(分片发送)功能,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

写在前面

在我们使用各种网盘的时候,可以随时的暂停上传,然后继续上传,这其实就是断点续传的功能,本文就看下在netty中如何实现断点续传的功能。

1:核心点介绍

1.1:RandomAccessFile

RandomAccessFile类有一个seek方法,通过该方法可以从文件的指定位置开始读取内容,基于此,我们就可以实现从断点处继续上传的效果,其实也就是实现断点续传了。

1.1:client和server交互协议的封装

定义如下的类来封装交互协议:

public class FileTransferProtocol {private Integer transferType; //0请求传输文件、1文件传输指令、2文件传输数据private Object transferObj;   //数据对象;(0)FileDescInfo、(1)FileBurstInstruct、(2)FileBurstDatapublic Integer getTransferType() {return transferType;}public void setTransferType(Integer transferType) {this.transferType = transferType;}public Object getTransferObj() {return transferObj;}public void setTransferObj(Object transferObj) {this.transferObj = transferObj;}}

其中transferType有如下的值:

1:0请求传输文件 客户端请求开始上传文件,对应的信息封装类是FileDescInfo,描述了要上传的文件的名称大小等信息
2:1文件传输指令客户端和服务端共同使用,对应的信息封装类是FileBurstInstruct,通过抽象的指令值来标记当前传输处于哪个阶段
3:2文件传输数据用来封装具体要上传的数据,位置信息等

1.3:protostuff

数据传输的序列化方式采用protostuff,因为其在对象序列化上的性能表现还是比较优秀(序列化的速度以及序列化的大小),并且使用方式也比较简单。

2:正式编码

2.1:server

server main:

package com.dahuyou.netty.transferfile.server;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {//配置服务端NIO线程组private EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));private EventLoopGroup childGroup = new NioEventLoopGroup();private Channel channel;public ChannelFuture bing(int port) {ChannelFuture channelFuture = null;try {ServerBootstrap b = new ServerBootstrap();b.group(parentGroup, childGroup).channel(NioServerSocketChannel.class)    //非阻塞模式.option(ChannelOption.SO_BACKLOG, 128).childHandler(new MyChannelInitializer());channelFuture = b.bind(port).syncUninterruptibly();this.channel = channelFuture.channel();} catch (Exception e) {e.printStackTrace();} finally {if (null != channelFuture && channelFuture.isSuccess()) {System.out.println("netty server start done. {}");} else {System.out.println("netty server start error. {}");}}return channelFuture;}public void destroy() {if (null == channel) return;channel.close();parentGroup.shutdownGracefully();childGroup.shutdownGracefully();}public Channel getChannel() {return channel;}}

MyChannelInitializer:

package com.dahuyou.netty.transferfile.server;import com.dahuyou.netty.transferfile.codec.ObjDecoder;
import com.dahuyou.netty.transferfile.codec.ObjEncoder;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel channel) {//对象传输处理channel.pipeline().addLast(new ObjDecoder(FileTransferProtocol.class));channel.pipeline().addLast(new ObjEncoder(FileTransferProtocol.class));// 在管道中添加我们自己的接收数据实现方法channel.pipeline().addLast(new MyServerHandler());}}

这里设置了基于protostuff的编解码器,以及消息处理的handler:

package com.dahuyou.netty.transferfile.server;import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import com.dahuyou.netty.transferfile.domain.*;
import com.dahuyou.netty.transferfile.util.CacheUtil;
import com.dahuyou.netty.transferfile.util.FileUtil;
import com.dahuyou.netty.transferfile.util.MsgUtil;
import java.text.SimpleDateFormat;
import java.util.Date;public class MyServerHandler extends ChannelInboundHandlerAdapter {/*** 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {SocketChannel channel = (SocketChannel) ctx.channel();System.out.println("链接报告开始");/*System.out.println("链接报告信息:有一客户端链接到本服务端。channelId:" + channel.id());System.out.println("链接报告IP:" + channel.localAddress().getHostString());System.out.println("链接报告Port:" + channel.localAddress().getPort());System.out.println("链接报告完毕");*/}/*** 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("客户端断开链接" + ctx.channel().localAddress().toString());}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//数据格式验证if (!(msg instanceof FileTransferProtocol)) return;FileTransferProtocol fileTransferProtocol = (FileTransferProtocol) msg;//0传输文件'请求'、1文件传输'指令'、2文件传输'数据'switch (fileTransferProtocol.getTransferType()) {case 0:FileDescInfo fileDescInfo = (FileDescInfo) fileTransferProtocol.getTransferObj();//断点续传信息,实际应用中需要将断点续传信息保存到数据库中FileBurstInstruct fileBurstInstructOld = CacheUtil.burstDataMap.get(fileDescInfo.getFileName());if (null != fileBurstInstructOld) {if (fileBurstInstructOld.getStatus() == Constants.FileStatus.COMPLETE) {CacheUtil.burstDataMap.remove(fileDescInfo.getFileName());}//传输完成删除断点信息System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端,接收客户端传输文件请求[断点续传]。" + JSON.toJSONString(fileBurstInstructOld));ctx.writeAndFlush(MsgUtil.buildTransferInstruct(fileBurstInstructOld));return;}//发送信息FileTransferProtocol sendFileTransferProtocol = MsgUtil.buildTransferInstruct(Constants.FileStatus.BEGIN, fileDescInfo.getFileUrl(), 0);ctx.writeAndFlush(sendFileTransferProtocol);System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端,接收客户端传输文件请求。" + JSON.toJSONString(fileDescInfo));break;case 2:FileBurstData fileBurstData = (FileBurstData) fileTransferProtocol.getTransferObj();FileBurstInstruct fileBurstInstruct = FileUtil.writeFile("E://", fileBurstData);//保存断点续传信息CacheUtil.burstDataMap.put(fileBurstData.getFileName(), fileBurstInstruct);ctx.writeAndFlush(MsgUtil.buildTransferInstruct(fileBurstInstruct));System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端,接收客户端传输文件数据。" + JSON.toJSONString(fileBurstData));//传输完成删除断点信息if (fileBurstInstruct.getStatus() == Constants.FileStatus.COMPLETE) {CacheUtil.burstDataMap.remove(fileBurstData.getFileName());}break;default:break;}}/*** 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();System.out.println("异常信息:\r\n" + cause.getMessage());}}

主要看方法channelRead,分为如下几种情况:

0:根据是否是续传返回不同的消息,控制client上传的不同行为
2:如果是上传文件,则保存文件,完成当前文件内容的上传,并返回续传信息给client,client继续上传

2.2:client

client main:

package com.dahuyou.netty.transferfile.client;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient {//配置服务端NIO线程组private EventLoopGroup workerGroup = new NioEventLoopGroup();private Channel channel;public ChannelFuture connect(String inetHost, int inetPort) {ChannelFuture channelFuture = null;try {Bootstrap b = new Bootstrap();b.group(workerGroup);b.channel(NioSocketChannel.class);b.option(ChannelOption.AUTO_READ, true);b.handler(new MyChannelInitializer());channelFuture = b.connect(inetHost, inetPort).syncUninterruptibly();this.channel = channelFuture.channel();} catch (Exception e) {e.printStackTrace();} finally {if (null != channelFuture && channelFuture.isSuccess()) {System.out.println("netty client start done. {}");} else {System.out.println("netty client start error. {}");}}return channelFuture;}public void destroy() {if (null == channel) return;channel.close();workerGroup.shutdownGracefully();}}

MyChannelInitializer:

package com.dahuyou.netty.transferfile.client;import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import com.dahuyou.netty.transferfile.codec.ObjDecoder;
import com.dahuyou.netty.transferfile.codec.ObjEncoder;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {//对象传输处理channel.pipeline().addLast(new ObjDecoder(FileTransferProtocol.class));channel.pipeline().addLast(new ObjEncoder(FileTransferProtocol.class));// 在管道中添加我们自己的接收数据实现方法channel.pipeline().addLast(new MyClientHandler());}}

同样设置了protostuff的编解码器,以及消息处理类:

package com.dahuyou.netty.transferfile.client;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import com.dahuyou.netty.transferfile.domain.Constants;
import com.dahuyou.netty.transferfile.domain.FileBurstData;
import com.dahuyou.netty.transferfile.domain.FileBurstInstruct;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;
import com.dahuyou.netty.transferfile.util.FileUtil;
import com.dahuyou.netty.transferfile.util.MsgUtil;
import java.text.SimpleDateFormat;
import java.util.Date;public class MyClientHandler extends ChannelInboundHandlerAdapter {/*** 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {SocketChannel channel = (SocketChannel) ctx.channel();System.out.println("链接报告开始");/*System.out.println("链接报告信息:本客户端链接到服务端。channelId:" + channel.id());System.out.println("链接报告IP:" + channel.localAddress().getHostString());System.out.println("链接报告Port:" + channel.localAddress().getPort());System.out.println("链接报告完毕");*/}/*** 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("断开链接" + ctx.channel().localAddress().toString());super.channelInactive(ctx);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//数据格式验证if (!(msg instanceof FileTransferProtocol)) return;FileTransferProtocol fileTransferProtocol = (FileTransferProtocol) msg;//0传输文件'请求'、1文件传输'指令'、2文件传输'数据'switch (fileTransferProtocol.getTransferType()) {case 1:FileBurstInstruct fileBurstInstruct = (FileBurstInstruct) fileTransferProtocol.getTransferObj();//Constants.FileStatus {0开始、1中间、2结尾、3完成}if (Constants.FileStatus.COMPLETE == fileBurstInstruct.getStatus()) {ctx.flush();ctx.close();System.exit(-1);return;}FileBurstData fileBurstData = FileUtil.readFile(fileBurstInstruct.getClientFileUrl(), fileBurstInstruct.getReadPosition());ctx.writeAndFlush(MsgUtil.buildTransferData(fileBurstData));System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 客户端传输文件信息。 FILE:" + fileBurstData.getFileName() + " SIZE(byte):" + (fileBurstData.getEndPos() - fileBurstData.getBeginPos()));break;default:break;}/**模拟传输过程中断,场景测试可以注释掉*System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " [主动断开链接,模拟断点续传]");ctx.flush();ctx.close();System.exit(-1);*/}/*** 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();System.out.println("异常信息:\r\n" + cause.getMessage());}}

主要看方法channelRead,处理文件传输,根据是首次上传还是续传,从要上传的文件中获取字节码数据写到server,其中,体现续传的代码为FileUtil.readFile:

public class FileUtil {private static final int READ_BYTE_ONCE = 1024;public static FileBurstData readFile(String fileUrl, Integer readPosition) throws IOException {File file = new File(fileUrl);RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");//r: 只读模式 rw:读写模式// 这里体现了断点续传的续哦!!!randomAccessFile.seek(readPosition);}}

randomAccessFile.seek(readPosition);这里跳一下子就体现了断点续传的续哦!!!

2.3:测试

server启动类:

package com.dahuyou.netty.transferfile.test;import com.dahuyou.netty.transferfile.server.NettyServer;public class NettyServerTest {public static void main(String[] args) {System.out.println("hi netty server");//启动服务new NettyServer().bing(7397);}}

client启动类:

package com.dahuyou.netty.transferfile.test;import com.dahuyou.netty.transferfile.client.NettyClient;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;
import com.dahuyou.netty.transferfile.util.MsgUtil;
import io.netty.channel.ChannelFuture;
import java.io.File;public class NettyClientTest {public static void main(String[] args) {//启动客户端ChannelFuture channelFuture = new NettyClient().connect("127.0.0.1", 7397);//文件信息{文件大于1024kb方便测试断点续传}
//        File file = new File("C:\\Users\\fuzhengwei1\\Desktop\\测试传输文件.rar");File file = new File("D:\\xiaofuge_sourcecode\\interview-master\\dahuyou-study-netty\\transferfile\\src\\test\\java\\com\\dahuyou\\netty\\transferfile\\test\\测试传输文件.rar");FileTransferProtocol fileTransferProtocol = MsgUtil.buildRequestTransferFile(file.getAbsolutePath(), file.getName(), file.length());//发送信息;FILE:测试传输文件请求传输文件channelFuture.channel().writeAndFlush(fileTransferProtocol);}}

在client中首次启动发送请求上传文件的协议消息,发起文件上传的流程,我们测试的文件大小为1360字节,而首次上传文件的大小为1024字节,如下代码:

public class FileUtil {private static final int READ_BYTE_ONCE = 1024;public static FileBurstData readFile(String fileUrl, Integer readPosition) throws IOException {File file = new File(fileUrl);RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");//r: 只读模式 rw:读写模式randomAccessFile.seek(readPosition);
//        byte[] bytes = new byte[1024 * 100];byte[] bytes = new byte[READ_BYTE_ONCE];    
}

所以第一次上传后文件是打不开的如下:
在这里插入图片描述
再次上传后文件就可以正常打开了。
最后看下日志输出:
在这里插入图片描述
在这里插入图片描述

写在后面

参考文章列表

protostuff序列化方式学习 。

netty编程之使用protostuff作为数据传输载体 。

这篇关于netty编程之实现断点续传(分片发送)功能的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1121928

相关文章

C++中unordered_set哈希集合的实现

《C++中unordered_set哈希集合的实现》std::unordered_set是C++标准库中的无序关联容器,基于哈希表实现,具有元素唯一性和无序性特点,本文就来详细的介绍一下unorder... 目录一、概述二、头文件与命名空间三、常用方法与示例1. 构造与析构2. 迭代器与遍历3. 容量相关4

C++中悬垂引用(Dangling Reference) 的实现

《C++中悬垂引用(DanglingReference)的实现》C++中的悬垂引用指引用绑定的对象被销毁后引用仍存在的情况,会导致访问无效内存,下面就来详细的介绍一下产生的原因以及如何避免,感兴趣... 目录悬垂引用的产生原因1. 引用绑定到局部变量,变量超出作用域后销毁2. 引用绑定到动态分配的对象,对象

SpringBoot基于注解实现数据库字段回填的完整方案

《SpringBoot基于注解实现数据库字段回填的完整方案》这篇文章主要为大家详细介绍了SpringBoot如何基于注解实现数据库字段回填的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解... 目录数据库表pom.XMLRelationFieldRelationFieldMapping基础的一些代

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

Python实现字典转字符串的五种方法

《Python实现字典转字符串的五种方法》本文介绍了在Python中如何将字典数据结构转换为字符串格式的多种方法,首先可以通过内置的str()函数进行简单转换;其次利用ison.dumps()函数能够... 目录1、使用json模块的dumps方法:2、使用str方法:3、使用循环和字符串拼接:4、使用字符

Linux下利用select实现串口数据读取过程

《Linux下利用select实现串口数据读取过程》文章介绍Linux中使用select、poll或epoll实现串口数据读取,通过I/O多路复用机制在数据到达时触发读取,避免持续轮询,示例代码展示设... 目录示例代码(使用select实现)代码解释总结在 linux 系统里,我们可以借助 select、

Linux挂载linux/Windows共享目录实现方式

《Linux挂载linux/Windows共享目录实现方式》:本文主要介绍Linux挂载linux/Windows共享目录实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录文件共享协议linux环境作为服务端(NFS)在服务器端安装 NFS创建要共享的目录修改 NFS 配

通过React实现页面的无限滚动效果

《通过React实现页面的无限滚动效果》今天我们来聊聊无限滚动这个现代Web开发中不可或缺的技术,无论你是刷微博、逛知乎还是看脚本,无限滚动都已经渗透到我们日常的浏览体验中,那么,如何优雅地实现它呢?... 目录1. 早期的解决方案2. 交叉观察者:IntersectionObserver2.1 Inter

Spring Gateway动态路由实现方案

《SpringGateway动态路由实现方案》本文主要介绍了SpringGateway动态路由实现方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随... 目录前沿何为路由RouteDefinitionRouteLocator工作流程动态路由实现尾巴前沿S