本文主要是介绍SpringBoot+RustFS 实现文件切片极速上传的实例代码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《SpringBoot+RustFS实现文件切片极速上传的实例代码》本文介绍利用SpringBoot和RustFS构建高性能文件切片上传系统,实现大文件秒传、断点续传和分片上传等功能,具有一定的参考...
本文将手把手教你如何通过 SpringBoot 和 RustFS 构建高性能文件切片上传系统,解决大文件传输的痛点,实现秒传、断点续传和分片上传等高级功能。
一、为什么选择 RustFS + SpringBoot?
在传统文件上传方案中,大文件传输面临诸多挑战:网络传输不稳定、服务器内存溢出、上传失败需重新传输等。而 RustFS 作为一款基于 Rust 语言开发的高性能分布式对象存储系统,具有以下突出优势:
- 高性能:充分利用 Rust 的内存安全和高并发特性,响应速度极快
- 分布式架构:可扩展且具备容错能力,适用于海量数据存储
- AWS S3 兼容性:支持标准 S3 API,可与现有生态无缝集成
- 可视化管理:内置功能丰富的 Web 控制台,管理更方便
- 开源友好:采用 Apache 2.0 协议,鼓励社区贡献
结合 SpringBoot 的快速开发特性,我们可以轻松构建企业级文件上传解决方案。
二、环境准备与部署
2.1 安装 RustFS
使用 docker 快速部署 RustFS:
# docker-compose.yml version: '3.8' services: rustfs: image: registry.cn-shanghai.aliyuncs.com/study-03/rustfs:latest container_name: rustfs ports: - "9000:9000" # 管理控制台 - "9090:9090" # API服务端口 volumes: - ./data:/data environment: - RUSTFS_ROOT_USER=admin - RUSTFS_ROOT_PASSWORD=admin123 restart: unless-stopped
运行 docker-compose up -d
即可启动服务。访问 http://localhost:9000
使用 admin/admin123 登录管理控制台。
2.2 SpringBoowww.chinasem.cnt 项目配置
在 pom.XML
中添加必要依赖:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>io.minio</groupId> <artifactId>minio</artifactId> <version>8.5.2</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.11.0</version> </dependency> </dependencies>
配置 application.yml:
rustfs: endpoint: http://localhost:9090 Access-key: admin secret-key: admin123 bucket-name: mybucket
三、核心代码实现
3.1 配置 RustFS 客户端
@Configuration @ConfigurationProperties(prefix = "rustfs") public class RustFSConfig { private String endpoint; private String accessKey; private String secretKey; private String bucketName; @Bean public MinioClient rustFSClient() { return MinioClient.builder() .endpoint(endpoint) .credentials(accessKey, secretKey) .build(); } }
3.2 文件切片上传服务
@Service @Slf4j public class FileUploadService { @Autowired private MinioClient rustFSClient; @Value("${rustfs.bucket-name}") private String bucketName; /** * 初始化分片上传 */ public String initUpload(String fileName, String fileMd5) { String uploadId = UUID.randomUUID().toString(); // 检查文件是否已存在(秒传实现) if (checkFileExists(fileMd5)) { throw new RuntimeException("文件已存在,可直接秒传"); } // 存储上传记录到Redis或数据库 redisTemplate.opsForValue().set("upload:" + fileMd5, uploadId); return uploadId; } /** * 上传文件分片 */ public void uploadChunk(MultipartFile chunk, String fileMd5, int chunkIndex, int totalChunks) { try { // 生成分片唯一名称 String chunkName = fileMd5 + "_chunk_" + chunkIndex; // 上传分片到RustFS rustFSClient.putObject( PutObjectArgs.builder() .bucket(bucketName) .object(chunkName) .stream(chunk.getInputStream(), chunk.getSize(), -1) .build() ); // 记录已上传分片 redisTemplate.opsForSet().add("chunks:" + fileMd5, chunkIndex); } catch (Exception e) { log.error("分片上传失败", e); throw new RuntimeException("分片上传失败"); } } /** * 合并文件分片 */ public void mergeChunks(String fileMd5, String fileName, int totalChunks) { try { // 创建临时文件 Path tempFile = Files.createTempFile("merge_", ".tmp"); try (FileOutputStream fos = new FileOutputStream(tempFile.toFile())) { // 按顺序下载并合并所有分片 for (int i = 0; i < totalChunks; i++) { String chunkName = fileMd5 + "_chunk_" + i; try (InputStream is = rustFSClient.getObject( GetObjectArgs.builder() .bucket(bucketName) .object(chunkName) .build() )) { IOUtils.copy(is, fos); } // 删除已合并的分片 rustFSClient.removeObject( RemoveObjectArgs.builder() .bucket(bucketName) .object(chunkName) .build() ); } } // 上传最终文件 rustFSClient.uploadObject( UploadObjectArgs.builder() .bucket(bucketName) .object(fileName) .filename(tempFile.toString()) .build() ); // 清理临时文件 Files.deleteIfExists(tempFile); // 更新文件记录 saveFileRecord(fileMd5, fileName); } catch (Exception e) { log.error("分片合并失败", e); throw new RuntimeException("分片合并失败"); } } /** * 检查文件是否存在(秒传功能) */ private boolean checkFileExists(String fileMd5) { // 查询数据库或Redis检查文件是否已存在 return redisTemplate.hasKey("file:" + fileMd5); } }
3.3 控制器实现
@RestController @RequestMappi编程ng("/api/upload") @Slf4j public class FileUploadController { @Autowired private FileUploadService fileUploadService; /** * 初始化上传 */ @PostMapping("/init") public ResponseEntity<?> initUpload(@RequestParam String fileName, @RequestParam String fileMd5) { try { String uploadId = fileUploadService.initUpload(fileName, fileMd5); return ResponseEntity.ok(Map.of("uploadId", uploadId)); } catch (RuntimeException e) { return ResponseEntity.ok(Map.of("exists", true)); // 文www.chinasem.cn件已存在 } } /** * 上传分片 */ @PostMapping("/chunk") public ResponseEntity<String> uploadChunk(@RequestParam MultipartFile chunk, @RequestParam String fileMd5, @RequestParam int chunkIndex, @RequestParam int totalChunks) { fileUploadService.uploadChunk(chunk, fileMd5, chunkIndex, totalChunks); return ResponseEntity.ok("分片上传成功"); } /** * 合并分片 */ @PostMapping("/merge") public ResponseEntity<String> mergeChunks(@RequestParam String fileMd5, @RequestParam String fileName, @RequestParam int totalChunks) { fileUploadService.mergeChunks(fileMd5, fileName, totalChunks); return ResponseEntity.ok("文件合并成功China编程"); } /** * 获取已上传分片列表(断点续传) */ @GetMapping("/chunks/{fileMd5}") public ResponseEntity<List<Integer>> getUploadedChunks(@PathVariable String fileMd5) { Set<Object> uploaded = redisTemplate.opsForSet().members("chunks:" + fileMd5); List<Integer> chunks = uploaded.stream() .map(obj -> Integer.parseInt(obj.toString())) .collect(Collectors.toList()); return ResponseEntity.ok(chunks); } }
四、前端实现关键代码
4.1 文件切片处理
class FileUploader { constructor() { this.chunkSize = 5 * 1024 * 1024; // 5MB分片大小 this.concurrentLimit = 3; // 并发上传数 } // 计算文件MD5(秒传功能) async calculateFileMD5(file) { return new Promise((resolve) => { const reader = new FileReader(); const spark = new SparkMD5.ArrayBuffer(); reader.onload = e => { spark.append(e.target.result); resolve(spark.end()); }; reader.readAsArrayBuffer(file); }); } // 切片上传 async uploadFile(file) { // 计算文件MD5 const fileMd5 = await this.calculateFileMD5(file); // 初始化上传 const initResponse = await fetch('/api/upload/init', { method: 'POST', body: jsON.stringify({ fileName: file.name, fileMd5: fileMd5 }), headers: { 'Content-Type': 'application/json' } }); const initResult = await initResponse.json(); // 如果文件已存在,直接返回 if (initResult.exists) { alert('文件已存在,秒传成功!'); return; } // 获取已上传分片(断点续传) const uploadedChunks = await this.getUploadedChunks(fileMd5); // 计算分片信息 const totalChunks = Math.ceil(file.size / this.chunkSize); const uploadPromises = []; for (let i = 0; i < totalChunks; i++) { // 跳过已上传的分片 if (uploadedChunks.includes(i)) { continue; } const start = i * this.chunkSize; const end = Math.min(file.size, start + this.chunkSize); const chunk = file.slice(start, end); // 控制并发数 if (uploadPromises.length >= this.concurrentLimit) { await Promise.race(uploadPromises); } const uploadPromise = this.uploadChunk(chunk, fileMd5, i, totalChunks) .finally(() => { const index = uploadPromises.indexOf(uploadPromise); if (index > -1) { uploadPromises.splice(index, 1); } }); uploadPromises.push(uploadPromise); yAfVZ } // 等待所有分片上传完成 await Promise.all(uploadPromises); // 合并分片 await this.mergeChunks(fileMd5, file.name, totalChunks); } // 上传单个分片 async uploadChunk(chunk, fileMd5, chunkIndex, totalChunks) { const formData = new FormData(); formData.append('chunk', chunk); formData.append('fileMd5', fileMd5); formData.append('chunkIndex', chunkIndex); formData.append('totalChunks', totalChunks); const response = await fetch('/api/upload/chunk', { method: 'POST', body: formData }); if (!response.ok) { throw new Error(`分片上传失败: ${response.statusText}`); } } }
五、高级功能与优化
5.1 断点续传实现
通过记录已上传的分片信息,实现上传中断后从中断处继续上传:
@Service public class UploadProgressService { @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 获取已上传分片列表 */ public List<Integer> getUploadedChunks(String fileMd5) { Set<Object> uploaded = redisTemplate.opsForSet().members("chunks:" + fileMd5); return uploaded.stream() .map(obj -> Integer.parseInt(obj.toString())) .collect(Collectors.toList()); } /** * 清理上传记录 */ public void clearUploadRecord(String fileMd5) { redisTemplate.delete("chunks:" + fileMd5); redisTemplate.delete("upload:" + fileMd5); } }
5.2 分片验证与安全
确保分片传输的完整性和安全性:
/** * 分片验证服务 */ @Service public class ChunkValidationService { /** * 验证分片哈希 */ public boolean validateChunkHash(MultipartFile chunk, String expectedHash) { try { String actualHash = HMACUtils.hmacSha256Hex("secret-key", chunk.getBytes()); return actualHash.equals(expectedHash); } catch (IOException e) { return false; } } /** * 验证分片顺序 */ public boolean validateChunkOrder(String fileMd5, int chunkIndex) { // 获取已上传分片 Set<Object> uploaded = redisTemplate.opsForSet().members("chunks:" + fileMd5); List<Integer> chunks = uploaded.stream() .map(obj -> Integer.parseInt(obj.toString())) .sorted() .collect(Collectors.toList()); // 检查分片是否按顺序上传 return chunks.isEmpty() || chunkIndex == chunks.size(); } }
六、部署与性能优化
6.1 系统级优化建议
分片大小选择
- 内网环境:10MB-20MB
- 移动网络:1MB-5MB
- 广域网:500KB-1MB
并发控制
# 应用配置 spring: servlet: multipart: max-file-size: 10MB max-request-size: 100MB
定时清理策略
@Scheduled(fixedRate = 24 * 60 * 60 * 1000) // 每日清理 public void cleanTempFiles() { // 删除超过24小时的临时分片 redisTemplate.keys("chunks:*").forEach(key -> { if (redisTemplate.getExpire(key) < 0) { redisTemplate.delete(key); } }); }
6.2 监控与告警
集成 Prometheus 监控上传性能:
# 监控指标配置 management: endpoints: web: exposure: include: health,metrics,prometheus metrics: tags: application: ${spring.application.name}
七、总结
通过 SpringBoot 和 RustFS 的组合,我们实现了一个高性能的文件切片上传系统,具备以下优势:
- 高性能:利用 RustFS 的高并发特性和分片并行上传,大幅提升传输速度
- 可靠性:断点续传机制确保上传中断后从中断处继续,避免重复劳动
- 智能优化:秒传功能避免重复文件上传,节省带宽和存储空间
- 易于扩展:分布式架构支持水平扩展,适应不同规模的应用场景
这种方案特别适用于:
- 视频平台的大文件上传
- 企业级文档管理系统
- 云存储和备份服务
- AI 训练数据集上传
到此这篇关于SpringBoot+RustFS 实现文件切片极速上传的实例代码的文章就介绍到这了,更多相关SpringBoot RustFS 文件切片上传内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程China编程(www.chinasem.cn)!
这篇关于SpringBoot+RustFS 实现文件切片极速上传的实例代码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!