SpringBoot+RustFS 实现文件切片极速上传的实例代码

本文主要是介绍SpringBoot+RustFS 实现文件切片极速上传的实例代码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《SpringBoot+RustFS实现文件切片极速上传的实例代码》本文介绍利用SpringBoot和RustFS构建高性能文件切片上传系统,实现大文件秒传、断点续传和分片上传等功能,具有一定的参考...

本文将手把手教你如何通过 SpringBoot 和 RustFS 构建高性能文件切片上传系统,解决大文件传输的痛点,实现秒传、断点续传和分片上传等高级功能。

一、为什么选择 RustFS + SpringBoot?

在传统文件上传方案中,大文件传输面临诸多挑战:网络传输不稳定、服务器内存溢出、上传失败需重新传输等。而 ​RustFS​ 作为一款基于 Rust 语言开发的高性能分布式对象存储系统,具有以下突出优势:

  • 高性能​:充分利用 Rust 的内存安全和高并发特性,响应速度极快
  • 分布式架构​:可扩展且具备容错能力,适用于海量数据存储
  • AWS S3 兼容性​:支持标准 S3 API,可与现有生态无缝集成
  • 可视化管理​:内置功能丰富的 Web 控制台,管理更方便
  • 开源友好​:采用 Apache 2.0 协议,鼓励社区贡献

结合 SpringBoot 的快速开发特性,我们可以轻松构建企业级文件上传解决方案。

二、环境准备与部署

2.1 安装 RustFS

使用 docker 快速部署 RustFS:

# docker-compose.yml
version: '3.8'
services:
  rustfs:
    image: registry.cn-shanghai.aliyuncs.com/study-03/rustfs:latest
    container_name: rustfs
    ports:
      - "9000:9000"  # 管理控制台
      - "9090:9090"  # API服务端口
    volumes:
      - ./data:/data
    environment:
      - RUSTFS_ROOT_USER=admin
      - RUSTFS_ROOT_PASSWORD=admin123
    restart: unless-stopped

运行 docker-compose up -d即可启动服务。访问 http://localhost:9000使用 admin/admin123 登录管理控制台。

2.2 SpringBoowww.chinasem.cnt 项目配置

pom.XML中添加必要依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>io.minio</groupId>
        <artifactId>minio</artifactId>
        <version>8.5.2</version>
    </dependency>
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.11.0</version>
    </dependency>
</dependencies>

配置 application.yml:

rustfs:
  endpoint: http://localhost:9090
  Access-key: admin
  secret-key: admin123
  bucket-name: mybucket

三、核心代码实现

3.1 配置 RustFS 客户端

@Configuration
@ConfigurationProperties(prefix = "rustfs")
public class RustFSConfig {
    private String endpoint;
    private String accessKey;
    private String secretKey;
    private String bucketName;

    @Bean
    public MinioClient rustFSClient() {
        return MinioClient.builder()
                .endpoint(endpoint)
                .credentials(accessKey, secretKey)
                .build();
    }
}

3.2 文件切片上传服务

@Service
@Slf4j
public class FileUploadService {
    @Autowired
    private MinioClient rustFSClient;
    
    @Value("${rustfs.bucket-name}")
    private String bucketName;
    
    /**
     * 初始化分片上传
     */
    public String initUpload(String fileName, String fileMd5) {
        String uploadId = UUID.randomUUID().toString();
        // 检查文件是否已存在(秒传实现)
        if (checkFileExists(fileMd5)) {
            throw new RuntimeException("文件已存在,可直接秒传");
        }
        // 存储上传记录到Redis数据库
        redisTemplate.opsForValue().set("upload:" + fileMd5, uploadId);
        return uploadId;
    }
    
    /**
     * 上传文件分片
     */
    public void uploadChunk(MultipartFile chunk, String fileMd5, 
                           int chunkIndex, int totalChunks) {
        try {
            // 生成分片唯一名称
            String chunkName = fileMd5 + "_chunk_" + chunkIndex;
            
            // 上传分片到RustFS
            rustFSClient.putObject(
                PutObjectArgs.builder()
                    .bucket(bucketName)
                    .object(chunkName)
                    .stream(chunk.getInputStream(), chunk.getSize(), -1)
                    .build()
            );
            
            // 记录已上传分片
            redisTemplate.opsForSet().add("chunks:" + fileMd5, chunkIndex);
            
        } catch (Exception e) {
            log.error("分片上传失败", e);
            throw new RuntimeException("分片上传失败");
        }
    }
    
    /**
     * 合并文件分片
     */
    public void mergeChunks(String fileMd5, String fileName, int totalChunks) {
        try {
            // 创建临时文件
            Path tempFile = Files.createTempFile("merge_", ".tmp");
            
            try (FileOutputStream fos = new FileOutputStream(tempFile.toFile())) {
                // 按顺序下载并合并所有分片
                for (int i = 0; i < totalChunks; i++) {
                    String chunkName = fileMd5 + "_chunk_" + i;
                    
                    try (InputStream is = rustFSClient.getObject(
                        GetObjectArgs.builder()
                            .bucket(bucketName)
                            .object(chunkName)
                            .build()
                    )) {
                        IOUtils.copy(is, fos);
                    }
                    
                    // 删除已合并的分片
                    rustFSClient.removeObject(
                        RemoveObjectArgs.builder()
                            .bucket(bucketName)
                            .object(chunkName)
                            .build()
                    );
                }
            }
            
            // 上传最终文件
            rustFSClient.uploadObject(
                UploadObjectArgs.builder()
                    .bucket(bucketName)
                    .object(fileName)
                    .filename(tempFile.toString())
                    .build()
            );
            
            // 清理临时文件
            Files.deleteIfExists(tempFile);
            
            // 更新文件记录
            saveFileRecord(fileMd5, fileName);
            
        } catch (Exception e) {
            log.error("分片合并失败", e);
            throw new RuntimeException("分片合并失败");
        }
    }
    
    /**
     * 检查文件是否存在(秒传功能)
     */
    private boolean checkFileExists(String fileMd5) {
        // 查询数据库或Redis检查文件是否已存在
        return redisTemplate.hasKey("file:" + fileMd5);
    }
}

3.3 控制器实现

@RestController
@RequestMappi编程ng("/api/upload")
@Slf4j
public class FileUploadController {
    @Autowired
    private FileUploadService fileUploadService;
    
    /**
     * 初始化上传
     */
    @PostMapping("/init")
    public ResponseEntity<?> initUpload(@RequestParam String fileName,
                                      @RequestParam String fileMd5) {
        try {
            String uploadId = fileUploadService.initUpload(fileName, fileMd5);
            return ResponseEntity.ok(Map.of("uploadId", uploadId));
        } catch (RuntimeException e) {
            return ResponseEntity.ok(Map.of("exists", true)); // 文www.chinasem.cn件已存在
        }
    }
    
    /**
     * 上传分片
     */
    @PostMapping("/chunk")
    public ResponseEntity<String> uploadChunk(@RequestParam MultipartFile chunk,
                                             @RequestParam String fileMd5,
                                             @RequestParam int chunkIndex,
                                             @RequestParam int totalChunks) {
        fileUploadService.uploadChunk(chunk, fileMd5, chunkIndex, totalChunks);
        return ResponseEntity.ok("分片上传成功");
    }
    
    /**
     * 合并分片
     */
    @PostMapping("/merge")
    public ResponseEntity<String> mergeChunks(@RequestParam String fileMd5,
                                             @RequestParam String fileName,
                                             @RequestParam int totalChunks) {
        fileUploadService.mergeChunks(fileMd5, fileName, totalChunks);
        return ResponseEntity.ok("文件合并成功China编程");
    }
    
    /**
     * 获取已上传分片列表(断点续传)
     */
    @GetMapping("/chunks/{fileMd5}")
    public ResponseEntity<List<Integer>> getUploadedChunks(@PathVariable String fileMd5) {
        Set<Object> uploaded = redisTemplate.opsForSet().members("chunks:" + fileMd5);
        List<Integer> chunks = uploaded.stream()
                .map(obj -> Integer.parseInt(obj.toString()))
                .collect(Collectors.toList());
        return ResponseEntity.ok(chunks);
    }
}

四、前端实现关键代码

4.1 文件切片处理

class FileUploader {
    constructor() {
        this.chunkSize = 5 * 1024 * 1024; // 5MB分片大小
        this.concurrentLimit = 3; // 并发上传数
    }
    
    // 计算文件MD5(秒传功能)
    async calculateFileMD5(file) {
        return new Promise((resolve) => {
            const reader = new FileReader();
            const spark = new SparkMD5.ArrayBuffer();
            
            reader.onload = e => {
                spark.append(e.target.result);
                resolve(spark.end());
            };
            
            reader.readAsArrayBuffer(file);
        });
    }
    
    // 切片上传
    async uploadFile(file) {
        // 计算文件MD5
        const fileMd5 = await this.calculateFileMD5(file);
        
        // 初始化上传
        const initResponse = await fetch('/api/upload/init', {
            method: 'POST',
            body: jsON.stringify({
                fileName: file.name,
                fileMd5: fileMd5
            }),
            headers: {
                'Content-Type': 'application/json'
            }
        });
        
        const initResult = await initResponse.json();
        
        // 如果文件已存在,直接返回
        if (initResult.exists) {
            alert('文件已存在,秒传成功!');
            return;
        }
        
        // 获取已上传分片(断点续传)
        const uploadedChunks = await this.getUploadedChunks(fileMd5);
        
        // 计算分片信息
        const totalChunks = Math.ceil(file.size / this.chunkSize);
        const uploadPromises = [];
        
        for (let i = 0; i < totalChunks; i++) {
            // 跳过已上传的分片
            if (uploadedChunks.includes(i)) {
                continue;
            }
            
            const start = i * this.chunkSize;
            const end = Math.min(file.size, start + this.chunkSize);
            const chunk = file.slice(start, end);
            
            // 控制并发数
            if (uploadPromises.length >= this.concurrentLimit) {
                await Promise.race(uploadPromises);
            }
            
            const uploadPromise = this.uploadChunk(chunk, fileMd5, i, totalChunks)
                .finally(() => {
                    const index = uploadPromises.indexOf(uploadPromise);
                    if (index > -1) {
                        uploadPromises.splice(index, 1);
                    }
                });
            
            uploadPromises.push(uploadPromise);
      yAfVZ  }
        
        // 等待所有分片上传完成
        await Promise.all(uploadPromises);
        
        // 合并分片
        await this.mergeChunks(fileMd5, file.name, totalChunks);
    }
    
    // 上传单个分片
    async uploadChunk(chunk, fileMd5, chunkIndex, totalChunks) {
        const formData = new FormData();
        formData.append('chunk', chunk);
        formData.append('fileMd5', fileMd5);
        formData.append('chunkIndex', chunkIndex);
        formData.append('totalChunks', totalChunks);
        
        const response = await fetch('/api/upload/chunk', {
            method: 'POST',
            body: formData
        });
        
        if (!response.ok) {
            throw new Error(`分片上传失败: ${response.statusText}`);
        }
    }
}

五、高级功能与优化

5.1 断点续传实现

通过记录已上传的分片信息,实现上传中断后从中断处继续上传:

@Service
public class UploadProgressService {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    /**
     * 获取已上传分片列表
     */
    public List<Integer> getUploadedChunks(String fileMd5) {
        Set<Object> uploaded = redisTemplate.opsForSet().members("chunks:" + fileMd5);
        return uploaded.stream()
                .map(obj -> Integer.parseInt(obj.toString()))
                .collect(Collectors.toList());
    }
    
    /**
     * 清理上传记录
     */
    public void clearUploadRecord(String fileMd5) {
        redisTemplate.delete("chunks:" + fileMd5);
        redisTemplate.delete("upload:" + fileMd5);
    }
}

5.2 分片验证与安全

确保分片传输的完整性和安全性:

/**
 * 分片验证服务
 */
@Service
public class ChunkValidationService {
    
    /**
     * 验证分片哈希
     */
    public boolean validateChunkHash(MultipartFile chunk, String expectedHash) {
        try {
            String actualHash = HMACUtils.hmacSha256Hex("secret-key", 
                chunk.getBytes());
            return actualHash.equals(expectedHash);
        } catch (IOException e) {
            return false;
        }
    }
    
    /**
     * 验证分片顺序
     */
    public boolean validateChunkOrder(String fileMd5, int chunkIndex) {
        // 获取已上传分片
        Set<Object> uploaded = redisTemplate.opsForSet().members("chunks:" + fileMd5);
        List<Integer> chunks = uploaded.stream()
                .map(obj -> Integer.parseInt(obj.toString()))
                .sorted()
                .collect(Collectors.toList());
        
        // 检查分片是否按顺序上传
        return chunks.isEmpty() || chunkIndex == chunks.size();
    }
}

六、部署与性能优化

6.1 系统级优化建议

分片大小选择

  • 内网环境:10MB-20MB
  • 移动网络:1MB-5MB
  • 广域网:500KB-1MB

并发控制

# 应用配置
spring:
  servlet:
    multipart:
      max-file-size: 10MB
      max-request-size: 100MB

定时清理策略

@Scheduled(fixedRate = 24 * 60 * 60 * 1000) // 每日清理
public void cleanTempFiles() {
    // 删除超过24小时的临时分片
    redisTemplate.keys("chunks:*").forEach(key -> {
        if (redisTemplate.getExpire(key) < 0) {
            redisTemplate.delete(key);
        }
    });
}

6.2 监控与告警

集成 Prometheus 监控上传性能:

# 监控指标配置
management:
  endpoints:
    web:
      exposure:
        include: health,metrics,prometheus
  metrics:
    tags:
      application: ${spring.application.name}

七、总结

通过 SpringBoot 和 RustFS 的组合,我们实现了一个高性能的文件切片上传系统,具备以下优势:

  • 高性能​:利用 RustFS 的高并发特性和分片并行上传,大幅提升传输速度
  • 可靠性​:断点续传机制确保上传中断后从中断处继续,避免重复劳动
  • 智能优化​:秒传功能避免重复文件上传,节省带宽和存储空间
  • 易于扩展​:分布式架构支持水平扩展,适应不同规模的应用场景

这种方案特别适用于:

  • 视频平台的大文件上传
  • 企业级文档管理系统
  • 云存储和备份服务
  • AI 训练数据集上传

到此这篇关于SpringBoot+RustFS 实现文件切片极速上传的实例代码的文章就介绍到这了,更多相关SpringBoot RustFS 文件切片上传内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程China编程(www.chinasem.cn)!

这篇关于SpringBoot+RustFS 实现文件切片极速上传的实例代码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1155918

相关文章

Java中流式并行操作parallelStream的原理和使用方法

《Java中流式并行操作parallelStream的原理和使用方法》本文详细介绍了Java中的并行流(parallelStream)的原理、正确使用方法以及在实际业务中的应用案例,并指出在使用并行流... 目录Java中流式并行操作parallelStream0. 问题的产生1. 什么是parallelS

C++中unordered_set哈希集合的实现

《C++中unordered_set哈希集合的实现》std::unordered_set是C++标准库中的无序关联容器,基于哈希表实现,具有元素唯一性和无序性特点,本文就来详细的介绍一下unorder... 目录一、概述二、头文件与命名空间三、常用方法与示例1. 构造与析构2. 迭代器与遍历3. 容量相关4

Java中Redisson 的原理深度解析

《Java中Redisson的原理深度解析》Redisson是一个高性能的Redis客户端,它通过将Redis数据结构映射为Java对象和分布式对象,实现了在Java应用中方便地使用Redis,本文... 目录前言一、核心设计理念二、核心架构与通信层1. 基于 Netty 的异步非阻塞通信2. 编解码器三、

C++中悬垂引用(Dangling Reference) 的实现

《C++中悬垂引用(DanglingReference)的实现》C++中的悬垂引用指引用绑定的对象被销毁后引用仍存在的情况,会导致访问无效内存,下面就来详细的介绍一下产生的原因以及如何避免,感兴趣... 目录悬垂引用的产生原因1. 引用绑定到局部变量,变量超出作用域后销毁2. 引用绑定到动态分配的对象,对象

SpringBoot基于注解实现数据库字段回填的完整方案

《SpringBoot基于注解实现数据库字段回填的完整方案》这篇文章主要为大家详细介绍了SpringBoot如何基于注解实现数据库字段回填的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解... 目录数据库表pom.XMLRelationFieldRelationFieldMapping基础的一些代

一篇文章彻底搞懂macOS如何决定java环境

《一篇文章彻底搞懂macOS如何决定java环境》MacOS作为一个功能强大的操作系统,为开发者提供了丰富的开发工具和框架,下面:本文主要介绍macOS如何决定java环境的相关资料,文中通过代码... 目录方法一:使用 which命令方法二:使用 Java_home工具(Apple 官方推荐)那问题来了,

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

详解SpringBoot+Ehcache使用示例

《详解SpringBoot+Ehcache使用示例》本文介绍了SpringBoot中配置Ehcache、自定义get/set方式,并实际使用缓存的过程,文中通过示例代码介绍的非常详细,对大家的学习或者... 目录摘要概念内存与磁盘持久化存储:配置灵活性:编码示例引入依赖:配置ehcache.XML文件:配置

Java 虚拟线程的创建与使用深度解析

《Java虚拟线程的创建与使用深度解析》虚拟线程是Java19中以预览特性形式引入,Java21起正式发布的轻量级线程,本文给大家介绍Java虚拟线程的创建与使用,感兴趣的朋友一起看看吧... 目录一、虚拟线程简介1.1 什么是虚拟线程?1.2 为什么需要虚拟线程?二、虚拟线程与平台线程对比代码对比示例:三