本文主要是介绍C#实现千万数据秒级导入的代码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《C#实现千万数据秒级导入的代码》在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,所以本文我就给大家分享一下千万数据秒级导入怎么实现,文中有详细的代码示例供大家参考,需要的朋友可...
前言
在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,这里我就给大家分享一下千万数据秒级导入怎么实现
一、数据存储
这里使用到的数据库是clickhouse,因为像mysql、sqlserver这类关系型数据库对于大数据的支持不怎么好所以这里我使用的是clickhouse,它是由俄罗斯的yindex公司开发的列式存储数据库。查询语句这些和mysql差不多上手容易学起来简单。需要注意的是clickhouse对数据的删除比较严格一般都不会去删除它里面的数据库,如果要删除数据,我们一般都是采用用一个字段标记数据是否被删除,然后过滤掉这些被删除的数据,clickhouse的四种引擎也可以去了解一下
二、处理逻辑
优化前导入耗时两分钟以上优化后导入耗时30秒以内
优化前代码处理逻辑
优化后导入一千万数据大约需要两分钟完成处理。处理逻辑是将数据循环装入List中,当数据量达到预设的chunkSize阈值(150万)时开始批量插入。值得注意的是,1000万数据需要循环处理7次。这里存在一个常见误区:认为提高150万的阈值能加快处理速度。实际上,处理速度慢的原因并非循环次数,而是由于单线程运行无法充分发挥ClickHouse的高并发优势。
private async Task InsertContentsAsync2(string filePath, BlackListCreateDto dto, ulong maxId, long BATchId) { Stopwatch stopwatch = new(); var regex = RegexConst.Phone(); var userId = CurrentUser.Id.ToString(); var isTxt = filePath.Split(".").Last() == "txt"; var chunkSize = 1500000; List<string> blackLists = new(); injst offset = 0; int total = 0; stopwatch.Start(); if (isTxt) { using StreamReader sr = new(filePath); string line; while ((line = await sr.ReadLineAsync()) != null) { if (regex.IsMatch(line)) { China编程 blackLists.Add(line); offset++; total++; } if (offset >= chunkSize) { await InsertDataAsync(blackLists); offset = 0; blackLists.Clear(); } } if (offset >= 0) { await InsertDataAsync(blackLists); offset = 0; blackLists.Clear(); } } else { var rows = await MiniExcel.QueryAsync(filePath); foreach (IEnumerable<dynamic> row in rows) { var pair = (IDictionary<string, object>)row; var cells = pair.Values; if (cells != null && cells.Any()) { foreach (var cell in cells) { if (cell is not null && cell is string str) { blackLists.Add(str); offset++; total++; } } } if (offset >= chunkSize) { await InsertDataAsync(blackLists); offset = 0; blackLists.Clear(); } } if (offset >= 0) { await InsertDataAsync(blackLists); offset = 0; blackLists.Clear(); } } async Task InsertDataAsync(List<string> blackLists) { List<BlackList> list = blackLists.Select(it => CreateBlackList(dto, batchId, ++maxId, userId, it.ToString())).ToList(); var result = await _clickHouseClientRepository.BulkInsertAsync(list, _clickHouseTables.Value.SMS_BLACK_LIST_TABLE); blackLists.Clear(); } stopwatch.Stop(); Logger.LogInformation("文件路径:{filePath},{total}条数据, 耗时:{Elapsed} ", filePath, total, stopwatch.Elapsed); }
优化后的代码
优化后的代码将单线程执行模式改为多消费者并行处理,采用生产者-消费者模式分离IO和计算操作。文件读取时使用最大缓冲区和顺序扫描:
await using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, 81920, FileOptions.SequentialScan);
关键优化点包括:
- 使用异步文件读取(FileStream异步API)
- 预分配列表内存(new List(chunkSize))
- 采用ClickHouse批量并行提交而非单条插入
- 设置chunkSize为10万条
- 使用BoundedChannel进行管道容量控制
关于管道容量控制的作用:
- 防止内存溢出:限制管道中积压的未处理数据量,避免生产者过快导致内存问题
- 实现自动背压(Backpressure):管道满时生产者会自动阻塞或丢弃数据(取决于配置),直到消费者处理部分数据
选择10万而非150万的原因:
原因1:内存考量
- 150万条数China编程据(假设每条20字节)单批次占用约30MB
- 并行处理时多个批次会显著增加内存压力,可能触发GC影响性能
- 10万条仅占用约2MB,对GC更友好,适合长期批处理
原因2:ClickHouse性能
- BulkInsert在5万~10万批次时吞吐量最佳
原因3:并行处理效率
- 10万条/批次能更好分配任务到多线程/核心
- 150万条/批次可能导致任务分配不均
原因4:I/O优化
- 大数据批次可能导致I/O阻塞,CPU空闲等待
- 小批次(10万)能更好重叠I/O和计算,提升吞吐量
private async Task InsertContentsAsync(string filePath, BlackListCreateDto dto, ulong maxId, long batchId) { var stopwatch = Stopwatch.StartNew(); var regex = RegexConst.Phone(); var userId = CurrentUser.Id.ToString(); var isTxt = filePath.EndsWith(".txt", StringComparison.OrdinalIgnoreCase); const int chunkSize = 100_000; var totalProcessed = 0L; // 并行处理管道 var processingChannel = Channel.CreateBounded<List<string>>(new BoundedChannelOptions(5) { SingleWriter = true, SingleReader = false, FullMode = BoundedChannelFullMode.Wait }); // 启动并行消费者 var processingTasks = Enumerable.Range(0, Environment.ProcessorCount) .Select(_ => Task.Run(async () => { await foreach (var batch in processingChannel.Reader.ReadAllAsync()) { await ProcessBatchAsync(batch); Interlocked.Add(ref totalProcessed, batch.CouChina编程nt); } })).ToArray(); try { if (isTxt) { await ProcessTextFileAsync(filePath, regex, processingChannel.Writer, chunkSize); } else { await ProcessExcelFileAsync(filePath, processingChannel.Writer, chunkSize); } } finally { processingChannel.Writer.Complete(); await Task.WhenAll(processingTasks); stopwatch.Stop(); Logger.LogInformation("文件路径:{FilePath}, {Total}条数据, 耗时:{Elapsed}ms", filePath, totalProcessed, stopwatch.ElapsedMilliseconds); } async Task ProcessBatchAsync(List<string> batch) { var entities = batch .Select(phone => CreateBlackList(dto, batchId, ++maxId, userId, phone)) .ToList(); await _clickHouseClientRepository.BulkInsertAsync( entities, _clickHouseTables.Value.SMS_BLACK_LIST_TABLE); } } private async Task ProcessTextFileAsync(string filePath, Regex regex, ChannelWriter<List<string>> writer, int chunkSize) { await using var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, 81920, FileOptions.SequentialScan); using var sr = new StreamReader(fs); var batch = new List<string>(chunkSize); string line; while ((line = await sr.ReadLineAsync()) != null) { if (regex.IsMatch(line)) { batch.Add(line); if (batch.Count >= chunkSize) { await writer.WriteAsync(batch); batch = new List<string>(chunkSize); } 编程} } if (batch.Count > 0) { await writer.WriteAsync(batch); } } private async Task ProcessExcelFileAsync(string filePath, ChannelWriter<List<string>> writer, int chunkSize) { await using var stream = File.Open(filePath, FileMode.Open, FileAccess.Read); using var reader = MiniExcel.QueryAsDataTable(stream); var batch = new List<string>(chunkSize); foreach (DataRow row in reader.Rows) { var cell = row[0].ToString(); if (cell is string str && !string.IsNullOrWhiteSpace(str)) { batch.Add(str); if (batch.Count >= chunkSize) { await writer.WriteAsync(batch); batch = new List<string>(chunkSize); } } } if (batch.Count > 0) { await writer.WriteAsync(batch); } }
总结
到此这篇关于C#实现千万数据秒级导入的代码的文章就介绍到这了,更多相关C#千万数据秒级导入内容请搜索编程China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!
这篇关于C#实现千万数据秒级导入的代码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!