您的位置:首页 > 编程语言 > Python开发

通过Load table命令将数据文件加载到Sybase IQ数据库里面的Python脚本

2014-03-25 21:44 633 查看
CREATE TABLE poc_app.sys_ftp_cfg

(

ftp_id varchar(100) NOT NULL, --话单文件名标记

ftp_cycle_id varchar(1) NOT NULL, --话单文件名周期

ftp_stage_filepath varchar(255) NOT NULL, --话单处理后路径

ftp_stage_filereg varchar(100) NOT NULL, --话单处理后名称格式

stage_schema varchar(100) NOT NULL, --schema名称

table_name varchar(100) NOT NULL, --表名

delimiter_type_id varchar(10) NOT NULL --分隔符

);



insert into poc_app.sys_ftp_cfg

values('jiang_test_d','D','/home/sybase/day','jiang_test_[YYYYMMDD].dat','poc_app','jiang_test','|');

#!/usr/bin/python

#-*- encoding: utf-8 -*-

####################################################################################

# name: SybaseIQ_LoadData.py

# describe: 通过Load table命令将数据文件加载到Sybase IQ数据库里面

####################################################################################

import os

import pyodbc

import string

import sys

from subprocess import Popen,PIPE

import ConfigParser

reload(sys)

sys.setdefaultencoding('utf8')

'''

将数据文件加载到Sybase IQ数据库里面

'''

class SybaseIQLoad:

debug = 0

def __init__(self,dbinfo):

self.UID = dbinfo[1]

self.PWD = dbinfo[2]

odbcinfo = 'DSN=%s;UID=%s;PWD=%s'%(dbinfo[0],dbinfo[1],dbinfo[2])

self.cnxn = pyodbc.connect(odbcinfo,autocommit=True,ansi=True)

self.cursor = self.cnxn.cursor()

def __del__(self):

if self.cursor:

self.cursor.close()

if self.cnxn:

self.cnxn.close()

def _printinfo(self,msg):

print "%s"%(msg)

print "\n"

def _GetStageName(self,ftp_stage_filereg,ftp_cycle_id,cur_static_time):

if ftp_cycle_id.lower() == 'h':

ftp_stage_filename = ftp_stage_filereg.replace('[YYYYMMDDHH]',cur_static_time[0:10])

if ftp_cycle_id.lower() == 'd':

ftp_stage_filename = ftp_stage_filereg.replace('[YYYYMMDD]',cur_static_time[0:8])

if ftp_cycle_id.lower() == 'w':

ftp_stage_filename = ftp_stage_filereg.replace('[YYYY_WW]',cur_static_time[0:7])

if ftp_cycle_id.lower() == 'm':

ftp_stage_filename = ftp_stage_filereg.replace('[YYYYMM]',cur_static_time[0:6])

return ftp_stage_filename



def _getLoadInfo(self,ftp_id):

sql = '''

select

ftp_cycle_id

,ftp_stage_filepath

,ftp_stage_filereg

,stage_schema

,delimiter_type_id

,table_name

from jiang.sys_ftp_cfg

where ftp_id = '%s'

''' %(ftp_id)

self.cursor.execute(sql.strip())

row = self.cursor.fetchone()

return row



def _getSybIQServInfo(self):

# 保存SybaseIQ的主机和端口号

sybservinfo = []



# ODBC配置文件绝对路径

unixodbc_file = "/etc/unixODBC/odbc.ini"

config = ConfigParser.ConfigParser()

config.read(unixodbc_file)

# 获取SybaseIQ的IP地址

ServerIP = config.get("SybaseIQDSN", "Server")

# 获取SybaseIQ的端口号

Port = config.get("SybaseIQDSN", "Port")



# 保存获取的IP地址和端口号

sybservinfo.append(ServerIP)

sybservinfo.append(Port)



return sybservinfo



def loaddata(self,ftp_id,cur_static_time):

#取文件加载相关配置信息

row = self._getLoadInfo(ftp_id)

ftp_cycle_id = row[0]

ftp_stage_filepath = row[1]

ftp_stage_filereg = row[2]

stage_schema = row[3]

delimiter_type_id = row[4]

table_name = row[5]



# 获取指定日期的文件名

ftp_stage_filename = self._GetStageName(ftp_stage_filereg,ftp_cycle_id,cur_static_time)



# 获取清洗后文件的绝对路径

ftp_stage_absolute_filename = os.path.join(ftp_stage_filepath,ftp_stage_filename)

# 对清洗后的文件再进行处理

#ftp_stage_absolute_filename_final = ftp_stage_absolute_filename + '*'

# 获取SybaseIQ的主机IP地址和端口号

sybaseiq_ipport = self._getSybIQServInfo()



# 获取表的所有字段

table_columns = '''

select column_name

from syscolumn a

join systable b

on a.table_id = b.table_id

where b.table_name = '%s' ># /tmp/table_name.log

'''%(table_name)

load_sql='''dbisql -c "uid=%s;pwd=%s" -Host %s -port %s -nogui "%s"'''%(self.UID,self.PWD,sybaseiq_ipport[0],sybaseiq_ipport[1],table_columns)

os.system(load_sql)



# 处理生成的表字段文件

columns_sql = '''

cat /tmp/table_name.log | sed "s/'//g" | awk '{printf "%s,",$0}'| sed 's/,$//g'

'''

result = Popen(columns_sql,shell=True,stdout=PIPE,stderr=PIPE)

right_info = result.stdout.read().strip('\xef|\xbb|\xbf')

err_info = result.stderr.read()



loadsql = '''

load table %s.cpms_area_user

(

%s

)

USING FILE '%s'

FORMAT ASCII

ESCAPES OFF

QUOTES OFF

NOTIFY 1000000

DELIMITED BY '%s'

WITH CHECKPOINT ON;

COMMIT;

'''%(stage_schema, right_info, ftp_stage_absolute_filename, delimiter_type_id)

try:

iserr = 0

print "*************Begin to execute load table command...*************\n"

if self.debug == 1:

self._printinfo(loadsql.strip())

#self.cursor.execute(loadsql.strip())

loadsql='''dbisql -c "uid=%s;pwd=%s" -Host %s -port %s -nogui "%s"'''%(self.UID,self.PWD,sybaseiq_ipport[0],sybaseiq_ipport[1],loadsql)

os.system(loadsql)

print "\n*************End to execute load table command...*************"

print "**************************Successful**************************"

except Exception,err:

iserr = 1

print "Return value %s,Error %s" % (iserr,err)



return iserr

#Main

def main():

# 检查传入参数个数

if len(sys.argv) < 6 :

print 'usage: python SybaseIQ_LoadData.py SybaseDSN username password ftp_id cur_static_time\n'

sys.exit(1)



# 定义连接Sybase IQ的信息

dbinfo = []

#dbinfo.append('SybaseIQDSN')

#dbinfo.append('jiang')

#dbinfo.append('jiang')

dbinfo.append(sys.argv[1])

dbinfo.append(sys.argv[2])

dbinfo.append(sys.argv[3])



ftp_id = sys.argv[4]

cur_static_time = sys.argv[5]



SIQ = SybaseIQLoad(dbinfo)

ret = SIQ.loaddata(ftp_id,cur_static_time)

return ret

if __name__ == '__main__':

sys.exit(main())
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: