Oracle 利用管道函数(pipelined)实现高性能大数据处理

2024-05-29 21:38

本文主要是介绍Oracle 利用管道函数(pipelined)实现高性能大数据处理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

引言

在plsql开发中,会涉及到一些大数据量表的数据处理,如将某记录数超亿的表的记录经过处理转换插入到另外一张或几张表。
常规的操作方法固然可以实现,但时间、磁盘IO、redo日志等等都非常大。Oracle 提供了一种高级函数,可以将这种数据处理的性能提升到极限。这种函数称为管道函数。在实际项目中,管道函数会和表函数、数据流函数(即表函数和CURSOR结合)、数据集合、并行度一起使用,达到大数据处理的性能顶峰。转自:(http://mikixiyou.iteye.com/blog/1673672)

一、普通方法处理大数据

下面是一个例子,将表t_ss_normal的记录插入到表t_target中,插入过程中有部分转换操作。我分成四个方法来实现这个数据处理操作。

这是源表和目标表的表结构。现在源表有200W条,其数据来自dba_objects视图。

create table T_SS_NORMAL
(owner          VARCHAR2(30),object_name    VARCHAR2(128),subobject_name VARCHAR2(30),object_id      NUMBER,data_object_id NUMBER,object_type    VARCHAR2(19),created        DATE,last_ddl_time  DATE,timestamp      VARCHAR2(19),status         VARCHAR2(7),temporary      VARCHAR2(1),generated      VARCHAR2(1),secondary      VARCHAR2(1)
);
/create table T_TARGET
(owner       VARCHAR2(30),object_name VARCHAR2(128),comm        VARCHAR2(10)
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

1、一个insert into select语句搞定这个数据处理,简单。

create or replace package pkg_test isprocedure load_target_normal;
end pkg_test;create or replace package body pkg_test isprocedure load_target_normal isbegin  insert into t_target (owner, object_name, comm)select owner, object_name, 'xxx' from t_ss_normal;  commit;  end;
beginnull;
end pkg_test; 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2、采用管道函数实现这个数据处理。

create type obj_target as object(
owner VARCHAR2(30), object_name VARCHAR2(128), comm varchar2(10)
);
/
create or replace type typ_array_target as table of obj_target;
/create or replace package pkg_test is  function pipe_target(p_source_data in sys_refcursor) return typ_array_targetpipelined;procedure load_target;
end pkg_test;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

首先创建两个自定义的类型。obj_target的定义和t_target的表结构一致,用于存储每一条目标表记录。typ_array_target用于管道函数的返回值。
接着定义一个管道函数。
普通函数的结尾加一个pipelined关键字,就是管道函数。这个函数的返回参数类型为集合,这是为了使其能作为表函数使用。表函数就是在from子句中以table(v_resultset)调用的,v_resultset就是一个集合类型的参数。
最后定义一个调用存储过程。

在包体中定义该管道函数和调用存储过程。管道函数pipe_target的传入参数一个sys_refcursor类型。这是一个游标,可以理解为使用select * from table才能得到的结果集。
你也可以不用这个传入的游标,取而代之,在函数中定义一个游标,也一样使用。

  function pipe_target(p_source_data in sys_refcursor) return typ_array_targetpipelined isr_target_data obj_target := obj_target(null, null, null);r_source_data t_ss%rowtype; beginloopfetch p_source_datainto r_source_data;exit when p_source_data%notfound;    r_target_data.owner       := r_source_data.owner;r_target_data.object_name := r_source_data.object_name;r_target_data.comm        := 'xxx';    pipe row(r_target_data);end loop;close p_source_data;return;end;procedure load_target isbegin  insert into t_target(owner, object_name, comm)select owner, object_name, commfrom table(pipe_target(cursor(select * from t_ss_normal)));  commit;  end;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

关键字 pipe row 的作用是将obj_target插入到typ_array_target类型的数组中,管道函数自动返回这些数据。因为源表的数据量会非常大,所以在fetch取值时会使用bulk collect ,实现批量取值。这样做可以减少plsql引擎和sql引擎的控制转换次数。这种转换称为上下文切换。

  function pipe_target_array(p_source_data in sys_refcursor,p_limit_size  in pls_integer default c_default_limit)return typ_array_targetpipelined is  r_target_data obj_target := obj_target(null, null, null); type typ_source_data is table of t_ss%rowtype index by pls_integer;aa_source_data typ_source_data;beginloopfetch p_source_data bulk collectinto aa_source_data;exit when aa_source_data.count = 0;for i in 1 .. aa_source_data.count loopr_target_data.owner       := aa_source_data(i).owner;r_target_data.object_name := aa_source_data(i).object_name;r_target_data.comm        := 'xxx';pipe row(r_target_data);end loop;end loop;close p_source_data;return;end;procedure load_target_array isbegininsert into t_target(owner, object_name, comm)select owner, object_name, commfrom table(pipe_target_array(cursor (select * from t_ss_normal),100));  commit;  end;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

还可以使用并行度,使得管道函数可以多进程同时执行。并行度还有一个好处,就是将数据插入方式从常规路径转换为直接路径。直接路径可以大量减少redo日志的生成量。

  function pipe_target_parallel(p_source_data in sys_refcursor,p_limit_size  in pls_integer default c_default_limit)return typ_array_targetpipelinedparallel_enable(partition p_source_data by any) isr_target_data obj_target := obj_target(null, null, null);type typ_source_data is table of t_ss%rowtype index by pls_integer;  aa_source_data typ_source_data;begin  loopfetch p_source_data bulk collectinto aa_source_data;exit when aa_source_data.count = 0;    for i in 1 .. aa_source_data.count loop      r_target_data.owner       := aa_source_data(i).owner;r_target_data.object_name := aa_source_data(i).object_name;r_target_data.comm        := 'xxx';      pipe row(r_target_data);      end loop;    end loop;  close p_source_data;return;end;procedure load_target_parallel isbeginexecute immediate 'alter session enable parallel dml';  insert /*+parallel(t,4)*/into t_target t(owner, object_name, comm)select owner, object_name, commfrom table(pipe_target_array(cursor (select /*+parallel(s,4)*/*from t_ss_normal s),100));  commit;end;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

还可以使用并行度,使得管道函数可以多进程同时执行。并行度还有一个好处,就是将数据插入方式从常规路径转换为直接路径。直接路径可以大量减少redo日志的生成量。

  function pipe_target_parallel(p_source_data in sys_refcursor,p_limit_size  in pls_integer default c_default_limit)return typ_array_targetpipelinedparallel_enable(partition p_source_data by any) isr_target_data obj_target := obj_target(null, null, null);type typ_source_data is table of t_ss%rowtype index by pls_integer;  aa_source_data typ_source_data;begin  loopfetch p_source_data bulk collectinto aa_source_data;exit when aa_source_data.count = 0;    for i in 1 .. aa_source_data.count loop      r_target_data.owner       := aa_source_data(i).owner;r_target_data.object_name := aa_source_data(i).object_name;r_target_data.comm        := 'xxx';      pipe row(r_target_data);      end loop;    end loop;  close p_source_data;return;end;procedure load_target_parallel isbeginexecute immediate 'alter session enable parallel dml';  insert /*+parallel(t,4)*/into t_target t(owner, object_name, comm)select owner, object_name, commfrom table(pipe_target_array(cursor (select /*+parallel(s,4)*/*from t_ss_normal s),100));  commit;end;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

在测试过程中,我测试200W记录的操作,时间从24秒降到到8秒,重做日志也降低更多。

这篇关于Oracle 利用管道函数(pipelined)实现高性能大数据处理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1014709

相关文章

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

Python函数作用域与闭包举例深度解析

《Python函数作用域与闭包举例深度解析》Python函数的作用域规则和闭包是编程中的关键概念,它们决定了变量的访问和生命周期,:本文主要介绍Python函数作用域与闭包的相关资料,文中通过代码... 目录1. 基础作用域访问示例1:访问全局变量示例2:访问外层函数变量2. 闭包基础示例3:简单闭包示例4

Python实现字典转字符串的五种方法

《Python实现字典转字符串的五种方法》本文介绍了在Python中如何将字典数据结构转换为字符串格式的多种方法,首先可以通过内置的str()函数进行简单转换;其次利用ison.dumps()函数能够... 目录1、使用json模块的dumps方法:2、使用str方法:3、使用循环和字符串拼接:4、使用字符

Linux下利用select实现串口数据读取过程

《Linux下利用select实现串口数据读取过程》文章介绍Linux中使用select、poll或epoll实现串口数据读取,通过I/O多路复用机制在数据到达时触发读取,避免持续轮询,示例代码展示设... 目录示例代码(使用select实现)代码解释总结在 linux 系统里,我们可以借助 select、

Linux挂载linux/Windows共享目录实现方式

《Linux挂载linux/Windows共享目录实现方式》:本文主要介绍Linux挂载linux/Windows共享目录实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录文件共享协议linux环境作为服务端(NFS)在服务器端安装 NFS创建要共享的目录修改 NFS 配

通过React实现页面的无限滚动效果

《通过React实现页面的无限滚动效果》今天我们来聊聊无限滚动这个现代Web开发中不可或缺的技术,无论你是刷微博、逛知乎还是看脚本,无限滚动都已经渗透到我们日常的浏览体验中,那么,如何优雅地实现它呢?... 目录1. 早期的解决方案2. 交叉观察者:IntersectionObserver2.1 Inter

Spring Gateway动态路由实现方案

《SpringGateway动态路由实现方案》本文主要介绍了SpringGateway动态路由实现方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随... 目录前沿何为路由RouteDefinitionRouteLocator工作流程动态路由实现尾巴前沿S

JavaScript对象转数组的三种方法实现

《JavaScript对象转数组的三种方法实现》本文介绍了在JavaScript中将对象转换为数组的三种实用方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友... 目录方法1:使用Object.keys()和Array.map()方法2:使用Object.entr

k8s中实现mysql主备过程详解

《k8s中实现mysql主备过程详解》文章讲解了在K8s中使用StatefulSet部署MySQL主备架构,包含NFS安装、storageClass配置、MySQL部署及同步检查步骤,确保主备数据一致... 目录一、k8s中实现mysql主备1.1 环境信息1.2 部署nfs-provisioner1.2.