您的位置:首页 > 数据库 > Oracle

Python操作Oracle、Mysql及文本进行数据处理

2016-11-04 10:55 561 查看

1、脚本中中到的表结构信息

CREATE TABLE `static_sdk_room_messge_send` (
`xappkey` varchar(100) DEFAULT NULL,
`roomid` varchar(100) DEFAULT NULL,
`messge_send_cnt` bigint(20) DEFAULT NULL,
`pt` varchar(16) DEFAULT NULL,
`static_date` varchar(10) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `static_sdk_room_messge_send_x` (
`xappkey` varchar(100) CHARACTER SET utf8 DEFAULT NULL,
`roomid` varchar(100) CHARACTER SET utf8 DEFAULT NULL,
`messge_send_cnt` bigint(20) DEFAULT NULL,
`pt` varchar(16) CHARACTER SET utf8 DEFAULT NULL,
`static_date` varchar(10) CHARACTER SET utf8 DEFAULT NULL
) ENGINE=MyISAM DEFAULT CHARSET=latin1;

-- Create table
create table STATIC_SDK_ROOM_MESSGE_SEND_X
(
XAPPKEY VARCHAR2(100),
ROOMID VARCHAR2(100),
MESSGE_SEND_CNT INTEGER,
PT VARCHAR2(16),
STATIC_DATE VARCHAR2(10)
)
tablespace TV
pctfree 10
initrans 1
maxtrans 255
storage
(
initial 64K
next 1M
minextents 1
maxextents unlimited
);

2、Oracle连接

-----------------------------
db_oracle_oper_ex.py

# -*- coding=utf-8 -*-
import cx_Oracle
import MySQLdb
import warnings
import datetime
import os
import os.path
import shutil

warnings.filterwarnings("ignore")

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

print today
print yesterday

ora_conn = cx_Oracle.connect('c##tv/c##tv@172.16.96.31:1521/orcl')
cursor = ora_conn.cursor ()

sql_text = "SELECT max(roomid) from static_sdk_room_messge_send_x"
cursor.execute(sql_text)
row = cursor.fetchone()

sql_text="insert into static_sdk_room_messge_send_x(xappkey,roomid,messge_send_cnt,pt,static_date)" \
"VALUES ('3d40e812075a0527','9233097',12,'2016-10-19.08','2016-11-01')"
print sql_text
cursor.execute(sql_text)

ora_conn.commit()
cursor.close()
ora_conn.close()

-----------------------------
db_ora.py
# -*- coding=utf-8 -*-
import cx_Oracle

db=cx_Oracle.connect('c##tv','c##tv','172.16.96.31:1521/orcl')
print db.version

db.close()

3、Mysql导出成文本文件

-----------------------------
mysql2file.py

# -*- coding=utf-8 -*-
import smtplib
import MySQLdb
import warnings
import datetime
import os
import os.path
import shutil

warnings.filterwarnings("ignore")

db_config = {
'host': '172.16.96.34',
'user': 'root',
'passwd': '123',
'port': 3306,
'db': 'tableau_market'
}

def getDB():
try:
conn = MySQLdb.connect(host=db_config['host'],user=db_config['user'],passwd=db_config['passwd'],port=db_config['port'])
conn.autocommit(True)
curr = conn.cursor()
curr.execute("SET NAMES utf8");
curr.execute("USE %s" % db_config['db']);

return conn, curr
except MySQLdb.Error,e:
print "Mysql Error %d: %s" % (e.args[0], e.args[1])
return None, None

conn,curr = getDB()

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

print today
print yesterday

os.remove("/root/pystudy/data/static_sdk_room_messge_send_%s.dat" % (yesterday))

sql_text = "select count(*) from static_sdk_room_messge_send where static_date='%s';" % (yesterday)
curr.execute(sql_text)
chg = curr.fetchall()
for chg_num in chg:
chg_num0 = chg_num[0]
print chg_num0

if (chg_num0 >= 1):
sql_text = "select xappkey,roomid,messge_send_cnt,pt,static_date from static_sdk_room_messge_send where static_date='%s';" % (yesterday)
curr.execute(sql_text)
for item_room_message_send in curr.fetchall():
#item_room_message_sends = curr.fetchall()
#for item_room_message_send in item_room_message_sends:
xappkey = item_room_message_send[0]
roomid = item_room_message_send[1]
messge_send_cnt = item_room_message_send[2]
pt = item_room_message_send[3]
static_date = item_room_message_send[4]

#f=open("/root/pystudy/data/static_sdk_room_messge_send.dat",'a+')
f=open("/root/pystudy/data/static_sdk_room_messge_send_%s.dat" % (yesterday) ,'a+')
f.write(xappkey+'\t'+roomid+'\t'+str(messge_send_cnt)+'\t'+pt+'\t'+static_date+'\t\n')
f.close()
else:
print "%s,no data !" % (yesterday)

curr.close()
conn.close()

4、Mysql导到另一个库的Mysql

-----------------------------
mysql2mysql.py

# -*- coding=utf-8 -*-
import smtplib
import MySQLdb
import warnings
import datetime
import os
import os.path
import shutil

warnings.filterwarnings("ignore")

db_config_src = {
'host': '172.16.96.34',
'user': 'root',
'passwd': '123',
'port': 3306,
'db': 'tableau_market'
}

def get_srcDB():
try:
src_conn = MySQLdb.connect(host=db_config_src['host'],user=db_config_src['user'],passwd=db_config_src['passwd'],port=db_config_src['port'])
src_conn.autocommit(True)
src_curr = src_conn.cursor()
src_curr.execute("SET NAMES utf8");
src_curr.execute("USE %s" % db_config_src['db']);

return src_conn, src_curr
except MySQLdb.Error,e:
print "Mysql Error %d: %s" % (e.args[0], e.args[1])
return None, None

src_conn,src_curr = get_srcDB()

db_config_target = {
'host': '172.16.96.34',
'user': 'root',
'passwd': '123',
'port': 3306,
'db': 'tableau_market'
}

def get_targetDB():
try:
target_conn = MySQLdb.connect(host=db_config_target['host'],user=db_config_target['user'],passwd=db_config_target['passwd'],port=db_config_target['port'])
target_conn.autocommit(True)
target_curr = target_conn.cursor()
target_curr.execute("SET NAMES utf8");
target_curr.execute("USE %s" % db_config_target['db']);

return target_conn, target_curr
except MySQLdb.Error,e:
print "Mysql Error %d: %s" % (e.args[0], e.args[1])
return None, None

target_conn,target_curr = get_targetDB()

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

print today
print yesterday

select_sql_text = "select count(*) from static_sdk_room_messge_send where static_date='%s';" % (yesterday)
src_curr.execute(select_sql_text)
chg = src_curr.fetchall()
for chg_num in chg:
chg_num0 = chg_num[0]
print chg_num0

if (chg_num0 >= 1):
select_sql_text = "select xappkey,roomid,messge_send_cnt,pt,static_date from static_sdk_room_messge_send where static_date='%s';" % (yesterday)
src_curr.execute(select_sql_text)

delete_sql_text = "delete from static_sdk_room_messge_send_x where static_date='%s';" % (yesterday)
target_curr.execute(delete_sql_text)

i = 0
insert_sql_text = "insert into static_sdk_room_messge_send_x(xappkey,roomid,messge_send_cnt,pt,static_date) values "

for item_room_message_send in src_curr.fetchall():
xappkey = item_room_message_send[0]
roomid = item_room_message_send[1]
messge_send_cnt = item_room_message_send[2]
pt = item_room_message_send[3]
static_date = item_room_message_send[4]

i += 1

insert_sql_text = insert_sql_text + "( '%s','%s',%d,'%s','%s')," % (xappkey,roomid,messge_send_cnt,pt,static_date)

print i
if (i%5000 == 0):
insert_sql_text = insert_sql_text[0:-1] + ";"
target_curr.execute(insert_sql_text)
#print insert_sql_text

insert_sql_text = "insert into static_sdk_room_messge_send_x(xappkey,roomid,messge_send_cnt,pt,static_date) values "

insert_sql_text = insert_sql_text[0:-1] + ";"
target_curr.execute(insert_sql_text)

print i

else:
print "%s,no data !" % (yesterday)

src_curr.close()
src_conn.close()
target_curr.close()
target_conn.close()

5、Mysql导到Oracle

-----------------------------
mysql2oracle_no_rpt_err.py

# -*- coding=utf-8 -*-
import cx_Oracle
import MySQLdb
import warnings
import datetime
import os
import os.path
import shutil

warnings.filterwarnings("ignore")

db_config_src = {
'host': '172.16.96.34',
'user': 'root',
'passwd': '123',
'port': 3306,
'db': 'tableau_market'
}

def get_srcDB():
try:
src_conn = MySQLdb.connect(host=db_config_src['host'],user=db_config_src['user'],passwd=db_config_src['passwd'],port=db_config_src['port'])
src_conn.autocommit(True)
src_curr = src_conn.cursor()
src_curr.execute("SET NAMES utf8");
src_curr.execute("USE %s" % db_config_src['db']);

return src_conn, src_curr
except MySQLdb.Error,e:
print "Mysql Error %d: %s" % (e.args[0], e.args[1])
return None, None

src_conn,src_curr = get_srcDB()

target_conn = cx_Oracle.connect('c##tv/c##tv@172.16.96.31:1521/orcl')
target_curr = target_conn.cursor()

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

print today
print yesterday

select_sql_text = "select count(*) from static_sdk_room_messge_send where static_date='%s';" % (yesterday)
src_curr.execute(select_sql_text)
chg = src_curr.fetchall()
for chg_num in chg:
chg_num0 = chg_num[0]
print chg_num0

if (chg_num0 >= 1):
select_sql_text = "select xappkey,roomid,messge_send_cnt,pt,static_date from static_sdk_room_messge_send where static_date='%s';" % (yesterday)
src_curr.execute(select_sql_text)

delete_sql_text = "delete from static_sdk_room_messge_send_x where static_date='%s'" % (yesterday)
target_curr.execute(delete_sql_text)
target_conn.commit()

i = 0
insert_sql_text = "insert into static_sdk_room_messge_send_x(xappkey,roomid,messge_send_cnt,pt,static_date) "

for item_room_message_send in src_curr.fetchall():
xappkey = item_room_message_send[0]
roomid = item_room_message_send[1]
messge_send_cnt = item_room_message_send[2]
pt = item_room_message_send[3]
static_date = item_room_message_send[4]

i += 1

insert_sql_text = insert_sql_text + " select '%s','%s',%d,'%s','%s' from dual union all " % (xappkey,roomid,messge_send_cnt,pt,static_date)

print i
if (i%500 == 0):
insert_sql_text = insert_sql_text[0:-10] + ""
target_curr.execute(insert_sql_text)
target_conn.commit()
print insert_sql_text

insert_sql_text = "insert into static_sdk_room_messge_send_x(xappkey,roomid,messge_send_cnt,pt,static_date) "

insert_sql_text = insert_sql_text[0:-10] + ""
print insert_sql_text
if (chg_num0%500 <> 0 ):
target_curr.execute(insert_sql_text)
target_conn.commit()

print i

else:
print "%s,no data !" % (yesterday)

src_curr.close()
src_conn.close()
target_curr.close()
target_conn.close()

-----------------------------
mysql2oracle.py

# -*- coding=utf-8 -*-
import cx_Oracle
import MySQLdb
import warnings
import datetime
import os
import os.path
import shutil

warnings.filterwarnings("ignore")

db_config_src = {
'host': '172.16.96.34',
'user': 'root',
'passwd': '123',
'port': 3306,
'db': 'tableau_market'
}

def get_srcDB():
try:
src_conn = MySQLdb.connect(host=db_config_src['host'],user=db_config_src['user'],passwd=db_config_src['passwd'],port=db_config_src['port'])
src_conn.autocommit(True)
src_curr = src_conn.cursor()
src_curr.execute("SET NAMES utf8");
src_curr.execute("USE %s" % db_config_src['db']);

return src_conn, src_curr
except MySQLdb.Error,e:
print "Mysql Error %d: %s" % (e.args[0], e.args[1])
return None, None

src_conn,src_curr = get_srcDB()

target_conn = cx_Oracle.connect('c##tv/c##tv@172.16.96.31:1521/orcl')
target_curr = target_conn.cursor()

'''
sql_text = "delete from static_sdk_room_messge_send_x where static_date='%s'" % (yesterday)
target_curr.execute(sql_text)
target_conn.commit()
'''

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

print today
print yesterday

select_sql_text = "select count(*) from static_sdk_room_messge_send where static_date='%s';" % (yesterday)
src_curr.execute(select_sql_text)
chg = src_curr.fetchall()
for chg_num in chg:
chg_num0 = chg_num[0]
print chg_num0

if (chg_num0 >= 1):
select_sql_text = "select xappkey,roomid,messge_send_cnt,pt,static_date from static_sdk_room_messge_send where static_date='%s';" % (yesterday)
src_curr.execute(select_sql_text)

delete_sql_text = "delete from static_sdk_room_messge_send_x where static_date='%s'" % (yesterday)
target_curr.execute(delete_sql_text)
target_conn.commit()

i = 0
insert_sql_text = "insert into static_sdk_room_messge_send_x(xappkey,roomid,messge_send_cnt,pt,static_date) "

for item_room_message_send in src_curr.fetchall():
xappkey = item_room_message_send[0]
roomid = item_room_message_send[1]
messge_send_cnt = item_room_message_send[2]
pt = item_room_message_send[3]
static_date = item_room_message_send[4]

i += 1

insert_sql_text = insert_sql_text + " select '%s','%s',%d,'%s','%s' from dual union all " % (xappkey,roomid,messge_send_cnt,pt,static_date)

print i
if (i%500 == 0):
insert_sql_text = insert_sql_text[0:-10] + ""
target_curr.execute(insert_sql_text)
target_conn.commit()
#print insert_sql_text

insert_sql_text = "insert into static_sdk_room_messge_send_x(xappkey,roomid,messge_send_cnt,pt,static_date) "

insert_sql_text = insert_sql_text[0:-10] + ""
target_curr.execute(insert_sql_text)
target_conn.commit()

print i

else:
print "%s,no data !" % (yesterday)

src_curr.close()
src_conn.close()
target_curr.close()
target_conn.close()

6、Tsv文本文件导入到Mysql

-----------------------------
tsv2mysql_batch_commit_1103.py

# -*- coding=utf-8 -*-
import smtplib
import MySQLdb
import warnings
import datetime
import os
import os.path
import shutil

warnings.filterwarnings("ignore")

db_config = {
'host': '172.16.96.34',
'user': 'root',
'passwd': '123',
'port': 3306,
'db': 'tableau_market'
}

def getDB():
try:
conn = MySQLdb.connect(host=db_config['host'],user=db_config['user'],passwd=db_config['passwd'],port=db_config['port'])
conn.autocommit(True)
curr = conn.cursor()
curr.execute("SET NAMES utf8");
curr.execute("USE %s" % db_config['db']);

return conn, curr
except MySQLdb.Error,e:
print "Mysql Error %d: %s" % (e.args[0], e.args[1])
return None, None

conn,curr = getDB()

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

print today
print yesterday

sql_text = "delete from static_sdk_room_messge_send where static_date='%s';" % (str(yesterday))
curr.execute(sql_text)

i = 0

sql_text = "insert into static_sdk_room_messge_send(xappkey,roomid,messge_send_cnt,pt,static_date) values "

for col_data in open("/root/pystudy/data/static_sdk_room_messge_send_%s.dat" % (yesterday)):
xappkey = col_data.split('\t')[0]
roomid = col_data.split('\t')[1]
messge_send_cnt = int(col_data.split('\t')[2])
pt = col_data.split('\t')[3]
static_date = str(yesterday)

i += 1

sql_text = sql_text + "( '%s','%s',%d,'%s','%s')," % (xappkey,roomid,messge_send_cnt,pt,static_date)

print i
if (i%2000 == 0):
sql_text = sql_text[0:-1] + ";"
curr.execute(sql_text)
# print sql_text

sql_text = "insert into static_sdk_room_messge_send(xappkey,roomid,messge_send_cnt,pt,static_date) values "

sql_text = sql_text[0:-1] + ";"
print sql_text
curr.execute(sql_text)
print sql_text

print i

curr.close()
conn.close()

-----------------------------
tsv2mysql_batch_commit.py

# -*- coding=utf-8 -*-
import smtplib
import MySQLdb
import warnings
import datetime
import os
import os.path
import shutil

warnings.filterwarnings("ignore")

db_config = {
'host': '172.16.96.34',
'user': 'root',
'passwd': '123',
'port': 3306,
'db': 'tableau_market'
}

def getDB():
try:
conn = MySQLdb.connect(host=db_config['host'],user=db_config['user'],passwd=db_config['passwd'],port=db_config['port'])
conn.autocommit(True)
curr = conn.cursor()
curr.execute("SET NAMES utf8");
curr.execute("USE %s" % db_config['db']);

return conn, curr
except MySQLdb.Error,e:
print "Mysql Error %d: %s" % (e.args[0], e.args[1])
return None, None

conn,curr = getDB()

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

print today
print yesterday

sql_text = "delete from static_sdk_room_messge_send where static_date='%s';" % (yesterday)
curr.execute(sql_text)

i = 0

sql_text = "insert into static_sdk_room_messge_send(xappkey,roomid,messge_send_cnt,pt,static_date) values "

for col_data in open("/root/pystudy/data/static_sdk_room_messge_send_%s.dat" % (yesterday)):
xappkey = col_data.split('\t')[0]
roomid = col_data.split('\t')[1]
messge_send_cnt = int(col_data.split('\t')[2])
pt = col_data.split('\t')[3]
static_date = col_data.split('\t')[4]

i += 1

sql_text = sql_text + "( '%s','%s',%d,'%s','%s')," % (xappkey,roomid,messge_send_cnt,pt,static_date)

print i
if (i%2000 == 0):
sql_text = sql_text[0:-1] + ";"
curr.execute(sql_text)
# print sql_text

sql_text = "insert into static_sdk_room_messge_send(xappkey,roomid,messge_send_cnt,pt,static_date) values "

sql_text = sql_text[0:-1] + ";"
curr.execute(sql_text)
#print sql_text

print i

curr.close()
conn.close()

-----------------------------
tsv2mysql.py

# -*- coding=utf-8 -*-
import smtplib
import MySQLdb
import warnings
import datetime
import os
import os.path
import shutil

warnings.filterwarnings("ignore")

db_config = {
'host': '172.16.96.34',
'user': 'root',
'passwd': '123',
'port': 3306,
'db': 'tableau_market'
}

def getDB():
try:
conn = MySQLdb.connect(host=db_config['host'],user=db_config['user'],passwd=db_config['passwd'],port=db_config['port'])
conn.autocommit(True)
curr = conn.cursor()
curr.execute("SET NAMES utf8");
curr.execute("USE %s" % db_config['db']);

return conn, curr
except MySQLdb.Error,e:
print "Mysql Error %d: %s" % (e.args[0], e.args[1])
return None, None

conn,curr = getDB()

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

print today
print yesterday

'''
sql_text = "delete from static_sdk_room_messge_send where static_date='%s';" % (yesterday)
curr.execute(sql_text)
'''

for col_data in open("/root/pystudy/data/static_sdk_room_messge_send_%s.dat" % (yesterday)):
xappkey = col_data.split('\t')[0]
roomid = col_data.split('\t')[1]
messge_send_cnt = int(col_data.split('\t')[2])
pt = col_data.split('\t')[3]
static_date = col_data.split('\t')[4]

#os.remove("/root/pystudy/data/static_sdk_room_messge_send_%s.dat" % (yesterday))

sql_text = "insert into static_sdk_room_messge_send(xappkey,roomid,messge_send_cnt,pt,static_date) select '%s','%s',%d,'%s','%s';" % (xappkey,roomid,messge_send_cnt,pt,static_date)
curr.execute(sql_text)

curr.close()
conn.close()

7、Tsv文本文件导入到Oracle

-----------------------------
tsv2oracle_batch_commit.py

# -*- coding=utf-8 -*-
import cx_Oracle
import MySQLdb
import warnings
import datetime
import os
import os.path
import shutil

warnings.filterwarnings("ignore")

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

print today
print yesterday

ora_conn = cx_Oracle.connect('c##tv/c##tv@172.16.96.31:1521/orcl')
cursor = ora_conn.cursor()

sql_text = "SELECT max(roomid) from static_sdk_room_messge_send_x"
cursor.execute(sql_text)
row = cursor.fetchone()

sql_text = "delete from static_sdk_room_messge_send_x where static_date='%s'" % (yesterday)
cursor.execute(sql_text)
ora_conn.commit()

i = 0

sql_text = "insert into static_sdk_room_messge_send_x(xappkey,roomid,messge_send_cnt,pt,static_date) "

for col_data in open("/root/pystudy/data/static_sdk_room_messge_send_%s.dat" % (yesterday)):
xappkey = col_data.split('\t')[0]
roomid = col_data.split('\t')[1]
messge_send_cnt = int(col_data.split('\t')[2])
pt = col_data.split('\t')[3]
static_date = col_data.split('\t')[4]

i += 1

sql_text = sql_text + " select '%s','%s',%d,'%s','%s' from dual union all " % (xappkey,roomid,messge_send_cnt,pt,static_date)
print i
if (i%1000 == 0):
sql_text = sql_text[0:-10] + ""
cursor.execute(sql_text)
#print sql_text
ora_conn.commit()

sql_text = "insert into static_sdk_room_messge_send_x(xappkey,roomid,messge_send_cnt,pt,static_date) "

sql_text = sql_text[0:-10] + ""
#print sql_text
cursor.execute(sql_text)

ora_conn.commit()
cursor.close()
ora_conn.close()


-----------------------------
tsv2oracle.py

# -*- coding=utf-8 -*-
import cx_Oracle
import MySQLdb
import warnings
import datetime
import os
import os.path
import shutil

warnings.filterwarnings("ignore")

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

print today
print yesterday

ora_conn = cx_Oracle.connect('c##tv/c##tv@172.16.96.31:1521/orcl')
cursor = ora_conn.cursor ()

sql_text = "SELECT max(roomid) from static_sdk_room_messge_send_x"
cursor.execute(sql_text)
row = cursor.fetchone()

sql_text = "delete from static_sdk_room_messge_send_x where static_date='%s'" % (yesterday)
cursor.execute(sql_text)

for col_data in open("/root/pystudy/data/static_sdk_room_messge_send_%s.dat" % (yesterday)):
xappkey = col_data.split('\t')[0]
roomid = col_data.split('\t')[1]
messge_send_cnt = int(col_data.split('\t')[2])
pt = col_data.split('\t')[3]
static_date = col_data.split('\t')[4]

sql_text = "insert into static_sdk_room_messge_send_x(xappkey,roomid,messge_send_cnt,pt,static_date) select '%s','%s',%d,'%s','%s' from dual" % (xappkey,roomid,messge_send_cnt,pt,static_date)
cursor.execute(sql_text)

ora_conn.commit()
cursor.close()
ora_conn.close()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: