您的位置:首页 > 数据库 > Oracle

利用Oracle dbms_pipe实现存储过程之间的通信

2016-12-28 16:29 405 查看
应用程序开发人员的需求是这样的:
1. 根据条件给每一个国家的商品生成唯一7位随机代码,不同国家之间的商品代码可以相同
2. 如果输入标准分隔符的字符串,则解析该字符串作为需要生成的商品ID,为其生成代码,否则为商品表中所有商品ID生成代码
3. 代码的每一位要符合相应的规则,例如第一位的规则是[0123],则这位只能是0、1、2、3中的一个数
4. 由于可能一次生成大量的代码,这个过程需要较长时间,所以需要用进度条提示生成进度
5. 可以在生成过程执行中终止过程
6. 返回需要生成的代码个数和实际生成的代码个数
7. 每次提交的个数可以通过参数定义,例如共要生成10万的代码,每次提交1000个 设计思路:
1. 为每个国家预生成0到9999999一千万个随机数作为候选代码池表,每生成一个代码就从代码池中删除一个,避免查重操作
2. 用一个存储过程生成代码,另一个过程用来终止生成过程,两个过程间用dbms_pipe进行通信
3. 用一个函数返回当前已经生成的代码个数,供显示进度条的外部程序调用,生成代码过程与该函数用dbms_pipe进行通信
4. 用bulk collect批量处理提高效率源代码:[c-sharp] view plain copy-- 1. 生成可用代码池,重复执行过程为每一个国家生成一个可用代码池表,表名为org_code_加上两位国家代码  
CREATE OR REPLACE PROCEDURE p_org_code  
IS  
BEGIN  
   -- tmp是一个提交后删除行的全局临时表  
   FOR i IN 0 .. 9999999  
   LOOP  
      INSERT INTO tmp  
           VALUES (LPAD (i, 7, '0'));  
   END LOOP;  
  
   INSERT INTO org_code_00  
      SELECT   *  
          FROM tmp  
      ORDER BY DBMS_RANDOM.VALUE;  
  
   COMMIT;  
END;  
  
-- 2. 代码生成过程  
CREATE OR REPLACE PROCEDURE p_gen_code (  
   p_country_code   IN       VARCHAR2,              -- 国家代码  
   p_p1             IN       VARCHAR2,                          -- 第一位规则字符串,如012345  
   p_p2             IN       VARCHAR2,                          -- 第二位规则字符串,如012345  
   p_p3             IN       VARCHAR2,                          -- 第三位规则字符串,如012345  
   p_p4             IN       VARCHAR2,                          -- 第四位规则字符串,如012345  
   p_p5             IN       VARCHAR2,                          -- 第五位规则字符串,如012345  
   p_p6             IN       VARCHAR2,                          -- 第六位规则字符串,如012345  
   p_p7             IN       VARCHAR2,                          -- 第七位规则字符串,如012345  
   p_instr          IN       VARCHAR2,                          -- 商品ID字符串,用,做分隔符   
   p_count          IN       NUMBER DEFAULT 1000,   -- 每次提交的个数  
   r_outstr         OUT      VARCHAR2               -- 输出需要生成的代码个数和实际生成的代码个数  
)  
IS  
   l_id                       DBMS_SQL.varchar2_table;  
   l_mc                       DBMS_SQL.varchar2_table;  
   l_code                     DBMS_SQL.varchar2_table;  
   l_mc_str                   VARCHAR2 (255);  
   l_code_str                 VARCHAR2 (7);  
   l_id_count                 INT                     := 0;  
   l_code_count               INT                     := 0;  
   l_idx                      INT;  
   l_start                    INT                     := 1;  
   l_substr                   VARCHAR2 (32);  
   l_instr                    VARCHAR2 (2000);  
   l_length                   INT;  
   l_tablename                VARCHAR2 (30)  := 'org_code_' || p_country_code;  
   l_sql                      VARCHAR2 (2000);  
   l_count                    INT                     := 0;  
   l_pointer                  INT                     := 0;  
   l_real_count               INT                     := 0;  
   -- IPC  
   l_pipename        CONSTANT VARCHAR2 (12)           := 'mypipe';  
   l_pipe_getcount   CONSTANT VARCHAR2 (12)           := 'getcount';  
   l_pipe_retcount   CONSTANT VARCHAR2 (12)           := 'retcount';  
   l_send_result              INT;  
BEGIN  
   DBMS_PIPE.PURGE (l_pipename);  
  
   IF p_instr IS NULL  
   THEN  
      BEGIN  
         SELECT ID, mc  
         BULK COLLECT INTO l_id, l_mc  
           FROM product;  
  
         l_id_count := l_id.COUNT;  
         l_sql :=  
               'select code1 from (select code1 from '  
            || l_tablename  
            || ' where instr('''  
            || p_p1  
            || ''',substr(code1,1,1)) > 0 and instr('''  
            || p_p2  
            || ''',substr(code1,2,1)) > 0 and instr('''  
            || p_p3  
            || ''',substr(code1,3,1)) > 0 and instr('''  
            || p_p4  
            || ''',substr(code1,4,1)) > 0 and instr('''  
            || p_p5  
            || ''',substr(code1,5,1)) > 0 and instr('''  
            || p_p6  
            || ''',substr(code1,6,1)) > 0 and instr('''  
            || p_p7  
            || ''',substr(code1,7,1)) > 0 where rownum <= '  
            || l_id_count;  
  
         EXECUTE IMMEDIATE l_sql  
         BULK COLLECT INTO l_code;  
  
         l_code_count := l_code.COUNT;  
  
         WHILE (l_count < l_code_count)  
         LOOP  
            IF DBMS_PIPE.receive_message (l_pipename, 0) = 0  
            THEN  
               DBMS_PIPE.unpack_message (l_pipebuf);  
               EXIT WHEN l_pipebuf = 'stop';  
            END IF;  
  
            IF DBMS_PIPE.receive_message (l_pipe_getcount, 0) = 0  
            THEN  
               DBMS_PIPE.PURGE (l_pipe_retcount);  
               dbms_pipe_pack_message (TO_CHAR (l_count) || '|running');  
               l_send_result := DBMS_PIPE.send_message (l_pipe_retcount);  
            END IF;  
  
            l_real_count :=  
                           LEAST (p_count, l_code_count - l_pointer * p_count);  
  
            FOR i IN 1 .. l_real_count  
            LOOP  
               INSERT INTO product_code  
                    VALUES (l_code (l_pointer * p_count + i),  
                            l_mc (l_pointer * p_count + i),  
                            p_country_code);  
  
               EXECUTE IMMEDIATE    'delete from '  
                                 || l_tablename  
                                 || ' where code1=:x'  
                           USING l_code (l_pointer * p_count + i);  
  
               UPDATE product  
                  SET status = 1  
                WHERE ID = l_id (l_pointer * p_count + i);  
            END LOOP;  
  
            COMMIT;  
            l_pointer := pointer + 1;  
            l_count := l_count + l_real_count;  
         END LOOP;  
  
         DBMS_PIPE.PURGE (l_pipe_retcount);  
         DBMS_PIPE.pack_message (TO_CHAR (l_count) || '|end');  
         l_send_result := DBMS_PIPE.send_message (l_pipe_retcount);  
         r_outstr := l_id_count || '|' || l_count;  
         DBMS_OUTPUT.put_line (r_outstr);  
      EXCEPTION  
         WHEN NO_DATA_FOUND  
         THEN  
            COMMIT;  
         WHEN OTHERS  
         THEN  
            RAISE;  
      END;  
   ELSE  
      IF SUBSTR (p_instr, -1, 1) = ','  
      THEN  
         l_instr := p_instr;  
      ELSE  
         l_instr := p_instr || ',';  
      END IF;  
  
      l_length := LENGTH (l_instr);  
  
      <<outer_loop>>  
      WHILE l_start < l_length  
      LOOP  
         l_id_count := l_id_count + 1;  
         l_idx := INSTR (l_instr, ',', l_start);  
         l_substr := SUBSTR (l_instr, l_start, l_idx - l_start);  
         l_start := l_idx + 1;  
  
         IF DBMS_PIPE.receive_message (l_pipename, 0) = 0  
         THEN  
            DBMS_PIPE.unpack_message (l_pipebuf);  
            EXIT WHEN l_pipebuf = 'stop';  
         END IF;  
  
         IF DBMS_PIPE.receive_message (l_pipe_getcount, 0) = 0  
         THEN  
            DBMS_PIPE.PURGE (l_pipe_retcount);  
            DBMS_PIPE.pack_message (l_code_count || '|running');  
            l_send_result := DBMS_PIPE.send_message (l_pipe_retcount);  
         END IF;  
  
         BEGIN  
            SELECT mc  
              INTO l_mc_str  
              FROM product  
             WHERE ID = l_substr;  
         EXCEPTION  
            WHEN OTHERS  
            THEN  
               GOTO outer_loop;  
         END;  
  
         l_sql :=  
               'select code1 from (select code1 /*+ first_rows */ from '  
            || l_tablename  
            || ' where instr('''  
            || p_p1  
            || ''',  
         substr(code1,1,1)) > 0 and instr('''  
            || p_p2  
            || ''',  
         substr(code1,2,1)) > 0 and instr('''  
            || p_p3  
            || ''',  
         substr(code1,3,1)) > 0 and instr('''  
            || p_p4  
            || ''',  
         substr(code1,4,1)) > 0 and instr('''  
            || p_p5  
            || ''',  
         substr(code1,5,1)) > 0 and instr('''  
            || p_p6  
            || ''',  
         substr(code1,6,1)) > 0 and instr('''  
            || p_p7  
            || ''',  
         substr(code1,1,1)) > 0 ) where rownum=1 ';  
  
         BEGIN  
            EXECUTE IMMEDIATE l_sql  
                         INTO l_code_str;  
         EXCEPTION  
            WHEN NO_DATA_FOUND  
            THEN  
               EXIT;  
         END;  
  
         IF SQL%ROWCOUNT = 1  
         THEN  
            l_code_count := l_code_count + 1;  
         END IF;  
  
         INSERT INTO product_code  
              VALUES (l_code_str, l_wzmc_str, p_country_code);  
  
         EXECUTE IMMEDIATE 'delete from ' || l_tablename || ' where code1=:x'  
                     USING l_code_str;  
  
         UPDATE product  
            SET status = 1  
          WHERE ID = l_substr;  
      END LOOP;  
  
      COMMIT;  
      DBMS_PIPE.PURGE (l_pipe_retcount);  
      DBMS_PIPE.pack_message (TO_CHAR (l_code_count) || '|end');  
      l_send_result := DBMS_PIPE.send_message (l_pipe_retcount);  
      r_outstr :=  
            LENGTH (l_instr)  
         -  LENGTH (REPLACE (l_instr, ',', ''))  
         || '|'  
         || l_code_count;  
      DBMS_OUTPUT.put_line (r_outstr);  
   END IF;  
END p_gen_code;  
  
-- 3. 终止过程  
CREATE OR REPLACE PROCEDURE p_stop  
IS  
   l_pipename        VARCHAR2 (12) := 'mypipe';  
   l_create_result   INTEGER       := DBMS_PIPE.create_pipe (l_pipename);  
   l_send_result     INTEGER;  
BEGIN  
   DBMS_PIPE.PURGE (l_pipename);  
   DBMS_PIPE.pack_message ('stop');  
   l_send_result := DBMS_PIPE.send_message (l_pipename);  
   DBMS_OUTPUT.put_line ('l_send_result: ' || l_send_result);  
END p_stop;  
  
-- 4. 取得当前已经生成代码的个数  
CREATE OR REPLACE FUNCTION fn_getcount (p_timeout NUMBER)  
   RETURN VARCHAR2  
IS  
   l_pipebuf         VARCHAR2 (20);  
   l_pipe_getcount   VARCHAR2 (12) := 'getcount';  
   l_pipe_retcount   VARCHAR2 (12) := 'retcount';  
   l_status          NUMBER;  
   l_receive         NUMBER;  
BEGIN  
   DBMS_PIPE.PURGE (l_pipe_getcount);  
   DBMS_PIPE.pack_message (l_pipe_getcount);  
   l_status := DBMS_PIPE.send_message (l_pipe_getcount);  
   l_receive :=  
              DBMS_PIPE.receive_message (l_pipe_retcount, NVL (p_timeout, 0));  
  
   IF l_receive = 0  
   THEN  
      DBMS_PIPE.unpack_message (l_pipebuf);  
      RETURN TO_CHAR (l_pipebuf);  
   ELSE  
      RETURN TO_CHAR ('Timed out!');  
   END IF;  
END fn_getcount;  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: