Intel TBB::Pipeline,按序处理数据

2023-12-06 10:38

本文主要是介绍Intel TBB::Pipeline,按序处理数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在上一篇文章(TBB:pipeline,软件流水线的威力)最后提出了几个问题,我们逐个来看看TBB::Pipeline是怎么解决的。

 

 

为什么Pipeline可以保证数据执行的顺序?既然TBB归根到底是通过多线程执行任务,为什么不会在读入先后两个字符串后,后读入的字符串先被下一个task处理?Pipeline里是不是有一个类似于FIFO 先进先出队列之类的东西?

 

 

之前曾经质疑过Pipeline的性能,甚至想自己用MultiThreading来模拟一个流水线,但很快就发现其中实现的难点。数据执行的顺序性就是其中之一。

 

假设以一个thread代表流水线上的一个节点,如果某节点是并发执行的,那么就需要2个以上的thread(A和B),上一节点处理完毕的顺序数据到底是先送给A还是B呢?处理完毕后后又该先将A还是B中的数据送到下一节点呢?即使可以人为的指定A和B之间的优先规则,由于thread本身被调度的不确定性,实际运行中还是有很多不可预知的困难。

 

流水线的一个显著特性就是保证每个数据均以相同的顺序流过每个节点。因此,TBB::Pipeline中的一个首要任务就是在节点被并发执行的同时,仍能够保证所处理的数据的次序而不需额外的处理代码。此外,在要求串行处理的节点,要保证即使排在前面的数据先被处理,即使排在后面的数据先到达。

 

Pipeline的中心思想就是以token来控制数据的处理顺序和流水线的深度。Pipeline::run函数中指定了token的最大值:

 

void pipeline::run( size_t max_number_of_live_tokens ) {}

 

 

每一个数据在进入Pipeline的时候都会按照先后顺序依次分配一个token,如line1处:

 

task* stage_task::execute() {

    __TBB_ASSERT( !my_at_start || !my_object, NULL );

    if( my_at_start ) {

        if( my_filter->is_serial() ) {

            if( (my_object = (*my_filter)(my_object)) ) {

                my_token = my_pipeline.token_counter++; //line1

                my_token_ready = true;

                ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );

                if( --my_pipeline.input_tokens>0 )

                    spawn( *new( allocate_additional_child_of(*my_pipeline.end_counter) ) stage_task( my_pipeline ) );

            } else {

                my_pipeline.end_of_input = true; //line2

                return NULL;

            }

...

}

 

如果当前流水线中的token全部用完了,那么暂时就不会处理新的数据,直到已进入Pipeline的数据被处理完毕有空闲的token(line2处)

 

仍然以TBB中的例子text_filter为例考虑,流水线为 MyInputFilter->MyTransformFilter->MyOutputFiler,MyInputFilter从磁盘上读取数据,MyTransformFilter转换成大写字母,MyOutputFilter将转换好的数据写入磁盘。因此,MyInputFilter节点和MyOutputFiler节点必须是串行执行,而MyTransformFilter可以并发执行。对于MyInputFilter读入的一串顺序数据,token依次为1->2->3,如何保证经过转换后数据也是以相同的顺序写入磁盘?

 

秘密在于TBB中的一个类tbb::internal::ordered_buffer,MyOutputFilter用它来保证按照token的顺序执行其队列中的数据,而不管数据进入队列的先后次序,换句话说,即使排在后面的数据token 2先被某个MyTransformFilter节点处理完毕送往MyOutputFilter,只要数据token 1没到达没被MyOutputFilter执行,数据2就不会在数据1之前先写入磁盘。每一个需要被串行处理的节点,都会有一个ordered_buffer类型的成员变量。

 

先看看ordered_buffer的定义:

 

//! A buffer of ordered items.

/** Each item is a task, inserted into a position in the buffer corrsponding to a Token. */

class ordered_buffer {

    typedef  Token  size_type;

 

    //! Array of deferred tasks that cannot yet start executing.

    /** Element is NULL if unused. */

    task** array; //数组,以顺序方式保存所有待处理的task

 

    //! Size of array

    /** Always 0 or a power of 2 */

    size_type array_size; //数组的尺寸

 

    //! Lowest token that can start executing.

    /** All prior Token have already been seen. */

    Token low_token; //当前正在处理的token,

 

    //! Serializes updates.

    spin_mutex array_mutex; //用于保护array并发访问的锁

};

 

仍然是在task* stage_task::execute() {

...

 if( ordered_buffer* input_buffer = my_filter->input_buffer ) {

            // The next filter must execute tokens in order.

            stage_task& clone = *new( allocate_continuation() ) stage_task( my_pipeline, my_filter );

            clone.my_token = my_token;                       //token号

            clone.my_token_ready = my_token_ready;

            clone.my_object = my_object;                    //数据

            next = input_buffer->put_token(clone);//将task放入队列

        } else {

            /* A semi-hackish way to reexecute the same task object immediately without spawning.

               recycle_as_continuation marks the task for future execution,

               and then 'this' pointer is returned to bypass spawning. */

            recycle_as_continuation();

            next = this;

        }

    } else {

...

}

 

对于需要被串行处理的节点,使用ordered_buffer的put_token函数将相关的数据和task引用放入队列。put_token的实现是关键:

 

    template<typename StageTask>

    task* put_token( StageTask& putter ) {

        task* result = &putter;

        {

            spin_mutex::scoped_lock lock( array_mutex );

            Token token = putter.next_token_number();

            if( token!=low_token ) {

                // Trying to put token that is beyond low_token.

                // Need to wait until low_token catches up before dispatching.

                result = NULL;

                __TBB_ASSERT( (tokendiff_t)(token-low_token)>0, NULL );

                if( token-low_token>=array_size )

                    grow( token-low_token+1 );

                ITT_NOTIFY( sync_releasing, this );

                array[token&array_size-1] = &putter;

            }

        }

        return result;

    }

这个函数的实质是,首先取得下一个要处理的token,然后把待执行的task放到ordered_buffer的任务队列中的"合适位置",而low_token指向当前需要处理的token编号。

 

例如low_token=0,当前需要处理0号token,下一个token为1,因此task保存在array[1]处并处于阻塞状态,待0号token处理完毕后,low_token增加1,再从array数组中取出1号token对应的task进行处理。

 

Pipeline中是这样通知串行节点以处理好一条数据的:

还是在task* stage_task::execute() {

...

if( ordered_buffer* input_buffer = my_filter->input_buffer )

            input_buffer->note_done(my_token,*this);

...

}

 

看看note_done的实现会有一种大彻大悟的感觉!如果刚完成的token就是次序最优先的token(low_token),那取出下一个要执行的task,以spawn的方式让TBB的task scheduler来调度:

 

 

//! Note that processing of a token is finished.

    /** Fires up processing of the next token, if processing was deferred. */

    void note_done( Token token, task& spawner ) {

        task* wakee=NULL;

        {

            spin_mutex::scoped_lock lock( array_mutex );

            if( token==low_token ) {

                // Wake the next task

                task*& item = array[++low_token & array_size-1];

                ITT_NOTIFY( sync_acquired, this );

                wakee = item;

                item = NULL;

            }

        }

        if( wakee ) {

            spawner.spawn(*wakee);

        }

    }

 

 

 

 

ordered_buffer是一个非常有趣的实现,相比于常见的用FIFO queue来实现线程间的数据传递,ordered_buffer可谓精巧。我们可以好好利用ordered_buffer的原理来进一步改进我们的代码。


关于作者:

softarts,曾任职于阿尔卡特-朗讯,Nokia,从事电信系统软件研发工作。研究兴趣:C++,多核计算,Linux


转载请注明出处

 

本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/softarts/archive/2009/04/28/4134806.aspx

这篇关于Intel TBB::Pipeline,按序处理数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/461522

相关文章

使用SpringBoot整合Sharding Sphere实现数据脱敏的示例

《使用SpringBoot整合ShardingSphere实现数据脱敏的示例》ApacheShardingSphere数据脱敏模块,通过SQL拦截与改写实现敏感信息加密存储,解决手动处理繁琐及系统改... 目录痛点一:痛点二:脱敏配置Quick Start——Spring 显示配置:1.引入依赖2.创建脱敏

Golang 日志处理和正则处理的操作方法

《Golang日志处理和正则处理的操作方法》:本文主要介绍Golang日志处理和正则处理的操作方法,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考... 目录1、logx日志处理1.1、logx简介1.2、日志初始化与配置1.3、常用方法1.4、配合defer

springboot加载不到nacos配置中心的配置问题处理

《springboot加载不到nacos配置中心的配置问题处理》:本文主要介绍springboot加载不到nacos配置中心的配置问题处理,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑... 目录springboot加载不到nacos配置中心的配置两种可能Spring Boot 版本Nacos

详解如何使用Python构建从数据到文档的自动化工作流

《详解如何使用Python构建从数据到文档的自动化工作流》这篇文章将通过真实工作场景拆解,为大家展示如何用Python构建自动化工作流,让工具代替人力完成这些数字苦力活,感兴趣的小伙伴可以跟随小编一起... 目录一、Excel处理:从数据搬运工到智能分析师二、PDF处理:文档工厂的智能生产线三、邮件自动化:

Python数据分析与可视化的全面指南(从数据清洗到图表呈现)

《Python数据分析与可视化的全面指南(从数据清洗到图表呈现)》Python是数据分析与可视化领域中最受欢迎的编程语言之一,凭借其丰富的库和工具,Python能够帮助我们快速处理、分析数据并生成高质... 目录一、数据采集与初步探索二、数据清洗的七种武器1. 缺失值处理策略2. 异常值检测与修正3. 数据

pandas实现数据concat拼接的示例代码

《pandas实现数据concat拼接的示例代码》pandas.concat用于合并DataFrame或Series,本文主要介绍了pandas实现数据concat拼接的示例代码,具有一定的参考价值,... 目录语法示例:使用pandas.concat合并数据默认的concat:参数axis=0,join=

C#代码实现解析WTGPS和BD数据

《C#代码实现解析WTGPS和BD数据》在现代的导航与定位应用中,准确解析GPS和北斗(BD)等卫星定位数据至关重要,本文将使用C#语言实现解析WTGPS和BD数据,需要的可以了解下... 目录一、代码结构概览1. 核心解析方法2. 位置信息解析3. 经纬度转换方法4. 日期和时间戳解析5. 辅助方法二、L

使用Python和Matplotlib实现可视化字体轮廓(从路径数据到矢量图形)

《使用Python和Matplotlib实现可视化字体轮廓(从路径数据到矢量图形)》字体设计和矢量图形处理是编程中一个有趣且实用的领域,通过Python的matplotlib库,我们可以轻松将字体轮廓... 目录背景知识字体轮廓的表示实现步骤1. 安装依赖库2. 准备数据3. 解析路径指令4. 绘制图形关键

解决mysql插入数据锁等待超时报错:Lock wait timeout exceeded;try restarting transaction

《解决mysql插入数据锁等待超时报错:Lockwaittimeoutexceeded;tryrestartingtransaction》:本文主要介绍解决mysql插入数据锁等待超时报... 目录报错信息解决办法1、数据库中执行如下sql2、再到 INNODB_TRX 事务表中查看总结报错信息Lock

使用C#删除Excel表格中的重复行数据的代码详解

《使用C#删除Excel表格中的重复行数据的代码详解》重复行是指在Excel表格中完全相同的多行数据,删除这些重复行至关重要,因为它们不仅会干扰数据分析,还可能导致错误的决策和结论,所以本文给大家介绍... 目录简介使用工具C# 删除Excel工作表中的重复行语法工作原理实现代码C# 删除指定Excel单元