您的位置:首页 > 其它

ETL应用:一种处理接口的Pro*C实现方法

2015-10-10 22:23 453 查看
2007年,当时项目所有ETL采用C编写,实现了ETL基本功能。当接口很多时,为保证文件获取效率,做好接口可配置;文件维护中经常会出现接口晚到情况,需要有一种方法能将接口晚到信息写入数据库,便于短信告警。当时刚学习Pro*C不久,就实现了该方法, 如下

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <time.h>
#include <sys/wait.h>
#include <errno.h>
#include <sys/types.h>
#include <limits.h>
#include <sqlda.h>
#include <sqlcpr.h>
/*定义数据库连接信息*/
#define USERNAME    "masaetl"
#define PASSWORD    "masaetl"
#define ORACLESID   "DWDB2"
#define MAX_MSG_LENGTH 128
#define MAX_CMD_LENGTH 1024
#define  TRUE         1
/*定义告警文件生成目录*/
const char *alarm_path="/ETL_FS/etlfs/interface/toptea";
/*定义处理日期,内部链接的静态变量*/
static char filedate[9];
/*调试代码时使用*/
#ifndef DEBUG
#define DEBUG   // 定义调试开关
#endif
/*SQL语句返回值定义*/
/*定义返回值,成功为0,错误<0,警告>0  */
#define SQLCODE   sqlca.sqlcode
/*SQL语句返回错误解释 */
#define SQLERRMC  sqlca.sqlerrm.sqlerrmc
/*包含数据库信息*/
EXEC SQL INCLUDE sqlca;
EXEC SQL INCLUDE sqlda;

/*SQL语句返回值定义*/
/*定义返回值,成功为0,错误<0,警告>0*/
#define SQLCODE   sqlca.sqlcode
/*SQL语句返回错误解释*/
#define SQLERRMC  sqlca.sqlerrm.sqlerrmc
/*执行应用程序说明*/
/*getlinkdata程序每执行一次得到最后提供的晚到接口后都会执行*/
const char  * getlinkfile="/ETL_FS/etlfs/interface/service/public/proc/getlinkdata";
/*getlatefile.sh根据得到的晚到接口列表自动提取接口文件并进行链接*/
const char  * getlatefile="/ETL_FS/etlfs/interface/service/public/proc/getlatefile.sh";
void sqlerror();           /*处理SQL错误*/
int ConnectDataBase();     /*链接数据库*/
void DisConnectDataBase(); /*断开数据库连接*/
char *getYestDate(char *szDate);   /*得到前一天的日期,格式"YYYYMMDD"*/
char *getFileName(const char *szFilePath,char *szFileName); /*从字符串中得到文件名*/
int getAlarmFile(const char *alarmFile);     /*生成告警文件*/
char *substitute(char *src,const char *sMatch,const char *sReplace);    /*将源字符串中子字符串替换*/
/**************************************
*** 功能: 提取运行在数据库级报错信息
*** 输入变量:
*** 输出变量:
***************************************/
void sqlerror()
{
/*为了避免错误处理时发生死循环,应给出此说明*/
EXEC SQL WHENEVER SQLERROR CONTINUE;
printf("\n Oracle error detected: ");
printf("\n%s",SQLERRMC);  /*错误信息*/

/*断开数据库链接*/
EXEC SQL ROLLBACK WORK RELEASE;
exit(1);
}
/**************************************
*** 功能:  连接数据库
*** 输入变量:
*** 输出变量: -1 连接数据库失败 0 成功
***************************************/
int ConnectDataBase()
{
EXEC SQL BEGIN DECLARE SECTION;
char  username[8];
char  password[11];
char  oraclesid[7];
EXEC SQL END DECLARE  SECTION;

strcpy(username,USERNAME);  /*用户名*/
//username.len=strlen(username.arr);

strcpy(password,PASSWORD);  /*密码*/
//password.len=strlen(password.arr);

strcpy(oraclesid,ORACLESID);  /*Oralce SID*/
//oraclesid.len=strlen(oraclesid.arr);

/*链接数据库*/
EXEC SQL CONNECT :username IDENTIFIED BY :password USING :oraclesid;
if( SQLCODE ) {

return -1;
}
return 0;
}
/**************************************
*** 功能:  断开数据库连接
*** 输入变量:
*** 输出变量:
***************************************/
void DisConnectDataBase()
{
/*断开连接*/
EXEC SQL ROLLBACK WORK RELEASE;
exit(0);
}
/***************************************
**  功能 :    得到前一天的日期
*** 输入变量:
*** 输出变量:
****************************************/
char *getYestDate(char *szDate)
{
time_t yest;
struct tm ptime={'\0'};
char szChar[5];

//若输入的参数为空,默认提取系统前一天的日期
if ( *szDate =='\0' ){

yest=time(NULL)-(time_t)(60*60*24);
strftime(szDate,9,"%Y%m%d",localtime(&yest));

return szDate;
}
//提取年
memset(szChar,0,sizeof(szChar));
strncpy(szChar,szDate,4);
ptime.tm_year=atoi(szChar)-1900;

//提取月
memset(szChar,0,sizeof(szChar));
strncpy(szChar,szDate+4,2);
ptime.tm_mon=atoi(szChar)-1;

//提取日
memset(szChar,0,sizeof(szChar));
strncpy(szChar,szDate+6,2);
ptime.tm_mday=atoi(szChar);

yest=mktime(&ptime)-(time_t)(60*60*24);
strftime(szDate,9,"%Y%m%d",localtime(&yest));

return szDate;
}
/****************************************
**  功能: 判断当日是否有接口文件晚到
**     有,则将接口晚到信息写入告警文件中
**  输入参数:  告警文件名称
**  输出参数:  接口晚到个数
*****************************************/
int getAlarmFile(const char *alarmFile)
{
FILE *pf;

/*申明接口编号,接口处理日期*/
EXEC SQL BEGIN DECLARE SECTION;
char szSql[MAX_CMD_LENGTH];
char szDate[9];
int v_count=0;
long order_id;
EXEC SQL END DECLARE SECTION;

/*得到日期*/
strcpy(szDate,filedate);

/*连接数据库*/
if( ConnectDataBase() <0 ){

printf("连接数据库失败,失败原因[%d][%s]",SQLCODE,SQLERRMC);
return -1;
}
/*得到接口晚到个数*/
EXEC SQL SELECT COUNT(DISTINCT A.ORDER_ID) INTO :v_count
FROM  MASAETL.ETL_INTERFACE_ALARM A,
(SELECT * FROM MASAETL.ETL_LINK_FILE WHERE FILEDATE=:szDate) B
WHERE A.ORDER_ID=B.ORDER_ID(+) AND B.ORDER_ID IS NULL AND A.IF_ALARM=1;

/*检查SQL是否可以正常执行*/
if(SQLCODE)
{
printf("line%d:执行检查接口晚到数目SQL失败,错误号:[%d][%s]\n", __LINE__, SQLCODE, SQLERRMC);
return 0;
}

/*判断当日是否有接口晚到*/
if ( v_count >0 ){

/*只有在有接口未到时才生成告警文件*/
if ( ( pf=fopen(alarmFile,"a+") ) ==NULL ){

printf("创建当日告警文件失败.\n");
exit(1);
}

/*申明游标*/
EXEC SQL DECLARE later_cursor CURSOR FOR
SELECT  DISTINCT A.ORDER_ID
FROM  MASAETL.ETL_INTERFACE_ALARM A,
(SELECT * FROM MASAETL.ETL_LINK_FILE WHERE FILEDATE=:szDate) B
WHERE A.ORDER_ID=B.ORDER_ID(+) AND B.ORDER_ID IS NULL AND A.IF_ALARM=1;

/*打开游标*/
EXEC SQL OPEN  later_cursor;
EXEC SQL WHENEVER NOT FOUND DO break;

while ( TRUE ){

/*提取记录*/
EXEC SQL FETCH later_cursor INTO :order_id;

/*任务记录完,退出while循环*/
if ( SQLCODE == 1403 ){

printf("提取记录为空!!!!\n");
break;
}
else if (SQLCODE){

printf("无法正常取得接口编号,退出!!!!\n");
printf("错误原因 SQLCODE=[%d][%s]\n", SQLCODE, SQLERRMC);
continue;
}
/*将告警信息写入文件*/
fprintf(pf,"%ld|",order_id);

}
/*关闭游标*/
EXEC SQL CLOSE  later_cursor;
/*关闭文件*/
fclose(pf);
}

/*断开数据库连接*/
DisConnectDataBase();

return v_count;
}
/****************************************
**  功能:  将字符串中的模式字段替换成日期
**     字段,这里处理的是日接口,模式是固定的YYYYMMDD
**  输入参数:
**             src 需要替换的源字符串
**             subpattern  为YYYYMMDD
**             subdest     为日期变量filedate
**  输出参数:  替换后字符串
*****************************************/
char *substitute(char *src,const char *sMatch,const char *sReplace)
{
int  stringLen;
char sNewString[MAX_MSG_LENGTH];
char *FindPos =NULL;

FindPos=strstr(src, sMatch);

while( FindPos ){

memset(sNewString, 0, sizeof(sNewString));
stringLen = FindPos - src;
strncpy(sNewString, src, stringLen);
strcat(sNewString, sReplace);
strcat(sNewString, FindPos + strlen(sMatch));
strcpy(src, sNewString);
FindPos = strstr(src, sMatch);
}

return src;
}
/***************************************
**  功能 :    主程序
****************************************/
int main(int argc,char **argv)
{
char alarmfile[128];     /*告警文件格式,IYYYYMMDD.chk*/
const char *moduel="YYYYMMDD";  /*字符串替换模式*/
char cmdString[MAX_CMD_LENGTH];            /*shell字符串*/

pid_t pid;  /*子进程ID*/
int status; /*子进程终止状态*/

EXEC SQL BEGIN DECLARE SECTION;
char pDate[9];

long order_id;                     /*接口编号*/
char file_pattern[33];             /*文件模式*/
char source_ip[33];                /*源文件IP地址*/
char source_username[33];          /*源主机用户名*/
char source_password[33];          /*源主机用户密码*/
char source_path[128];             /*源主机文件所在目录*/
char destination_path[128];        /*目的主机文件目录*/
char link_path[128];               /*目的文件主机链接目录*/

short ind_file_pattern;
short ind_source_ip;
short ind_source_username;
short ind_source_password;
short ind_source_path;
short ind_destination_path;
short ind_link_path;
EXEC SQL END DECLARE SECTION;

/*得到处理日期,就是上一天的日期*/
getYestDate(filedate);
strcpy(pDate,filedate);

sprintf(alarmfile,"%s/IB%s.chk",alarm_path,filedate);
#ifdef DEBUG
printf("告警文件名:[%s].\n",alarmfile);
#endif
/*调用getlinkfile得到最新的系统文件信息*/
if ( system(getlinkfile) <0 ){

printf("执行getlinkfile应用程序失败.\n");
exit(1);
}

/*使用子进程,第一次执行时将FTP下来后生成告警文件,后面无需再处理告警文件*/
if( ( pid =fork() ) ==0 ){

if ( access(alarmfile,F_OK) <0 ){

if ( getAlarmFile(alarmfile) ==0 ){

printf("当日无接口晚到.\n");
}
}
}
/*等待子进程结束*/
if( wait(&status)!=pid ){

perror((char *)status);
}

/*连接数据库*/
if( ConnectDataBase() <0 ){

printf("连接数据库失败,失败原因[%d][%s]",SQLCODE,SQLERRMC);
return -1;
}

/*使用游标得到数据库记录*/
EXEC SQL DECLARE ftp_cursor CURSOR FOR
SELECT  A.ORDER_ID, A.FILE_PATTERN,A.SOURCE_IP,A.SOURCE_USERNAME,
A.SOURCE_PASSWORD,A.SOURCE_PATH,A.DESTINATION_PATH,A.LINK_PATH
FROM  MASAETL.ETL_INTERFACE_ALARM A,
(SELECT * FROM MASAETL.ETL_LINK_FILE WHERE FILEDATE=:pDate) B
WHERE A.ORDER_ID=B.ORDER_ID(+) AND B.ORDER_ID IS NULL AND A.IF_CHECK=1;

/*检查SQL执行情况*/
if( SQLCODE )
{
printf("执行提取晚到接口SQL失败,失败原因:[%d][%s].\n", SQLCODE, SQLERRMC);
exit(1);
}

/*打开游标*/
EXEC SQL OPEN ftp_cursor;
EXEC SQL WHENEVER NOT FOUND DO break;

while ( TRUE )
{
/*提取记录*/
EXEC SQL FETCH ftp_cursor INTO :order_id, :file_pattern:ind_file_pattern, :source_ip:ind_source_ip,
:source_username:ind_source_username, :source_password:ind_source_password,
:source_path:ind_source_path, :destination_path:ind_destination_path, :link_path:ind_link_path;

/*任务记录完,退出while循环*/
if ( SQLCODE ){

printf("提取接口编号失败,错误原因 SQLCODE=[%d][%s]\n", SQLCODE, SQLERRMC);
continue;
}

/*判断记录的有效性*/
if (ind_file_pattern==-1){

printf("文件模式为空.\n");
continue;
}
if (ind_source_ip==-1){

printf("源IP地址为空.\n");
continue;
}
if (ind_source_username==-1){

printf("源主机用户名为空.\n");
continue;
}
if (ind_source_password==-1){

printf("源主机密码为空.\n");
continue;
}
if (ind_source_path==-1){

printf("源主机目录为空.\n");
continue;
}
if (ind_destination_path==-1){

printf("目的主机目录为空.\n");
continue;
}
if (ind_link_path==-1){

printf("目录链接目录为空.\n");
continue;
}

/*对文件模式字段进行字符串替换*/
substitute(file_pattern,moduel,filedate);

/*对源主机目录字段进行字符串替换*/
substitute(destination_path,moduel,filedate);

/*执行shell程序ftp数据文件*/
memset(cmdString,0,sizeof(cmdString));
sprintf(cmdString,"%s %s %s %s %s %s %s %s",getlatefile,source_ip,source_username,source_password,
source_path,file_pattern,destination_path,link_path);
#ifdef DEBUG
printf("生成提取接口文件命令[%s].\n",cmdString);
#endif

/*执行文件提取及链接*/
if ( system(cmdString) <0 ){

printf("获取接口[%ld]文件失败.\n",order_id);
}

}
/*关闭游标*/
EXEC SQL CLOSE ftp_cursor;

/*断开数据库连接*/
DisConnectDataBase();

return 0;
}


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: