您的位置:首页 > 编程语言 > Python开发

Python使用DB封装

2014-08-12 09:01 344 查看
自己在项目中使用db操作时的一点封装,觉得用得好的话还挺好用的,记录一下。当然也有很多开源的好用的db封装库,不去比较,自己用的顺手就是好的。

一般每一个数据库表对应一个model,首先有一个所有model都继承的Entity类,为了更多方便的操作,这里Entity继承了dict。

class Entity(dict):

def __getattr__(self, name, default = None):
try:
return self[name]
except:
return default

def __setattr__(self, name, value = ""):
self[name] = value

def clr_and_set(self, **kwargs):
self.clear()
self.update(kwargs)

def key_dict(self, obtain_time=True):
if hasattr(self.__class__,'table_keys'):
for key in self.keys():
if key not in self.__class__.table_keys:
self.pop(key)
if obtain_time:
if "created_at" in self.__class__.table_keys:
self.update(created_at=sec2str(time.time()))
if "updated_at" in self.__class__.table_keys:
self.update(updated_at=sec2str(time.time()))
return self

def db_insert(self, callback=None):
DBHelper().insert(self.__class__.table_name, self.key_dict(), callback=callback)

def db_multi_insert(self, chunksize=100, datas=[], callback=None):
DBHelper().multi_insert(self.__class__.table_name, chunksize, datas, callback=callback)

def db_replace(self, callback=None):
DBHelper().replace(self.__class__.table_name, self.key_dict(), callback=callback)

def db_multi_replace(self, chunksize=100, datas=[], callback=None):
DBHelper().multi_replace(self.__class__.table_name, chunksize, datas, callback=callback)

#key_dict, set_dict
def db_update(self, set_dict, callback=None):
if "updated_at" in self.__class__.table_keys:
set_dict.update(updated_at=sec2str(time.time()))
DBHelper().update(self.__class__.table_name, set_dict, self.key_dict(False), callback=callback)

def db_delete(self, callback=None):
DBHelper().delete(self.__class__.table_name, self.key_dict(False), callback=callback)

#field in field_tuple need add `` sometimes
def db_get(self, field_tuple, callback, more_cond=""):
def _on_get(result, ex):
if result and not ex:
callback(self.__class__(**result), None)
else:
callback(None, ex)
DBHelper().get(self.__class__.table_name, field_tuple, self.key_dict(False), callback=_on_get, more_cond=more_cond)

#field in field_tuple need add `` sometimes
def db_query(self, field_tuple, callback, more_cond=""):
def _on_query(result, ex):
if result and not ex:
callback([self.__class__(**ret) for ret in result], None)
else:
callback([], ex)
DBHelper().query(self.__class__.table_name, field_tuple, self.key_dict(False), callback=_on_query, more_cond=more_cond)


DBHelper做进一步封装,adb为具体的实例db,这里为封装了一层的异步db:

class DBHelper():

def insert(self, table_name, key_dict, callback = None):
sql = construct_insert_sql(table_name,key_dict)
adb.insert(sql, callback=callback)

def multi_insert(self, table_name, chunksize = 100, datas = [], callback = None):
def _on_insert_continue(result, ex):
if not ex:
self.cur_chunk += 1
key_list = datas[(self.cur_chunk -1) * chunksize:self.cur_chunk * chunksize]
bulk_insert_sql = construct_bulk_insert_sql(table_name,key_list)
adb.insert(bulk_insert_sql, callback=self.cur_chunk==chunk and callback or _on_insert_continue)
else:
callback and callback(result, ex)

size = len(datas)
chunk = size / chunksize + (size % chunksize and 1 or 0)
self.cur_chunk = 0
_on_insert_continue(True, None)

def replace(self, table_name, key_dict, callback = None):
sql = construct_replace_sql(table_name,key_dict)
adb.replace(sql, callback=callback)

def multi_replace(self, table_name, chunksize = 100, datas = [], callback = None):
def _on_replace_continue(result, ex):
if not ex:
self.cur_chunk += 1
key_list = datas[(self.cur_chunk -1) * chunksize:self.cur_chunk * chunksize]
bulk_replace_sql = construct_bulk_replace_sql(table_name,key_list)
adb.replace(bulk_replace_sql, callback=self.cur_chunk==chunk and callback or _on_replace_continue)
else:
callback and callback(result, ex)

size = len(datas)
chunk = size / chunksize + (size % chunksize and 1 or 0)
self.cur_chunk = 0
_on_replace_continue(True, None)

def update(self, table_name, key_dict, where_dict, callback = None, more_cond = ""):
sql = construct_update_sql(table_name,key_dict,where_dict,more_cond)
adb.update(sql, callback=callback)

def delete(self, table_name, where_dict, callback = None, more_cond = ""):
sql = construct_delete_sql(table_name,where_dict,more_cond)
adb.delete(sql, callback=callback)

def get(self, table_name, field_tuple, where_dict, callback, more_cond = ""):
sql = construct_select_sql(table_name,field_tuple,where_dict,more_cond)
adb.get(sql, callback=callback)

def query(self, table_name, field_tuple, where_dict, callback, more_cond = ""):
sql = construct_select_sql(table_name,field_tuple,where_dict,more_cond)
adb.query(sql, callback=callback)


组装sql的几个方法:

def construct_where_sql(where_dict):
if isinstance(where_dict,dict):
l = len(where_dict)
if l == 0:
return "1=1"
else:
return ' AND '.join(["`%s`=%s" % (k, format_value(v)) for k, v in where_dict.items()])
else:
raise ValueError(" where_dict type is invalid")

def construct_insert_sql(table_name, a_dict):
sql_template = "INSERT INTO `{0}`({1}) VALUES ({2})"
keys = ','.join(["`%s`" % k for k in a_dict.keys()])
values = ','.join(["%s" % format_value(a_dict[k]) for k in a_dict.keys() ])
return sql_template.format( table_name , keys, values )

def construct_bulk_insert_sql(table_name, a_list):
sql_template = "INSERT IGNORE INTO `{0}`({1}) VALUES {2}"
dict_0 = a_list[0]
keys = ','.join(["`%s`" % k for k in dict_0.keys()])
values = ','.join(["%s" % format_value(dict_0[k]) for k in dict_0.keys() ])
values = '(%s)' % (values)
for a_dict in a_list[1:]:
value = ','.join(["%s" % format_value(a_dict[k]) for k in a_dict.keys() ])
values += ',(%s)' % value
return sql_template.format( table_name , keys, values )

def construct_replace_sql(table_name, a_dict):
sql_template = "REPLACE INTO `{0}`({1}) VALUES ({2})"
keys = ','.join(["`%s`" % k for k in a_dict.keys()])
values = ','.join(["%s" % format_value(a_dict[k]) for k in a_dict.keys() ])
return sql_template.format( table_name , keys, values )

def construct_bulk_replace_sql(table_name, a_list):
sql_template = "REPLACE INTO `{0}`({1}) VALUES {2}"
dict_0 = a_list[0]
keys = ','.join(["`%s`" % k for k in dict_0.keys()])
values = ','.join(["%s" % format_value(dict_0[k]) for k in dict_0.keys() ])
values = '(%s)' % (values)
for a_dict in a_list[1:]:
value = ','.join(["%s" % format_value(a_dict[k]) for k in a_dict.keys() ])
values += ',(%s)' % value
return sql_template.format( table_name , keys, values )

def construct_update_sql(table_name, set_dict, where_dict, more_cond = ""):
return "UPDATE `{0}` SET {1} WHERE {2} {3}".format(table_name, ','.join(["`%s`=%s" % (k,format_value(v)) for k,v in set_dict.items()]), construct_where_sql(where_dict), more_cond)

def construct_delete_sql(table_name, where_dict, more_cond = ""):
return "DELETE FROM `{0}` WHERE {1} {2}".format(table_name, construct_where_sql(where_dict), more_cond)

def construct_select_sql(table_name, field_tuple, where_dict, more_cond = ""):
return "SELECT {0} FROM `{1}` WHERE {2} {3}".format(",".join(field_tuple), table_name, construct_where_sql(where_dict), more_cond)

class MysqlExpr(object):
def __init__(self,s):
super(MysqlExpr,self).__init__()
self._function = s

def __str__(self):
return self._function

def format_value(v):
if isinstance(v,(int,long,float)):
return str(v)
elif isinstance(v,unicode):
v = v.encode("UTF-8")
return "'%s'"  % MySQLdb.escape_string(v)
elif isinstance(v,str):
return "'%s'"  % MySQLdb.escape_string(v)
elif isinstance(v,MysqlExpr):
return v
elif v is None:
return ''


最后,看下model层使用上是不是方便些:
class Account(Entity):

table_name = TABLE_NAMES.ACCOUNT
#对应db中字段
table_keys = (
"id", "name", "sex", "created_at", "updated_at"
)

def __init__(self, **kwargs):
super(Account, self).__init__(kwargs)

def main():
ac = Account(id=1,name="test")
ac.db_insert()
ac.db_update(sex=1)
ac.db_delete()

if __name__ == "__main__":
main()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: