您的位置:首页 > 数据库

[bigdata-039] pysql+pymongo+py3把数据从sql 导入 到mongo的通用代码

2017-01-25 15:44 387 查看
#!/usr/bin/env python3
#! coding:utf-8 -*-

import pymysql
import pymongo

def trans_data_from_mysql_to_mongo(source_param, target_param, trans_type, where_clause, ignore_columns=[]):
# 创建mysql连接
mysql_conn = pymysql.connect(host=source_param['db_host'],
port=source_param['db_port'],
user=source_param['db_user'],
passwd=source_param['db_passwd'],
db=source_param['db_database'],
charset='utf8')
mysql_cursor = mysql_conn.cursor()
#创建mongodb连接
mongo_client = pymongo.MongoClient(target_param['db_host'])
mongo_db = mongo_client[target_param['db_database']]
mongo_db.authenticate(target_param['db_user'], target_param['db_passwd'])
if '删除目标表重新导入' == trans_type:
mongo_db.drop_collection(target_param['db_table'])
mongo_table = mongo_db[target_param['db_table']]

#use 目标database
mysql_cursor.execute('use %s;' % source_param['db_database'])
#获取源表的cloumns
mysql_cursor.execute('describe %s;' %(source_param['db_table']))
mysql_ret = mysql_cursor.fetchall()
mysql_columns = [i[0] for i in mysql_ret]
print(mysql_columns)
#获取记录数量
mysql_cursor.execute('select count(*) from %s %s' % (source_param['db_table'], where_clause))
n_records = [i[0] for i in mysql_cursor.fetchall()][0]

#获取数据
mysql_cursor.execute('select * from %s %s' % (source_param['db_table'], where_clause))
mysql_ret = mysql_cursor.fetchall()
to_be_insert = []
i = 0
for mysql_i in mysql_ret:
x = {}
for ii in range(len(mysql_columns)):
if len(ignore_columns) > 0 and mysql_columns[ii] in ignore_columns:
continue
x[mysql_columns[ii]] = mysql_i[ii];
to_be_insert.append(x)
if len(to_be_insert) == 100:
mongo_table.insert_many(to_be_insert)
to_be_insert = []
i += 1
print(i)

if len(to_be_insert) > 0:
mongo_table.insert_many(to_be_insert)
return n_records


每100条记录插入一次。能删除目标表。能过滤不需要的column。返回插入记录总数。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: