浅谈C#之ConcurrentQueue

2024-09-06 10:44

本文主要是介绍浅谈C#之ConcurrentQueue,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、基本介绍

ConcurrentQueue<T> 是一个线程安全的队列,它允许多个线程同时对队列进行操作而不会相互干扰。它是 System.Collections.Concurrent 命名空间下的一个类,提供了基本的队列操作,如 Enqueue(入队)、TryDequeue(尝试出队)、TryPeek(尝试查看队首元素)等,并且是线程安全的。

二、关键特性

线程安全:不需要额外的同步机制,就可以在多线程环境中安全地使用。

无锁:内部使用原子操作来保证线程安全,通常比使用锁有更好的性能。

阻塞操作:虽然 ConcurrentQueue<T> 本身不提供阻塞操作,但可以与其他同步原语(如 SemaphoreSlim 或 CancellationToken)结合使用来实现阻塞行为

三、简单示例

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;class Program
{static void Main(){ConcurrentQueue<int> queue = new ConcurrentQueue<int>();CancellationTokenSource cts = new CancellationTokenSource();// 生产者线程Task producer = Task.Run(() =>{for (int i = 0; i < 10; i++){queue.Enqueue(i);Console.WriteLine($"Produced: {i}");Thread.Sleep(100); // 模拟工作}}, cts.Token);// 消费者线程Task consumer = Task.Run(() =>{while (!cts.Token.IsCancellationRequested){if (queue.TryDequeue(out int item)){Console.WriteLine($"Consumed: {item}");}else{Thread.Yield(); // 让出 CPU 时间片}}}, cts.Token);// 等待一段时间,然后取消任务Thread.Sleep(1500);cts.Cancel();Task.WaitAll(producer, consumer);}
}

四、完整示例

1.与 BlockingCollection<T> 结合使用

BlockingCollection<T> 是一个线程安全的集合,提供了数据结构和同步原语的组合,可以与 ConcurrentQueue<T> 结合使用来实现生产者-消费者模式。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;class Program
{static void Main(){BlockingCollection<int> blockingCollection = new BlockingCollection<int>(new ConcurrentQueue<int>(), 10);Task producer = Task.Run(() =>{for (int i = 0; i < 20; i++){blockingCollection.Add(i);Console.WriteLine($"Produced: {i}");Thread.Sleep(100);}blockingCollection.CompleteAdding();});Task consumer = Task.Run(() =>{foreach (var item in blockingCollection.GetConsumingEnumerable()){Console.WriteLine($"Consumed: {item}");Thread.Sleep(150);}});Task.WaitAll(producer, consumer);}
}

2. 使用 CancellationToken 实现优雅的取消

ConcurrentQueue<T> 可以与 CancellationToken 结合使用,以实现任务的优雅取消。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;class Program
{static void Main(){ConcurrentQueue<int> queue = new ConcurrentQueue<int>();CancellationTokenSource cts = new CancellationTokenSource();Task producer = Task.Run(() =>{for (int i = 0; i < 20; i++){if (cts.Token.IsCancellationRequested){Console.WriteLine("Cancellation requested");return;}queue.Enqueue(i);Console.WriteLine($"Produced: {i}");Thread.Sleep(100);}}, cts.Token);Task consumer = Task.Run(() =>{while (!cts.Token.IsCancellationRequested){if (queue.TryDequeue(out int item)){Console.WriteLine($"Consumed: {item}");}else{Thread.Yield();}}}, cts.Token);Thread.Sleep(1500);cts.Cancel();Task.WaitAll(producer, consumer);}
}

与 SemaphoreSlim 实现并发控制

SemaphoreSlim 可以与 ConcurrentQueue<T> 结合使用,以控制同时访问资源的线程数量。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;class Program
{static void Main(){ConcurrentQueue<int> queue = new ConcurrentQueue<int>();SemaphoreSlim semaphore = new SemaphoreSlim(3);Task producer = Task.Run(() =>{for (int i = 0; i < 20; i++){semaphore.Wait();queue.Enqueue(i);Console.WriteLine($"Produced: {i}");Thread.Sleep(100);semaphore.Release();}});Task consumer = Task.Run(() =>{while (!queue.IsEmpty){semaphore.Wait();if (queue.TryDequeue(out int item)){Console.WriteLine($"Consumed: {item}");Thread.Sleep(150);}semaphore.Release();}});Task.WaitAll(producer, consumer);}
}

 使用 IProducerConsumerCollection<T> 接口

ConcurrentQueue<T> 实现了 IProducerConsumerCollection<T> 接口,这使得它可以与任何需要这种接口的 API 一起使用。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;class Program
{static void Main(){IProducerConsumerCollection<int> collection = new ConcurrentQueue<int>();Task producer = Task.Run(() =>{for (int i = 0; i < 20; i++){collection.TryAdd(i);Console.WriteLine($"Produced: {i}");Thread.Sleep(100);}});Task consumer = Task.Run(() =>{while (collection.TryTake(out int item)){Console.WriteLine($"Consumed: {item}");Thread.Sleep(150);}});Task.WaitAll(producer, consumer);}
}

这篇关于浅谈C#之ConcurrentQueue的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1141798

相关文章

一文解析C#中的StringSplitOptions枚举

《一文解析C#中的StringSplitOptions枚举》StringSplitOptions是C#中的一个枚举类型,用于控制string.Split()方法分割字符串时的行为,核心作用是处理分割后... 目录C#的StringSplitOptions枚举1.StringSplitOptions枚举的常用

C#自动化实现检测并删除PDF文件中的空白页面

《C#自动化实现检测并删除PDF文件中的空白页面》PDF文档在日常工作和生活中扮演着重要的角色,本文将深入探讨如何使用C#编程语言,结合强大的PDF处理库,自动化地检测并删除PDF文件中的空白页面,感... 目录理解PDF空白页的定义与挑战引入Spire.PDF for .NET库核心实现:检测并删除空白页

C#利用Free Spire.XLS for .NET复制Excel工作表

《C#利用FreeSpire.XLSfor.NET复制Excel工作表》在日常的.NET开发中,我们经常需要操作Excel文件,本文将详细介绍C#如何使用FreeSpire.XLSfor.NET... 目录1. 环境准备2. 核心功能3. android示例代码3.1 在同一工作簿内复制工作表3.2 在不同

C#中通过Response.Headers设置自定义参数的代码示例

《C#中通过Response.Headers设置自定义参数的代码示例》:本文主要介绍C#中通过Response.Headers设置自定义响应头的方法,涵盖基础添加、安全校验、生产实践及调试技巧,强... 目录一、基础设置方法1. 直接添加自定义头2. 批量设置模式二、高级配置技巧1. 安全校验机制2. 类型

C#使用iText获取PDF的trailer数据的代码示例

《C#使用iText获取PDF的trailer数据的代码示例》开发程序debug的时候,看到了PDF有个trailer数据,挺有意思,于是考虑用代码把它读出来,那么就用到我们常用的iText框架了,所... 目录引言iText 核心概念C# 代码示例步骤 1: 确保已安装 iText步骤 2: C# 代码程

C#实现高性能拍照与水印添加功能完整方案

《C#实现高性能拍照与水印添加功能完整方案》在工业检测、质量追溯等应用场景中,经常需要对产品进行拍照并添加相关信息水印,本文将详细介绍如何使用C#实现一个高性能的拍照和水印添加功能,包含完整的代码实现... 目录1. 概述2. 功能架构设计3. 核心代码实现python3.1 主拍照方法3.2 安全HBIT

C#实现SHP文件读取与地图显示的完整教程

《C#实现SHP文件读取与地图显示的完整教程》在地理信息系统(GIS)开发中,SHP文件是一种常见的矢量数据格式,本文将详细介绍如何使用C#读取SHP文件并实现地图显示功能,包括坐标转换、图形渲染、平... 目录概述功能特点核心代码解析1. 文件读取与初始化2. 坐标转换3. 图形绘制4. 地图交互功能缩放

C#使用SendMessage实现进程间通信的示例代码

《C#使用SendMessage实现进程间通信的示例代码》在软件开发中,进程间通信(IPC)是关键技术之一,C#通过调用WindowsAPI的SendMessage函数实现这一功能,本文将通过实例介绍... 目录第一章:SendMessage的底层原理揭秘第二章:构建跨进程通信桥梁2.1 定义通信协议2.2

C#实现千万数据秒级导入的代码

《C#实现千万数据秒级导入的代码》在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,所以本文我就给大家分享一下千万数据秒级导入怎么实现,文中有详细的代码示例供大家参考,需要的朋友可... 目录前言一、数据存储二、处理逻辑优化前代码处理逻辑优化后的代码总结前言在实际开发中excel导入很

C#使用Spire.Doc for .NET实现HTML转Word的高效方案

《C#使用Spire.Docfor.NET实现HTML转Word的高效方案》在Web开发中,HTML内容的生成与处理是高频需求,然而,当用户需要将HTML页面或动态生成的HTML字符串转换为Wor... 目录引言一、html转Word的典型场景与挑战二、用 Spire.Doc 实现 HTML 转 Word1