Oracle 利用管道函数(pipelined)实现高性能大数据处理

2024-05-29 21:38

本文主要是介绍Oracle 利用管道函数(pipelined)实现高性能大数据处理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

引言

在plsql开发中,会涉及到一些大数据量表的数据处理,如将某记录数超亿的表的记录经过处理转换插入到另外一张或几张表。
常规的操作方法固然可以实现,但时间、磁盘IO、redo日志等等都非常大。Oracle 提供了一种高级函数,可以将这种数据处理的性能提升到极限。这种函数称为管道函数。在实际项目中,管道函数会和表函数、数据流函数(即表函数和CURSOR结合)、数据集合、并行度一起使用,达到大数据处理的性能顶峰。转自:(http://mikixiyou.iteye.com/blog/1673672)

一、普通方法处理大数据

下面是一个例子,将表t_ss_normal的记录插入到表t_target中,插入过程中有部分转换操作。我分成四个方法来实现这个数据处理操作。

这是源表和目标表的表结构。现在源表有200W条,其数据来自dba_objects视图。

create table T_SS_NORMAL
(owner          VARCHAR2(30),object_name    VARCHAR2(128),subobject_name VARCHAR2(30),object_id      NUMBER,data_object_id NUMBER,object_type    VARCHAR2(19),created        DATE,last_ddl_time  DATE,timestamp      VARCHAR2(19),status         VARCHAR2(7),temporary      VARCHAR2(1),generated      VARCHAR2(1),secondary      VARCHAR2(1)
);
/create table T_TARGET
(owner       VARCHAR2(30),object_name VARCHAR2(128),comm        VARCHAR2(10)
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

1、一个insert into select语句搞定这个数据处理,简单。

create or replace package pkg_test isprocedure load_target_normal;
end pkg_test;create or replace package body pkg_test isprocedure load_target_normal isbegin  insert into t_target (owner, object_name, comm)select owner, object_name, 'xxx' from t_ss_normal;  commit;  end;
beginnull;
end pkg_test; 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2、采用管道函数实现这个数据处理。

create type obj_target as object(
owner VARCHAR2(30), object_name VARCHAR2(128), comm varchar2(10)
);
/
create or replace type typ_array_target as table of obj_target;
/create or replace package pkg_test is  function pipe_target(p_source_data in sys_refcursor) return typ_array_targetpipelined;procedure load_target;
end pkg_test;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

首先创建两个自定义的类型。obj_target的定义和t_target的表结构一致,用于存储每一条目标表记录。typ_array_target用于管道函数的返回值。
接着定义一个管道函数。
普通函数的结尾加一个pipelined关键字,就是管道函数。这个函数的返回参数类型为集合,这是为了使其能作为表函数使用。表函数就是在from子句中以table(v_resultset)调用的,v_resultset就是一个集合类型的参数。
最后定义一个调用存储过程。

在包体中定义该管道函数和调用存储过程。管道函数pipe_target的传入参数一个sys_refcursor类型。这是一个游标,可以理解为使用select * from table才能得到的结果集。
你也可以不用这个传入的游标,取而代之,在函数中定义一个游标,也一样使用。

  function pipe_target(p_source_data in sys_refcursor) return typ_array_targetpipelined isr_target_data obj_target := obj_target(null, null, null);r_source_data t_ss%rowtype; beginloopfetch p_source_datainto r_source_data;exit when p_source_data%notfound;    r_target_data.owner       := r_source_data.owner;r_target_data.object_name := r_source_data.object_name;r_target_data.comm        := 'xxx';    pipe row(r_target_data);end loop;close p_source_data;return;end;procedure load_target isbegin  insert into t_target(owner, object_name, comm)select owner, object_name, commfrom table(pipe_target(cursor(select * from t_ss_normal)));  commit;  end;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

关键字 pipe row 的作用是将obj_target插入到typ_array_target类型的数组中,管道函数自动返回这些数据。因为源表的数据量会非常大,所以在fetch取值时会使用bulk collect ,实现批量取值。这样做可以减少plsql引擎和sql引擎的控制转换次数。这种转换称为上下文切换。

  function pipe_target_array(p_source_data in sys_refcursor,p_limit_size  in pls_integer default c_default_limit)return typ_array_targetpipelined is  r_target_data obj_target := obj_target(null, null, null); type typ_source_data is table of t_ss%rowtype index by pls_integer;aa_source_data typ_source_data;beginloopfetch p_source_data bulk collectinto aa_source_data;exit when aa_source_data.count = 0;for i in 1 .. aa_source_data.count loopr_target_data.owner       := aa_source_data(i).owner;r_target_data.object_name := aa_source_data(i).object_name;r_target_data.comm        := 'xxx';pipe row(r_target_data);end loop;end loop;close p_source_data;return;end;procedure load_target_array isbegininsert into t_target(owner, object_name, comm)select owner, object_name, commfrom table(pipe_target_array(cursor (select * from t_ss_normal),100));  commit;  end;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

还可以使用并行度,使得管道函数可以多进程同时执行。并行度还有一个好处,就是将数据插入方式从常规路径转换为直接路径。直接路径可以大量减少redo日志的生成量。

  function pipe_target_parallel(p_source_data in sys_refcursor,p_limit_size  in pls_integer default c_default_limit)return typ_array_targetpipelinedparallel_enable(partition p_source_data by any) isr_target_data obj_target := obj_target(null, null, null);type typ_source_data is table of t_ss%rowtype index by pls_integer;  aa_source_data typ_source_data;begin  loopfetch p_source_data bulk collectinto aa_source_data;exit when aa_source_data.count = 0;    for i in 1 .. aa_source_data.count loop      r_target_data.owner       := aa_source_data(i).owner;r_target_data.object_name := aa_source_data(i).object_name;r_target_data.comm        := 'xxx';      pipe row(r_target_data);      end loop;    end loop;  close p_source_data;return;end;procedure load_target_parallel isbeginexecute immediate 'alter session enable parallel dml';  insert /*+parallel(t,4)*/into t_target t(owner, object_name, comm)select owner, object_name, commfrom table(pipe_target_array(cursor (select /*+parallel(s,4)*/*from t_ss_normal s),100));  commit;end;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

还可以使用并行度,使得管道函数可以多进程同时执行。并行度还有一个好处,就是将数据插入方式从常规路径转换为直接路径。直接路径可以大量减少redo日志的生成量。

  function pipe_target_parallel(p_source_data in sys_refcursor,p_limit_size  in pls_integer default c_default_limit)return typ_array_targetpipelinedparallel_enable(partition p_source_data by any) isr_target_data obj_target := obj_target(null, null, null);type typ_source_data is table of t_ss%rowtype index by pls_integer;  aa_source_data typ_source_data;begin  loopfetch p_source_data bulk collectinto aa_source_data;exit when aa_source_data.count = 0;    for i in 1 .. aa_source_data.count loop      r_target_data.owner       := aa_source_data(i).owner;r_target_data.object_name := aa_source_data(i).object_name;r_target_data.comm        := 'xxx';      pipe row(r_target_data);      end loop;    end loop;  close p_source_data;return;end;procedure load_target_parallel isbeginexecute immediate 'alter session enable parallel dml';  insert /*+parallel(t,4)*/into t_target t(owner, object_name, comm)select owner, object_name, commfrom table(pipe_target_array(cursor (select /*+parallel(s,4)*/*from t_ss_normal s),100));  commit;end;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

在测试过程中,我测试200W记录的操作,时间从24秒降到到8秒,重做日志也降低更多。

这篇关于Oracle 利用管道函数(pipelined)实现高性能大数据处理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1014709

相关文章

Python函数作用域示例详解

《Python函数作用域示例详解》本文介绍了Python中的LEGB作用域规则,详细解析了变量查找的四个层级,通过具体代码示例,展示了各层级的变量访问规则和特性,对python函数作用域相关知识感兴趣... 目录一、LEGB 规则二、作用域实例2.1 局部作用域(Local)2.2 闭包作用域(Enclos

Python实现对阿里云OSS对象存储的操作详解

《Python实现对阿里云OSS对象存储的操作详解》这篇文章主要为大家详细介绍了Python实现对阿里云OSS对象存储的操作相关知识,包括连接,上传,下载,列举等功能,感兴趣的小伙伴可以了解下... 目录一、直接使用代码二、详细使用1. 环境准备2. 初始化配置3. bucket配置创建4. 文件上传到os

关于集合与数组转换实现方法

《关于集合与数组转换实现方法》:本文主要介绍关于集合与数组转换实现方法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、Arrays.asList()1.1、方法作用1.2、内部实现1.3、修改元素的影响1.4、注意事项2、list.toArray()2.1、方

使用Python实现可恢复式多线程下载器

《使用Python实现可恢复式多线程下载器》在数字时代,大文件下载已成为日常操作,本文将手把手教你用Python打造专业级下载器,实现断点续传,多线程加速,速度限制等功能,感兴趣的小伙伴可以了解下... 目录一、智能续传:从崩溃边缘抢救进度二、多线程加速:榨干网络带宽三、速度控制:做网络的好邻居四、终端交互

MySQL count()聚合函数详解

《MySQLcount()聚合函数详解》MySQL中的COUNT()函数,它是SQL中最常用的聚合函数之一,用于计算表中符合特定条件的行数,本文给大家介绍MySQLcount()聚合函数,感兴趣的朋... 目录核心功能语法形式重要特性与行为如何选择使用哪种形式?总结深入剖析一下 mysql 中的 COUNT

java实现docker镜像上传到harbor仓库的方式

《java实现docker镜像上传到harbor仓库的方式》:本文主要介绍java实现docker镜像上传到harbor仓库的方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 前 言2. 编写工具类2.1 引入依赖包2.2 使用当前服务器的docker环境推送镜像2.2

C++20管道运算符的实现示例

《C++20管道运算符的实现示例》本文简要介绍C++20管道运算符的使用与实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录标准库的管道运算符使用自己实现类似的管道运算符我们不打算介绍太多,因为它实际属于c++20最为重要的

Java easyExcel实现导入多sheet的Excel

《JavaeasyExcel实现导入多sheet的Excel》这篇文章主要为大家详细介绍了如何使用JavaeasyExcel实现导入多sheet的Excel,文中的示例代码讲解详细,感兴趣的小伙伴可... 目录1.官网2.Excel样式3.代码1.官网easyExcel官网2.Excel样式3.代码

python实现对数据公钥加密与私钥解密

《python实现对数据公钥加密与私钥解密》这篇文章主要为大家详细介绍了如何使用python实现对数据公钥加密与私钥解密,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录公钥私钥的生成使用公钥加密使用私钥解密公钥私钥的生成这一部分,使用python生成公钥与私钥,然后保存在两个文

MySQL 中 ROW_NUMBER() 函数最佳实践

《MySQL中ROW_NUMBER()函数最佳实践》MySQL中ROW_NUMBER()函数,作为窗口函数为每行分配唯一连续序号,区别于RANK()和DENSE_RANK(),特别适合分页、去重... 目录mysql 中 ROW_NUMBER() 函数详解一、基础语法二、核心特点三、典型应用场景1. 数据分