C#实现千万数据秒级导入的代码

2025-09-14 23:50

本文主要是介绍C#实现千万数据秒级导入的代码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《C#实现千万数据秒级导入的代码》在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,所以本文我就给大家分享一下千万数据秒级导入怎么实现,文中有详细的代码示例供大家参考,需要的朋友可...

前言

在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,这里我就给大家分享一下千万数据秒级导入怎么实现

一、数据存储

这里使用到的数据库是clickhouse,因为像mysql、sqlserver这类关系型数据库对于大数据的支持不怎么好所以这里我使用的是clickhouse,它是由俄罗斯的yindex公司开发的列式存储数据库。查询语句这些和mysql差不多上手容易学起来简单。需要注意的是clickhouse对数据的删除比较严格一般都不会去删除它里面的数据库,如果要删除数据,我们一般都是采用用一个字段标记数据是否被删除,然后过滤掉这些被删除的数据,clickhouse的四种引擎也可以去了解一下

二、处理逻辑

优化前导入耗时两分钟以上优化后导入耗时30秒以内

优化前代码处理逻辑

优化后导入一千万数据大约需要两分钟完成处理。处理逻辑是将数据循环装入List中,当数据量达到预设的chunkSize阈值(150万)时开始批量插入。值得注意的是,1000万数据需要循环处理7次。这里存在一个常见误区:认为提高150万的阈值能加快处理速度。实际上,处理速度慢的原因并非循环次数,而是由于单线程运行无法充分发挥ClickHouse的高并发优势。

private async Task InsertContentsAsync2(string filePath, BlackListCreateDto dto, ulong maxId, long BATchId)
 {
     Stopwatch stopwatch = new();
     var regex = RegexConst.Phone();
     var userId = CurrentUser.Id.ToString();
     var isTxt = filePath.Split(".").Last() == "txt";
     var chunkSize = 1500000;
     List<string> blackLists = new();
     injst offset = 0;
     int total = 0;
     stopwatch.Start();
     if (isTxt)
     {
         using StreamReader sr = new(filePath);
         string line;
         while ((line = await sr.ReadLineAsync()) != null)
         {
             if (regex.IsMatch(line))
             {
              China编程   blackLists.Add(line);
                 offset++;
                 total++;
             }
             if (offset >= chunkSize)
             {
                 await InsertDataAsync(blackLists);
                 offset = 0;
                 blackLists.Clear();
             }
         }

         if (offset >= 0)
         {
             await InsertDataAsync(blackLists);
             offset = 0;
             blackLists.Clear();
         }
     }
     else
     {

         var rows = await MiniExcel.QueryAsync(filePath);
         foreach (IEnumerable<dynamic> row in rows)
         {
             var pair = (IDictionary<string, object>)row;
             var cells = pair.Values;
             if (cells != null && cells.Any())
             {
                 foreach (var cell in cells)
                 {
                     if (cell is not null && cell is string str)
                     {
                         blackLists.Add(str);
                         offset++;
                         total++;
                     }

                 }
             }
             if (offset >= chunkSize)
             {
                 await InsertDataAsync(blackLists);
                 offset = 0;
                 blackLists.Clear();
             }
         }

         if (offset >= 0)
         {
             await InsertDataAsync(blackLists);
             offset = 0;
             blackLists.Clear();
         }
     }

     async Task InsertDataAsync(List<string> blackLists)
     {
         List<BlackList> list = blackLists.Select(it => CreateBlackList(dto, batchId, ++maxId, userId, it.ToString())).ToList();
         var result = await _clickHouseClientRepository.BulkInsertAsync(list, _clickHouseTables.Value.SMS_BLACK_LIST_TABLE);
         blackLists.Clear();
     }
     stopwatch.Stop();
     Logger.LogInformation("文件路径:{filePath},{total}条数据, 耗时:{Elapsed} ", filePath, total, stopwatch.Elapsed);
 }

优化后的代码

优化后的代码将单线程执行模式改为多消费者并行处理,采用生产者-消费者模式分离IO和计算操作。文件读取时使用最大缓冲区和顺序扫描:

await using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, 81920, FileOptions.SequentialScan);

关键优化点包括:

  • 使用异步文件读取(FileStream异步API)
  • 预分配列表内存(new List(chunkSize))
  • 采用ClickHouse批量并行提交而非单条插入
  • 设置chunkSize为10万条
  • 使用BoundedChannel进行管道容量控制

关于管道容量控制的作用:

  1. 防止内存溢出:限制管道中积压的未处理数据量,避免生产者过快导致内存问题
  2. 实现自动背压(Backpressure):管道满时生产者会自动阻塞或丢弃数据(取决于配置),直到消费者处理部分数据

选择10万而非150万的原因:

原因1:内存考量

  • 150万条数China编程据(假设每条20字节)单批次占用约30MB
  • 并行处理时多个批次会显著增加内存压力,可能触发GC影响性能
  • 10万条仅占用约2MB,对GC更友好,适合长期批处理

原因2:ClickHouse性能

  • BulkInsert在5万~10万批次时吞吐量最佳

原因3:并行处理效率

  • 10万条/批次能更好分配任务到多线程/核心
  • 150万条/批次可能导致任务分配不均

原因4:I/O优化

  • 大数据批次可能导致I/O阻塞,CPU空闲等待
  • 小批次(10万)能更好重叠I/O和计算,提升吞吐量
private async Task InsertContentsAsync(string filePath, BlackListCreateDto dto, ulong maxId, long batchId)
 {
     var stopwatch = Stopwatch.StartNew();
     var regex = RegexConst.Phone();
     var userId = CurrentUser.Id.ToString();
     var isTxt = filePath.EndsWith(".txt", StringComparison.OrdinalIgnoreCase);
     const int chunkSize = 100_000; 
     var totalProcessed = 0L;

     // 并行处理管道
     var processingChannel = Channel.CreateBounded<List<string>>(new BoundedChannelOptions(5)
     {
         SingleWriter = true,
         SingleReader = false,
         FullMode = BoundedChannelFullMode.Wait
     });

     // 启动并行消费者
     var processingTasks = Enumerable.Range(0, Environment.ProcessorCount)
         .Select(_ => Task.Run(async () =>
         {
             await foreach (var batch in processingChannel.Reader.ReadAllAsync())
             {
                 await ProcessBatchAsync(batch);
                 Interlocked.Add(ref totalProcessed, batch.CouChina编程nt);
             }
         })).ToArray();

     try
     {
         if (isTxt)
         {
             await ProcessTextFileAsync(filePath, regex, processingChannel.Writer, chunkSize);
         }
         else
         {
             await ProcessExcelFileAsync(filePath, processingChannel.Writer, chunkSize);
         }
     }
     finally
     {
         processingChannel.Writer.Complete();
         await Task.WhenAll(processingTasks);
         stopwatch.Stop();

         Logger.LogInformation("文件路径:{FilePath}, {Total}条数据, 耗时:{Elapsed}ms",
             filePath, totalProcessed, stopwatch.ElapsedMilliseconds);
     }

     async Task ProcessBatchAsync(List<string> batch)
     {
         var entities = batch
             .Select(phone => CreateBlackList(dto, batchId, ++maxId, userId, phone))
             .ToList();

         await _clickHouseClientRepository.BulkInsertAsync(
             entities,
             _clickHouseTables.Value.SMS_BLACK_LIST_TABLE);
     }
 }

 private async Task ProcessTextFileAsync(string filePath, Regex regex, ChannelWriter<List<string>> writer, int chunkSize)
 {
     await using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, 81920, FileOptions.SequentialScan);
     using var sr = new StreamReader(fs);

     var batch = new List<string>(chunkSize);
     string line;

     while ((line = await sr.ReadLineAsync()) != null)
     {
         if (regex.IsMatch(line))
         {
             batch.Add(line);

             if (batch.Count >= chunkSize)
             {
                 await writer.WriteAsync(batch);
                 batch = new List<string>(chunkSize);
             }
         编程}
     }

     if (batch.Count > 0)
     {
         await writer.WriteAsync(batch);
     }
 }

 private async Task ProcessExcelFileAsync(string filePath, ChannelWriter<List<string>> writer, int chunkSize)
 {
     await using var stream = File.Open(filePath, FileMode.Open, FileAccess.Read);
     using var reader = MiniExcel.QueryAsDataTable(stream);
     var batch = new List<string>(chunkSize);
     foreach (DataRow row in reader.Rows)
     {
         var cell = row[0].ToString();
         if (cell is string str && !string.IsNullOrWhiteSpace(str))
         {
             batch.Add(str);

             if (batch.Count >= chunkSize)
             {
                 await writer.WriteAsync(batch);
                 batch = new List<string>(chunkSize);
             }
         }
     }

     if (batch.Count > 0)
     {
         await writer.WriteAsync(batch);
     }
 }

总结

到此这篇关于C#实现千万数据秒级导入的代码的文章就介绍到这了,更多相关C#千万数据秒级导入内容请搜索编程China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!

这篇关于C#实现千万数据秒级导入的代码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1155923

相关文章

C++中unordered_set哈希集合的实现

《C++中unordered_set哈希集合的实现》std::unordered_set是C++标准库中的无序关联容器,基于哈希表实现,具有元素唯一性和无序性特点,本文就来详细的介绍一下unorder... 目录一、概述二、头文件与命名空间三、常用方法与示例1. 构造与析构2. 迭代器与遍历3. 容量相关4

C++中悬垂引用(Dangling Reference) 的实现

《C++中悬垂引用(DanglingReference)的实现》C++中的悬垂引用指引用绑定的对象被销毁后引用仍存在的情况,会导致访问无效内存,下面就来详细的介绍一下产生的原因以及如何避免,感兴趣... 目录悬垂引用的产生原因1. 引用绑定到局部变量,变量超出作用域后销毁2. 引用绑定到动态分配的对象,对象

SpringBoot基于注解实现数据库字段回填的完整方案

《SpringBoot基于注解实现数据库字段回填的完整方案》这篇文章主要为大家详细介绍了SpringBoot如何基于注解实现数据库字段回填的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解... 目录数据库表pom.XMLRelationFieldRelationFieldMapping基础的一些代

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

一文解析C#中的StringSplitOptions枚举

《一文解析C#中的StringSplitOptions枚举》StringSplitOptions是C#中的一个枚举类型,用于控制string.Split()方法分割字符串时的行为,核心作用是处理分割后... 目录C#的StringSplitOptions枚举1.StringSplitOptions枚举的常用

Python实现字典转字符串的五种方法

《Python实现字典转字符串的五种方法》本文介绍了在Python中如何将字典数据结构转换为字符串格式的多种方法,首先可以通过内置的str()函数进行简单转换;其次利用ison.dumps()函数能够... 目录1、使用json模块的dumps方法:2、使用str方法:3、使用循环和字符串拼接:4、使用字符

Linux下利用select实现串口数据读取过程

《Linux下利用select实现串口数据读取过程》文章介绍Linux中使用select、poll或epoll实现串口数据读取,通过I/O多路复用机制在数据到达时触发读取,避免持续轮询,示例代码展示设... 目录示例代码(使用select实现)代码解释总结在 linux 系统里,我们可以借助 select、

Linux挂载linux/Windows共享目录实现方式

《Linux挂载linux/Windows共享目录实现方式》:本文主要介绍Linux挂载linux/Windows共享目录实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录文件共享协议linux环境作为服务端(NFS)在服务器端安装 NFS创建要共享的目录修改 NFS 配

通过React实现页面的无限滚动效果

《通过React实现页面的无限滚动效果》今天我们来聊聊无限滚动这个现代Web开发中不可或缺的技术,无论你是刷微博、逛知乎还是看脚本,无限滚动都已经渗透到我们日常的浏览体验中,那么,如何优雅地实现它呢?... 目录1. 早期的解决方案2. 交叉观察者:IntersectionObserver2.1 Inter