Intel TBB::Pipeline,按序处理数据

2023-12-06 10:38

本文主要是介绍Intel TBB::Pipeline,按序处理数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在上一篇文章(TBB:pipeline,软件流水线的威力)最后提出了几个问题,我们逐个来看看TBB::Pipeline是怎么解决的。

 

 

为什么Pipeline可以保证数据执行的顺序?既然TBB归根到底是通过多线程执行任务,为什么不会在读入先后两个字符串后,后读入的字符串先被下一个task处理?Pipeline里是不是有一个类似于FIFO 先进先出队列之类的东西?

 

 

之前曾经质疑过Pipeline的性能,甚至想自己用MultiThreading来模拟一个流水线,但很快就发现其中实现的难点。数据执行的顺序性就是其中之一。

 

假设以一个thread代表流水线上的一个节点,如果某节点是并发执行的,那么就需要2个以上的thread(A和B),上一节点处理完毕的顺序数据到底是先送给A还是B呢?处理完毕后后又该先将A还是B中的数据送到下一节点呢?即使可以人为的指定A和B之间的优先规则,由于thread本身被调度的不确定性,实际运行中还是有很多不可预知的困难。

 

流水线的一个显著特性就是保证每个数据均以相同的顺序流过每个节点。因此,TBB::Pipeline中的一个首要任务就是在节点被并发执行的同时,仍能够保证所处理的数据的次序而不需额外的处理代码。此外,在要求串行处理的节点,要保证即使排在前面的数据先被处理,即使排在后面的数据先到达。

 

Pipeline的中心思想就是以token来控制数据的处理顺序和流水线的深度。Pipeline::run函数中指定了token的最大值:

 

void pipeline::run( size_t max_number_of_live_tokens ) {}

 

 

每一个数据在进入Pipeline的时候都会按照先后顺序依次分配一个token,如line1处:

 

task* stage_task::execute() {

    __TBB_ASSERT( !my_at_start || !my_object, NULL );

    if( my_at_start ) {

        if( my_filter->is_serial() ) {

            if( (my_object = (*my_filter)(my_object)) ) {

                my_token = my_pipeline.token_counter++; //line1

                my_token_ready = true;

                ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );

                if( --my_pipeline.input_tokens>0 )

                    spawn( *new( allocate_additional_child_of(*my_pipeline.end_counter) ) stage_task( my_pipeline ) );

            } else {

                my_pipeline.end_of_input = true; //line2

                return NULL;

            }

...

}

 

如果当前流水线中的token全部用完了,那么暂时就不会处理新的数据,直到已进入Pipeline的数据被处理完毕有空闲的token(line2处)

 

仍然以TBB中的例子text_filter为例考虑,流水线为 MyInputFilter->MyTransformFilter->MyOutputFiler,MyInputFilter从磁盘上读取数据,MyTransformFilter转换成大写字母,MyOutputFilter将转换好的数据写入磁盘。因此,MyInputFilter节点和MyOutputFiler节点必须是串行执行,而MyTransformFilter可以并发执行。对于MyInputFilter读入的一串顺序数据,token依次为1->2->3,如何保证经过转换后数据也是以相同的顺序写入磁盘?

 

秘密在于TBB中的一个类tbb::internal::ordered_buffer,MyOutputFilter用它来保证按照token的顺序执行其队列中的数据,而不管数据进入队列的先后次序,换句话说,即使排在后面的数据token 2先被某个MyTransformFilter节点处理完毕送往MyOutputFilter,只要数据token 1没到达没被MyOutputFilter执行,数据2就不会在数据1之前先写入磁盘。每一个需要被串行处理的节点,都会有一个ordered_buffer类型的成员变量。

 

先看看ordered_buffer的定义:

 

//! A buffer of ordered items.

/** Each item is a task, inserted into a position in the buffer corrsponding to a Token. */

class ordered_buffer {

    typedef  Token  size_type;

 

    //! Array of deferred tasks that cannot yet start executing.

    /** Element is NULL if unused. */

    task** array; //数组,以顺序方式保存所有待处理的task

 

    //! Size of array

    /** Always 0 or a power of 2 */

    size_type array_size; //数组的尺寸

 

    //! Lowest token that can start executing.

    /** All prior Token have already been seen. */

    Token low_token; //当前正在处理的token,

 

    //! Serializes updates.

    spin_mutex array_mutex; //用于保护array并发访问的锁

};

 

仍然是在task* stage_task::execute() {

...

 if( ordered_buffer* input_buffer = my_filter->input_buffer ) {

            // The next filter must execute tokens in order.

            stage_task& clone = *new( allocate_continuation() ) stage_task( my_pipeline, my_filter );

            clone.my_token = my_token;                       //token号

            clone.my_token_ready = my_token_ready;

            clone.my_object = my_object;                    //数据

            next = input_buffer->put_token(clone);//将task放入队列

        } else {

            /* A semi-hackish way to reexecute the same task object immediately without spawning.

               recycle_as_continuation marks the task for future execution,

               and then 'this' pointer is returned to bypass spawning. */

            recycle_as_continuation();

            next = this;

        }

    } else {

...

}

 

对于需要被串行处理的节点,使用ordered_buffer的put_token函数将相关的数据和task引用放入队列。put_token的实现是关键:

 

    template<typename StageTask>

    task* put_token( StageTask& putter ) {

        task* result = &putter;

        {

            spin_mutex::scoped_lock lock( array_mutex );

            Token token = putter.next_token_number();

            if( token!=low_token ) {

                // Trying to put token that is beyond low_token.

                // Need to wait until low_token catches up before dispatching.

                result = NULL;

                __TBB_ASSERT( (tokendiff_t)(token-low_token)>0, NULL );

                if( token-low_token>=array_size )

                    grow( token-low_token+1 );

                ITT_NOTIFY( sync_releasing, this );

                array[token&array_size-1] = &putter;

            }

        }

        return result;

    }

这个函数的实质是,首先取得下一个要处理的token,然后把待执行的task放到ordered_buffer的任务队列中的"合适位置",而low_token指向当前需要处理的token编号。

 

例如low_token=0,当前需要处理0号token,下一个token为1,因此task保存在array[1]处并处于阻塞状态,待0号token处理完毕后,low_token增加1,再从array数组中取出1号token对应的task进行处理。

 

Pipeline中是这样通知串行节点以处理好一条数据的:

还是在task* stage_task::execute() {

...

if( ordered_buffer* input_buffer = my_filter->input_buffer )

            input_buffer->note_done(my_token,*this);

...

}

 

看看note_done的实现会有一种大彻大悟的感觉!如果刚完成的token就是次序最优先的token(low_token),那取出下一个要执行的task,以spawn的方式让TBB的task scheduler来调度:

 

 

//! Note that processing of a token is finished.

    /** Fires up processing of the next token, if processing was deferred. */

    void note_done( Token token, task& spawner ) {

        task* wakee=NULL;

        {

            spin_mutex::scoped_lock lock( array_mutex );

            if( token==low_token ) {

                // Wake the next task

                task*& item = array[++low_token & array_size-1];

                ITT_NOTIFY( sync_acquired, this );

                wakee = item;

                item = NULL;

            }

        }

        if( wakee ) {

            spawner.spawn(*wakee);

        }

    }

 

 

 

 

ordered_buffer是一个非常有趣的实现,相比于常见的用FIFO queue来实现线程间的数据传递,ordered_buffer可谓精巧。我们可以好好利用ordered_buffer的原理来进一步改进我们的代码。


关于作者:

softarts,曾任职于阿尔卡特-朗讯,Nokia,从事电信系统软件研发工作。研究兴趣:C++,多核计算,Linux


转载请注明出处

 

本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/softarts/archive/2009/04/28/4134806.aspx

这篇关于Intel TBB::Pipeline,按序处理数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/461522

相关文章

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

Python进行JSON和Excel文件转换处理指南

《Python进行JSON和Excel文件转换处理指南》在数据交换与系统集成中,JSON与Excel是两种极为常见的数据格式,本文将介绍如何使用Python实现将JSON转换为格式化的Excel文件,... 目录将 jsON 导入为格式化 Excel将 Excel 导出为结构化 JSON处理嵌套 JSON:

Spring Boot 中的默认异常处理机制及执行流程

《SpringBoot中的默认异常处理机制及执行流程》SpringBoot内置BasicErrorController,自动处理异常并生成HTML/JSON响应,支持自定义错误路径、配置及扩展,如... 目录Spring Boot 异常处理机制详解默认错误页面功能自动异常转换机制错误属性配置选项默认错误处理

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

SpringBoot 异常处理/自定义格式校验的问题实例详解

《SpringBoot异常处理/自定义格式校验的问题实例详解》文章探讨SpringBoot中自定义注解校验问题,区分参数级与类级约束触发的异常类型,建议通过@RestControllerAdvice... 目录1. 问题简要描述2. 异常触发1) 参数级别约束2) 类级别约束3. 异常处理1) 字段级别约束

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

C#解析JSON数据全攻略指南

《C#解析JSON数据全攻略指南》这篇文章主要为大家详细介绍了使用C#解析JSON数据全攻略指南,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、为什么jsON是C#开发必修课?二、四步搞定网络JSON数据1. 获取数据 - HttpClient最佳实践2. 动态解析 - 快速

Java堆转储文件之1.6G大文件处理完整指南

《Java堆转储文件之1.6G大文件处理完整指南》堆转储文件是优化、分析内存消耗的重要工具,:本文主要介绍Java堆转储文件之1.6G大文件处理的相关资料,文中通过代码介绍的非常详细,需要的朋友可... 目录前言文件为什么这么大?如何处理这个文件?分析文件内容(推荐)删除文件(如果不需要)查看错误来源如何避