多线程无锁循环队列

2024-03-11 16:48
文章标签 多线程 队列 循环 无锁

本文主要是介绍多线程无锁循环队列,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

转自:https://blog.csdn.net/weixin_40825228/article/details/80783860

1、Lamport提出的无锁SPSC队列。

在其论文【论文】中证明了,在遵守【顺序一致性】内存模型的计算机中,单生产者单消费者(SPSC)先进先出队列中的锁是可以去除的,从而得到了一个无锁队列,并第一次给出了并发无锁先进先出(Concurrent Lock-free FIFO,CLF)队列的实现。通过去除队列中的锁,生产者、消费者可以并发的访问队列,从而提高了系统的并发执行度。

一种实现:

 
  1. #define DEFAULT_SIZE 30

  2. template<typename T>

  3. class CircuBuffer

  4. {

  5. public:

  6. CircuBuffer(unsigned size):

  7. size_(size)

  8. {

  9. writeIndex_=0;

  10. readIndex_=0;

  11. buffer_=new T[size_];

  12. }

  13.  
  14. CircuBuffer():

  15. size_(DEFAULT_SIZE)

  16. {

  17. writeIndex_=0;

  18. readIndex_=0;

  19. buffer_=new T[size_];

  20. }

  21.  
  22. ~CircuBuffer()

  23. {

  24. delete [] buffer_;

  25. }

  26.  
  27. unsigned getReadIndex() const

  28. {

  29. return readIndex_;

  30. }

  31.  
  32. unsigned getWriteIndex() const

  33. {

  34. return writeIndex_;

  35. }

  36.  
  37. bool isEmpty()

  38. {

  39. return writeIndex_==readIndex_;

  40. }

  41.  
  42. bool isFull()

  43. {

  44. return (readIndex_+1)%size_==writeIndex_;

  45. }

  46.  
  47. bool pushAnEle(T element)

  48. {

  49. //队列不为满

  50. if(!isFull())

  51. {

  52. buffer_[writeIndex_]=element;

  53. writeIndex_=(writeIndex_+1)%size_;

  54. return true;

  55. }

  56. else

  57. {

  58. return false;

  59. }

  60.  
  61. };

  62.  
  63. T* getAnEle()

  64. {

  65. //队列不为空

  66. if (!isEmpty())

  67. {

  68. T* temp=buffer_+readIndex_;

  69. readIndex_=(readIndex_+1)%size_;

  70. return temp;

  71. }

  72. else

  73. {

  74. return nullptr;

  75. }

  76. }

  77. private:

  78.  
  79. //写指针

  80. unsigned writeIndex_;

  81. //读指针

  82. unsigned readIndex_;

  83. //环形队列首地址

  84. T* buffer_;

  85. //环形队列的尺寸

  86. unsigned size_;

  87. };

往SPSC型的队列中放入数据的生产者改变【写指针】,消费者改变【读指针】。然而,生产者放入数据至队列中需要判满,这就需要读取【读指针】;同理消费者从队列中取数据,需要判空,这也需要读取【写指针】。若读指针与写指针在同一缓存行,而读写线程分别在不同的核上,这在多核平台上会产生严重的缓存颠簸。所谓的缓存颠簸是运行于多个核上的线程同时修改位于某个缓存行中的不同位置的数据时,导致该缓存行频繁地在多个核上被写无效的现象,这种现象会极大地损害系统的性能。

2、对于Lamport提出的无锁SPSC队列的改进型——FastForward队列。

第一节阐明了Lamport提出的无锁队列存在以下两个缺点:<1>由于生产者与消费者需要使用共享变量头指针与尾指针来同步信息,保存有头指针与尾指针的cacheline会频繁的分别被生产者与消费者修改,产生缓存颠簸,伤害系统的性能;<2>Lamport的CLF队列不能运行在支持弱内存一致性模型的机器中。

【改进1】针对上述去缺点,Join Giacomoni等人提出了针对缓存友好的CLF队列【缓存友好的CLF队列】。

针对缺点2的改进的CLF队列的一种实现:

 
  1. #ifndef SPSC_QUEUE_H

  2. #define SPSC_QUEUE_H

  3. #include <stdint.h>

  4. #include <string.h>

  5. #define ELE_ZERO 0

  6.  
  7. class BaseSPSCLockFreeQueue

  8. {

  9.  
  10. enum QueueState

  11. {

  12. FULL=-1,

  13. SUCCESS,

  14. EMPTY

  15. };

  16.  
  17. public:

  18. BaseSPSCLockFreeQueue(int* dataArr,uint32_t maxSize)

  19. {

  20. head_=0;

  21. tail_=0;

  22. maxQueueSize_=maxSize;

  23. dataArray_=dataArr;

  24. memset(dataArray_,ELE_ZERO,maxSize);

  25. }

  26.  
  27. ~BaseSPSCLockFreeQueue()

  28. {

  29. }

  30.  
  31. //注意 data!=0

  32. QueueState pushData2Queue(int& dataIn)

  33. {

  34. if(dataArray_[head_]!=ELE_ZERO)

  35. return FULL;

  36.  
  37. dataArray_[head_]=dataIn;

  38. head_=(head_+1)/maxQueueSize_;

  39. return SUCCESS;

  40. }

  41.  
  42. QueueState getDataFromQueue(int& dataOut)

  43. {

  44. if(dataArray_[tail_]==ELE_ZERO)

  45. return EMPTY;

  46.  
  47. dataOut=dataArray_[tail_];

  48. tail_=(tail_+1)/maxQueueSize_;

  49. return SUCCESS;

  50. }

  51.  
  52. private:

  53. uint32_t head_;

  54. uint32_t tail_;

  55. uint32_t maxQueueSize_;

  56. int* dataArray_;

  57. };

  58.  
  59. #endif // SPSC_QUEUE_H

上述针对缺点2的改进队列。在弱内存一致型的机器上也适用。情形一:假定生产者还未将数据放入但已将写指针往后移了一格,且初始时写指针与读指针在同一位置,由于消费者有判据,因此不会出现错误。情形二:假定消费者还未取走新的数据,但读指针已经后移了一格,由于写指针也有判据,因此也不会出现错误。

【改进2】针对存在的缓存行颠簸的问题。主要消除两个方面的缓存行颠簸:1、写指针与读指针错误的缓存行共享;2、数据的错误共享。

针对缺点1的改进的CLF队列的一种实现:

 
  1. #ifndef MCRINGBUFFER_H

  2. #define MCRINGBUFFER_H

  3.  
  4. #include <stdint.h>

  5. #include <assert.h>

  6. //x86-64 计算机中为 8*8=64Bytes

  7. #define CacheLineLength 8

  8. #define ELE_ZERO 0

  9. class MCRingBuffer

  10. {

  11.  
  12. public:

  13. MCRingBuffer(int* array,size_t maxSize):

  14. maxSize_(maxSize)

  15. {

  16. head_=0;

  17. tail_=0;

  18. count_=0;

  19. buffer_=array;

  20.  
  21. for(size_t i=0;i<maxSize;++i)

  22. buffer_[i]=ELE_ZERO;

  23. }

  24.  
  25. ~MCRingBuffer()

  26. {

  27. delete buffer_;

  28. }

  29.  
  30. bool pushData(int& dataIn)

  31. {

  32. tempArray[count_++]=dataIn;

  33. if(count_==CacheLineLength*2)

  34. {

  35. for(int i=0;i<CacheLineLength*2;++i)

  36. {

  37. if(buffer_[head_]!=ELE_ZERO)

  38. {

  39. count_=0;

  40. return false;

  41. }

  42. else

  43. {

  44. buffer_[head_]=tempArray[i];

  45. }

  46. head_=(head_+1)/maxSize_;

  47. }

  48. count_=0;

  49. }

  50. return true;

  51. }

  52.  
  53. bool getData(int& dataOut)

  54. {

  55. if(buffer_[tail_]==ELE_ZERO)

  56. return false;

  57.  
  58. dataOut=buffer_[tail_];

  59. buffer_[tail_]=ELE_ZERO;

  60. tail_=(tail_+1)/maxSize_;

  61. return true;

  62. }

  63.  
  64. private:

  65.  
  66. //读指针

  67. volatile unsigned long tail_;

  68. long tailPadding_[CacheLineLength-1];

  69. //写指针

  70. volatile unsigned long head_;

  71. long headPadding_[CacheLineLength-1];

  72.  
  73. size_t maxSize_;

  74. int* buffer_;

  75.  
  76. unsigned short count_;

  77. int tempArray[CacheLineLength*2];

  78.  
  79.  
  80. };

  81.  
  82. #endif // MCRINGBUFFER_H

4、实验对比。Lamport与FastForward队列性能对比。

这篇关于多线程无锁循环队列的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/798419

相关文章

MySQL存储过程之循环遍历查询的结果集详解

《MySQL存储过程之循环遍历查询的结果集详解》:本文主要介绍MySQL存储过程之循环遍历查询的结果集,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言1. 表结构2. 存储过程3. 关于存储过程的SQL补充总结前言近来碰到这样一个问题:在生产上导入的数据发现

Java中常见队列举例详解(非线程安全)

《Java中常见队列举例详解(非线程安全)》队列用于模拟队列这种数据结构,队列通常是指先进先出的容器,:本文主要介绍Java中常见队列(非线程安全)的相关资料,文中通过代码介绍的非常详细,需要的朋... 目录一.队列定义 二.常见接口 三.常见实现类3.1 ArrayDeque3.1.1 实现原理3.1.2

C++ RabbitMq消息队列组件详解

《C++RabbitMq消息队列组件详解》:本文主要介绍C++RabbitMq消息队列组件的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. RabbitMq介绍2. 安装RabbitMQ3. 安装 RabbitMQ 的 C++客户端库4. A

python多线程并发测试过程

《python多线程并发测试过程》:本文主要介绍python多线程并发测试过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、并发与并行?二、同步与异步的概念?三、线程与进程的区别?需求1:多线程执行不同任务需求2:多线程执行相同任务总结一、并发与并行?1、

golang实现延迟队列(delay queue)的两种实现

《golang实现延迟队列(delayqueue)的两种实现》本文主要介绍了golang实现延迟队列(delayqueue)的两种实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的... 目录1 延迟队列:邮件提醒、订单自动取消2 实现2.1 simplChina编程e简单版:go自带的time

Python多进程、多线程、协程典型示例解析(最新推荐)

《Python多进程、多线程、协程典型示例解析(最新推荐)》:本文主要介绍Python多进程、多线程、协程典型示例解析(最新推荐),本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定... 目录一、multiprocessing(多进程)1. 模块简介2. 案例详解:并行计算平方和3. 实现逻

Nginx部署React项目时重定向循环问题的解决方案

《Nginx部署React项目时重定向循环问题的解决方案》Nginx在处理React项目请求时出现重定向循环,通常是由于`try_files`配置错误或`root`路径配置不当导致的,本文给大家详细介... 目录问题原因1. try_files 配置错误2. root 路径错误解决方法1. 检查 try_f

Spring三级缓存解决循环依赖的解析过程

《Spring三级缓存解决循环依赖的解析过程》:本文主要介绍Spring三级缓存解决循环依赖的解析过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、循环依赖场景二、三级缓存定义三、解决流程(以ServiceA和ServiceB为例)四、关键机制详解五、设计约

Spring 中的循环引用问题解决方法

《Spring中的循环引用问题解决方法》:本文主要介绍Spring中的循环引用问题解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录什么是循环引用?循环依赖三级缓存解决循环依赖二级缓存三级缓存本章来聊聊Spring 中的循环引用问题该如何解决。这里聊

Java的栈与队列实现代码解析

《Java的栈与队列实现代码解析》栈是常见的线性数据结构,栈的特点是以先进后出的形式,后进先出,先进后出,分为栈底和栈顶,栈应用于内存的分配,表达式求值,存储临时的数据和方法的调用等,本文给大家介绍J... 目录栈的概念(Stack)栈的实现代码队列(Queue)模拟实现队列(双链表实现)循环队列(循环数组