Oracle的pipelined函数实现高性能大数据处理(2)
2013-03-12 17:24
204 查看
关键字 pipe row 的作用是将obj_target插入到typ_array_target类型的数组中,管道函数自动返回这些数据。
因为源表的数据量会非常大,所以在fetch取值时会使用bulk collect ,实现批量取值。这样做可以减少plsql引擎和sql引擎的控制转换次数。这种转换称为上下文切换。
Sql代码
function pipe_target_array(p_source_data in sys_refcursor,
p_limit_size in pls_integer default c_default_limit)
return typ_array_target
pipelined is
r_target_data obj_target := obj_target(null, null, null);
type typ_source_data is table of t_ss%rowtype index by pls_integer;
aa_source_data typ_source_data;
begin
loop
fetch p_source_data bulk collect
into aa_source_data;
exit when aa_source_data.count = 0;
for i in 1 .. aa_source_data.count loop
r_target_data.owner := aa_source_data(i).owner;
r_target_data.object_name := aa_source_data(i).object_name;
r_target_data.comm := 'xxx';
pipe row(r_target_data);
end loop;
end loop;
close p_source_data;
return;
end;
procedure load_target_array is
begin
insert into t_target
(owner, object_name, comm)
select owner, object_name, comm
from table(pipe_target_array(cursor (select * from t_ss_normal),
100));
commit;
end;
还可以使用并行度,使得管道函数可以多进程同时执行。并行度还有一个好处,就是将数据插入方式从常规路径转换为直接路径。直接路径可以大量减少redo日志的生成量。
Sql代码
function pipe_target_parallel(p_source_data in sys_refcursor,
p_limit_size in pls_integer default c_default_limit)
return typ_array_target
pipelined
parallel_enable(partition p_source_data by any) is
r_target_data obj_target := obj_target(null, null, null);
type typ_source_data is table of t_ss%rowtype index by pls_integer;
aa_source_data typ_source_data;
begin
loop
fetch p_source_data bulk collect
into aa_source_data;
exit when aa_source_data.count = 0;
for i in 1 .. aa_source_data.count loop
r_target_data.owner := aa_source_data(i).owner;
r_target_data.object_name := aa_source_data(i).object_name;
r_target_data.comm := 'xxx';
pipe row(r_target_data);
end loop;
end loop;
close p_source_data;
return;
end;
procedure load_target_parallel is
begin
execute immediate 'alter session enable parallel dml';
insert /*+parallel(t,4)*/
into t_target t
(owner, object_name, comm)
select owner, object_name, comm
from table(pipe_target_array(cursor (select /*+parallel(s,4)*/
*
from t_ss_normal s),
100));
commit;
end;
在测试过程中,我测试200W记录的操作,时间从24秒降到到8秒,重做日志也降低更多。
在此数据处理操作中,涉及到集合(collection)、表函数、管道函数、流函数、bulk collect、游标等知识点。
因为源表的数据量会非常大,所以在fetch取值时会使用bulk collect ,实现批量取值。这样做可以减少plsql引擎和sql引擎的控制转换次数。这种转换称为上下文切换。
Sql代码
function pipe_target_array(p_source_data in sys_refcursor,
p_limit_size in pls_integer default c_default_limit)
return typ_array_target
pipelined is
r_target_data obj_target := obj_target(null, null, null);
type typ_source_data is table of t_ss%rowtype index by pls_integer;
aa_source_data typ_source_data;
begin
loop
fetch p_source_data bulk collect
into aa_source_data;
exit when aa_source_data.count = 0;
for i in 1 .. aa_source_data.count loop
r_target_data.owner := aa_source_data(i).owner;
r_target_data.object_name := aa_source_data(i).object_name;
r_target_data.comm := 'xxx';
pipe row(r_target_data);
end loop;
end loop;
close p_source_data;
return;
end;
procedure load_target_array is
begin
insert into t_target
(owner, object_name, comm)
select owner, object_name, comm
from table(pipe_target_array(cursor (select * from t_ss_normal),
100));
commit;
end;
还可以使用并行度,使得管道函数可以多进程同时执行。并行度还有一个好处,就是将数据插入方式从常规路径转换为直接路径。直接路径可以大量减少redo日志的生成量。
Sql代码
function pipe_target_parallel(p_source_data in sys_refcursor,
p_limit_size in pls_integer default c_default_limit)
return typ_array_target
pipelined
parallel_enable(partition p_source_data by any) is
r_target_data obj_target := obj_target(null, null, null);
type typ_source_data is table of t_ss%rowtype index by pls_integer;
aa_source_data typ_source_data;
begin
loop
fetch p_source_data bulk collect
into aa_source_data;
exit when aa_source_data.count = 0;
for i in 1 .. aa_source_data.count loop
r_target_data.owner := aa_source_data(i).owner;
r_target_data.object_name := aa_source_data(i).object_name;
r_target_data.comm := 'xxx';
pipe row(r_target_data);
end loop;
end loop;
close p_source_data;
return;
end;
procedure load_target_parallel is
begin
execute immediate 'alter session enable parallel dml';
insert /*+parallel(t,4)*/
into t_target t
(owner, object_name, comm)
select owner, object_name, comm
from table(pipe_target_array(cursor (select /*+parallel(s,4)*/
*
from t_ss_normal s),
100));
commit;
end;
在测试过程中,我测试200W记录的操作,时间从24秒降到到8秒,重做日志也降低更多。
在此数据处理操作中,涉及到集合(collection)、表函数、管道函数、流函数、bulk collect、游标等知识点。
PLSQL集合类型的使用总结
使用bulk collect insert实现大数据快速迁移
Oracle的pipelined函数提升数据输出性能
相关文章推荐
- Oracle的pipelined函数实现高性能大数据处理(1)
- Oracle的pipelined函数实现高性能大数据处理
- Oracle的pipelined函数实现高性能大数据处理
- Oracle的pipelined函数实现高性能大数据处理
- Oracle的pipelined函数实现高性能大数据处理
- 【转】Oracle的pipelined函数实现高性能大数据处理
- Oracle的pipelined函数实现高性能大数据处理
- 用于大数据处理高性能计算的4个实现步骤
- 用于大数据处理高性能计算的4个实现步骤
- Oracle 利用管道函数(pipelined)实现高性能大数据处理
- 深入分析JavaWeb Item29 -- 使用JDBC处理大数据(MySql + Oracle)
- 在Oracle中实现各种日期处理完全版
- 如何在Oracle中实现时间相加处理?[原创]
- 在Oracle中实现各种日期处理完全版
- 在Oracle中实现各种日期处理
- MSSQL和Oracle的区别及大数据处理(面试会问,加分亮点)
- 高性能服务通信框架Gaea的详细实现--server请求处理流程
- 用ADO.Net实现Oracle大批量数据更新优化处理方法
- Oracle副总裁Thomas Kyte谈大数据处理和未来的DBA
- 批处理实现从Excel导入Oracle