netty编程之实现断点续传(分片发送)功能

2024-08-30 21:04

本文主要是介绍netty编程之实现断点续传(分片发送)功能,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

写在前面

在我们使用各种网盘的时候,可以随时的暂停上传,然后继续上传,这其实就是断点续传的功能,本文就看下在netty中如何实现断点续传的功能。

1:核心点介绍

1.1:RandomAccessFile

RandomAccessFile类有一个seek方法,通过该方法可以从文件的指定位置开始读取内容,基于此,我们就可以实现从断点处继续上传的效果,其实也就是实现断点续传了。

1.1:client和server交互协议的封装

定义如下的类来封装交互协议:

public class FileTransferProtocol {private Integer transferType; //0请求传输文件、1文件传输指令、2文件传输数据private Object transferObj;   //数据对象;(0)FileDescInfo、(1)FileBurstInstruct、(2)FileBurstDatapublic Integer getTransferType() {return transferType;}public void setTransferType(Integer transferType) {this.transferType = transferType;}public Object getTransferObj() {return transferObj;}public void setTransferObj(Object transferObj) {this.transferObj = transferObj;}}

其中transferType有如下的值:

1:0请求传输文件 客户端请求开始上传文件,对应的信息封装类是FileDescInfo,描述了要上传的文件的名称大小等信息
2:1文件传输指令客户端和服务端共同使用,对应的信息封装类是FileBurstInstruct,通过抽象的指令值来标记当前传输处于哪个阶段
3:2文件传输数据用来封装具体要上传的数据,位置信息等

1.3:protostuff

数据传输的序列化方式采用protostuff,因为其在对象序列化上的性能表现还是比较优秀(序列化的速度以及序列化的大小),并且使用方式也比较简单。

2:正式编码

2.1:server

server main:

package com.dahuyou.netty.transferfile.server;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {//配置服务端NIO线程组private EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));private EventLoopGroup childGroup = new NioEventLoopGroup();private Channel channel;public ChannelFuture bing(int port) {ChannelFuture channelFuture = null;try {ServerBootstrap b = new ServerBootstrap();b.group(parentGroup, childGroup).channel(NioServerSocketChannel.class)    //非阻塞模式.option(ChannelOption.SO_BACKLOG, 128).childHandler(new MyChannelInitializer());channelFuture = b.bind(port).syncUninterruptibly();this.channel = channelFuture.channel();} catch (Exception e) {e.printStackTrace();} finally {if (null != channelFuture && channelFuture.isSuccess()) {System.out.println("netty server start done. {}");} else {System.out.println("netty server start error. {}");}}return channelFuture;}public void destroy() {if (null == channel) return;channel.close();parentGroup.shutdownGracefully();childGroup.shutdownGracefully();}public Channel getChannel() {return channel;}}

MyChannelInitializer:

package com.dahuyou.netty.transferfile.server;import com.dahuyou.netty.transferfile.codec.ObjDecoder;
import com.dahuyou.netty.transferfile.codec.ObjEncoder;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel channel) {//对象传输处理channel.pipeline().addLast(new ObjDecoder(FileTransferProtocol.class));channel.pipeline().addLast(new ObjEncoder(FileTransferProtocol.class));// 在管道中添加我们自己的接收数据实现方法channel.pipeline().addLast(new MyServerHandler());}}

这里设置了基于protostuff的编解码器,以及消息处理的handler:

package com.dahuyou.netty.transferfile.server;import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import com.dahuyou.netty.transferfile.domain.*;
import com.dahuyou.netty.transferfile.util.CacheUtil;
import com.dahuyou.netty.transferfile.util.FileUtil;
import com.dahuyou.netty.transferfile.util.MsgUtil;
import java.text.SimpleDateFormat;
import java.util.Date;public class MyServerHandler extends ChannelInboundHandlerAdapter {/*** 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {SocketChannel channel = (SocketChannel) ctx.channel();System.out.println("链接报告开始");/*System.out.println("链接报告信息:有一客户端链接到本服务端。channelId:" + channel.id());System.out.println("链接报告IP:" + channel.localAddress().getHostString());System.out.println("链接报告Port:" + channel.localAddress().getPort());System.out.println("链接报告完毕");*/}/*** 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("客户端断开链接" + ctx.channel().localAddress().toString());}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//数据格式验证if (!(msg instanceof FileTransferProtocol)) return;FileTransferProtocol fileTransferProtocol = (FileTransferProtocol) msg;//0传输文件'请求'、1文件传输'指令'、2文件传输'数据'switch (fileTransferProtocol.getTransferType()) {case 0:FileDescInfo fileDescInfo = (FileDescInfo) fileTransferProtocol.getTransferObj();//断点续传信息,实际应用中需要将断点续传信息保存到数据库中FileBurstInstruct fileBurstInstructOld = CacheUtil.burstDataMap.get(fileDescInfo.getFileName());if (null != fileBurstInstructOld) {if (fileBurstInstructOld.getStatus() == Constants.FileStatus.COMPLETE) {CacheUtil.burstDataMap.remove(fileDescInfo.getFileName());}//传输完成删除断点信息System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端,接收客户端传输文件请求[断点续传]。" + JSON.toJSONString(fileBurstInstructOld));ctx.writeAndFlush(MsgUtil.buildTransferInstruct(fileBurstInstructOld));return;}//发送信息FileTransferProtocol sendFileTransferProtocol = MsgUtil.buildTransferInstruct(Constants.FileStatus.BEGIN, fileDescInfo.getFileUrl(), 0);ctx.writeAndFlush(sendFileTransferProtocol);System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端,接收客户端传输文件请求。" + JSON.toJSONString(fileDescInfo));break;case 2:FileBurstData fileBurstData = (FileBurstData) fileTransferProtocol.getTransferObj();FileBurstInstruct fileBurstInstruct = FileUtil.writeFile("E://", fileBurstData);//保存断点续传信息CacheUtil.burstDataMap.put(fileBurstData.getFileName(), fileBurstInstruct);ctx.writeAndFlush(MsgUtil.buildTransferInstruct(fileBurstInstruct));System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端,接收客户端传输文件数据。" + JSON.toJSONString(fileBurstData));//传输完成删除断点信息if (fileBurstInstruct.getStatus() == Constants.FileStatus.COMPLETE) {CacheUtil.burstDataMap.remove(fileBurstData.getFileName());}break;default:break;}}/*** 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();System.out.println("异常信息:\r\n" + cause.getMessage());}}

主要看方法channelRead,分为如下几种情况:

0:根据是否是续传返回不同的消息,控制client上传的不同行为
2:如果是上传文件,则保存文件,完成当前文件内容的上传,并返回续传信息给client,client继续上传

2.2:client

client main:

package com.dahuyou.netty.transferfile.client;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient {//配置服务端NIO线程组private EventLoopGroup workerGroup = new NioEventLoopGroup();private Channel channel;public ChannelFuture connect(String inetHost, int inetPort) {ChannelFuture channelFuture = null;try {Bootstrap b = new Bootstrap();b.group(workerGroup);b.channel(NioSocketChannel.class);b.option(ChannelOption.AUTO_READ, true);b.handler(new MyChannelInitializer());channelFuture = b.connect(inetHost, inetPort).syncUninterruptibly();this.channel = channelFuture.channel();} catch (Exception e) {e.printStackTrace();} finally {if (null != channelFuture && channelFuture.isSuccess()) {System.out.println("netty client start done. {}");} else {System.out.println("netty client start error. {}");}}return channelFuture;}public void destroy() {if (null == channel) return;channel.close();workerGroup.shutdownGracefully();}}

MyChannelInitializer:

package com.dahuyou.netty.transferfile.client;import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import com.dahuyou.netty.transferfile.codec.ObjDecoder;
import com.dahuyou.netty.transferfile.codec.ObjEncoder;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {//对象传输处理channel.pipeline().addLast(new ObjDecoder(FileTransferProtocol.class));channel.pipeline().addLast(new ObjEncoder(FileTransferProtocol.class));// 在管道中添加我们自己的接收数据实现方法channel.pipeline().addLast(new MyClientHandler());}}

同样设置了protostuff的编解码器,以及消息处理类:

package com.dahuyou.netty.transferfile.client;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import com.dahuyou.netty.transferfile.domain.Constants;
import com.dahuyou.netty.transferfile.domain.FileBurstData;
import com.dahuyou.netty.transferfile.domain.FileBurstInstruct;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;
import com.dahuyou.netty.transferfile.util.FileUtil;
import com.dahuyou.netty.transferfile.util.MsgUtil;
import java.text.SimpleDateFormat;
import java.util.Date;public class MyClientHandler extends ChannelInboundHandlerAdapter {/*** 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {SocketChannel channel = (SocketChannel) ctx.channel();System.out.println("链接报告开始");/*System.out.println("链接报告信息:本客户端链接到服务端。channelId:" + channel.id());System.out.println("链接报告IP:" + channel.localAddress().getHostString());System.out.println("链接报告Port:" + channel.localAddress().getPort());System.out.println("链接报告完毕");*/}/*** 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("断开链接" + ctx.channel().localAddress().toString());super.channelInactive(ctx);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//数据格式验证if (!(msg instanceof FileTransferProtocol)) return;FileTransferProtocol fileTransferProtocol = (FileTransferProtocol) msg;//0传输文件'请求'、1文件传输'指令'、2文件传输'数据'switch (fileTransferProtocol.getTransferType()) {case 1:FileBurstInstruct fileBurstInstruct = (FileBurstInstruct) fileTransferProtocol.getTransferObj();//Constants.FileStatus {0开始、1中间、2结尾、3完成}if (Constants.FileStatus.COMPLETE == fileBurstInstruct.getStatus()) {ctx.flush();ctx.close();System.exit(-1);return;}FileBurstData fileBurstData = FileUtil.readFile(fileBurstInstruct.getClientFileUrl(), fileBurstInstruct.getReadPosition());ctx.writeAndFlush(MsgUtil.buildTransferData(fileBurstData));System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 客户端传输文件信息。 FILE:" + fileBurstData.getFileName() + " SIZE(byte):" + (fileBurstData.getEndPos() - fileBurstData.getBeginPos()));break;default:break;}/**模拟传输过程中断,场景测试可以注释掉*System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " [主动断开链接,模拟断点续传]");ctx.flush();ctx.close();System.exit(-1);*/}/*** 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();System.out.println("异常信息:\r\n" + cause.getMessage());}}

主要看方法channelRead,处理文件传输,根据是首次上传还是续传,从要上传的文件中获取字节码数据写到server,其中,体现续传的代码为FileUtil.readFile:

public class FileUtil {private static final int READ_BYTE_ONCE = 1024;public static FileBurstData readFile(String fileUrl, Integer readPosition) throws IOException {File file = new File(fileUrl);RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");//r: 只读模式 rw:读写模式// 这里体现了断点续传的续哦!!!randomAccessFile.seek(readPosition);}}

randomAccessFile.seek(readPosition);这里跳一下子就体现了断点续传的续哦!!!

2.3:测试

server启动类:

package com.dahuyou.netty.transferfile.test;import com.dahuyou.netty.transferfile.server.NettyServer;public class NettyServerTest {public static void main(String[] args) {System.out.println("hi netty server");//启动服务new NettyServer().bing(7397);}}

client启动类:

package com.dahuyou.netty.transferfile.test;import com.dahuyou.netty.transferfile.client.NettyClient;
import com.dahuyou.netty.transferfile.domain.FileTransferProtocol;
import com.dahuyou.netty.transferfile.util.MsgUtil;
import io.netty.channel.ChannelFuture;
import java.io.File;public class NettyClientTest {public static void main(String[] args) {//启动客户端ChannelFuture channelFuture = new NettyClient().connect("127.0.0.1", 7397);//文件信息{文件大于1024kb方便测试断点续传}
//        File file = new File("C:\\Users\\fuzhengwei1\\Desktop\\测试传输文件.rar");File file = new File("D:\\xiaofuge_sourcecode\\interview-master\\dahuyou-study-netty\\transferfile\\src\\test\\java\\com\\dahuyou\\netty\\transferfile\\test\\测试传输文件.rar");FileTransferProtocol fileTransferProtocol = MsgUtil.buildRequestTransferFile(file.getAbsolutePath(), file.getName(), file.length());//发送信息;FILE:测试传输文件请求传输文件channelFuture.channel().writeAndFlush(fileTransferProtocol);}}

在client中首次启动发送请求上传文件的协议消息,发起文件上传的流程,我们测试的文件大小为1360字节,而首次上传文件的大小为1024字节,如下代码:

public class FileUtil {private static final int READ_BYTE_ONCE = 1024;public static FileBurstData readFile(String fileUrl, Integer readPosition) throws IOException {File file = new File(fileUrl);RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");//r: 只读模式 rw:读写模式randomAccessFile.seek(readPosition);
//        byte[] bytes = new byte[1024 * 100];byte[] bytes = new byte[READ_BYTE_ONCE];    
}

所以第一次上传后文件是打不开的如下:
在这里插入图片描述
再次上传后文件就可以正常打开了。
最后看下日志输出:
在这里插入图片描述
在这里插入图片描述

写在后面

参考文章列表

protostuff序列化方式学习 。

netty编程之使用protostuff作为数据传输载体 。

这篇关于netty编程之实现断点续传(分片发送)功能的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1121928

相关文章

Python实现对阿里云OSS对象存储的操作详解

《Python实现对阿里云OSS对象存储的操作详解》这篇文章主要为大家详细介绍了Python实现对阿里云OSS对象存储的操作相关知识,包括连接,上传,下载,列举等功能,感兴趣的小伙伴可以了解下... 目录一、直接使用代码二、详细使用1. 环境准备2. 初始化配置3. bucket配置创建4. 文件上传到os

关于集合与数组转换实现方法

《关于集合与数组转换实现方法》:本文主要介绍关于集合与数组转换实现方法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、Arrays.asList()1.1、方法作用1.2、内部实现1.3、修改元素的影响1.4、注意事项2、list.toArray()2.1、方

使用Python实现可恢复式多线程下载器

《使用Python实现可恢复式多线程下载器》在数字时代,大文件下载已成为日常操作,本文将手把手教你用Python打造专业级下载器,实现断点续传,多线程加速,速度限制等功能,感兴趣的小伙伴可以了解下... 目录一、智能续传:从崩溃边缘抢救进度二、多线程加速:榨干网络带宽三、速度控制:做网络的好邻居四、终端交互

mysql表操作与查询功能详解

《mysql表操作与查询功能详解》本文系统讲解MySQL表操作与查询,涵盖创建、修改、复制表语法,基本查询结构及WHERE、GROUPBY等子句,本文结合实例代码给大家介绍的非常详细,感兴趣的朋友跟随... 目录01.表的操作1.1表操作概览1.2创建表1.3修改表1.4复制表02.基本查询操作2.1 SE

java实现docker镜像上传到harbor仓库的方式

《java实现docker镜像上传到harbor仓库的方式》:本文主要介绍java实现docker镜像上传到harbor仓库的方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 前 言2. 编写工具类2.1 引入依赖包2.2 使用当前服务器的docker环境推送镜像2.2

C++20管道运算符的实现示例

《C++20管道运算符的实现示例》本文简要介绍C++20管道运算符的使用与实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录标准库的管道运算符使用自己实现类似的管道运算符我们不打算介绍太多,因为它实际属于c++20最为重要的

Java easyExcel实现导入多sheet的Excel

《JavaeasyExcel实现导入多sheet的Excel》这篇文章主要为大家详细介绍了如何使用JavaeasyExcel实现导入多sheet的Excel,文中的示例代码讲解详细,感兴趣的小伙伴可... 目录1.官网2.Excel样式3.代码1.官网easyExcel官网2.Excel样式3.代码

Go语言数据库编程GORM 的基本使用详解

《Go语言数据库编程GORM的基本使用详解》GORM是Go语言流行的ORM框架,封装database/sql,支持自动迁移、关联、事务等,提供CRUD、条件查询、钩子函数、日志等功能,简化数据库操作... 目录一、安装与初始化1. 安装 GORM 及数据库驱动2. 建立数据库连接二、定义模型结构体三、自动迁

python实现对数据公钥加密与私钥解密

《python实现对数据公钥加密与私钥解密》这篇文章主要为大家详细介绍了如何使用python实现对数据公钥加密与私钥解密,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录公钥私钥的生成使用公钥加密使用私钥解密公钥私钥的生成这一部分,使用python生成公钥与私钥,然后保存在两个文

浏览器插件cursor实现自动注册、续杯的详细过程

《浏览器插件cursor实现自动注册、续杯的详细过程》Cursor简易注册助手脚本通过自动化邮箱填写和验证码获取流程,大大简化了Cursor的注册过程,它不仅提高了注册效率,还通过友好的用户界面和详细... 目录前言功能概述使用方法安装脚本使用流程邮箱输入页面验证码页面实战演示技术实现核心功能实现1. 随机