Python封装的访问MySQL数据库的类及DEMO
2014-04-15 19:31
561 查看
# Filename:mysql_class.py # Author:Rain.Zen; Date: 2014-04-15 import MySQLdb class MyDb: '''初始化[类似于构造函数]''' def __init__(self, host, user, password, charset = 'utf8'): self.host = host self.user = user self.password = password self.charset = charset try: self.conn = MySQLdb.connect(host = self.host, user = self.user, passwd = self.password, charset = self.charset) self.cur = self.conn.cursor() except MySQLdb.Error as e: print('MySQL Error %d: %s' % (e.args[0], e.args[1])) '''组合字符串''' def joinData(self, data, char = ','): return char.join(data) '''解析条件语句,参数类型[{dict}, 'AND|OR'] | {dict}''' def parseMap(self, condition) : if isinstance(condition, list) : dictMap = condition[0] logic = ' '+ condition[1] +' ' else : dictMap = condition logic = ' AND ' if isinstance(dictMap, dict) : for k, v in dictMap.items(): dictMap[k] = "`"+ k +"` = '"+ str(v) +"'" return self.joinData(dictMap.values(), logic) else : return '1 = 1' '''选择数据表''' def selectDb(self, db): try : self.conn.select_db(db) except MySQLdb.Error as e: print('MySQL Error %d: %s' % (e.args[0], e.args[1])) '''执行SQL语句''' def query(self, sql): try : return self.cur.execute(sql) except MySQLdb.Error as e: print 'MySQL Error: %s \nSQL: %s' % (e, sql) '''添加一条信息''' def addOne(self, table, dictData): for field, value in dictData.items(): dictData[field] = "'"+ str(value) +"'" self.query('INSERT INTO ' + table + "("+ self.joinData(dictData.keys()) +') VALUES('+ self.joinData(dictData.values()) +')') return self.cur.lastrowid '''修改一条信息[根据条件]''' def saveOne(self, table, dictData, condition): for key, val in dictData.items(): dictData[key] = "`"+ key +"` = '"+ str(val) +"'" save_sql = 'UPDATE ' + table + ' SET ' + self.joinData(dictData.values()) + ' WHERE ' + self.parseMap(condition) return self.query(save_sql) '''删除信息[根据条件]''' def deleteOne(self, table, condition): return self.query('DELETE FROM ' + table + ' WHERE ' + self.parseMap(condition)) '''读取单条信息[根据条件]''' def fetchOne(self, tabel, condition, fields = '*', dataType = 0): field = isinstance(fields, list) and self.joinData(fields) or fields self.query('SELECT '+ field +' FROM ' + tabel + ' WHERE ' + self.parseMap(condition)) tupleData = self.cur.fetchone() if dataType == 0 or fields == '*' : return tupleData else : dictData = {} n = 0 for k in fields: dictData[k] = tupleData n = n + 1 return dictData '''读取多条信息[根据条件]''' def fetchSome(self, tabel, condition, fields = '*', dataType = 0): field = isinstance(fields, list) and self.joinData(fields) or fields self.query('SELECT '+ field +' FROM ' + tabel + ' WHERE ' + self.parseMap(condition)) tupleData = self.cur.fetchall() count = self.cur.rowcount if count > 0: if dataType == 0 or fields == '*' : return (int(count), tupleData) else : listData = [] for row in tupleData: dictData = {} n = 0 for k in fields: dictData[k] = row n = n + 1 listData.append(dictData) return (int(count), listData) else: return False '''获取信息总数[根据条件]''' def getCount(self, tabel, condition = 0): where = isinstance(condition, dict) and ' WHERE ' + self.parseMap(condition) or '' self.query('SELECT 0 FROM ' + tabel + where) return self.cur.rowcount '''提交事务''' def commit(self): self.conn.commit() '''关闭句柄和连接''' def close(self): self.cur.close() self.conn.close()
DEMO
# Filename: test_mysql.py from mysql_class import MyDb '''测试开始''' '''连接数据库[实例化]''' my_db = MyDb('127.0.0.1', 'python_user', '123123') '''选择数据库''' my_db.selectDb('test') '''修改单条信息 dictData = {'class' : 'DD', 'name' : 'Pitt.C', 'subject' : 'Match_01', 'score' : 60} condition = [{'class' : 'DD', 'name' : 'Tom'}, 'OR'] print my_db.saveOne('cla_info', dictData, condition)''' '''删除信息 condition = [{'class':'DD', 'name':'bd', 'id':17}, 'OR'] print my_db.deleteOne('cla_info', condition)''' '''获取单条信息 fields = ['class', 'name', 'subject', 'score', 'comment'] condition = {'id':6} print my_db.fetchOne('cla_info', condition, fields)''' '''新增[单条] dictData = {'class' : 'DDD', 'name' : 'Pitt', 'subject' : 'Match', 'score' : 97} in_id = my_db.addOne('cla_info', dictData) print 'Successful add data: ID(%d)' % in_id''' '''查询数据 field = ['class', 'name', 'subject', 'score', 'comment'] condition = {'name':'小明'} ary = my_db.fetchSome('cla_info', condition, field, 1) if ary or False : print 'The total is:',ary[0] print ary[1]''' '''获取总数 condition = {'name':'小明'} print my_db.getCount('cla_info')''' '''事务处理''' my_db.commit() '''关闭句柄和连接''' my_db.close()
相关文章推荐
- selenium2.0关于python的常用函数
- python2.5访问数据库出现中文乱码的问题。
- 写一个程序,分析一个文本文件中各个词出现的频率,并且把频率最高的10个词打印出来。文本文件大约是30KB~300KB大小。
- Learning Python
- 【Python脚本】Python创建删除文件-----------我的第一个Python脚本
- python内置对象的一些知识
- python 类继承与子类实例初始化
- Python程序语言快速上手教程
- Python开发——理解With语句
- python简明教程 argv
- Python单元测试框架使用unittestpyUnit
- 如何使用Python3读取配置文件(ini)
- python判断两个文件是否相同
- Python学习(3)——if语句
- mac os下Python模块安装错误
- Eclipse+PyDev+Django+Mysql搭建Python web开发环境
- Python开发——字符串format
- Python调试
- python类型转换、数值操作(转)
- Python自动单元测试框架