C#实现千万数据秒级导入的代码

2025-09-14 23:50

本文主要是介绍C#实现千万数据秒级导入的代码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《C#实现千万数据秒级导入的代码》在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,所以本文我就给大家分享一下千万数据秒级导入怎么实现,文中有详细的代码示例供大家参考,需要的朋友可...

前言

在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,这里我就给大家分享一下千万数据秒级导入怎么实现

一、数据存储

这里使用到的数据库是clickhouse,因为像mysql、sqlserver这类关系型数据库对于大数据的支持不怎么好所以这里我使用的是clickhouse,它是由俄罗斯的yindex公司开发的列式存储数据库。查询语句这些和mysql差不多上手容易学起来简单。需要注意的是clickhouse对数据的删除比较严格一般都不会去删除它里面的数据库,如果要删除数据,我们一般都是采用用一个字段标记数据是否被删除,然后过滤掉这些被删除的数据,clickhouse的四种引擎也可以去了解一下

二、处理逻辑

优化前导入耗时两分钟以上优化后导入耗时30秒以内

优化前代码处理逻辑

优化后导入一千万数据大约需要两分钟完成处理。处理逻辑是将数据循环装入List中,当数据量达到预设的chunkSize阈值(150万)时开始批量插入。值得注意的是,1000万数据需要循环处理7次。这里存在一个常见误区:认为提高150万的阈值能加快处理速度。实际上,处理速度慢的原因并非循环次数,而是由于单线程运行无法充分发挥ClickHouse的高并发优势。

private async Task InsertContentsAsync2(string filePath, BlackListCreateDto dto, ulong maxId, long BATchId)
 {
     Stopwatch stopwatch = new();
     var regex = RegexConst.Phone();
     var userId = CurrentUser.Id.ToString();
     var isTxt = filePath.Split(".").Last() == "txt";
     var chunkSize = 1500000;
     List<string> blackLists = new();
     injst offset = 0;
     int total = 0;
     stopwatch.Start();
     if (isTxt)
     {
         using StreamReader sr = new(filePath);
         string line;
         while ((line = await sr.ReadLineAsync()) != null)
         {
             if (regex.IsMatch(line))
             {
              China编程   blackLists.Add(line);
                 offset++;
                 total++;
             }
             if (offset >= chunkSize)
             {
                 await InsertDataAsync(blackLists);
                 offset = 0;
                 blackLists.Clear();
             }
         }

         if (offset >= 0)
         {
             await InsertDataAsync(blackLists);
             offset = 0;
             blackLists.Clear();
         }
     }
     else
     {

         var rows = await MiniExcel.QueryAsync(filePath);
         foreach (IEnumerable<dynamic> row in rows)
         {
             var pair = (IDictionary<string, object>)row;
             var cells = pair.Values;
             if (cells != null && cells.Any())
             {
                 foreach (var cell in cells)
                 {
                     if (cell is not null && cell is string str)
                     {
                         blackLists.Add(str);
                         offset++;
                         total++;
                     }

                 }
             }
             if (offset >= chunkSize)
             {
                 await InsertDataAsync(blackLists);
                 offset = 0;
                 blackLists.Clear();
             }
         }

         if (offset >= 0)
         {
             await InsertDataAsync(blackLists);
             offset = 0;
             blackLists.Clear();
         }
     }

     async Task InsertDataAsync(List<string> blackLists)
     {
         List<BlackList> list = blackLists.Select(it => CreateBlackList(dto, batchId, ++maxId, userId, it.ToString())).ToList();
         var result = await _clickHouseClientRepository.BulkInsertAsync(list, _clickHouseTables.Value.SMS_BLACK_LIST_TABLE);
         blackLists.Clear();
     }
     stopwatch.Stop();
     Logger.LogInformation("文件路径:{filePath},{total}条数据, 耗时:{Elapsed} ", filePath, total, stopwatch.Elapsed);
 }

优化后的代码

优化后的代码将单线程执行模式改为多消费者并行处理,采用生产者-消费者模式分离IO和计算操作。文件读取时使用最大缓冲区和顺序扫描:

await using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, 81920, FileOptions.SequentialScan);

关键优化点包括:

  • 使用异步文件读取(FileStream异步API)
  • 预分配列表内存(new List(chunkSize))
  • 采用ClickHouse批量并行提交而非单条插入
  • 设置chunkSize为10万条
  • 使用BoundedChannel进行管道容量控制

关于管道容量控制的作用:

  1. 防止内存溢出:限制管道中积压的未处理数据量,避免生产者过快导致内存问题
  2. 实现自动背压(Backpressure):管道满时生产者会自动阻塞或丢弃数据(取决于配置),直到消费者处理部分数据

选择10万而非150万的原因:

原因1:内存考量

  • 150万条数China编程据(假设每条20字节)单批次占用约30MB
  • 并行处理时多个批次会显著增加内存压力,可能触发GC影响性能
  • 10万条仅占用约2MB,对GC更友好,适合长期批处理

原因2:ClickHouse性能

  • BulkInsert在5万~10万批次时吞吐量最佳

原因3:并行处理效率

  • 10万条/批次能更好分配任务到多线程/核心
  • 150万条/批次可能导致任务分配不均

原因4:I/O优化

  • 大数据批次可能导致I/O阻塞,CPU空闲等待
  • 小批次(10万)能更好重叠I/O和计算,提升吞吐量
private async Task InsertContentsAsync(string filePath, BlackListCreateDto dto, ulong maxId, long batchId)
 {
     var stopwatch = Stopwatch.StartNew();
     var regex = RegexConst.Phone();
     var userId = CurrentUser.Id.ToString();
     var isTxt = filePath.EndsWith(".txt", StringComparison.OrdinalIgnoreCase);
     const int chunkSize = 100_000; 
     var totalProcessed = 0L;

     // 并行处理管道
     var processingChannel = Channel.CreateBounded<List<string>>(new BoundedChannelOptions(5)
     {
         SingleWriter = true,
         SingleReader = false,
         FullMode = BoundedChannelFullMode.Wait
     });

     // 启动并行消费者
     var processingTasks = Enumerable.Range(0, Environment.ProcessorCount)
         .Select(_ => Task.Run(async () =>
         {
             await foreach (var batch in processingChannel.Reader.ReadAllAsync())
             {
                 await ProcessBatchAsync(batch);
                 Interlocked.Add(ref totalProcessed, batch.CouChina编程nt);
             }
         })).ToArray();

     try
     {
         if (isTxt)
         {
             await ProcessTextFileAsync(filePath, regex, processingChannel.Writer, chunkSize);
         }
         else
         {
             await ProcessExcelFileAsync(filePath, processingChannel.Writer, chunkSize);
         }
     }
     finally
     {
         processingChannel.Writer.Complete();
         await Task.WhenAll(processingTasks);
         stopwatch.Stop();

         Logger.LogInformation("文件路径:{FilePath}, {Total}条数据, 耗时:{Elapsed}ms",
             filePath, totalProcessed, stopwatch.ElapsedMilliseconds);
     }

     async Task ProcessBatchAsync(List<string> batch)
     {
         var entities = batch
             .Select(phone => CreateBlackList(dto, batchId, ++maxId, userId, phone))
             .ToList();

         await _clickHouseClientRepository.BulkInsertAsync(
             entities,
             _clickHouseTables.Value.SMS_BLACK_LIST_TABLE);
     }
 }

 private async Task ProcessTextFileAsync(string filePath, Regex regex, ChannelWriter<List<string>> writer, int chunkSize)
 {
     await using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, 81920, FileOptions.SequentialScan);
     using var sr = new StreamReader(fs);

     var batch = new List<string>(chunkSize);
     string line;

     while ((line = await sr.ReadLineAsync()) != null)
     {
         if (regex.IsMatch(line))
         {
             batch.Add(line);

             if (batch.Count >= chunkSize)
             {
                 await writer.WriteAsync(batch);
                 batch = new List<string>(chunkSize);
             }
         编程}
     }

     if (batch.Count > 0)
     {
         await writer.WriteAsync(batch);
     }
 }

 private async Task ProcessExcelFileAsync(string filePath, ChannelWriter<List<string>> writer, int chunkSize)
 {
     await using var stream = File.Open(filePath, FileMode.Open, FileAccess.Read);
     using var reader = MiniExcel.QueryAsDataTable(stream);
     var batch = new List<string>(chunkSize);
     foreach (DataRow row in reader.Rows)
     {
         var cell = row[0].ToString();
         if (cell is string str && !string.IsNullOrWhiteSpace(str))
         {
             batch.Add(str);

             if (batch.Count >= chunkSize)
             {
                 await writer.WriteAsync(batch);
                 batch = new List<string>(chunkSize);
             }
         }
     }

     if (batch.Count > 0)
     {
         await writer.WriteAsync(batch);
     }
 }

总结

到此这篇关于C#实现千万数据秒级导入的代码的文章就介绍到这了,更多相关C#千万数据秒级导入内容请搜索编程China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!

这篇关于C#实现千万数据秒级导入的代码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1155923

相关文章

PHP轻松处理千万行数据的方法详解

《PHP轻松处理千万行数据的方法详解》说到处理大数据集,PHP通常不是第一个想到的语言,但如果你曾经需要处理数百万行数据而不让服务器崩溃或内存耗尽,你就会知道PHP用对了工具有多强大,下面小编就... 目录问题的本质php 中的数据流处理:为什么必不可少生成器:内存高效的迭代方式流量控制:避免系统过载一次性

Python的Darts库实现时间序列预测

《Python的Darts库实现时间序列预测》Darts一个集统计、机器学习与深度学习模型于一体的Python时间序列预测库,本文主要介绍了Python的Darts库实现时间序列预测,感兴趣的可以了解... 目录目录一、什么是 Darts?二、安装与基本配置安装 Darts导入基础模块三、时间序列数据结构与

Python使用FastAPI实现大文件分片上传与断点续传功能

《Python使用FastAPI实现大文件分片上传与断点续传功能》大文件直传常遇到超时、网络抖动失败、失败后只能重传的问题,分片上传+断点续传可以把大文件拆成若干小块逐个上传,并在中断后从已完成分片继... 目录一、接口设计二、服务端实现(FastAPI)2.1 运行环境2.2 目录结构建议2.3 serv

SpringBoot+RustFS 实现文件切片极速上传的实例代码

《SpringBoot+RustFS实现文件切片极速上传的实例代码》本文介绍利用SpringBoot和RustFS构建高性能文件切片上传系统,实现大文件秒传、断点续传和分片上传等功能,具有一定的参考... 目录一、为什么选择 RustFS + SpringBoot?二、环境准备与部署2.1 安装 RustF

Nginx部署HTTP/3的实现步骤

《Nginx部署HTTP/3的实现步骤》本文介绍了在Nginx中部署HTTP/3的详细步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录前提条件第一步:安装必要的依赖库第二步:获取并构建 BoringSSL第三步:获取 Nginx

MyBatis Plus实现时间字段自动填充的完整方案

《MyBatisPlus实现时间字段自动填充的完整方案》在日常开发中,我们经常需要记录数据的创建时间和更新时间,传统的做法是在每次插入或更新操作时手动设置这些时间字段,这种方式不仅繁琐,还容易遗漏,... 目录前言解决目标技术栈实现步骤1. 实体类注解配置2. 创建元数据处理器3. 服务层代码优化填充机制详

Python实现Excel批量样式修改器(附完整代码)

《Python实现Excel批量样式修改器(附完整代码)》这篇文章主要为大家详细介绍了如何使用Python实现一个Excel批量样式修改器,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录前言功能特性核心功能界面特性系统要求安装说明使用指南基本操作流程高级功能技术实现核心技术栈关键函

Java实现字节字符转bcd编码

《Java实现字节字符转bcd编码》BCD是一种将十进制数字编码为二进制的表示方式,常用于数字显示和存储,本文将介绍如何在Java中实现字节字符转BCD码的过程,需要的小伙伴可以了解下... 目录前言BCD码是什么Java实现字节转bcd编码方法补充总结前言BCD码(Binary-Coded Decima

oracle 11g导入\导出(expdp impdp)之导入过程

《oracle11g导入导出(expdpimpdp)之导入过程》导出需使用SEC.DMP格式,无分号;建立expdir目录(E:/exp)并确保存在;导入在cmd下执行,需sys用户权限;若需修... 目录准备文件导入(impdp)1、建立directory2、导入语句 3、更改密码总结上一个环节,我们讲了

SpringBoot全局域名替换的实现

《SpringBoot全局域名替换的实现》本文主要介绍了SpringBoot全局域名替换的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录 项目结构⚙️ 配置文件application.yml️ 配置类AppProperties.Ja