您的位置:首页 > 数据库 > Oracle

Oracle 11g 使用 dbms_parallel_execute 对大表进行并行update

2011-07-13 14:45 645 查看

一.dbms_parallel_execute说明

UpdatingLargeTablesinParallel
TheDBMS_PARALLEL_EXECUTEpackageenablesyoutoincrementallyupdatethedatainalargetableinparallel,intwohigh-levelsteps:
(1)Groupsetsofrowsinthetableintosmallerchunks.
(2)ApplythedesiredUPDATEstatementtothechunksinparallel,committingeachtimeyouhavefinishedprocessingachunk.
--dbms_parallel_execute包使用并行的2个步骤,一是将大表分成多个小的chunks。二对这些小的chunks进行并行。

Thistechniqueisrecommendedwheneveryouareupdatingalotofdata.Itsadvantagesare:
(1)Youlockonlyonesetofrowsatatime,forarelativelyshorttime,insteadoflockingtheentiretable.
(2)Youdonotloseworkthathasbeendoneifsomethingfailsbeforetheentireoperationfinishes.
(3)Youreducerollbackspaceconsumption.
(4)Youimproveperformance.

SeeAlso:
OracleDatabasePL/SQLPackagesandTypesReferenceformoreinformationabouttheDBMS_PARALLEL_EXECUTEpackage

http://download.oracle.com/docs/cd/E11882_01/appdev.112/e16760/d_parallel_ex.htm#ARPLS233
--这个链接上有这个包的详细使用说明。

并行在一定程度上能够提高SQL的性能,在我的blog里对parallelexecution这块有说明:
OracleParallelExecution(并行执行)
/article/1449254.html

提到这篇文章,是关注一个问题:
Oracle对Delete,update,merge的操作限制在,只有操作的对象是分区表示,Oracle才会启动并行操作。原因在于,对于分区表,Oracle会对每个分区启用一个并行服务进程同时进行数据处理,这对于非分区表来说是没有意义的。

如果我们要对一张大表进行update,而且该表又不是分区表,这时就可以使用我们的dbms_parallel­_execute包来进行并行操作。
dbms_parallel_execute包是把大表分成了多个小的chunks,然后对chunks进行并行,这个就类似把非分区表变成了分区表。
注意,该包是Oracle11g以后才有的。


二.使用说明

以下内容转自:
http://www.oracle-base.com/articles/11g/dbms_parallel_execute_11gR2.php

2.1操作需要createjob的权限,所以先赋权

SQL>conn/assysdba;
Connected.
SQL>grantcreatejobtoicd;
Grantsucceeded.
SQL>connicd/icd;
Connected.

2.2创建相关的测试表并插入数据

SQL>CREATETABLEtest_tab(
2idNUMBER,
3descriptionVARCHAR2(50),
4num_colNUMBER,
5CONSTRAINTtest_tab_pkPRIMARYKEY(id)
6);
Tablecreated.
SQL>INSERT/*+APPEND*/INTOtest_tab
2SELECTlevel,
3'Descriptionfor'||level,
4CASE
5WHENMOD(level,5)=0THEN10
6WHENMOD(level,3)=0THEN20
7ELSE30
8END
9FROMdual
10CONNECTBYlevel<=500000;
500000rowscreated.
SQL>commit;
Commitcomplete.

2.3收集统计信息

SQL>EXECDBMS_STATS.gather_table_stats(USER,'TEST_TAB',cascade=>TRUE);
PL/SQLproceduresuccessfullycompleted.
SQL>SELECTnum_col,COUNT(*)
2FROMtest_tab
3GROUPBYnum_col
4ORDERBYnum_col;
NUM_COLCOUNT(*)
--------------------
10100000
20133333
30266667

2.4创建task

TheCREATE_TASKprocedureisusedtocreateanewtask.Itrequiresatasknametobespecified,butcanalsoincludeanoptionaltaskcomment.

SQL>BEGIN
2DBMS_PARALLEL_EXECUTE.create_task(task_name=>'test_task');
3END;
4/
PL/SQLproceduresuccessfullycompleted.

Informationaboutexistingtasksisdisplayedusingthe[DBA|USER]_PARALLEL_EXECUTE_TASKSviews.

SQL>COLUMNtask_nameFORMATA10
SQL>SELECTtask_name,
2status
3FROMuser_parallel_execute_tasks;

TASK_NAMESTATUS
-----------------------------
test_taskCREATED
TheGENERATE_TASK_NAMEfunctionreturnsauniquetasknameifyoudonotwanttonamethetaskmanually.

SQL>SELECTDBMS_PARALLEL_EXECUTE.generate_task_nameFROMdual;

GENERATE_TASK_NAME
-----------------------------------------------------
TASK$_1


2.5Splittheworkloadintochunks

将一张大表split成多个chunks有三种方法。
(1)CREATE_CHUNKS_BY_ROWID
(2)CREATE_CHUNKS_BY_NUMBER_COL
(3)CREATE_CHUNKS_BY_SQL

分配好的chunks可以用drop_chunks来删除。

2.5.1CREATE_CHUNKS_BY_ROWID

TheCREATE_CHUNKS_BY_ROWIDproceduresplitsthedatabyrowidintochunksspecifiedbytheCHUNK_SIZEparameter.IftheBY_ROWparameterissettoTRUE,theCHUNK_SIZEreferstothenumberofrows,otherwiseitreferstothenumberofblocks.

SQL>BEGIN
2dbms_parallel_execute.create_chunks_by_rowid(task_name=>'test_task',
3table_owner=>'icd',
4table_name=>'test_tab',
5by_row=>true,
6chunk_size=>10000);
7end;
8/
PL/SQLproceduresuccessfullycompleted.

一旦chunks创建完毕,task的状态就变成了'chunked'.
SQL>COLUMNtask_nameFORMATA10
SQL>SELECTtask_name,
2status
3FROMuser_parallel_execute_tasks;

TASK_NAMESTATUS
-----------------------------
test_taskCHUNKED

The[DBA|USER]_PARALLEL_EXECUTE_CHUNKSviewsdisplayinformationabouttheindividualchunks.

SQL>SELECTchunk_id,status,start_rowid,end_rowid
2FROMuser_parallel_execute_chunks
3WHEREtask_name='test_task'
4ORDERBYchunk_id;

CHUNK_IDSTATUSSTART_ROWIDEND_ROWID
------------------------------------------------------------------
2UNASSIGNEDAAATMCAAMAABSMIAAAAAATMCAAMAABSMPCcP
3UNASSIGNEDAAATMCAAMAABSMgAAAAAATMCAAMAABSMnCcP
4UNASSIGNEDAAATMCAAMAABSMoAAAAAATMCAAMAABSMvCcP
...
73UNASSIGNEDAAATMCAAMAABS0yAAAAAATMCAAMAABS1jCcP
74UNASSIGNEDAAATMCAAMAABS1kAAAAAATMCAAMAABS1/CcP

73rowsselected.

删除chunks
SQL>begin
2dbms_parallel_execute.drop_chunks('test_task');
3end;
4/
PL/SQLproceduresuccessfullycompleted.

再次查看chunk状态,又变成了created.
SQL>SELECTtask_name,
2status
3FROMuser_parallel_execute_tasks;

TASK_NAMESTATUS
-----------------------------
test_taskCREATED

2.5.2CREATE_CHUNKS_BY_NUMBER_COL

TheCREATE_CHUNKS_BY_NUMBER_COLproceduredividestheworkloadupbasedonanumbercolumn.Itusesthespecifiedcolumnsminandmaxvaluesalongwiththechunksizetosplitthedataintoapproximatelyequalchunks.Forthechunkstobeequallysizedthecolumnmustcontainacontinuoussequenceofnumbers,likethatgeneratedbyasequence.

BEGIN
dbms_parallel_execute.create_chunks_by_number_col(task_name=>'test_task',
table_owner=>'ICD',
table_name=>'TEST_TAB',
table_column=>'ID',
chunk_size=>10000);
END;
/

The[DBA|USER]_PARALLEL_EXECUTE_CHUNKSviewsdisplayinformationabouttheindividualchunks.

SQL>SELECTchunk_id,status,start_id,end_id
2FROMuser_parallel_execute_chunks
3WHEREtask_name='test_task'
4ORDERBYchunk_id;

CHUNK_IDSTATUSSTART_IDEND_ID
--------------------------------------------------
75UNASSIGNED110000
76UNASSIGNED1000120000
77UNASSIGNED2000130000
78UNASSIGNED3000140000
......
122UNASSIGNED470001480000
123UNASSIGNED480001490000
124UNASSIGNED490001500000

50rowsselected.

2.5.3CREATE_CHUNKS_BY_SQL

TheCREATE_CHUNKS_BY_SQLproceduredividestheworkloadbasedonauser-definedquery.IftheBY_ROWIDparameterissettoTRUE,thequerymustreturnaseriesofstartandendrowids.Ifit'ssettoFALSE,thequerymustreturnaseriesofstartandendIDs.

把之前创建的chunksdrop掉
SQL>execdbms_parallel_execute.drop_chunks('test_task');
PL/SQLproceduresuccessfullycompleted.

DECLARE
l_stmtCLOB;
BEGIN
l_stmt:='SELECTDISTINCTnum_col,num_colFROMtest_tab';

DBMS_PARALLEL_EXECUTE.create_chunks_by_sql(task_name=>'test_task',
sql_stmt=>l_stmt,
by_rowid=>FALSE);
END;
/

The[DBA|USER]_PARALLEL_EXECUTE_CHUNKSviewsdisplayinformationabouttheindividualchunks.

SQL>SELECTchunk_id,status,start_id,end_id
2FROMuser_parallel_execute_chunks
3WHEREtask_name='test_task'
4ORDERBYchunk_id;

CHUNK_IDSTATUSSTART_IDEND_ID
--------------------------------------------------
141UNASSIGNED1010
142UNASSIGNED3030
143UNASSIGNED2020

2.6Runthetask

Runningataskinvolvesrunningaspecificstatementforeachdefinedchunkofwork.Thedocumentationonlyshowsexamplesusingupdatesofthebasetable,butthisisnottheonlyuseofthisfunctionality.Thestatementassociatedwiththetaskcanbeaprocedurecall,asshowninoneoftheexamplesattheendofthearticle.

Therearetwowaystorunataskandseveralprocedurestocontrolarunningtask.

2.6.1RUN_TASK

TheRUN_TASKprocedurerunsthespecifiedstatementinparallelbyschedulingjobstoprocesstheworkloadchunks.Thestatementspecifyingtheactualworktobedonemustincludeareferencetothe':start_id'and':end_id',whichrepresentarangeofrowidsorcolumnIDstobeprocessed,asspecifiedinthechunkdefinitions.Thedegreeofparallelismiscontrolledbythenumberofscheduledjobs,notthenumberofchunksdefined.Thescheduledjobstakeanunassignedworkloadchunk,processit,thenmoveontothenextunassignedchunk.

DECLARE
l_sql_stmtVARCHAR2(32767);
BEGIN
l_sql_stmt:='UPDATE/*+ROWID(dda)*/test_tabt
SETt.num_col=t.num_col+10
WHERErowidBETWEEN:start_idAND:end_id';

DBMS_PARALLEL_EXECUTE.run_task(task_name=>'test_task',
sql_stmt=>l_sql_stmt,
language_flag=>DBMS_SQL.NATIVE,
parallel_level=>10);
END;
/

TheRUN_TASKprocedurewaitsforthetasktocomplete.Oncompletion,thestatusofthetaskmustbeassessedtoknowwhatactiontotakenext.

2.6.2User-definedframework

TheDBMS_PARALLEL_EXECUTEpackageallowsyoutomanuallycodethetaskrun.TheGET_ROWID_CHUNKandGET_NUMBER_COL_CHUNKproceduresreturnthenextavailableunassignedchunk.Youcanthanmanuallyprocessthechunkandsetitsstatus.Theexamplebelowshowstheprocessingofaworkloadchunkedbyrowid.

DECLARE
l_sql_stmtVARCHAR2(32767);
l_chunk_idNUMBER;
l_start_rowidROWID;
l_end_rowidROWID;
l_any_rowsBOOLEAN;
BEGIN
l_sql_stmt:='UPDATE/*+ROWID(dda)*/test_tabt
SETt.num_col=t.num_col+10
WHERErowidBETWEEN:start_idAND:end_id';

LOOP
--Getnextunassignedchunk.
DBMS_PARALLEL_EXECUTE.get_rowid_chunk(task_name=>'test_task',
chunk_id=>l_chunk_id,
start_rowid=>l_start_rowid,
end_rowid=>l_end_rowid,
any_rows=>l_any_rows);

EXITWHENl_any_rows=FALSE;

BEGIN
--Manuallyexecutethework.
EXECUTEIMMEDIATEl_sql_stmtUSINGl_start_rowid,l_end_rowid;

--Setthechunkstatusasprocessed.
DBMS_PARALLEL_EXECUTE.set_chunk_status(task_name=>'test_task',
chunk_id=>l_chunk_id,
status=>DBMS_PARALLEL_EXECUTE.PROCESSED);
EXCEPTION
WHENOTHERSTHEN
--Recordchunkerror.
DBMS_PARALLEL_EXECUTE.set_chunk_status(task_name=>'test_task',
chunk_id=>l_chunk_id,
status=>DBMS_PARALLEL_EXECUTE.PROCESSED_WITH_ERROR,
err_num=>SQLCODE,
err_msg=>SQLERRM);
END;

--Commitwork.
COMMIT;
ENDLOOP;
END;
/

2.6.3Taskcontrol

ArunningtaskcanbestoppedandrestartedusingtheSTOP_TASKandRESUME_TASKproceduresrespectively.

ThePURGE_PROCESSED_CHUNKSproceduredeletesallchunkswithastatusof'PROCESSED'or'PROCESSED_WITH_ERROR'.

TheADM_DROP_CHUNKS,ADM_DROP_TASK,ADM_TASK_STATUSandADM_STOP_TASKroutineshavethesamefunctionastheirnamesakes,buttheyallowtheoperationstoperformedontasksownedbyotherusers.InordertousetheseroutinestheusermusthavebeengrantedtheADM_PARALLEL_EXECUTE_TASKrole.

2.7Checkthetaskstatus

ThesimplestwaytocheckthestatusofataskistousetheTASK_STATUSfunction.Afterexecutionofthetask,theonlypossiblereturnvaluesarethe'FINISHED'or'FINISHED_WITH_ERROR'constants.Ifthestatusisnot'FINISHED',thenthetaskcanberesumedusingtheRESUME_TASKprocedure.

DECLARE
l_tryNUMBER;
l_statusNUMBER;
BEGIN
--Ifthereiserror,RESUMEitforatmost2times.
l_try:=0;
l_status:=DBMS_PARALLEL_EXECUTE.task_status('test_task');
WHILE(l_try<2andl_status!=DBMS_PARALLEL_EXECUTE.FINISHED)
Loop
l_try:=l_try+1;
DBMS_PARALLEL_EXECUTE.resume_task('test_task');
l_status:=DBMS_PARALLEL_EXECUTE.task_status('test_task');
ENDLOOP;
END;
/

Thestatusofthetaskandthechunkscanalsobequeried.
COLUMNtask_nameFORMATA10
SELECTtask_name,
status
FROMuser_parallel_execute_tasks;
TASK_NAMESTATUS
-----------------------------
test_taskFINISHED
Iftherewereerrors,thechunkscanbequeriedtoidentifytheproblems.

SELECTstatus,COUNT(*)
FROMuser_parallel_execute_chunks
GROUPBYstatus
ORDERBYstatus;
STATUSCOUNT(*)
------------------------------
PROCESSED_WITH_ERROR3
The[DBA|USER]_PARALLEL_EXECUTE_TASKSviewscontainarecordoftheJOB_PREFIXusedwhenschedulingthechunksofwork.

SELECTjob_prefix
FROMuser_parallel_execute_tasks
WHEREtask_name='test_task';

JOB_PREFIX
------------------------------
TASK$_368
Thisvaluecanbeusedtoqueryinformationabouttheindividualjobsusedduringtheprocess.ThenumberofjobsscheduledshouldmatchthedegreeofparallelismspecifiedintheRUN_TASKprocedure.

COLUMNjob_nameFORMATA20

SELECTjob_name,status
FROMuser_scheduler_job_run_details
WHEREjob_nameLIKE(SELECTjob_prefix||'%'
FROMuser_parallel_execute_tasks
WHEREtask_name='test_task');
JOB_NAMESTATUS
--------------------------------------------------
TASK$_205_3SUCCEEDED
TASK$_205_9SUCCEEDED
TASK$_205_5SUCCEEDED
TASK$_205_7SUCCEEDED
TASK$_205_1SUCCEEDED
TASK$_205_2SUCCEEDED
TASK$_205_6SUCCEEDED
TASK$_205_8SUCCEEDED
TASK$_205_4SUCCEEDED
TASK$_205_10SUCCEEDED

2.8Dropthetask

Oncethejobiscompleteyoucandropthetask,whichwilldroptheassociatedchunkinformationalso.

BEGIN
DBMS_PARALLEL_EXECUTE.drop_task('test_task');
END;
/



三.示例

3.1Test1

Thefollowingexampleshowstheprocessingofaworkloadchunkedbyrowid.

DECLARE
l_taskVARCHAR2(30):='test_task';
l_sql_stmtVARCHAR2(32767);
l_tryNUMBER;
l_statusNUMBER;
BEGIN
DBMS_PARALLEL_EXECUTE.create_task(task_name=>l_task);

DBMS_PARALLEL_EXECUTE.create_chunks_by_rowid(task_name=>l_task,
table_owner=>'TEST',
table_name=>'TEST_TAB',
by_row=>TRUE,
chunk_size=>10000);

l_sql_stmt:='UPDATE/*+ROWID(dda)*/test_tabt
SETt.num_col=t.num_col+10
WHERErowidBETWEEN:start_idAND:end_id';

DBMS_PARALLEL_EXECUTE.run_task(task_name=>l_task,
sql_stmt=>l_sql_stmt,
language_flag=>DBMS_SQL.NATIVE,
parallel_level=>10);

--Ifthereiserror,RESUMEitforatmost2times.
l_try:=0;
l_status:=DBMS_PARALLEL_EXECUTE.task_status(l_task);
WHILE(l_try<2andl_status!=DBMS_PARALLEL_EXECUTE.FINISHED)
Loop
l_try:=l_try+1;
DBMS_PARALLEL_EXECUTE.resume_task(l_task);
l_status:=DBMS_PARALLEL_EXECUTE.task_status(l_task);
ENDLOOP;

DBMS_PARALLEL_EXECUTE.drop_task(l_task);
END;
/

3.2Test2

Thefollowingexampleshowstheprocessingofaworkloadchunkedbyanumbercolumn.Noticethattheworkloadisactuallyastoredprocedureinthiscase.

CREATEORREPLACEPROCEDUREprocess_update(p_start_idINNUMBER,p_end_idINNUMBER)AS
BEGIN
UPDATE/*+ROWID(dda)*/test_tabt
SETt.num_col=t.num_col+10
WHEREidBETWEENp_start_idANDp_end_id;
END;
/
DECLARE
l_taskVARCHAR2(30):='test_task';
l_sql_stmtVARCHAR2(32767);
l_tryNUMBER;
l_statusNUMBER;
BEGIN
DBMS_PARALLEL_EXECUTE.create_task(task_name=>l_task);

DBMS_PARALLEL_EXECUTE.create_chunks_by_number_col(task_name=>l_task,
table_owner=>'TEST',
table_name=>'TEST_TAB',
table_column=>'ID',
chunk_size=>10000);

l_sql_stmt:='BEGINprocess_update(:start_id,:end_id);END;';

DBMS_PARALLEL_EXECUTE.run_task(task_name=>l_task,
sql_stmt=>l_sql_stmt,
language_flag=>DBMS_SQL.NATIVE,
parallel_level=>10);

--Ifthereiserror,RESUMEitforatmost2times.
l_try:=0;
l_status:=DBMS_PARALLEL_EXECUTE.task_status(l_task);
WHILE(l_try<2andl_status!=DBMS_PARALLEL_EXECUTE.FINISHED)
Loop
l_try:=l_try+1;
DBMS_PARALLEL_EXECUTE.resume_task(l_task);
l_status:=DBMS_PARALLEL_EXECUTE.task_status(l_task);
ENDLOOP;

DBMS_PARALLEL_EXECUTE.drop_task(l_task);
END;
/

3.3Test3

ThefollowingexampleshowsaworkloadchunkedbyanSQLstatementandprocessedbyauser-definedframework.

DECLARE
l_taskVARCHAR2(30):='test_task';
l_stmtCLOB;
l_sql_stmtVARCHAR2(32767);
l_chunk_idNUMBER;
l_start_idNUMBER;
l_end_idNUMBER;
l_any_rowsBOOLEAN;
BEGIN
DBMS_PARALLEL_EXECUTE.create_task(task_name=>l_task);

l_stmt:='SELECTDISTINCTnum_col,num_colFROMtest_tab';

DBMS_PARALLEL_EXECUTE.create_chunks_by_sql(task_name=>l_task,
sql_stmt=>l_stmt,
by_rowid=>FALSE);

l_sql_stmt:='UPDATE/*+ROWID(dda)*/test_tabt
SETt.num_col=t.num_col
WHEREnum_colBETWEEN:start_idAND:end_id';

LOOP
--Getnextunassignedchunk.
DBMS_PARALLEL_EXECUTE.get_number_col_chunk(task_name=>'test_task',
chunk_id=>l_chunk_id,
start_id=>l_start_id,
end_id=>l_end_id,
any_rows=>l_any_rows);

EXITWHENl_any_rows=FALSE;

BEGIN
--Manuallyexecutethework.
EXECUTEIMMEDIATEl_sql_stmtUSINGl_start_id,l_end_id;

--Setthechunkstatusasprocessed.
DBMS_PARALLEL_EXECUTE.set_chunk_status(task_name=>'test_task',
chunk_id=>l_chunk_id,
status=>DBMS_PARALLEL_EXECUTE.PROCESSED);
EXCEPTION
WHENOTHERSTHEN
--Recordchunkerror.
DBMS_PARALLEL_EXECUTE.set_chunk_status(task_name=>'test_task',
chunk_id=>l_chunk_id,
status=>DBMS_PARALLEL_EXECUTE.PROCESSED_WITH_ERROR,
err_num=>SQLCODE,
err_msg=>SQLERRM);
END;

--Commitwork.
COMMIT;
ENDLOOP;

DBMS_PARALLEL_EXECUTE.drop_task(l_task);
END;
/



-------------------------------------------------------------------------------------------------------
Blog:http://blog.csdn.net/tianlesoftware
Email:dvd.dba@gmail.com
DBA1群:62697716(满);DBA2群:62697977(满)DBA3群:62697850(满)
DBA超级群:63306533(满);DBA4群:83829929DBA5群:142216823
DBA6群:158654907聊天群:40132017聊天2群:69087192
--加群需要在备注说明Oracle表空间和数据文件的关系,否则拒绝申请
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: