glib库异步队列和线程池代码分析

2024-01-17 06:48

本文主要是介绍glib库异步队列和线程池代码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文章主要讲了两部分内容:一是分析了异步队列的原理和实现,二是分析线程池的原理和实现。
在多线程程序的运行中,如果经常地创建和销毁执行过程相似而所用数据不同的线程,系统的效率,系统资源的利用率将会受到极大的影响。对于这一问题可用类似glib库中的线程池的解决办法。
  
我 们可以这样想像线程池的处理,当有新的数据要交给线程处理时,主程序/主线程 就从线程池中找到一个未被使用的线程处理这新来的数据,如果线程池中没有找到可用的空闲线程,就新创建一个线程来处理这个数据,并在处理完后不销毁它而是 把这个线程放到线程池中,以备后用。线程池的这个原理和内存管理中slab机制有异曲同工之妙!我想无论是线程池的这种处理方式还是slab机制,其本质 思想还是一致的。
近来做的项目,在框架中用到了多线程的异步队列,实现形式和glib中的异步队列极其相似,而glib线程池中的代码也用到了异步队列(名字用同步队列更合适),因此就先分析一下异步队列。
异 步队列的概念是这样的:所有的数据组织成队列,供多线程并发访问,而这些并发控制全部在异步队列里面实现,对外面只提供读写接口;当队列中的数据为空时, 如果是读线程访问异步队列,那么这一读线程就等待,直到有数据为止;写线程向队列放数据时,如果有线程在等待数据就唤醒等待线程。
异步队列主要代码剖析:
异步队列的数据结构如下:
struct _GAsyncQueue
{
GMutex *mutex; //互斥变量
GCond *cond; //等待条件
GQueue *queue; //数据队列
guint waiting_threads; //等待的读线程个数
gint32 ref_count;
};
g_async_queue_push_unlocked (GAsyncQueue* queue, gpointer data)
{
………//这些点代表一些省略的代码
//把数据放入队列
g_queue_push_head (queue->queue, data);
//现在队列已经有数据了,判断是否有读线程在等待数据,
//如果有就发送信号唤醒读线程
if (queue->waiting_threads > 0)
    g_cond_signal (queue->cond);
}
g_async_queue_push (GAsyncQueue* queue, gpointer data)
{
……..
g_mutex_lock (queue->mutex); //在访问临界区前先获得互斥变量
g_async_queue_push_unlocked (queue, data); //执行写数据操作
g_mutex_unlock (queue->mutex); //释放互斥变量,以使其它线程可以进入临界区
}
从以上的接口可看出,”…._ unlocked” 这样的接口就是异步队列这个对象已获得互斥变量的接口,glib中线程处理相关接口都有类似的命名规则,在接下来的代码分析中,如没有特别的需要就只看”…._ unlocked” 这样的接口。
// 读线程从异步队列中获取数据的接口
// try参数和时间参数在多线程同步/内核多进程的实现中是很常见的东西了,在这里就不再作特殊的解释了。
g_async_queue_pop_intern_unlocked (GAsyncQueue *queue,
                               gboolean     try,
                               GTimeVal    *end_time)
{
gpointer retval;
//判断是否有数据在队列中,如果没有就要执行if语句相应的睡眠等待,直到被写进程唤醒
if (!g_queue_peek_tail_link (queue->queue))
    {
      if (try)//如果try为真,则永远不睡眠
       return NULL;
     
    // 接下来是要让线程进行睡眠等待了,在等待之前先确保等待条件已创建
      if (!queue->cond)
       queue->cond = g_cond_new ();
      if (!end_time) // 等待无时间限制
        {
          queue->waiting_threads++; // 等待线程数加一
      // 这里为什么用循环?因为这是多线程的环境,有可能有多个读线程在等待
      // 当前线程被唤醒时,有可能数据队列中的数据又被别的线程读走了,所以
      // 当前线程就得继续睡眠等待
      // 注意:睡眠等待时会暂时放弃互斥锁,被唤醒时会重新获取互斥锁
       while (!g_queue_peek_tail_link (queue->queue))
            g_cond_wait (queue->cond, queue->mutex);
          queue->waiting_threads--; // 等待线程数减一
        }
      else
        {
          queue->waiting_threads++;
          while (!g_queue_peek_tail_link (queue->queue))
           if (!g_cond_timed_wait (queue->cond, queue->mutex, end_time))
              break;
          queue->waiting_threads--;
          if (!g_queue_peek_tail_link (queue->queue))
           return NULL;
        }
    }
retval = g_queue_pop_tail (queue->queue);
g_assert (retval);
return retval;
}
/* 返回数据队列的长度,也即数据队列中的数据个数.
* 如果是负值表明是等待数据的线程个数,正数表示数据队列的数据个数
* g_async_queue_length == 0 表示是有 'n' 个数据和' n' 个等待线程在数据队列
* 这种特殊情况可能是在对数据队列加锁或调度时发生
*/
g_async_queue_length_unlocked (GAsyncQueue* queue)
{
g_return_val_if_fail (queue, 0);
g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
return queue->queue->length - queue->waiting_threads;
}
  
有 了前面的异步队列基础就可以分析线程池是怎么实现的了。在glib库中的线程池的实现和使用有两种方式:1. 单个线程池对象不共享方式;2. 多个线程池对象共享线程方式,也即把各个具体的线程池对象创建的把任务做完了的线程统一放在全局线程池中进行统一管理,各个具体的线程池对象要使用线程 时,可以先向全局线程池中取线程,如果全局线程池没有线程了具体的线程池对象就可自行创建线程。
       线程池的数据结构有两部分,一部分是在头文件中,另一部分在C文件中。这是C语言中常用的信息隐藏方法之一,把要暴露给用户的数据放在头文件中,而要隐藏的数据则放在C文件中。下面是线程池头文件中的数据结构:
typedef struct _GThreadPool     GThreadPool;
struct _GThreadPool
{
   // 具体处理数据的函数
   // 它的第一个参数为g_thread_pool_push进去的数据,也即要执行的任务
GFunc func;
gpointer user_data; // func的第二个参数
// 通过这个成员控制线程池对象创建的线程是否在全局线程池中共享,
// TRUE为不共享,FALSE为共享
gboolean exclusive;
};
C文件中线程池的数据结构:
typedef struct _GRealThreadPool GRealThreadPool;
struct _GRealThreadPool
{
GThreadPool pool; // 头文件已定义
GAsyncQueue* queue; // 异步数据队列
GCond* cond;
gint max_threads; // 线程池对象持有的线程数上限
gint num_threads;// 池程池对象当前持有的线程数
gboolean running;
gboolean immediate;
gboolean waiting;
GCompareDataFunc sort_func;
gpointer sort_user_data;
};
我们可以先来分析单个线程对象不共享的主要实现。在分析它的实现之前,可以先看看一个流程图
从上图可见当主线程有数据交给线程池处理时,只要调用异步队列相关的push接口,线程池中的任何一个线程都可以为这服务。根据以上的流程图看看单个线程对象不共享方式的主要实现代码,它的调用从创建线程池对象开始:
g_thread_pool_new---> g_thread_pool_start_thread---> g_thread_create(g_thread_pool_thread_proxy,pool,FALSE,&local_error)--->> g_thread_pool_wait_for_new_task(pool ---->  g_async_queue_pop_unlocked (pool->queue);
// max_threads为 -1 时表示线程池中的线程数无限制并且线程由动态生成
// max_threads为正整数时,线程池就会预先创建max_threads个线程
g_thread_pool_new (GFunc             func,
                 gpointer         user_data,
                 gint             max_threads,
                 gboolean         exclusive,
                 GError         **error)
{
GRealThreadPool *retval;
    ……………. //这些点代表一些省略的代码
retval = g_new (GRealThreadPool, 1);
retval->pool.func = func;
retval->pool.user_data = user_data;
retval->pool.exclusive = exclusive;
retval->queue = g_async_queue_new (); // 创建异步队列
retval->cond = NULL;
retval->max_threads = max_threads;
retval->num_threads = 0;
retval->running = TRUE;
    …………….
if (retval->pool.exclusive)
{
      g_async_queue_lock (retval->queue);
      while (retval->num_threads < retval->max_threads)
          {
             GError *local_error = NULL;
             g_thread_pool_start_thread (retval, &local_error);//起动新的线程
             …………….
       }
      g_async_queue_unlock (retval->queue);
}
return (GThreadPool*) retval;
}
g_thread_pool_start_thread (GRealThreadPool *pool,
                         GError          **error)
{
gboolean success = FALSE;
if (pool->num_threads >= pool->max_threads && pool->max_threads != -1)
    /* Enough threads are already running */
    return;
…………….
if (!success)
{
      GError *local_error = NULL;
      /* No thread was found, we have to start a new one */
      // 真正创建一个新的线程
      g_thread_create (g_thread_pool_thread_proxy, pool, FALSE, &local_error);
      ……………….
}
pool->num_threads++;
}
g_thread_pool_thread_proxy (gpointer data)
{
GRealThreadPool *pool;
pool = data;
……………..
g_async_queue_lock (pool->queue);
while (TRUE)
{
      gpointer task;
      // 线程等待任务,也即等待数据,线程在等待就是处在线程池中的空闲线程
      task = g_thread_pool_wait_for_new_task (pool);
      // 如果线程被唤醒收到并数据就用此线程执行任务,否则继续循环等待
      // 注意:当任务做完时,继续循环又会调用上面的g_thread_pool_wait_for_new_task
      // 而进入等待状态,
if (task)
       {
             if (pool->running || !pool->immediate)
              {
                /* A task was received and the thread pool is active, so
              * execute the function.
              */
                g_async_queue_unlock (pool->queue);
                pool->pool.func (task, pool->pool.user_data);
                g_async_queue_lock (pool->queue);
           }
       }
      else
       {
            ………………
      }
}
return NULL;
}
g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
{
gpointer task = NULL;
if (pool->running || (!pool->immediate &&
                     g_async_queue_length_unlocked (pool->queue) > 0))
{
      /* This thread pool is still active. */
      if (pool->num_threads > pool->max_threads && pool->max_threads != -1)
       {
           …………..
       }
     else if (pool->pool.exclusive)
       {
           /* Exclusive threads stay attached to the pool. */
        // 调用异步队列的pop接口进入等待状态,到此一个线程的创建过程就完成了
           task = g_async_queue_pop_unlocked (pool->queue);
       }
      else
       {
           ………….
       }
}
else
{
     …………
}
return task;
}

现在可以结合流程图分析线程池中创建一个线程的一个情景:从函数g_thread_pool_new的while循环调用了 g_thread_pool_start_thread函数,在函数中直接调用g_thread_create创建线程,被创建的线程调用函数 g_thread_pool_wait_for_new_task循环等待任务的到来,函数 g_thread_pool_wait_for_new_task调用g_async_queue_pop_unlocked (pool->queue)真正进入等待。如此可知,最终新创建的线程是调用异步队列的pop接口进入等待状态的,这样一个线程的创建就大功告成 了。而函数g_thread_pool_new的while循环结束时就创建了max_threads个等待线程,也即这个新建的线程池对象有了 max_threads个线程以备使用。

       创建线程池、线程池中的线程是为了使用它,在线程池中取线程,叫线程干活的过程就很简单多了,这个调用过程:g_thread_pool_push--à g_thread_pool_queue_push_unlocked--à g_async_queue_push_unlocked。可见最终调用的是异步数据队列的push接口,把要处理的数据插入队列后它就会唤醒等待异步队列数据的等待线程。

g_thread_pool_push (GThreadPool *pool,
gpointer      data,
GError      **error)
{
……………
//
if (g_async_queue_length_unlocked (real->queue) >= 0)
/* No thread is waiting in the queue */
g_thread_pool_start_thread (real, error);
g_thread_pool_queue_push_unlocked (real, data);
g_async_queue_unlock (real->queue);
}
g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
gpointer         data)
{
………….
g_async_queue_push_unlocked (pool->queue, data);
}

    总结:单个线程池对象不共享方式在管理多线程时是以线程池对象中的异步队列为中心,新创建的线程或做完任务的线程并不释放,让它调用异步队列的pop接口进入等待状态,而在使用唤醒线程池中的线程就是调用异步队列的push接口。

    以上对于理解线程池的实现已经足够,多个线程池对象共享线程方式和具体线程池的销毁的技巧,在这里就不讨论了。

这篇关于glib库异步队列和线程池代码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/615168

相关文章

JDK21对虚拟线程的几种用法实践指南

《JDK21对虚拟线程的几种用法实践指南》虚拟线程是Java中的一种轻量级线程,由JVM管理,特别适合于I/O密集型任务,:本文主要介绍JDK21对虚拟线程的几种用法,文中通过代码介绍的非常详细,... 目录一、参考官方文档二、什么是虚拟线程三、几种用法1、Thread.ofVirtual().start(

Java 虚拟线程的创建与使用深度解析

《Java虚拟线程的创建与使用深度解析》虚拟线程是Java19中以预览特性形式引入,Java21起正式发布的轻量级线程,本文给大家介绍Java虚拟线程的创建与使用,感兴趣的朋友一起看看吧... 目录一、虚拟线程简介1.1 什么是虚拟线程?1.2 为什么需要虚拟线程?二、虚拟线程与平台线程对比代码对比示例:三

Nginx分布式部署流程分析

《Nginx分布式部署流程分析》文章介绍Nginx在分布式部署中的反向代理和负载均衡作用,用于分发请求、减轻服务器压力及解决session共享问题,涵盖配置方法、策略及Java项目应用,并提及分布式事... 目录分布式部署NginxJava中的代理代理分为正向代理和反向代理正向代理反向代理Nginx应用场景

Redis中的有序集合zset从使用到原理分析

《Redis中的有序集合zset从使用到原理分析》Redis有序集合(zset)是字符串与分值的有序映射,通过跳跃表和哈希表结合实现高效有序性管理,适用于排行榜、延迟队列等场景,其时间复杂度低,内存占... 目录开篇:排行榜背后的秘密一、zset的基本使用1.1 常用命令1.2 Java客户端示例二、zse

Redis中的AOF原理及分析

《Redis中的AOF原理及分析》Redis的AOF通过记录所有写操作命令实现持久化,支持always/everysec/no三种同步策略,重写机制优化文件体积,与RDB结合可平衡数据安全与恢复效率... 目录开篇:从日记本到AOF一、AOF的基本执行流程1. 命令执行与记录2. AOF重写机制二、AOF的

Java集合之Iterator迭代器实现代码解析

《Java集合之Iterator迭代器实现代码解析》迭代器Iterator是Java集合框架中的一个核心接口,位于java.util包下,它定义了一种标准的元素访问机制,为各种集合类型提供了一种统一的... 目录一、什么是Iterator二、Iterator的核心方法三、基本使用示例四、Iterator的工

Java 线程池+分布式实现代码

《Java线程池+分布式实现代码》在Java开发中,池通过预先创建并管理一定数量的资源,避免频繁创建和销毁资源带来的性能开销,从而提高系统效率,:本文主要介绍Java线程池+分布式实现代码,需要... 目录1. 线程池1.1 自定义线程池实现1.1.1 线程池核心1.1.2 代码示例1.2 总结流程2. J

Python爬虫HTTPS使用requests,httpx,aiohttp实战中的证书异步等问题

《Python爬虫HTTPS使用requests,httpx,aiohttp实战中的证书异步等问题》在爬虫工程里,“HTTPS”是绕不开的话题,HTTPS为传输加密提供保护,同时也给爬虫带来证书校验、... 目录一、核心问题与优先级检查(先问三件事)二、基础示例:requests 与证书处理三、高并发选型:

JS纯前端实现浏览器语音播报、朗读功能的完整代码

《JS纯前端实现浏览器语音播报、朗读功能的完整代码》在现代互联网的发展中,语音技术正逐渐成为改变用户体验的重要一环,下面:本文主要介绍JS纯前端实现浏览器语音播报、朗读功能的相关资料,文中通过代码... 目录一、朗读单条文本:① 语音自选参数,按钮控制语音:② 效果图:二、朗读多条文本:① 语音有默认值:②

MyBatis Plus大数据量查询慢原因分析及解决

《MyBatisPlus大数据量查询慢原因分析及解决》大数据量查询慢常因全表扫描、分页不当、索引缺失、内存占用高及ORM开销,优化措施包括分页查询、流式读取、SQL优化、批处理、多数据源、结果集二次... 目录大数据量查询慢的常见原因优化方案高级方案配置调优监控与诊断总结大数据量查询慢的常见原因MyBAT