本文主要是介绍使用Python实现大文件切片上传及断点续传的方法,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《使用Python实现大文件切片上传及断点续传的方法》本文介绍了使用Python实现大文件切片上传及断点续传的方法,包括功能模块划分(获取上传文件接口状态、临时文件夹状态信息、切片上传、切片合并)、整...
概要
本文使用python实现大文件切片上传,并支持断点续传,将功能划分为:获取上传文件接口状态、获取临时文件夹状态信息、切片上传、切片合并四个功能模块。提供了详细的思路和功能源码。
整体架构流程
步骤示例: 1.前端收到用户的上传的文件后,会执行以下操作:
- 计算文件哈希值:通过特定算法,算出该文件的hash值。
- 向后端传参:将文件路径(包括文件名)、目标机器id、文件hash值、用户id,一并传递给后端判断文件状态接口get_file_upload_status。
后端收到前端传来的数据后,按如下逻辑进行处理: 检查文件是否已经存在:判断该文件是否存在,如果已存在,则返回前端让用户选择是否覆盖文件。 检查Redis中上传记录:如果不存在则进一步判断redis中是否有该文件正在上传的记录。
- 无上传记录:如果没有则使用redis String数据类型进行存储文件上传状态,key为"机器_id + 文件路径(例如118/tmp/123.jpg)",value为自制一个
{ 'hash_code': 文件hash值, 'person_id': 用户_id }
- 有上传记录:如果redis中存在正在上传的记录,则比对用户id是否为同一个人,是则可以断点续传,否则阻止其他人上传。
2.当文件状态通过后,前端会调用get_file_dir_info接口,并传入主机_id、用户_id、文件路径(包括文件名)。 get_file_dir_info接口主要有三个功能:
- 检查临时文件夹:判断用于存放切片的 “临时文件夹” 是否存在。临时文件夹的完整路径由 “文件路径 + : + 用户 id” 拼接而成,例如 “/tmp/123.jgp:12”。若该文件夹不存在,系统会自动创建它。创建完成后,对临时文件夹路径进行哈希计算,再借助 Redis 存储相关信息,以哈希值作为 key,临时文件夹的全路径作为 value,方便后续执行删除文件夹操作时快速定位。
- 检查切片上传记录临时文件:查看 “临时文件夹” 下的 “切片上传记录临时文件” 是否已经存在。如果文件存在,就读取其中的内容;要是不存在,就对其内容进行初始化。“临时记录文件”内容如下:
class FileObject(): def __init__(self,name,size,chunk,path=None,num=0): self.name = name #文件名 self.size = size #文件夹已上传大小 self.chunk = chunk #块数 self.patpythonh = path # 路径 self.num = num #文件夹下存在分块数,设计冗余了
- 放置合并脚本:在临时文件夹下放置 merge.py 脚本文件,为后续的文件处理流程提供支持。
最后遍历整个临时文件夹,获得一个文件已上传块数的列表(为了支持断点续传),返回给前端临时记录文件信息、临时文件夹的hash值、已上传的文件块列表。
3.前端将文件按照10M进行切片,并按照上述接口得到的信息(文件名、已上传的文件块列表),进行续传或直接传输file_chunk_upload,通过列表中的信息可跳过已上传的文件块,并可补上中间因为其他原因丢失的块。 4.上传完成后前端会再次调用get_file_dir_info接口,获得当前文件夹下已存在的文件块数量与列表,与自己分块进行对比。如果成功则调用合并文件接口。 5.前端发起合并请求时,会调用 bigfile_merge 接口,同时传入用户 id、文件块数、文件路径以及临时文件夹路径这些关键参数。 后端在接收到前端传来的数据后,展开一系列校验与操作:
- 参数校验:对传入的路径与文件块数进行准确性核查,如果正确则开始合并。
- 合并方式选择:合并功能主要分两种执行路径。一是检测目标机器是否支持运行 merge.py 脚本,若支持,便直接执行已传入的脚本完成合并;若不支持,则执行目标机器文件的操作方法来执行合并。
- 断点续合判断:开始合并前,要先确认需要合并的文件是否已经存在。若文件已存在,意味着这是一个断点文件,此时需读取临时记录文件中的 size 和 chunk 值,从而判断文件的实际大小以及已合并的块位置。一旦发现实际文件大小超过记录文件中的数值,就表明合并过程曾出现中断,此时要把文件指针调整到记录值对应的位置,接着从该记录值对应的块开启后续合并。
- 记录更新 :每成功完成一块文件的合并,就把新的 size 与 chunk 值写入临时记录文件。
- 收尾清理:合并完成后清除临时文件夹,并删除redis中该文件上传记录状态信息。
到此完成一次文件上传操作
技术细节
- API
获取上传文件状态接口
get_file_upload_status 查看需要上传的文件是否存在或断点续
class FileUploadStatus(APIView): def post(self, request, *args, **kwargs): #先判断目标地址下是否存在该文件 #如果存在则提示是否需要覆盖 try: forms = json.loads(request.body) host_id = forms['host_id'] full_path = forms['full_path'] hash_code = forms['hash_code'] person_id = forms['person_id'] except Exception as e: logger.error("前端参数解析出错",repr(e)) try: err, tmp_sftp = RemoteBigFileUploadHandler.get_handle(host_id) #使用paramiko模块获取目标机器连接,这里读者自行用其他方式都可以 if err: return Response({'code': -1, 'data': host_id, 'msg': "连接目标主机出错"}, status=status.HTTP_400_BAD_REQUEST) except Exception as e: logger.error("连接目标机器失败",repr(e)) try: #判断机器上文件是否存在 tmp_sftp._sftp.stat(full_path) return Response({'code': -1, 'data': full_path, 'msg': "文件已存在,通知是否覆盖"}, status=status.HTTP_400_BAD_REQUEST) except IOError: #其次判断是否存在在redis中,防止短时间内有相同的人传同一份文件,但前一个人并没有合并完成 #如果都没有则创建新key hashcode放入redis,并通知前端可以上传 try: r = get_redis() #获得redis连接实例,读者自行实现 except Exception as e: logger.error("redis获取失败",repr(e)) #例如:18115/app/home/202.jpg #获取redis中记录 data = r.get(str(host_id)+full_path) if not data: #将key和hashcode放入redis #加上person_id file_dict = {} file_dict['hash_code'] = hash_code file_dict['person_id'] = person_id r.set(str(host_id)+full_path, str(file_dict)) return Response({'code': 1, 'data': full_path, 'msg': "文件不存在,可以上传"}, status=status.HTTP_200_OK) else: #如果redis中有记录,host_id+路径能够拦截在目标机器传输同一份文件,但也会把同一个人想断点传输拦截 #所以在此key的value中保存带有person_id的临时文件夹路径信息 #使用person_id将同一个人放行 retrieved_data = r.get(str(host_id)+full_path) # 将获取到的数据转换为字典对象 try: retrieved_dict = eval(retrieved_data.decode()) except Exception as e: logger.error("redis获取目标文件hashcode解码失败",repr(e)) if person_id ==retrieved_dict['person_id']: return Response({'code': 1, 'data': full_path, 'msg': "断点续传判断通过"}, status=status.HTTP_200_OK) else: return Response({'code': 2, 'data': full_path, 'msg': "该文件正在上传,请勿多次操作"}, status=status.HTTP_200_OK)
获取临时文件夹状态信息接口
get_file_dir_info 获取临时文件夹状态信息接口
class GetFileDirInfo(APIView): depythonf post(self, request, *args, **kwargs): forms = json.loads(request.body) host_id = forms['host_id'] person_id = forms['person_id'] full_path = forms['full_path'] err, tmp_sftp = RemoteBigFileUploadHandler.get_handle(host_id)#使用paramiko模块获取目标机器连接,这里读者自行用其他方式都可以 if err: return Response({'code': -1, 'data': host_id, 'msg': "连接目标主机出错"}, status=status.HTTP_400_BAD_REQUEST) #分离路径名与文件名 #示例:/home/app 2-2.jpg file_path,file_name = os.path.split(full_path) if file_path[-1] != "/": file_path += "/" #分离文件名与文件扩展名 #示例:2-2 .jpg short_name,ephpxt = os.path.splitext(file_name) #构造存放临时文件的文件夹名 #示例:/home/app/2-2.jpg:12/ tmp_dir = full_path +":"+ str(person_id) #并将临时文件夹做hashcode 传入redis key为hashcode value为路径,后续删除文件夹操作多一层验证 tmp = hashlib.md5() tmp.update(tmp_dir.encode('utf-8')) hash_value = tmp.hexdigest() if tmp_dir[-1] != "/": tmp_dir += "/" try: #判断临时文件夹是否存在 tmp_sftp._sftp.stat(tmp_dir) except Exception: try: print('创建临时文件夹') tmp_sftp._sftp.mkdir(tmp_dir) r = get_redis() #获取redis连接 r.set(hash_value,tmp_dir) except Exception as e: logger.error("创建临时文件夹失败",e) tmp_sftp._sftp.close() return Response({'code': -1, 'data': "", 'msg': "创建临时文件夹失败"}, status=status.HTTP_400_BAD_REQUEST) #如果临时文件夹存在则在此文件夹下获取文件传输txt #初始化记录文件对象 fobject = FileObject(file_name,0,0,tmp_dir,0) #示例:/home/app/2-2.jpg:12/2-2.txt record_file = tmp_dir+short_name+".txt" try: #判断记录文件是否存在 tmp_sftp._sftp.stat(record_file) tmp_load_file = tmp_sftp._sftp.open(record_file,'r') #如果记录文件夹存在则读取其已保存上传文件大小和块数 for line in tmp_load_file.readlines(): title,value = line.replace('\n','').split(':') print(title + ":"+ value) if title=='name': fobject.name = value elif title=='size': fobject.size = int(value) elif title =='chunk': fobject.chunk = int(value) tmp_load_file.close() except IOError: print("记录文件不存在,创建新的记录文件,并初始化") store_f=tmp_sftp._sftp.open(record_file,'w+') store_f.writelines(['name:'+fobject.name+'\n','size:'+str(fobject.size)+'\n', 'chunk:'+str(fobject.chunk)+'\n']) print("创建记录文件成功") store_f.close() serializer = RecordFileSerializer(fobject) data = serializer.data data['dir_hashcode'] = hash_value #上传一个支持异步合并的merge.py文件 command = 'command -v python' stdin, stdout, stderr = tmp_sftp._ssh.exec_command(command) output = stdout.read().decode().strip() try: print('判断merge在不在') tmp_sftp._sftp.stat(tmp_dir + 'merge.py') except Exception as e: if output: print('目标机器已安装 Python') current_path = os.path.dirname(os.path.abspath(__file__)) if current_path[-1] != "/": current_path += "/" try: print('需要传输的地址',current_path+'merge.py') print('目标地址',tmp_dir + 'merge.py') js tmp_sftp._sftp.put(current_path+'merge.py',tmp_dir + 'merge.py') except Exception as e: logger.error("发送本地合并文件失败",e) tmp_sftp._sftp.close() return Response({'code': -1, 'data': "", 'msg': "发送本地合并文件失败"}, status=status.HTTP_400_BAD_REQUEST) #将merge文件发送 else: tmp_sftp._sftp.close() return Response({'code': -1, 'data': "", 'msg': "目标机器未安装python"}, status=status.HTTP_400_BAD_REQUEST) tmp_sftp._sftp.close() return Response({'code': 0, 'data': data, 'msg': "创建新记录文件"}, status=status.HTTP_200_OK) file_list = tmp_sftp._sftp.listdir(tmp_dir) #文件夹下有一个txt记录文件,总文件块数需要减一 fobject.num = len(file_list)-2 chunk_list =[] for file in file_list: parts = file.rsplit(":", 1) # 按冒号从右到左分割为最多两个部分 if len(parts) == 2: last_part = parts[1] # 获取最后一个部分 try: chunk_list.append(int(last_part)) except Exception as e: continue print("最后一个值:", last_part) else: print("未找到冒号,跳过") serializer = RecordFileSerializer(fobject) data = serializer.data data['dir_hashcode'] = hash_value data['chunk_list'] = chunk_list return Response({'code': 0, 'data': data, 'msg': "记录文件已存在"}, status=status.HTTP_200_OK)
切片上传功能
file_chunk_upload 切片上传功能
class FileChunkUploadHandler(): www.chinasem.cn def __init__(self, redis_host,request=None,file_info=None,chunk_num=0,people_id =0): self.request = request self.file_info = request.FILES.get('file_info') # forms = json.loads(request.body) #数据块信息 #第几个数据块 self.chunk_num = request.data.get('chunk_num') self.full_path = request.data.get('full_path') self.person_id = request.data.get('person_id') self.redis_host = redis_host def upload_file(self): print("%s号文件块开始传输",self.chunk_num) full_path = urllib.parse.unquote(self.full_path) file_path,file_name = os.path.split(full_path) host_id = self.request.query_params.get('host_id', None) err, tmp_sftp = RemoteBigFileUploadHandler.get_handle(host_id) #分离扩展名和文件名 #2-2 .jpg short_name,ext = os.path.splitext(file_name) if file_path[-1] != "/": file_path += "/" #存放临时文件的文件夹,在上一个获取临时文件夹状态接口就已创建,按照相同格式做一次校验。 #/home/app/2-2.jpg:12 tmp_dir = file_path+file_name+":"+ str(self.person_id) if tmp_dir[-1] != "/": tmp_dir += "/" #临时文件名 #/home/app/2-2.jpg:12/12-2-2:1 tmp_file_name = tmp_dir + str(self.person_id) + "-" + short_name + ":" + str(self.chunk_num) try: tmp_sftp._sftp.stat(tmp_dir) except IOError as e: logger.error("存储临时文件夹不存在",e) return 0,repr(e) try: tmp_sftp._sftp.stat(tmp_file_name) except IOError: try: #文件块上传存放 print('创建临时文件块',tmp_file_name) remote_file = tmp_sftp._sftp.open(tmp_file_name, mode="wb") my_bytes = self.file_info.read() remote_file.write(my_bytes) remote_file.close() except Exception as e: logger.error("上传文件块失败",e) return 0,repr(e) print("写入文件完成:",tmp_file_name) record_file = tmp_dir+short_name+".txt" #更新上传记录文件信息 fobject = FileObject(file_name,0,0) tmp_load_file = tmp_sftp._sftp.open(record_file,'r') for line in tmp_load_file.readlines(): title,value = line.replace('\n','').split(':') print(title + ":"+ value) if title=='name': fobject.name = value elif title=='size': fobject.size = int(value) elif title =='chunk': fobject.chunk = int(value) tmp_load_file.close() try: tmp_sftp._sftp.stat(record_file) load_file = tmp_sftp._sftp.open(record_file,'w+') load_file.writelines(['name:'+fobject.name+'\n','size:'+str(fobject.size)+'\n', 'chunk:'+str(0)+'\n']) except Exception as e: logger.error(e) tmp_sftp.close() return 1,None
文件合并功能
bigfile_merge 文件合并与校验功能
class RemoteBigFileUploadHandler(): def __init__(self, redis_host,request=None, chat=False,tmp_path=None,chunk_num=0,person_id=0): self.request = request forms = json.loads(request.body) self.chat = chat #数据块大小 # self.chunk_size = forms['chunk_size'] self.tmp_path = forms['tmp_path'] #数据块数量 self.chunk_num = forms['chunk_num'] self.full_path = forms['full_path'] self.person_id = forms['person_id'] self.redis_host = redis_host if self.chat: self.current = 0 self.datetime = datetime.datetime.now() #数据进度聊天室 self.channel_layer = get_channel_layer() self.room = self.request.query_params.get('chat_room', None) self.total = self.request.query_params.get('total', None) if not self.room or not self.total: raise KeyError('params: chat_room and total is needed') def file_isavailable(self): tmp_path = self.tmp_path tmp_sftp = self.get_session() file_list = tmp_sftp._sftp.listdir(tmp_path) #文件夹下有一个txt记录文件,总文件块数需要减一 true_num = len(file_list)-2 if true_num!=self.chunk_num: tmp_sftp._sftp.close() return False else: tmp_sftp._sftp.close() return True def merge_file(self): #----------------------获得基础信息----------------------------------- host_id = self.request.query_params.get('host_id', None) full_path = urllib.parse.unquote(self.full_path) file_path,file_name = os.path.split(full_path) if file_path[-1] != "/": file_path += "/" #分离扩展名和文件名 short_name,ext = os.path.splitext(file_name) tmp_path = self.tmp_path tmp_sftp = self.get_session() file_list = tmp_sftp._sftp.listdir(tmp_path) try: tmp_sftp._sftp.stat(full_path) print("文件已存在") except IOError: print("文件不存在") print("创建新的文件") new_f = tmp_sftp._sftp.open(full_path,mode='a') new_f.write("") new_f.close() #判断merge.py 文件是否存在 if 'merge.py' in file_list: com_path = tmp_path + 'merge.py' command = f"nohup python {com_path} {self.person_id} {self.chunk_num} &" stdin, stdout, stderr = tmp_sftp._ssh.exec_command(command) merging = True while merging: if self.chat: # websocket回显进度,获得当前文件字节数 self.current = tmp_sftp._sftp.stat(full_path).st_size print('current',self.current) print('total',self.total) self.call_back() if self.current >= int(self.total): merging = False time.sleep(1) else : merging = False else: #创建或打开用户的存储文件 # ----------------开始合并操作------------------------------- # rb+ 追加覆盖的方式打开用户需要存储文件,指针初始位置为文件开头 remote_file = tmp_sftp._sftp.open(full_path,mode='rb+') #默认当前块数从1开始 i=1 #创建文件记录对象 fobject = FileObject(file_name,0,0) print('创建记录对象成功') # 断点续传记录文件 record_file = tmp_path+short_name+".txt" try : #如果记录文件存在,则存储文件大概率也存在 #如果两个文件打开出错 则走excpt重新创建 tmp_sftp._sftp.stat(record_file) upload_file = tmp_sftp._sftp.stat(full_path) #这里也是获取本地文件的大小 temp_size = upload_file.st_size print("记录文件已存在") print("当前文件真实数据大小:",temp_size) # 读取文件,获得记录的已上传块数,以及大小总块数 tmp_load_file = tmp_sftp._sftp.open(record_file,'r') for line in tmp_load_file.readlines(): title,value = line.replace('\n','').split(':') print(title + ":"+ value) if title=='name': fobject.name = value elif title=='size': fobject.size = int(value) elif title =='chunk': fobject.chunk = int(value) tmp_load_file.close() except IOError: print("记录文件不存在,创建新的记录文件,并初始化") temp_size = 0 store_f=tmp_sftp._sftp.open(record_file,'w+') store_f.writelines(['name:'+fobject.name+'\n','size:'+str(fobject.size)+'\n', 'chunk:'+str(fobject.chunk)+'\n']) print("创建记录文件成功") store_f.close() # ----------------------------------开始保存数据-------------------------- print("真实文件数据大小",temp_size) print("记录文件数据大小",fobject.size) if temp_size>=fobject.size: #如果实际文件大于记录文件,则表示在合并时中断过 print("调整指针") remote_file.seek(fobject.size) i = fobject.chunk+1 tmp_file_name = str(self.person_id) + "-" + short_name + ":" try: while i<=self.chunk_num: if tmp_file_name + str(i) in file_list: print('true') #临时文件夹/临时文件名 file_path = tmp_path+tmp_file_name + str(i) with tmp_sftp._sftp.file(file_path,'rb') as input_file: print(file_path) file_content = input_file.read() remote_file.write(file_content) fobject.size += len(file_content) i = i+1 fobject.chunk += 1 if self.chat: # websocket回显进度,获得当前文件字节数 self.current = tmp_sftp._sftp.stat(full_path).st_size print('current',self.current) print('total',self.total) self.call_back() #将已经写入的块和大小记录 load_file = tmp_sftp._sftp.open(record_file,'w+') load_file.writelines(['name:'+fobject.name+'\n','size:'+str(fobject.size)+'\n', 'chunk:'+str(fobject.chunk)+'\n']) load_file.close() except Exception as e: logger.error("合并文件异常",e) #----------------清除临时文件与关闭资源操作---------- #删除临时文件夹下的所有文件 remote_file.close() try: for name in file_list: del_path = tmp_path + name tmp_sftp._sftp.remove(del_path) # 删除空临时文件夹 tmp_sftp._sftp.rmdir(tmp_path) except Exception as e: logger.error("删除文件文件异常",e) #关闭操作文件资源 tmp_sftp._sftp.close() #删除redis中该文件临时记录,一个是文件夹hashcode,用于验证与删除目标文件夹 #一个是host_id+路径,用于标记该路径下这个文件正在传输 r = self.redis_host data = r.get(str(host_id)+full_path) if data: r.delete(str(host_id)+full_path) tmp = hashlib.md5() tmp.update(tmp_path.encode('utf-8')) hash_value = tmp.hexdigest() r.delete(hash_value) print("文件合并结束") return 1,None
小结
本文提供了一个python大文件上传功能实现的思路,希望能够你带来一些思路和启发。
以上就是使用Python实现大文件切片上传及断点续传的方法的详细内容,更多关于Python大文件切片上传及断点续传的资料请关注China编程(www.chinasem.cn)其它相关文章!
这篇关于使用Python实现大文件切片上传及断点续传的方法的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!