glib库异步队列和线程池代码分析

2024-01-17 06:48

本文主要是介绍glib库异步队列和线程池代码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文章主要讲了两部分内容:一是分析了异步队列的原理和实现,二是分析线程池的原理和实现。
在多线程程序的运行中,如果经常地创建和销毁执行过程相似而所用数据不同的线程,系统的效率,系统资源的利用率将会受到极大的影响。对于这一问题可用类似glib库中的线程池的解决办法。
  
我 们可以这样想像线程池的处理,当有新的数据要交给线程处理时,主程序/主线程 就从线程池中找到一个未被使用的线程处理这新来的数据,如果线程池中没有找到可用的空闲线程,就新创建一个线程来处理这个数据,并在处理完后不销毁它而是 把这个线程放到线程池中,以备后用。线程池的这个原理和内存管理中slab机制有异曲同工之妙!我想无论是线程池的这种处理方式还是slab机制,其本质 思想还是一致的。
近来做的项目,在框架中用到了多线程的异步队列,实现形式和glib中的异步队列极其相似,而glib线程池中的代码也用到了异步队列(名字用同步队列更合适),因此就先分析一下异步队列。
异 步队列的概念是这样的:所有的数据组织成队列,供多线程并发访问,而这些并发控制全部在异步队列里面实现,对外面只提供读写接口;当队列中的数据为空时, 如果是读线程访问异步队列,那么这一读线程就等待,直到有数据为止;写线程向队列放数据时,如果有线程在等待数据就唤醒等待线程。
异步队列主要代码剖析:
异步队列的数据结构如下:
struct _GAsyncQueue
{
GMutex *mutex; //互斥变量
GCond *cond; //等待条件
GQueue *queue; //数据队列
guint waiting_threads; //等待的读线程个数
gint32 ref_count;
};
g_async_queue_push_unlocked (GAsyncQueue* queue, gpointer data)
{
………//这些点代表一些省略的代码
//把数据放入队列
g_queue_push_head (queue->queue, data);
//现在队列已经有数据了,判断是否有读线程在等待数据,
//如果有就发送信号唤醒读线程
if (queue->waiting_threads > 0)
    g_cond_signal (queue->cond);
}
g_async_queue_push (GAsyncQueue* queue, gpointer data)
{
……..
g_mutex_lock (queue->mutex); //在访问临界区前先获得互斥变量
g_async_queue_push_unlocked (queue, data); //执行写数据操作
g_mutex_unlock (queue->mutex); //释放互斥变量,以使其它线程可以进入临界区
}
从以上的接口可看出,”…._ unlocked” 这样的接口就是异步队列这个对象已获得互斥变量的接口,glib中线程处理相关接口都有类似的命名规则,在接下来的代码分析中,如没有特别的需要就只看”…._ unlocked” 这样的接口。
// 读线程从异步队列中获取数据的接口
// try参数和时间参数在多线程同步/内核多进程的实现中是很常见的东西了,在这里就不再作特殊的解释了。
g_async_queue_pop_intern_unlocked (GAsyncQueue *queue,
                               gboolean     try,
                               GTimeVal    *end_time)
{
gpointer retval;
//判断是否有数据在队列中,如果没有就要执行if语句相应的睡眠等待,直到被写进程唤醒
if (!g_queue_peek_tail_link (queue->queue))
    {
      if (try)//如果try为真,则永远不睡眠
       return NULL;
     
    // 接下来是要让线程进行睡眠等待了,在等待之前先确保等待条件已创建
      if (!queue->cond)
       queue->cond = g_cond_new ();
      if (!end_time) // 等待无时间限制
        {
          queue->waiting_threads++; // 等待线程数加一
      // 这里为什么用循环?因为这是多线程的环境,有可能有多个读线程在等待
      // 当前线程被唤醒时,有可能数据队列中的数据又被别的线程读走了,所以
      // 当前线程就得继续睡眠等待
      // 注意:睡眠等待时会暂时放弃互斥锁,被唤醒时会重新获取互斥锁
       while (!g_queue_peek_tail_link (queue->queue))
            g_cond_wait (queue->cond, queue->mutex);
          queue->waiting_threads--; // 等待线程数减一
        }
      else
        {
          queue->waiting_threads++;
          while (!g_queue_peek_tail_link (queue->queue))
           if (!g_cond_timed_wait (queue->cond, queue->mutex, end_time))
              break;
          queue->waiting_threads--;
          if (!g_queue_peek_tail_link (queue->queue))
           return NULL;
        }
    }
retval = g_queue_pop_tail (queue->queue);
g_assert (retval);
return retval;
}
/* 返回数据队列的长度,也即数据队列中的数据个数.
* 如果是负值表明是等待数据的线程个数,正数表示数据队列的数据个数
* g_async_queue_length == 0 表示是有 'n' 个数据和' n' 个等待线程在数据队列
* 这种特殊情况可能是在对数据队列加锁或调度时发生
*/
g_async_queue_length_unlocked (GAsyncQueue* queue)
{
g_return_val_if_fail (queue, 0);
g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
return queue->queue->length - queue->waiting_threads;
}
  
有 了前面的异步队列基础就可以分析线程池是怎么实现的了。在glib库中的线程池的实现和使用有两种方式:1. 单个线程池对象不共享方式;2. 多个线程池对象共享线程方式,也即把各个具体的线程池对象创建的把任务做完了的线程统一放在全局线程池中进行统一管理,各个具体的线程池对象要使用线程 时,可以先向全局线程池中取线程,如果全局线程池没有线程了具体的线程池对象就可自行创建线程。
       线程池的数据结构有两部分,一部分是在头文件中,另一部分在C文件中。这是C语言中常用的信息隐藏方法之一,把要暴露给用户的数据放在头文件中,而要隐藏的数据则放在C文件中。下面是线程池头文件中的数据结构:
typedef struct _GThreadPool     GThreadPool;
struct _GThreadPool
{
   // 具体处理数据的函数
   // 它的第一个参数为g_thread_pool_push进去的数据,也即要执行的任务
GFunc func;
gpointer user_data; // func的第二个参数
// 通过这个成员控制线程池对象创建的线程是否在全局线程池中共享,
// TRUE为不共享,FALSE为共享
gboolean exclusive;
};
C文件中线程池的数据结构:
typedef struct _GRealThreadPool GRealThreadPool;
struct _GRealThreadPool
{
GThreadPool pool; // 头文件已定义
GAsyncQueue* queue; // 异步数据队列
GCond* cond;
gint max_threads; // 线程池对象持有的线程数上限
gint num_threads;// 池程池对象当前持有的线程数
gboolean running;
gboolean immediate;
gboolean waiting;
GCompareDataFunc sort_func;
gpointer sort_user_data;
};
我们可以先来分析单个线程对象不共享的主要实现。在分析它的实现之前,可以先看看一个流程图
从上图可见当主线程有数据交给线程池处理时,只要调用异步队列相关的push接口,线程池中的任何一个线程都可以为这服务。根据以上的流程图看看单个线程对象不共享方式的主要实现代码,它的调用从创建线程池对象开始:
g_thread_pool_new---> g_thread_pool_start_thread---> g_thread_create(g_thread_pool_thread_proxy,pool,FALSE,&local_error)--->> g_thread_pool_wait_for_new_task(pool ---->  g_async_queue_pop_unlocked (pool->queue);
// max_threads为 -1 时表示线程池中的线程数无限制并且线程由动态生成
// max_threads为正整数时,线程池就会预先创建max_threads个线程
g_thread_pool_new (GFunc             func,
                 gpointer         user_data,
                 gint             max_threads,
                 gboolean         exclusive,
                 GError         **error)
{
GRealThreadPool *retval;
    ……………. //这些点代表一些省略的代码
retval = g_new (GRealThreadPool, 1);
retval->pool.func = func;
retval->pool.user_data = user_data;
retval->pool.exclusive = exclusive;
retval->queue = g_async_queue_new (); // 创建异步队列
retval->cond = NULL;
retval->max_threads = max_threads;
retval->num_threads = 0;
retval->running = TRUE;
    …………….
if (retval->pool.exclusive)
{
      g_async_queue_lock (retval->queue);
      while (retval->num_threads < retval->max_threads)
          {
             GError *local_error = NULL;
             g_thread_pool_start_thread (retval, &local_error);//起动新的线程
             …………….
       }
      g_async_queue_unlock (retval->queue);
}
return (GThreadPool*) retval;
}
g_thread_pool_start_thread (GRealThreadPool *pool,
                         GError          **error)
{
gboolean success = FALSE;
if (pool->num_threads >= pool->max_threads && pool->max_threads != -1)
    /* Enough threads are already running */
    return;
…………….
if (!success)
{
      GError *local_error = NULL;
      /* No thread was found, we have to start a new one */
      // 真正创建一个新的线程
      g_thread_create (g_thread_pool_thread_proxy, pool, FALSE, &local_error);
      ……………….
}
pool->num_threads++;
}
g_thread_pool_thread_proxy (gpointer data)
{
GRealThreadPool *pool;
pool = data;
……………..
g_async_queue_lock (pool->queue);
while (TRUE)
{
      gpointer task;
      // 线程等待任务,也即等待数据,线程在等待就是处在线程池中的空闲线程
      task = g_thread_pool_wait_for_new_task (pool);
      // 如果线程被唤醒收到并数据就用此线程执行任务,否则继续循环等待
      // 注意:当任务做完时,继续循环又会调用上面的g_thread_pool_wait_for_new_task
      // 而进入等待状态,
if (task)
       {
             if (pool->running || !pool->immediate)
              {
                /* A task was received and the thread pool is active, so
              * execute the function.
              */
                g_async_queue_unlock (pool->queue);
                pool->pool.func (task, pool->pool.user_data);
                g_async_queue_lock (pool->queue);
           }
       }
      else
       {
            ………………
      }
}
return NULL;
}
g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
{
gpointer task = NULL;
if (pool->running || (!pool->immediate &&
                     g_async_queue_length_unlocked (pool->queue) > 0))
{
      /* This thread pool is still active. */
      if (pool->num_threads > pool->max_threads && pool->max_threads != -1)
       {
           …………..
       }
     else if (pool->pool.exclusive)
       {
           /* Exclusive threads stay attached to the pool. */
        // 调用异步队列的pop接口进入等待状态,到此一个线程的创建过程就完成了
           task = g_async_queue_pop_unlocked (pool->queue);
       }
      else
       {
           ………….
       }
}
else
{
     …………
}
return task;
}

现在可以结合流程图分析线程池中创建一个线程的一个情景:从函数g_thread_pool_new的while循环调用了 g_thread_pool_start_thread函数,在函数中直接调用g_thread_create创建线程,被创建的线程调用函数 g_thread_pool_wait_for_new_task循环等待任务的到来,函数 g_thread_pool_wait_for_new_task调用g_async_queue_pop_unlocked (pool->queue)真正进入等待。如此可知,最终新创建的线程是调用异步队列的pop接口进入等待状态的,这样一个线程的创建就大功告成 了。而函数g_thread_pool_new的while循环结束时就创建了max_threads个等待线程,也即这个新建的线程池对象有了 max_threads个线程以备使用。

       创建线程池、线程池中的线程是为了使用它,在线程池中取线程,叫线程干活的过程就很简单多了,这个调用过程:g_thread_pool_push--à g_thread_pool_queue_push_unlocked--à g_async_queue_push_unlocked。可见最终调用的是异步数据队列的push接口,把要处理的数据插入队列后它就会唤醒等待异步队列数据的等待线程。

g_thread_pool_push (GThreadPool *pool,
gpointer      data,
GError      **error)
{
……………
//
if (g_async_queue_length_unlocked (real->queue) >= 0)
/* No thread is waiting in the queue */
g_thread_pool_start_thread (real, error);
g_thread_pool_queue_push_unlocked (real, data);
g_async_queue_unlock (real->queue);
}
g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
gpointer         data)
{
………….
g_async_queue_push_unlocked (pool->queue, data);
}

    总结:单个线程池对象不共享方式在管理多线程时是以线程池对象中的异步队列为中心,新创建的线程或做完任务的线程并不释放,让它调用异步队列的pop接口进入等待状态,而在使用唤醒线程池中的线程就是调用异步队列的push接口。

    以上对于理解线程池的实现已经足够,多个线程池对象共享线程方式和具体线程池的销毁的技巧,在这里就不讨论了。

这篇关于glib库异步队列和线程池代码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/615168

相关文章

MySQL 内存使用率常用分析语句

《MySQL内存使用率常用分析语句》用户整理了MySQL内存占用过高的分析方法,涵盖操作系统层确认及数据库层bufferpool、内存模块差值、线程状态、performance_schema性能数据... 目录一、 OS层二、 DB层1. 全局情况2. 内存占js用详情最近连续遇到mysql内存占用过高导致

深度解析Nginx日志分析与499状态码问题解决

《深度解析Nginx日志分析与499状态码问题解决》在Web服务器运维和性能优化过程中,Nginx日志是排查问题的重要依据,本文将围绕Nginx日志分析、499状态码的成因、排查方法及解决方案展开讨论... 目录前言1. Nginx日志基础1.1 Nginx日志存放位置1.2 Nginx日志格式2. 499

如何在Java Spring实现异步执行(详细篇)

《如何在JavaSpring实现异步执行(详细篇)》Spring框架通过@Async、Executor等实现异步执行,提升系统性能与响应速度,支持自定义线程池管理并发,本文给大家介绍如何在Sprin... 目录前言1. 使用 @Async 实现异步执行1.1 启用异步执行支持1.2 创建异步方法1.3 调用

Olingo分析和实践之EDM 辅助序列化器详解(最佳实践)

《Olingo分析和实践之EDM辅助序列化器详解(最佳实践)》EDM辅助序列化器是ApacheOlingoOData框架中无需完整EDM模型的智能序列化工具,通过运行时类型推断实现灵活数据转换,适用... 目录概念与定义什么是 EDM 辅助序列化器?核心概念设计目标核心特点1. EDM 信息可选2. 智能类

Olingo分析和实践之OData框架核心组件初始化(关键步骤)

《Olingo分析和实践之OData框架核心组件初始化(关键步骤)》ODataSpringBootService通过初始化OData实例和服务元数据,构建框架核心能力与数据模型结构,实现序列化、URI... 目录概述第一步:OData实例创建1.1 OData.newInstance() 详细分析1.1.1

Java中的xxl-job调度器线程池工作机制

《Java中的xxl-job调度器线程池工作机制》xxl-job通过快慢线程池分离短时与长时任务,动态降级超时任务至慢池,结合异步触发和资源隔离机制,提升高频调度的性能与稳定性,支撑高并发场景下的可靠... 目录⚙️ 一、调度器线程池的核心设计 二、线程池的工作流程 三、线程池配置参数与优化 四、总结:线程

Olingo分析和实践之ODataImpl详细分析(重要方法详解)

《Olingo分析和实践之ODataImpl详细分析(重要方法详解)》ODataImpl.java是ApacheOlingoOData框架的核心工厂类,负责创建序列化器、反序列化器和处理器等组件,... 目录概述主要职责类结构与继承关系核心功能分析1. 序列化器管理2. 反序列化器管理3. 处理器管理重要方

WinForm跨线程访问UI及UI卡死的解决方案

《WinForm跨线程访问UI及UI卡死的解决方案》在WinForm开发过程中,跨线程访问UI控件和界面卡死是常见的技术难题,由于Windows窗体应用程序的UI控件默认只能在主线程(UI线程)上操作... 目录前言正文案例1:直接线程操作(无UI访问)案例2:BeginInvoke访问UI(错误用法)案例

Python实现MQTT通信的示例代码

《Python实现MQTT通信的示例代码》本文主要介绍了Python实现MQTT通信的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录1. 安装paho-mqtt库‌2. 搭建MQTT代理服务器(Broker)‌‌3. pytho

MySQL进行数据库审计的详细步骤和示例代码

《MySQL进行数据库审计的详细步骤和示例代码》数据库审计通过触发器、内置功能及第三方工具记录和监控数据库活动,确保安全、完整与合规,Java代码实现自动化日志记录,整合分析系统提升监控效率,本文给大... 目录一、数据库审计的基本概念二、使用触发器进行数据库审计1. 创建审计表2. 创建触发器三、Java