Python模拟进行Mysql和Redis数据传输总结
2016-07-22 10:59
555 查看
本来分开用两个脚本完成,后来发现有很多可多复用的东西,尤其是在python对mysql的操作过程中。然后写可以优化的地方,比如对mysql的操作封装成一个类,就如后面redis的队列类一样。不过懒病犯了,不想动了。简单整合了一下代码,我的心已经飞到办公室外的世界了,准确的是召唤师峡谷。。。
# -*- coding:utf-8 -*- import MySQLdb import schedule import time import datetime import random import string import redis # use python to automatic generate the data # and insert to mysql (db: test; table: test) class SaveToSql(object): def __init__(self, conn): self.conn = conn def random_str(self, randomlength=random.randint(1,10)): a = list(string.ascii_letters) random.shuffle(a) return ''.join(a[:randomlength]) def save(self): cursor = self.conn.cursor() try: sql = "insert into test(time, name, type, data) values('%s','%s', '%s', '%s')"% (datetime.datetime.now(),self.random_str(),self.random_str(),self.random_str()) cursor.execute(sql) print('Insert the data: ', sql) rs = cursor.rowcount if rs != 1: raise Exception("Error of data inserting.") self.conn.rollback() self.conn.commit() finally: cursor.close() # get the data from mysql class FromSql(object): def __init__(self, conn): self.conn = conn def acquire(self): cursor = self.conn.cursor() try: #sql =select * from test where to_days(t) = to_days(now()) sql = "SELECT * FROM test WHERE TO_DAYS(NOW()) - TO_DAYS(t) <= 1" cursor.execute(sql) rs = cursor.fetchall() #print (rs) for eve in rs: print('%s, %s, %s, %s' % eve) copy_rs = rs cursor.close() return copy_rs except Exception as e: print("The error: %s" % e) class RedisQueue(object): def __init__(self, name, namespace='queue', **redis_kwargs): """The default connection parameters are: host='localhost', port=6379, db=0""" self.__db= redis.Redis(**redis_kwargs) self.key = '%s:%s' %(namespace, name) def qsize(self): return self.__db.llen(self.key) def put(self, item): self.__db.rpush(self.key, item) def get(self, block=True, timeout=None): if block: item = self.__db.blpop(self.key, timeout=timeout) else: item = self.__db.lpop(self.key) if item: item = item[1] return item def get_nowait(self): return self.get(False) if __name__ == "__main__": # connect mysqldb conn_sql = MySQLdb.connect( host = '127.0.0.1', port = 3306, user = 'root', passwd = '', db = 'test', charset = 'utf8' ) # main function def job_for_mysql(): save_to_mysql = SaveToSql(conn_sql) data = save_data.save() def job_for_redis(): get_data = FromSql(conn_sql) data = get_data.acquire() q = RedisQueue('test',host='localhost', port=6379, db=0) for single_data in data: for meta_data in single_data: q.put(meta_data) print(meta_data) print("All data had been inserted.") """ try: schedule.every(10).seconds.do(job_for_mysql) schedule.every().day.at("00:00").do(job_for_redis) except Exception as e: print('Error: %s'% e) # finally: # conn.close() while True: schedule.run_pending() time.sleep(1) """
相关文章推荐
- MySQL中的integer 数据类型
- MySQL存储过程
- Python动态类型的学习---引用的理解
- Python3写爬虫(四)多线程实现数据爬取
- 垃圾邮件过滤器 python简单实现
- 下载并遍历 names.txt 文件,输出长度最长的回文人名。
- mysql中int、bigint、smallint 和 tinyint的区别与长度
- mysql load data 导出、导入 csv
- install and upgrade scrapy
- source命令执行SQL脚本文件
- Scrapy的架构介绍
- Centos6 编译安装Python
- 使用Python生成Excel格式的图片
- 让Python文件也可以当bat文件运行
- [Python]推算数独
- redis安装问题小结
- MySQL创建用户及权限控制