netty编程之实现断点续传(分片发送)功能

2024-08-30 21:04

本文主要是介绍netty编程之实现断点续传(分片发送)功能,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

写在前面

在我们使用各种网盘的时候,可以随时的暂停上传,然后继续上传,这其实就是断点续传的功能,本文就看下在netty中如何实现断点续传的功能。

1:核心点介绍

1.1:RandomAccessFile

RandomAccessFile类有一个seek方法,通过该方法可以从文件的指定位置开始读取内容,基于此,我们就可以实现从断点处继续上传的效果,其实也就是实现断点续传了。

1.1:client和server交互协议的封装

定义如下的类来封装交互协议:

public class FileTransferProtocol {private Integer transferType; //0请求传输文件、1文件传输指令、2文件传输数据private Object transferObj;   //数据对象;(0)FileDescInfo、(1)FileBurstInstruct、(2)FileBurstDatapublic Integer getTransferType() {return transferType;}public void setTransferType(Integer transferType) {this.transferType = transferType;}public Object getTransferObj() {return transferObj;}public void setTransferObj(Object transferObj) {this.transferObj = transferObj;}}

其中transferType有如下的值:

1:0请求传输文件 客户端请求开始上传文件,对应的信息封装类是FileDescInfo,描述了要上传的文件的名称大小等信息
2:1文件传输指令客户端和服务端共同使用,对应的信息封装类是FileBurstInstruct,通过抽象的指令值来标记当前传输处于哪个阶段
3:2文件传输数据用来封装具体要上传的数据,位置信息等

1.3:protostuff

数据传输的序列化方式采用protostuff,因为其在对象序列化上的性能表现还是比较优秀(序列化的速度以及序列化的大小),并且使用方式也比较简单。

2:正式编码

2.1:server

server main:

package com.dahuyou.netty.transferfile.server;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {//配置服务端NIO线程组private EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));private EventLoopGroup childGroup = new NioEventLoopGroup();private Channel channel;public ChannelFuture bing(int port) {ChannelFuture channelFuture = null;try {ServerBootstrap b = new ServerBootstrap();b.group(parentGroup, childGroup).channel(NioServerSocketChannel.class)    //非阻塞模式.option(ChannelOption.SO_BACKLOG, 128).childHandler(new MyChannelInitializer());channelFuture = b.bind(port).syncUninterruptibly();this.channel = channelFuture.channel();} catch (Exception e) {e.printStackTrace();} finally {if (null != channelFuture && channelFuture.isSuccess()) {System.out.println("netty server start done. {}");} else {System.out.println("netty server start error. {}");}}return channelFuture;}public void destroy() {if (null == channel) return;channel.close();parentGroup.shutdownGracefully();childGroup.shutdownGracefully();}public Channel getChannel() {return channel;}}

MyChannelInitializer:

package com.dahuyou.netty.transferfile.server;import com.dahuyou.netty.transferfile.codec.ObjDecoder;
import com.dahuyou.netty.transferfile.codec.ObjEncoder;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel channel) {//对象传输处理channel.pipeline().addLast(new ObjDecoder(FileTransferProtocol.class));channel.pipeline().addLast(new ObjEncoder(FileTransferProtocol.class));// 在管道中添加我们自己的接收数据实现方法channel.pipeline().addLast(new MyServerHandler());}}

这里设置了基于protostuff的编解码器,以及消息处理的handler:

package com.dahuyou.netty.transferfile.server;import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import com.dahuyou.netty.transferfile.domain.*;
import com.dahuyou.netty.transferfile.util.CacheUtil;
import com.dahuyou.netty.transferfile.util.FileUtil;
import com.dahuyou.netty.transferfile.util.MsgUtil;
import java.text.SimpleDateFormat;
import java.util.Date;public class MyServerHandler extends ChannelInboundHandlerAdapter {/*** 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {SocketChannel channel = (SocketChannel) ctx.channel();System.out.println("链接报告开始");/*System.out.println("链接报告信息:有一客户端链接到本服务端。channelId:" + channel.id());System.out.println("链接报告IP:" + channel.localAddress().getHostString());System.out.println("链接报告Port:" + channel.localAddress().getPort());System.out.println("链接报告完毕");*/}/*** 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("客户端断开链接" + ctx.channel().localAddress().toString());}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//数据格式验证if (!(msg instanceof FileTransferProtocol)) return;FileTransferProtocol fileTransferProtocol = (FileTransferProtocol) msg;//0传输文件'请求'、1文件传输'指令'、2文件传输'数据'switch (fileTransferProtocol.getTransferType()) {case 0:FileDescInfo fileDescInfo = (FileDescInfo) fileTransferProtocol.getTransferObj();//断点续传信息,实际应用中需要将断点续传信息保存到数据库中FileBurstInstruct fileBurstInstructOld = CacheUtil.burstDataMap.get(fileDescInfo.getFileName());if (null != fileBurstInstructOld) {if (fileBurstInstructOld.getStatus() == Constants.FileStatus.COMPLETE) {CacheUtil.burstDataMap.remove(fileDescInfo.getFileName());}//传输完成删除断点信息System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端,接收客户端传输文件请求[断点续传]。" + JSON.toJSONString(fileBurstInstructOld));ctx.writeAndFlush(MsgUtil.buildTransferInstruct(fileBurstInstructOld));return;}//发送信息FileTransferProtocol sendFileTransferProtocol = MsgUtil.buildTransferInstruct(Constants.FileStatus.BEGIN, fileDescInfo.getFileUrl(), 0);ctx.writeAndFlush(sendFileTransferProtocol);System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端,接收客户端传输文件请求。" + JSON.toJSONString(fileDescInfo));break;case 2:FileBurstData fileBurstData = (FileBurstData) fileTransferProtocol.getTransferObj();FileBurstInstruct fileBurstInstruct = FileUtil.writeFile("E://", fileBurstData);//保存断点续传信息CacheUtil.burstDataMap.put(fileBurstData.getFileName(), fileBurstInstruct);ctx.writeAndFlush(MsgUtil.buildTransferInstruct(fileBurstInstruct));System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端,接收客户端传输文件数据。" + JSON.toJSONString(fileBurstData));//传输完成删除断点信息if (fileBurstInstruct.getStatus() == Constants.FileStatus.COMPLETE) {CacheUtil.burstDataMap.remove(fileBurstData.getFileName());}break;default:break;}}/*** 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();System.out.println("异常信息:\r\n" + cause.getMessage());}}

主要看方法channelRead,分为如下几种情况:

0:根据是否是续传返回不同的消息,控制client上传的不同行为
2:如果是上传文件,则保存文件,完成当前文件内容的上传,并返回续传信息给client,client继续上传

2.2:client

client main:

package com.dahuyou.netty.transferfile.client;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient {//配置服务端NIO线程组private EventLoopGroup workerGroup = new NioEventLoopGroup();private Channel channel;public ChannelFuture connect(String inetHost, int inetPort) {ChannelFuture channelFuture = null;try {Bootstrap b = new Bootstrap();b.group(workerGroup);b.channel(NioSocketChannel.class);b.option(ChannelOption.AUTO_READ, true);b.handler(new MyChannelInitializer());channelFuture = b.connect(inetHost, inetPort).syncUninterruptibly();this.channel = channelFuture.channel();} catch (Exception e) {e.printStackTrace();} finally {if (null != channelFuture && channelFuture.isSuccess()) {System.out.println("netty client start done. {}");} else {System.out.println("netty client start error. {}");}}return channelFuture;}public void destroy() {if (null == channel) return;channel.close();workerGroup.shutdownGracefully();}}

MyChannelInitializer:

package com.dahuyou.netty.transferfile.client;import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import com.dahuyou.netty.transferfile.codec.ObjDecoder;
import com.dahuyou.netty.transferfile.codec.ObjEncoder;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {//对象传输处理channel.pipeline().addLast(new ObjDecoder(FileTransferProtocol.class));channel.pipeline().addLast(new ObjEncoder(FileTransferProtocol.class));// 在管道中添加我们自己的接收数据实现方法channel.pipeline().addLast(new MyClientHandler());}}

同样设置了protostuff的编解码器,以及消息处理类:

package com.dahuyou.netty.transferfile.client;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import com.dahuyou.netty.transferfile.domain.Constants;
import com.dahuyou.netty.transferfile.domain.FileBurstData;
import com.dahuyou.netty.transferfile.domain.FileBurstInstruct;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;
import com.dahuyou.netty.transferfile.util.FileUtil;
import com.dahuyou.netty.transferfile.util.MsgUtil;
import java.text.SimpleDateFormat;
import java.util.Date;public class MyClientHandler extends ChannelInboundHandlerAdapter {/*** 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {SocketChannel channel = (SocketChannel) ctx.channel();System.out.println("链接报告开始");/*System.out.println("链接报告信息:本客户端链接到服务端。channelId:" + channel.id());System.out.println("链接报告IP:" + channel.localAddress().getHostString());System.out.println("链接报告Port:" + channel.localAddress().getPort());System.out.println("链接报告完毕");*/}/*** 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("断开链接" + ctx.channel().localAddress().toString());super.channelInactive(ctx);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//数据格式验证if (!(msg instanceof FileTransferProtocol)) return;FileTransferProtocol fileTransferProtocol = (FileTransferProtocol) msg;//0传输文件'请求'、1文件传输'指令'、2文件传输'数据'switch (fileTransferProtocol.getTransferType()) {case 1:FileBurstInstruct fileBurstInstruct = (FileBurstInstruct) fileTransferProtocol.getTransferObj();//Constants.FileStatus {0开始、1中间、2结尾、3完成}if (Constants.FileStatus.COMPLETE == fileBurstInstruct.getStatus()) {ctx.flush();ctx.close();System.exit(-1);return;}FileBurstData fileBurstData = FileUtil.readFile(fileBurstInstruct.getClientFileUrl(), fileBurstInstruct.getReadPosition());ctx.writeAndFlush(MsgUtil.buildTransferData(fileBurstData));System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 客户端传输文件信息。 FILE:" + fileBurstData.getFileName() + " SIZE(byte):" + (fileBurstData.getEndPos() - fileBurstData.getBeginPos()));break;default:break;}/**模拟传输过程中断,场景测试可以注释掉*System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " [主动断开链接,模拟断点续传]");ctx.flush();ctx.close();System.exit(-1);*/}/*** 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();System.out.println("异常信息:\r\n" + cause.getMessage());}}

主要看方法channelRead,处理文件传输,根据是首次上传还是续传,从要上传的文件中获取字节码数据写到server,其中,体现续传的代码为FileUtil.readFile:

public class FileUtil {private static final int READ_BYTE_ONCE = 1024;public static FileBurstData readFile(String fileUrl, Integer readPosition) throws IOException {File file = new File(fileUrl);RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");//r: 只读模式 rw:读写模式// 这里体现了断点续传的续哦!!!randomAccessFile.seek(readPosition);}}

randomAccessFile.seek(readPosition);这里跳一下子就体现了断点续传的续哦!!!

2.3:测试

server启动类:

package com.dahuyou.netty.transferfile.test;import com.dahuyou.netty.transferfile.server.NettyServer;public class NettyServerTest {public static void main(String[] args) {System.out.println("hi netty server");//启动服务new NettyServer().bing(7397);}}

client启动类:

package com.dahuyou.netty.transferfile.test;import com.dahuyou.netty.transferfile.client.NettyClient;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;
import com.dahuyou.netty.transferfile.util.MsgUtil;
import io.netty.channel.ChannelFuture;
import java.io.File;public class NettyClientTest {public static void main(String[] args) {//启动客户端ChannelFuture channelFuture = new NettyClient().connect("127.0.0.1", 7397);//文件信息{文件大于1024kb方便测试断点续传}
//        File file = new File("C:\\Users\\fuzhengwei1\\Desktop\\测试传输文件.rar");File file = new File("D:\\xiaofuge_sourcecode\\interview-master\\dahuyou-study-netty\\transferfile\\src\\test\\java\\com\\dahuyou\\netty\\transferfile\\test\\测试传输文件.rar");FileTransferProtocol fileTransferProtocol = MsgUtil.buildRequestTransferFile(file.getAbsolutePath(), file.getName(), file.length());//发送信息;FILE:测试传输文件请求传输文件channelFuture.channel().writeAndFlush(fileTransferProtocol);}}

在client中首次启动发送请求上传文件的协议消息,发起文件上传的流程,我们测试的文件大小为1360字节,而首次上传文件的大小为1024字节,如下代码:

public class FileUtil {private static final int READ_BYTE_ONCE = 1024;public static FileBurstData readFile(String fileUrl, Integer readPosition) throws IOException {File file = new File(fileUrl);RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");//r: 只读模式 rw:读写模式randomAccessFile.seek(readPosition);
//        byte[] bytes = new byte[1024 * 100];byte[] bytes = new byte[READ_BYTE_ONCE];    
}

所以第一次上传后文件是打不开的如下:
在这里插入图片描述
再次上传后文件就可以正常打开了。
最后看下日志输出:
在这里插入图片描述
在这里插入图片描述

写在后面

参考文章列表

protostuff序列化方式学习 。

netty编程之使用protostuff作为数据传输载体 。

这篇关于netty编程之实现断点续传(分片发送)功能的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1121928

相关文章

Python的Darts库实现时间序列预测

《Python的Darts库实现时间序列预测》Darts一个集统计、机器学习与深度学习模型于一体的Python时间序列预测库,本文主要介绍了Python的Darts库实现时间序列预测,感兴趣的可以了解... 目录目录一、什么是 Darts?二、安装与基本配置安装 Darts导入基础模块三、时间序列数据结构与

Python使用FastAPI实现大文件分片上传与断点续传功能

《Python使用FastAPI实现大文件分片上传与断点续传功能》大文件直传常遇到超时、网络抖动失败、失败后只能重传的问题,分片上传+断点续传可以把大文件拆成若干小块逐个上传,并在中断后从已完成分片继... 目录一、接口设计二、服务端实现(FastAPI)2.1 运行环境2.2 目录结构建议2.3 serv

C#实现千万数据秒级导入的代码

《C#实现千万数据秒级导入的代码》在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,所以本文我就给大家分享一下千万数据秒级导入怎么实现,文中有详细的代码示例供大家参考,需要的朋友可... 目录前言一、数据存储二、处理逻辑优化前代码处理逻辑优化后的代码总结前言在实际开发中excel导入很

SpringBoot+RustFS 实现文件切片极速上传的实例代码

《SpringBoot+RustFS实现文件切片极速上传的实例代码》本文介绍利用SpringBoot和RustFS构建高性能文件切片上传系统,实现大文件秒传、断点续传和分片上传等功能,具有一定的参考... 目录一、为什么选择 RustFS + SpringBoot?二、环境准备与部署2.1 安装 RustF

Nginx部署HTTP/3的实现步骤

《Nginx部署HTTP/3的实现步骤》本文介绍了在Nginx中部署HTTP/3的详细步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录前提条件第一步:安装必要的依赖库第二步:获取并构建 BoringSSL第三步:获取 Nginx

MySQL的JDBC编程详解

《MySQL的JDBC编程详解》:本文主要介绍MySQL的JDBC编程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言一、前置知识1. 引入依赖2. 认识 url二、JDBC 操作流程1. JDBC 的写操作2. JDBC 的读操作总结前言本文介绍了mysq

MyBatis Plus实现时间字段自动填充的完整方案

《MyBatisPlus实现时间字段自动填充的完整方案》在日常开发中,我们经常需要记录数据的创建时间和更新时间,传统的做法是在每次插入或更新操作时手动设置这些时间字段,这种方式不仅繁琐,还容易遗漏,... 目录前言解决目标技术栈实现步骤1. 实体类注解配置2. 创建元数据处理器3. 服务层代码优化填充机制详

Python实现Excel批量样式修改器(附完整代码)

《Python实现Excel批量样式修改器(附完整代码)》这篇文章主要为大家详细介绍了如何使用Python实现一个Excel批量样式修改器,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录前言功能特性核心功能界面特性系统要求安装说明使用指南基本操作流程高级功能技术实现核心技术栈关键函

Java实现字节字符转bcd编码

《Java实现字节字符转bcd编码》BCD是一种将十进制数字编码为二进制的表示方式,常用于数字显示和存储,本文将介绍如何在Java中实现字节字符转BCD码的过程,需要的小伙伴可以了解下... 目录前言BCD码是什么Java实现字节转bcd编码方法补充总结前言BCD码(Binary-Coded Decima

SpringBoot全局域名替换的实现

《SpringBoot全局域名替换的实现》本文主要介绍了SpringBoot全局域名替换的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录 项目结构⚙️ 配置文件application.yml️ 配置类AppProperties.Ja