您的位置:首页 > 数据库 > Redis

Python模拟进行Mysql和Redis数据传输总结

2016-07-22 10:59 555 查看
本来分开用两个脚本完成,后来发现有很多可多复用的东西,尤其是在python对mysql的操作过程中。然后写可以优化的地方,比如对mysql的操作封装成一个类,就如后面redis的队列类一样。不过懒病犯了,不想动了。简单整合了一下代码,我的心已经飞到办公室外的世界了,准确的是召唤师峡谷。。。

# -*- coding:utf-8 -*-

import MySQLdb
import schedule
import time
import datetime
import random
import string
import redis

# use python to automatic generate the data
# and insert to mysql (db: test; table: test)
class SaveToSql(object):
def __init__(self, conn):
self.conn = conn

def random_str(self, randomlength=random.randint(1,10)):
a = list(string.ascii_letters)
random.shuffle(a)
return ''.join(a[:randomlength])

def save(self):
cursor = self.conn.cursor()

try:
sql = "insert into test(time, name, type, data) values('%s','%s', '%s', '%s')"% (datetime.datetime.now(),self.random_str(),self.random_str(),self.random_str())
cursor.execute(sql)

print('Insert the data: ', sql)
rs = cursor.rowcount
if rs != 1:
raise Exception("Error of data inserting.")
self.conn.rollback()
self.conn.commit()
finally:
cursor.close()

# get the data from mysql
class FromSql(object):
def __init__(self, conn):
self.conn = conn

def acquire(self):
cursor = self.conn.cursor()
try:
#sql =select * from test where to_days(t) = to_days(now())
sql = "SELECT * FROM test WHERE TO_DAYS(NOW()) - TO_DAYS(t) <= 1"

cursor.execute(sql)
rs = cursor.fetchall()
#print (rs)
for eve in rs:

print('%s, %s, %s, %s' % eve)
copy_rs = rs
cursor.close()

return copy_rs

except Exception as e:
print("The error: %s" % e)

class RedisQueue(object):

def __init__(self, name, namespace='queue', **redis_kwargs):
"""The default connection parameters are: host='localhost', port=6379, db=0"""
self.__db= redis.Redis(**redis_kwargs)
self.key = '%s:%s' %(namespace, name)

def qsize(self):
return self.__db.llen(self.key)

def put(self, item):
self.__db.rpush(self.key, item)

def get(self, block=True, timeout=None):

if block:
item = self.__db.blpop(self.key, timeout=timeout)
else:
item = self.__db.lpop(self.key)

if item:
item = item[1]
return item

def get_nowait(self):
return self.get(False)

if __name__ == "__main__":
# connect mysqldb
conn_sql = MySQLdb.connect(
host = '127.0.0.1',
port = 3306,
user = 'root',
passwd = '',
db = 'test',
charset = 'utf8'
)

# main function
def job_for_mysql():

save_to_mysql = SaveToSql(conn_sql)
data = save_data.save()

def job_for_redis():
get_data = FromSql(conn_sql)
data = get_data.acquire()

q = RedisQueue('test',host='localhost', port=6379, db=0)
for single_data in data:
for meta_data in single_data:
q.put(meta_data)
print(meta_data)
print("All data had been inserted.")

"""
try:
schedule.every(10).seconds.do(job_for_mysql)
schedule.every().day.at("00:00").do(job_for_redis)
except Exception as e:
print('Error: %s'% e)
#   finally:
#       conn.close()

while True:
schedule.run_pending()
time.sleep(1)
"""
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  python redis mysql