glib库异步队列和线程池代码分析

2024-01-17 06:48

本文主要是介绍glib库异步队列和线程池代码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文章主要讲了两部分内容:一是分析了异步队列的原理和实现,二是分析线程池的原理和实现。
在多线程程序的运行中,如果经常地创建和销毁执行过程相似而所用数据不同的线程,系统的效率,系统资源的利用率将会受到极大的影响。对于这一问题可用类似glib库中的线程池的解决办法。
  
我 们可以这样想像线程池的处理,当有新的数据要交给线程处理时,主程序/主线程 就从线程池中找到一个未被使用的线程处理这新来的数据,如果线程池中没有找到可用的空闲线程,就新创建一个线程来处理这个数据,并在处理完后不销毁它而是 把这个线程放到线程池中,以备后用。线程池的这个原理和内存管理中slab机制有异曲同工之妙!我想无论是线程池的这种处理方式还是slab机制,其本质 思想还是一致的。
近来做的项目,在框架中用到了多线程的异步队列,实现形式和glib中的异步队列极其相似,而glib线程池中的代码也用到了异步队列(名字用同步队列更合适),因此就先分析一下异步队列。
异 步队列的概念是这样的:所有的数据组织成队列,供多线程并发访问,而这些并发控制全部在异步队列里面实现,对外面只提供读写接口;当队列中的数据为空时, 如果是读线程访问异步队列,那么这一读线程就等待,直到有数据为止;写线程向队列放数据时,如果有线程在等待数据就唤醒等待线程。
异步队列主要代码剖析:
异步队列的数据结构如下:
struct _GAsyncQueue
{
GMutex *mutex; //互斥变量
GCond *cond; //等待条件
GQueue *queue; //数据队列
guint waiting_threads; //等待的读线程个数
gint32 ref_count;
};
g_async_queue_push_unlocked (GAsyncQueue* queue, gpointer data)
{
………//这些点代表一些省略的代码
//把数据放入队列
g_queue_push_head (queue->queue, data);
//现在队列已经有数据了,判断是否有读线程在等待数据,
//如果有就发送信号唤醒读线程
if (queue->waiting_threads > 0)
    g_cond_signal (queue->cond);
}
g_async_queue_push (GAsyncQueue* queue, gpointer data)
{
……..
g_mutex_lock (queue->mutex); //在访问临界区前先获得互斥变量
g_async_queue_push_unlocked (queue, data); //执行写数据操作
g_mutex_unlock (queue->mutex); //释放互斥变量,以使其它线程可以进入临界区
}
从以上的接口可看出,”…._ unlocked” 这样的接口就是异步队列这个对象已获得互斥变量的接口,glib中线程处理相关接口都有类似的命名规则,在接下来的代码分析中,如没有特别的需要就只看”…._ unlocked” 这样的接口。
// 读线程从异步队列中获取数据的接口
// try参数和时间参数在多线程同步/内核多进程的实现中是很常见的东西了,在这里就不再作特殊的解释了。
g_async_queue_pop_intern_unlocked (GAsyncQueue *queue,
                               gboolean     try,
                               GTimeVal    *end_time)
{
gpointer retval;
//判断是否有数据在队列中,如果没有就要执行if语句相应的睡眠等待,直到被写进程唤醒
if (!g_queue_peek_tail_link (queue->queue))
    {
      if (try)//如果try为真,则永远不睡眠
       return NULL;
     
    // 接下来是要让线程进行睡眠等待了,在等待之前先确保等待条件已创建
      if (!queue->cond)
       queue->cond = g_cond_new ();
      if (!end_time) // 等待无时间限制
        {
          queue->waiting_threads++; // 等待线程数加一
      // 这里为什么用循环?因为这是多线程的环境,有可能有多个读线程在等待
      // 当前线程被唤醒时,有可能数据队列中的数据又被别的线程读走了,所以
      // 当前线程就得继续睡眠等待
      // 注意:睡眠等待时会暂时放弃互斥锁,被唤醒时会重新获取互斥锁
       while (!g_queue_peek_tail_link (queue->queue))
            g_cond_wait (queue->cond, queue->mutex);
          queue->waiting_threads--; // 等待线程数减一
        }
      else
        {
          queue->waiting_threads++;
          while (!g_queue_peek_tail_link (queue->queue))
           if (!g_cond_timed_wait (queue->cond, queue->mutex, end_time))
              break;
          queue->waiting_threads--;
          if (!g_queue_peek_tail_link (queue->queue))
           return NULL;
        }
    }
retval = g_queue_pop_tail (queue->queue);
g_assert (retval);
return retval;
}
/* 返回数据队列的长度,也即数据队列中的数据个数.
* 如果是负值表明是等待数据的线程个数,正数表示数据队列的数据个数
* g_async_queue_length == 0 表示是有 'n' 个数据和' n' 个等待线程在数据队列
* 这种特殊情况可能是在对数据队列加锁或调度时发生
*/
g_async_queue_length_unlocked (GAsyncQueue* queue)
{
g_return_val_if_fail (queue, 0);
g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
return queue->queue->length - queue->waiting_threads;
}
  
有 了前面的异步队列基础就可以分析线程池是怎么实现的了。在glib库中的线程池的实现和使用有两种方式:1. 单个线程池对象不共享方式;2. 多个线程池对象共享线程方式,也即把各个具体的线程池对象创建的把任务做完了的线程统一放在全局线程池中进行统一管理,各个具体的线程池对象要使用线程 时,可以先向全局线程池中取线程,如果全局线程池没有线程了具体的线程池对象就可自行创建线程。
       线程池的数据结构有两部分,一部分是在头文件中,另一部分在C文件中。这是C语言中常用的信息隐藏方法之一,把要暴露给用户的数据放在头文件中,而要隐藏的数据则放在C文件中。下面是线程池头文件中的数据结构:
typedef struct _GThreadPool     GThreadPool;
struct _GThreadPool
{
   // 具体处理数据的函数
   // 它的第一个参数为g_thread_pool_push进去的数据,也即要执行的任务
GFunc func;
gpointer user_data; // func的第二个参数
// 通过这个成员控制线程池对象创建的线程是否在全局线程池中共享,
// TRUE为不共享,FALSE为共享
gboolean exclusive;
};
C文件中线程池的数据结构:
typedef struct _GRealThreadPool GRealThreadPool;
struct _GRealThreadPool
{
GThreadPool pool; // 头文件已定义
GAsyncQueue* queue; // 异步数据队列
GCond* cond;
gint max_threads; // 线程池对象持有的线程数上限
gint num_threads;// 池程池对象当前持有的线程数
gboolean running;
gboolean immediate;
gboolean waiting;
GCompareDataFunc sort_func;
gpointer sort_user_data;
};
我们可以先来分析单个线程对象不共享的主要实现。在分析它的实现之前,可以先看看一个流程图
从上图可见当主线程有数据交给线程池处理时,只要调用异步队列相关的push接口,线程池中的任何一个线程都可以为这服务。根据以上的流程图看看单个线程对象不共享方式的主要实现代码,它的调用从创建线程池对象开始:
g_thread_pool_new---> g_thread_pool_start_thread---> g_thread_create(g_thread_pool_thread_proxy,pool,FALSE,&local_error)--->> g_thread_pool_wait_for_new_task(pool ---->  g_async_queue_pop_unlocked (pool->queue);
// max_threads为 -1 时表示线程池中的线程数无限制并且线程由动态生成
// max_threads为正整数时,线程池就会预先创建max_threads个线程
g_thread_pool_new (GFunc             func,
                 gpointer         user_data,
                 gint             max_threads,
                 gboolean         exclusive,
                 GError         **error)
{
GRealThreadPool *retval;
    ……………. //这些点代表一些省略的代码
retval = g_new (GRealThreadPool, 1);
retval->pool.func = func;
retval->pool.user_data = user_data;
retval->pool.exclusive = exclusive;
retval->queue = g_async_queue_new (); // 创建异步队列
retval->cond = NULL;
retval->max_threads = max_threads;
retval->num_threads = 0;
retval->running = TRUE;
    …………….
if (retval->pool.exclusive)
{
      g_async_queue_lock (retval->queue);
      while (retval->num_threads < retval->max_threads)
          {
             GError *local_error = NULL;
             g_thread_pool_start_thread (retval, &local_error);//起动新的线程
             …………….
       }
      g_async_queue_unlock (retval->queue);
}
return (GThreadPool*) retval;
}
g_thread_pool_start_thread (GRealThreadPool *pool,
                         GError          **error)
{
gboolean success = FALSE;
if (pool->num_threads >= pool->max_threads && pool->max_threads != -1)
    /* Enough threads are already running */
    return;
…………….
if (!success)
{
      GError *local_error = NULL;
      /* No thread was found, we have to start a new one */
      // 真正创建一个新的线程
      g_thread_create (g_thread_pool_thread_proxy, pool, FALSE, &local_error);
      ……………….
}
pool->num_threads++;
}
g_thread_pool_thread_proxy (gpointer data)
{
GRealThreadPool *pool;
pool = data;
……………..
g_async_queue_lock (pool->queue);
while (TRUE)
{
      gpointer task;
      // 线程等待任务,也即等待数据,线程在等待就是处在线程池中的空闲线程
      task = g_thread_pool_wait_for_new_task (pool);
      // 如果线程被唤醒收到并数据就用此线程执行任务,否则继续循环等待
      // 注意:当任务做完时,继续循环又会调用上面的g_thread_pool_wait_for_new_task
      // 而进入等待状态,
if (task)
       {
             if (pool->running || !pool->immediate)
              {
                /* A task was received and the thread pool is active, so
              * execute the function.
              */
                g_async_queue_unlock (pool->queue);
                pool->pool.func (task, pool->pool.user_data);
                g_async_queue_lock (pool->queue);
           }
       }
      else
       {
            ………………
      }
}
return NULL;
}
g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
{
gpointer task = NULL;
if (pool->running || (!pool->immediate &&
                     g_async_queue_length_unlocked (pool->queue) > 0))
{
      /* This thread pool is still active. */
      if (pool->num_threads > pool->max_threads && pool->max_threads != -1)
       {
           …………..
       }
     else if (pool->pool.exclusive)
       {
           /* Exclusive threads stay attached to the pool. */
        // 调用异步队列的pop接口进入等待状态,到此一个线程的创建过程就完成了
           task = g_async_queue_pop_unlocked (pool->queue);
       }
      else
       {
           ………….
       }
}
else
{
     …………
}
return task;
}

现在可以结合流程图分析线程池中创建一个线程的一个情景:从函数g_thread_pool_new的while循环调用了 g_thread_pool_start_thread函数,在函数中直接调用g_thread_create创建线程,被创建的线程调用函数 g_thread_pool_wait_for_new_task循环等待任务的到来,函数 g_thread_pool_wait_for_new_task调用g_async_queue_pop_unlocked (pool->queue)真正进入等待。如此可知,最终新创建的线程是调用异步队列的pop接口进入等待状态的,这样一个线程的创建就大功告成 了。而函数g_thread_pool_new的while循环结束时就创建了max_threads个等待线程,也即这个新建的线程池对象有了 max_threads个线程以备使用。

       创建线程池、线程池中的线程是为了使用它,在线程池中取线程,叫线程干活的过程就很简单多了,这个调用过程:g_thread_pool_push--à g_thread_pool_queue_push_unlocked--à g_async_queue_push_unlocked。可见最终调用的是异步数据队列的push接口,把要处理的数据插入队列后它就会唤醒等待异步队列数据的等待线程。

g_thread_pool_push (GThreadPool *pool,
gpointer      data,
GError      **error)
{
……………
//
if (g_async_queue_length_unlocked (real->queue) >= 0)
/* No thread is waiting in the queue */
g_thread_pool_start_thread (real, error);
g_thread_pool_queue_push_unlocked (real, data);
g_async_queue_unlock (real->queue);
}
g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
gpointer         data)
{
………….
g_async_queue_push_unlocked (pool->queue, data);
}

    总结:单个线程池对象不共享方式在管理多线程时是以线程池对象中的异步队列为中心,新创建的线程或做完任务的线程并不释放,让它调用异步队列的pop接口进入等待状态,而在使用唤醒线程池中的线程就是调用异步队列的push接口。

    以上对于理解线程池的实现已经足够,多个线程池对象共享线程方式和具体线程池的销毁的技巧,在这里就不讨论了。

这篇关于glib库异步队列和线程池代码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/615168

相关文章

github打不开的问题分析及解决

《github打不开的问题分析及解决》:本文主要介绍github打不开的问题分析及解决,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、找到github.com域名解析的ip地址二、找到github.global.ssl.fastly.net网址解析的ip地址三

Linux实现线程同步的多种方式汇总

《Linux实现线程同步的多种方式汇总》本文详细介绍了Linux下线程同步的多种方法,包括互斥锁、自旋锁、信号量以及它们的使用示例,通过这些同步机制,可以解决线程安全问题,防止资源竞争导致的错误,示例... 目录什么是线程同步?一、互斥锁(单人洗手间规则)适用场景:特点:二、条件变量(咖啡厅取餐系统)工作流

Java中常见队列举例详解(非线程安全)

《Java中常见队列举例详解(非线程安全)》队列用于模拟队列这种数据结构,队列通常是指先进先出的容器,:本文主要介绍Java中常见队列(非线程安全)的相关资料,文中通过代码介绍的非常详细,需要的朋... 目录一.队列定义 二.常见接口 三.常见实现类3.1 ArrayDeque3.1.1 实现原理3.1.2

SpringBoot3中使用虚拟线程的完整步骤

《SpringBoot3中使用虚拟线程的完整步骤》在SpringBoot3中使用Java21+的虚拟线程(VirtualThreads)可以显著提升I/O密集型应用的并发能力,这篇文章为大家介绍了详细... 目录1. 环境准备2. 配置虚拟线程方式一:全局启用虚拟线程(Tomcat/Jetty)方式二:异步

Mysql的主从同步/复制的原理分析

《Mysql的主从同步/复制的原理分析》:本文主要介绍Mysql的主从同步/复制的原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录为什么要主从同步?mysql主从同步架构有哪些?Mysql主从复制的原理/整体流程级联复制架构为什么好?Mysql主从复制注意

深入解析 Java Future 类及代码示例

《深入解析JavaFuture类及代码示例》JavaFuture是java.util.concurrent包中用于表示异步计算结果的核心接口,下面给大家介绍JavaFuture类及实例代码,感兴... 目录一、Future 类概述二、核心工作机制代码示例执行流程2. 状态机模型3. 核心方法解析行为总结:三

python获取cmd环境变量值的实现代码

《python获取cmd环境变量值的实现代码》:本文主要介绍在Python中获取命令行(cmd)环境变量的值,可以使用标准库中的os模块,需要的朋友可以参考下... 前言全局说明在执行py过程中,总要使用到系统环境变量一、说明1.1 环境:Windows 11 家庭版 24H2 26100.4061

如何解决Druid线程池Cause:java.sql.SQLRecoverableException:IO错误:Socket read timed out的问题

《如何解决Druid线程池Cause:java.sql.SQLRecoverableException:IO错误:Socketreadtimedout的问题》:本文主要介绍解决Druid线程... 目录异常信息触发场景找到版本发布更新的说明从版本更新信息可以看到该默认逻辑已经去除总结异常信息触发场景复

pandas实现数据concat拼接的示例代码

《pandas实现数据concat拼接的示例代码》pandas.concat用于合并DataFrame或Series,本文主要介绍了pandas实现数据concat拼接的示例代码,具有一定的参考价值,... 目录语法示例:使用pandas.concat合并数据默认的concat:参数axis=0,join=

java -jar命令运行 jar包时运行外部依赖jar包的场景分析

《java-jar命令运行jar包时运行外部依赖jar包的场景分析》:本文主要介绍java-jar命令运行jar包时运行外部依赖jar包的场景分析,本文给大家介绍的非常详细,对大家的学习或工作... 目录Java -jar命令运行 jar包时如何运行外部依赖jar包场景:解决:方法一、启动参数添加: -Xb