通过Load table命令将数据文件加载到Sybase IQ数据库里面的Python脚本
2014-03-25 21:44
633 查看
CREATE TABLE poc_app.sys_ftp_cfg
(
ftp_id varchar(100) NOT NULL, --话单文件名标记
ftp_cycle_id varchar(1) NOT NULL, --话单文件名周期
ftp_stage_filepath varchar(255) NOT NULL, --话单处理后路径
ftp_stage_filereg varchar(100) NOT NULL, --话单处理后名称格式
stage_schema varchar(100) NOT NULL, --schema名称
table_name varchar(100) NOT NULL, --表名
delimiter_type_id varchar(10) NOT NULL --分隔符
);
insert into poc_app.sys_ftp_cfg
values('jiang_test_d','D','/home/sybase/day','jiang_test_[YYYYMMDD].dat','poc_app','jiang_test','|');
#!/usr/bin/python
#-*- encoding: utf-8 -*-
####################################################################################
# name: SybaseIQ_LoadData.py
# describe: 通过Load table命令将数据文件加载到Sybase IQ数据库里面
####################################################################################
import os
import pyodbc
import string
import sys
from subprocess import Popen,PIPE
import ConfigParser
reload(sys)
sys.setdefaultencoding('utf8')
'''
将数据文件加载到Sybase IQ数据库里面
'''
class SybaseIQLoad:
debug = 0
def __init__(self,dbinfo):
self.UID = dbinfo[1]
self.PWD = dbinfo[2]
odbcinfo = 'DSN=%s;UID=%s;PWD=%s'%(dbinfo[0],dbinfo[1],dbinfo[2])
self.cnxn = pyodbc.connect(odbcinfo,autocommit=True,ansi=True)
self.cursor = self.cnxn.cursor()
def __del__(self):
if self.cursor:
self.cursor.close()
if self.cnxn:
self.cnxn.close()
def _printinfo(self,msg):
print "%s"%(msg)
print "\n"
def _GetStageName(self,ftp_stage_filereg,ftp_cycle_id,cur_static_time):
if ftp_cycle_id.lower() == 'h':
ftp_stage_filename = ftp_stage_filereg.replace('[YYYYMMDDHH]',cur_static_time[0:10])
if ftp_cycle_id.lower() == 'd':
ftp_stage_filename = ftp_stage_filereg.replace('[YYYYMMDD]',cur_static_time[0:8])
if ftp_cycle_id.lower() == 'w':
ftp_stage_filename = ftp_stage_filereg.replace('[YYYY_WW]',cur_static_time[0:7])
if ftp_cycle_id.lower() == 'm':
ftp_stage_filename = ftp_stage_filereg.replace('[YYYYMM]',cur_static_time[0:6])
return ftp_stage_filename
def _getLoadInfo(self,ftp_id):
sql = '''
select
ftp_cycle_id
,ftp_stage_filepath
,ftp_stage_filereg
,stage_schema
,delimiter_type_id
,table_name
from jiang.sys_ftp_cfg
where ftp_id = '%s'
''' %(ftp_id)
self.cursor.execute(sql.strip())
row = self.cursor.fetchone()
return row
def _getSybIQServInfo(self):
# 保存SybaseIQ的主机和端口号
sybservinfo = []
# ODBC配置文件绝对路径
unixodbc_file = "/etc/unixODBC/odbc.ini"
config = ConfigParser.ConfigParser()
config.read(unixodbc_file)
# 获取SybaseIQ的IP地址
ServerIP = config.get("SybaseIQDSN", "Server")
# 获取SybaseIQ的端口号
Port = config.get("SybaseIQDSN", "Port")
# 保存获取的IP地址和端口号
sybservinfo.append(ServerIP)
sybservinfo.append(Port)
return sybservinfo
def loaddata(self,ftp_id,cur_static_time):
#取文件加载相关配置信息
row = self._getLoadInfo(ftp_id)
ftp_cycle_id = row[0]
ftp_stage_filepath = row[1]
ftp_stage_filereg = row[2]
stage_schema = row[3]
delimiter_type_id = row[4]
table_name = row[5]
# 获取指定日期的文件名
ftp_stage_filename = self._GetStageName(ftp_stage_filereg,ftp_cycle_id,cur_static_time)
# 获取清洗后文件的绝对路径
ftp_stage_absolute_filename = os.path.join(ftp_stage_filepath,ftp_stage_filename)
# 对清洗后的文件再进行处理
#ftp_stage_absolute_filename_final = ftp_stage_absolute_filename + '*'
# 获取SybaseIQ的主机IP地址和端口号
sybaseiq_ipport = self._getSybIQServInfo()
# 获取表的所有字段
table_columns = '''
select column_name
from syscolumn a
join systable b
on a.table_id = b.table_id
where b.table_name = '%s' ># /tmp/table_name.log
'''%(table_name)
load_sql='''dbisql -c "uid=%s;pwd=%s" -Host %s -port %s -nogui "%s"'''%(self.UID,self.PWD,sybaseiq_ipport[0],sybaseiq_ipport[1],table_columns)
os.system(load_sql)
# 处理生成的表字段文件
columns_sql = '''
cat /tmp/table_name.log | sed "s/'//g" | awk '{printf "%s,",$0}'| sed 's/,$//g'
'''
result = Popen(columns_sql,shell=True,stdout=PIPE,stderr=PIPE)
right_info = result.stdout.read().strip('\xef|\xbb|\xbf')
err_info = result.stderr.read()
loadsql = '''
load table %s.cpms_area_user
(
%s
)
USING FILE '%s'
FORMAT ASCII
ESCAPES OFF
QUOTES OFF
NOTIFY 1000000
DELIMITED BY '%s'
WITH CHECKPOINT ON;
COMMIT;
'''%(stage_schema, right_info, ftp_stage_absolute_filename, delimiter_type_id)
try:
iserr = 0
print "*************Begin to execute load table command...*************\n"
if self.debug == 1:
self._printinfo(loadsql.strip())
#self.cursor.execute(loadsql.strip())
loadsql='''dbisql -c "uid=%s;pwd=%s" -Host %s -port %s -nogui "%s"'''%(self.UID,self.PWD,sybaseiq_ipport[0],sybaseiq_ipport[1],loadsql)
os.system(loadsql)
print "\n*************End to execute load table command...*************"
print "**************************Successful**************************"
except Exception,err:
iserr = 1
print "Return value %s,Error %s" % (iserr,err)
return iserr
#Main
def main():
# 检查传入参数个数
if len(sys.argv) < 6 :
print 'usage: python SybaseIQ_LoadData.py SybaseDSN username password ftp_id cur_static_time\n'
sys.exit(1)
# 定义连接Sybase IQ的信息
dbinfo = []
#dbinfo.append('SybaseIQDSN')
#dbinfo.append('jiang')
#dbinfo.append('jiang')
dbinfo.append(sys.argv[1])
dbinfo.append(sys.argv[2])
dbinfo.append(sys.argv[3])
ftp_id = sys.argv[4]
cur_static_time = sys.argv[5]
SIQ = SybaseIQLoad(dbinfo)
ret = SIQ.loaddata(ftp_id,cur_static_time)
return ret
if __name__ == '__main__':
sys.exit(main())
(
ftp_id varchar(100) NOT NULL, --话单文件名标记
ftp_cycle_id varchar(1) NOT NULL, --话单文件名周期
ftp_stage_filepath varchar(255) NOT NULL, --话单处理后路径
ftp_stage_filereg varchar(100) NOT NULL, --话单处理后名称格式
stage_schema varchar(100) NOT NULL, --schema名称
table_name varchar(100) NOT NULL, --表名
delimiter_type_id varchar(10) NOT NULL --分隔符
);
insert into poc_app.sys_ftp_cfg
values('jiang_test_d','D','/home/sybase/day','jiang_test_[YYYYMMDD].dat','poc_app','jiang_test','|');
#!/usr/bin/python
#-*- encoding: utf-8 -*-
####################################################################################
# name: SybaseIQ_LoadData.py
# describe: 通过Load table命令将数据文件加载到Sybase IQ数据库里面
####################################################################################
import os
import pyodbc
import string
import sys
from subprocess import Popen,PIPE
import ConfigParser
reload(sys)
sys.setdefaultencoding('utf8')
'''
将数据文件加载到Sybase IQ数据库里面
'''
class SybaseIQLoad:
debug = 0
def __init__(self,dbinfo):
self.UID = dbinfo[1]
self.PWD = dbinfo[2]
odbcinfo = 'DSN=%s;UID=%s;PWD=%s'%(dbinfo[0],dbinfo[1],dbinfo[2])
self.cnxn = pyodbc.connect(odbcinfo,autocommit=True,ansi=True)
self.cursor = self.cnxn.cursor()
def __del__(self):
if self.cursor:
self.cursor.close()
if self.cnxn:
self.cnxn.close()
def _printinfo(self,msg):
print "%s"%(msg)
print "\n"
def _GetStageName(self,ftp_stage_filereg,ftp_cycle_id,cur_static_time):
if ftp_cycle_id.lower() == 'h':
ftp_stage_filename = ftp_stage_filereg.replace('[YYYYMMDDHH]',cur_static_time[0:10])
if ftp_cycle_id.lower() == 'd':
ftp_stage_filename = ftp_stage_filereg.replace('[YYYYMMDD]',cur_static_time[0:8])
if ftp_cycle_id.lower() == 'w':
ftp_stage_filename = ftp_stage_filereg.replace('[YYYY_WW]',cur_static_time[0:7])
if ftp_cycle_id.lower() == 'm':
ftp_stage_filename = ftp_stage_filereg.replace('[YYYYMM]',cur_static_time[0:6])
return ftp_stage_filename
def _getLoadInfo(self,ftp_id):
sql = '''
select
ftp_cycle_id
,ftp_stage_filepath
,ftp_stage_filereg
,stage_schema
,delimiter_type_id
,table_name
from jiang.sys_ftp_cfg
where ftp_id = '%s'
''' %(ftp_id)
self.cursor.execute(sql.strip())
row = self.cursor.fetchone()
return row
def _getSybIQServInfo(self):
# 保存SybaseIQ的主机和端口号
sybservinfo = []
# ODBC配置文件绝对路径
unixodbc_file = "/etc/unixODBC/odbc.ini"
config = ConfigParser.ConfigParser()
config.read(unixodbc_file)
# 获取SybaseIQ的IP地址
ServerIP = config.get("SybaseIQDSN", "Server")
# 获取SybaseIQ的端口号
Port = config.get("SybaseIQDSN", "Port")
# 保存获取的IP地址和端口号
sybservinfo.append(ServerIP)
sybservinfo.append(Port)
return sybservinfo
def loaddata(self,ftp_id,cur_static_time):
#取文件加载相关配置信息
row = self._getLoadInfo(ftp_id)
ftp_cycle_id = row[0]
ftp_stage_filepath = row[1]
ftp_stage_filereg = row[2]
stage_schema = row[3]
delimiter_type_id = row[4]
table_name = row[5]
# 获取指定日期的文件名
ftp_stage_filename = self._GetStageName(ftp_stage_filereg,ftp_cycle_id,cur_static_time)
# 获取清洗后文件的绝对路径
ftp_stage_absolute_filename = os.path.join(ftp_stage_filepath,ftp_stage_filename)
# 对清洗后的文件再进行处理
#ftp_stage_absolute_filename_final = ftp_stage_absolute_filename + '*'
# 获取SybaseIQ的主机IP地址和端口号
sybaseiq_ipport = self._getSybIQServInfo()
# 获取表的所有字段
table_columns = '''
select column_name
from syscolumn a
join systable b
on a.table_id = b.table_id
where b.table_name = '%s' ># /tmp/table_name.log
'''%(table_name)
load_sql='''dbisql -c "uid=%s;pwd=%s" -Host %s -port %s -nogui "%s"'''%(self.UID,self.PWD,sybaseiq_ipport[0],sybaseiq_ipport[1],table_columns)
os.system(load_sql)
# 处理生成的表字段文件
columns_sql = '''
cat /tmp/table_name.log | sed "s/'//g" | awk '{printf "%s,",$0}'| sed 's/,$//g'
'''
result = Popen(columns_sql,shell=True,stdout=PIPE,stderr=PIPE)
right_info = result.stdout.read().strip('\xef|\xbb|\xbf')
err_info = result.stderr.read()
loadsql = '''
load table %s.cpms_area_user
(
%s
)
USING FILE '%s'
FORMAT ASCII
ESCAPES OFF
QUOTES OFF
NOTIFY 1000000
DELIMITED BY '%s'
WITH CHECKPOINT ON;
COMMIT;
'''%(stage_schema, right_info, ftp_stage_absolute_filename, delimiter_type_id)
try:
iserr = 0
print "*************Begin to execute load table command...*************\n"
if self.debug == 1:
self._printinfo(loadsql.strip())
#self.cursor.execute(loadsql.strip())
loadsql='''dbisql -c "uid=%s;pwd=%s" -Host %s -port %s -nogui "%s"'''%(self.UID,self.PWD,sybaseiq_ipport[0],sybaseiq_ipport[1],loadsql)
os.system(loadsql)
print "\n*************End to execute load table command...*************"
print "**************************Successful**************************"
except Exception,err:
iserr = 1
print "Return value %s,Error %s" % (iserr,err)
return iserr
#Main
def main():
# 检查传入参数个数
if len(sys.argv) < 6 :
print 'usage: python SybaseIQ_LoadData.py SybaseDSN username password ftp_id cur_static_time\n'
sys.exit(1)
# 定义连接Sybase IQ的信息
dbinfo = []
#dbinfo.append('SybaseIQDSN')
#dbinfo.append('jiang')
#dbinfo.append('jiang')
dbinfo.append(sys.argv[1])
dbinfo.append(sys.argv[2])
dbinfo.append(sys.argv[3])
ftp_id = sys.argv[4]
cur_static_time = sys.argv[5]
SIQ = SybaseIQLoad(dbinfo)
ret = SIQ.loaddata(ftp_id,cur_static_time)
return ret
if __name__ == '__main__':
sys.exit(main())
相关文章推荐
- Sybase IQ如何将大文件数据迅速加载到数据库
- Python脚本之django---mysql-记录主机性能数据到数据库-web站点管理数据库及web显示命令执行结果
- ubuntu下创建数据库的常用命令,及流程,以及sql脚本文件格式
- python写一段脚本代码自动完成输入(目录下的所有)文件的数据替换(修改数据和替换数据都是输入的)
- 备份文件命令SqlServer一键复制数据库脚本
- 将excel表中的数据写到txt文本文件的Python脚本
- java通过配置文件加载数据库(以Mysql为例)
- python连接oracel数据库,提取数据后制图并通过邮件发送
- 通过SQL脚本导入数据到不同数据库避免重复导入三种方式
- 通过句柄恢复Linux下误删除的数据库数据文件
- linux之用echo输入数据到文本末尾以及用open ssl命令在证书文件里面获取公钥
- 用命令文件生成数据库的脚本
- 利用python fabric模块写的批量操作远程主机脚本(命令执行,上传、下载文件)
- 测试必备技能系列1 :通过mysql命令进行脚本数据导入
- python通过串口读取GPS NMEA格式的数据,并保存为csv文件
- Python脚本---把MySQL数据库表中的数据导出生成csv格式文件
- 使用rdb文件进行redis数据迁移--python脚本
- Python -- 数据加载、存储与文件格式
- Java之JDBC 通过加载properties文件来的两种方式来连接数据库
- mysql 数据库通过命令导出数据库文件