您的位置:首页 > 数据库

pro*C 使用动态sql方法四实现数据批量导入导出

2016-07-26 09:44 881 查看
    这段时间,有一个数据同步的问题,把数据从一个库同步到另一个库,需要同步的数据表比较多,如果每个表都写一个方法同步,感觉没什么意义,而且以后如果表结构变更,还要一起更新代码,太过繁琐,在查询oracle官方文档:proc 动态方法四  以及一些资料之后,实现动态批量导出导入。代码如下:

    #include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <stdarg.h>
#include <syslog.h>
#include <sqlca.h>
#include <sqlda.h>
#include <sqlcpr.h>

/* 使用PRO*C 动态sql 方法4,通过指定select和insert语句,同步数据
* 参数说明:
* pSelectSql select动态语句,仅支持 select ... from ...
* 不支持where子句, 另外,还会截取该sql,生成查询
* 记录总数量的sql,并查询需同步的数量
* pInsertSql 插入动态sql,插入字段用占位符表示,和select一致
* list_count 查询最大列数,执行之后,返回实际的查询列数
* sFromtns 同步源数据库连接串,like : hsqs/hsqs@ora11g
* sTotns 目标库连接串
* errLen 错误信息缓冲区长度
* sErrMsg 错误信息指针
* 返回值:
同步成功,返回0 否则返回-1
*/
static int
syntaxTable(const char *pSelectSql,
const char *pInsertSql,
unsigned int *list_count,
const char *sFromtns,
const char *sTotns,
size_t errLen,
char *sErrMsg)
{
EXEC SQL BEGIN DECLARE SECTION;
char username[20];
char passwd[20];
char server[50];
char usernameTo[20];
char passwdTo[20];
char serverTo[50];
char toDB[50];
char *sql;
char *sql_insert;
int array_size;
int row_count;
char sql_count[2048];
EXEC SQL END DECLARE SECTION;

int i;
int j;
int k;
int cyc;
int null_ok;
int precision;
int scale;
SQLDA *select_dp;
SQLDA *bind_dp;
char **var;
char *pt;
if (sErrMsg == NULL){
return -1;
}

/* 解析数据库连接串,连接数据库,其中源表所在数据库为默认库,目的表数据库连接由toDB标示 */
if (3 != sscanf(sFromtns, "%49[^/]%*[/]%49[^@]%*[@]%49s", username, passwd, server)){
snprintf(sErrMsg, errLen, "数据库连接串[%s] 解析失败", sFromtns);
return -1;
}
if (3 != sscanf(sTotns, "%49[^/]%*[/]%49[^@]%*[@]%49s", usernameTo, passwdTo, serverTo)){
snprintf(sErrMsg, errLen, "数据库连接串[%s] 解析失败", sTotns);
return -1;
}

EXEC SQL CONNECT :username IDENTIFIED BY :passwd USING :server;

if (sqlca.sqlcode != 0) {
snprintf(sErrMsg, errLen, "连接数据库[%s]失败:[%s]", sFromtns, sqlca.sqlerrm.sqlerrmc);
return -1;
}

strcpy(toDB, serverTo);
EXEC SQL CONNECT :usernameTo IDENTIFIED BY :passwdTo AT :toDB USING :serverTo;

if (sqlca.sqlcode != 0) {
snprintf(sErrMsg, errLen, "连接数据库[%s]失败:[%s]", sTotns, sqlca.sqlerrm.sqlerrmc);
EXEC SQL ROLLBACK WORK RELEASE;
return -1;
}
/* 数据库连接 end */

/* 拼接查询记录数量的sql,查询需同步的记录数 */
row_count = 0;
if ((pt = strstr(pSelectSql, " from ")) == 0){
pt = strstr(pSelectSql, " FROM ");
}
snprintf(sql_count, sizeof(sql_count), "select count(*) %s", pt);

EXEC SQL WHENEVER SQLERROR GOTO l_end_func;
EXEC SQL PREPARE S_ROWCOUNT FROM :sql_count;
EXEC SQL DECLARE C_ROWCOUNT CURSOR FOR S_ROWCOUNT;
EXEC SQL OPEN C_ROWCOUNT;
for (;;){
EXEC SQL FETCH C_ROWCOUNT INTO :row_count;
break;
}
EXEC SQL CLOSE C_ROWCOUNT;
/* 同步记录查询 end */

sql = (char*)pSelectSql; //动态sql 需设置预编译选项mode=ansi, 必须在定义部分定义宿主变量,使用指针sql指向查询sql
sql_insert = (char*)pInsertSql;

/*
* 调用 SQLDA *SQLSQLDAAlloc 函数给描述字分配空间。
* 对函数的返回值:如果>0,则标识返回SQLDA 结构的指针地址,为0 则表示函数调用失败。
* 函数原型: SQLDA *SQLSQLDAAlloc(void *context, unsigned int max_vars,
* unsigned int max_name, unsigned int max_ind_name);
* 参数说明:context 指定运行上下文, 0表示单线程应用
* max_vars 最大能DESCRIBE 的SELECT-LIST列和占位符号的数目
* max_name 占位符和SELECT-LIST列名称的最大长度
* max_ind_name 最大指示变量长度,这个参数只用于分配绑定描述字(BIND DESCRIPTOR)空间,
* 对分配检索描述字(SELECT DESCRIPTOR) 空间时,赋0即可
*/

if ((select_dp = SQLSQLDAAlloc((void*)0, *list_count, 30, 0)) == (SQLDA*)0){ //为查询选择描述区分配空间
snprintf(sErrMsg, errLen, "给选择描述区分配空间失败");
EXEC SQL ROLLBACK WORK RELEASE;
EXEC SQL AT :toDB ROLLBACK WORK RELEASE;
return -1;
}

if ((bind_dp = SQLSQLDAAlloc((void*)0, *list_count, 30, 30)) == (SQLDA*)0){ //为插入绑定选择描述区分配空间
snprintf(sErrMsg, errLen, "给选择描述区分配空间失败");
SQLSQLDAFree(0, select_dp);
EXEC SQL ROLLBACK WORK RELEASE;
EXEC SQL AT :toDB ROLLBACK WORK RELEASE;
return -1;
}

EXEC SQL PREPARE S FROM :sql;
EXEC SQL DECLARE C CURSOR FOR S;
EXEC SQL OPEN C;
EXEC SQL DESCRIBE SELECT LIST FOR S INTO select_dp;

/* 如果select_dp->F是一个负数,说明查询结果列数比select_dp描述的max_vars多 */
if (select_dp->F < 0) {
snprintf(sErrMsg, errLen, "查询列数量过多");
EXEC SQL ROLLBACK WORK RELEASE;
EXEC SQL AT :toDB ROLLBACK WORK RELEASE;
return -1;
}

select_dp->N = select_dp->F; //重置查询列数目为实际数目
bind_dp->N = select_dp->F;
*list_count = select_dp->F;
cyc = (1 << 13); //每次插入行

var = (char**)malloc(sizeof(char*) * select_dp->F); //数据缓冲区指针

for (i = 0; i < select_dp->F; ++i){
SQLColumnNullCheck((void*)0, (unsigned short*)&(select_dp->T[i]), (unsigned short*)&(select_dp->T[i]), &null_ok);/* 清除NULL标示位 */
switch (select_dp->T[i]){
case 1: break; /* CHAR datatype */
case 2: /*NUMBER datatype: use SQLNumberPrecV6() to extract precision and scale. */
SQLNumberPrecV6((void*)0, (unsigned long*)&(select_dp->L[i]), &precision, &scale);
if (precision == 0)
precision = 40;
if (scale > 0)
select_dp->L[i] = sizeof(double);
else
select_dp->L[i] = sizeof(long);
break;
case 11: /* ROWID datatype */
select_dp->L[i] = 18;
break;
default: break;
}

/* 为number型重新指定类型,如果有小数位,则为double,否则为int */
if (select_dp->T[i] == 2)
select_dp->T[i] = (scale > 0 ? 22: 3);

/* 为指示变量的值分配存储空间 */
select_dp->I[i] = (short*)malloc(sizeof(short*) * cyc);

var[i] = malloc(sizeof(char) * (select_dp->L[i] + 1) * cyc);

/* 设置描述区 */
select_dp->V[i] = var[i];
bind_dp->V[i] = (char*)var[i];
bind_dp->T[i] = select_dp->T[i];
bind_dp->L[i] = select_dp->L[i];
bind_dp->I[i] = (short*)select_dp->I[i];
}

EXEC SQL WHENEVER SQLERROR GOTO l_end_free;

k = 0; //已经同步的记录数
while (1){
array_size = (row_count - k - cyc) > 0 ? cyc : (row_count - k);

EXEC SQL FOR :array_size FETCH C USING DESCRIPTOR select_dp;
EXEC SQL AT :toDB PREPARE S_I FROM :sql_insert;
EXEC SQL AT :toDB DESCRIBE BIND VARIABLES FOR S_I INTO bind_dp;
EXEC SQL AT :toDB FOR :array_size
EXECUTE S_I USING DESCRIPTOR bind_dp;
EXEC SQL AT :toDB COMMIT;
k += array_size;
if (k == row_count)
break;
}
EXEC SQL CLOSE C;
EXEC SQL COMMIT RELEASE;
EXEC SQL AT :toDB COMMIT RELEASE;
goto l_end_free;

l_end_free:
for (i = 0; i < *list_count; ++i){
free(select_dp->I[i]);
free(var[i]);
}
free(var);
SQLSQLDAFree(0, select_dp);
SQLSQLDAFree(0, bind_dp);
if (sqlca.sqlcode != 0){
goto l_end_func;
}
return 0;

l_end_func:
snprintf(sErrMsg + strlen(sErrMsg), errLen - strlen(sErrMsg) - 1, "[%s]", sqlca.sqlerrm.sqlerrmc);
EXEC SQL ROLLBACK WORK RELEASE;
EXEC SQL AT :toDB ROLLBACK WORK RELEASE;
return -1;
}

int main(int argc, char *argv[])
{
char sqlbuffer[10240];
char sqlbuffer1[10240];
char sErrMsg[1024];
int list_count = 100;

snprintf(sqlbuffer, sizeof(sqlbuffer), "select a, b, c, d from table1");

snprintf(sqlbuffer1, sizeof(sqlbuffer1), "insert into table2 (a, b, c, d) values (:a, :b, :c, :d)");
syntaxTable(sqlbuffer,
sqlbuffer1,
&list_count,
"luoxc/luoxc@192.168.54.222/ora11g",
"luoxc/luoxc@192.168.54.222/ora11g",
sizeof(sErrMsg) - 1,
sErrMsg);
printf("%s\n", sErrMsg);
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: