您的位置:首页 > 编程语言 > Python开发

python操作sqlite示例(支持多进程/线程同时操作)

2013-01-30 14:16 731 查看
python操作sqlite的示例代码:

import time
import threading
import sqlite3

def nomal_producer(conn):
'''
@summary: producer defination
'''
counter = 0
conn.isolation_level = None
conn.row_factory = sqlite3.Row

while True:
# insert to db
cur = conn.cursor()
cur.execute("INSERT INTO datas(content, flag) VALUES (?, ?);", ("content %s"%counter, False))
counter = counter + 1
# conn.commit()
time.sleep(0.1)

def nomal_consumer(conn):
'''
@summary: consumer defination
'''
conn.isolation_level = None
conn.row_factory = sqlite3.Row
while True:
# select data
cur = conn.cursor()
cur.execute("SELECT * FROM datas ORDER BY id LIMIT 10;")
records = cur.fetchall()
if len(records) > 0:
print "begin to delete: "
print records
# delete records
for r in records:
conn.execute("DELETE FROM datas WHERE id = ?;", (r["id"], ))
time.sleep(0.5)

if __name__ == "__main__":
# init db
conn = sqlite3.connect('./db.sqlite', check_same_thread = False)
# conn = sqlite3.connect('./db.sqlite')
# init thread
producer = threading.Thread(target = nomal_producer, args = (conn,))
consumer = threading.Thread(target = nomal_consumer, args = (conn,))

# start threads
producer.start()
consumer.start()


在多进程操作sqlite的示例代码中,采用producer和consumer的模式来处理,没有特殊之处,但需要注意的是:在建立sqlite3的connection的时候,需要设置check_same_thread = False。
另外,为了达到真正的thread-safe,可以对python的sqlite3做进一步封装,以达到仅有一个thread在操作sqlite,原理很简单,就是使用queue来处理所有操作请求并同时将结果返回到另外一个queue中去,示例代码如下:

import sqlite3
from Queue import Queue
from threading import Thread

class SqliteMultithread(Thread):
"""
Wrap sqlite connection in a way that allows concurrent requests from multiple threads.

This is done by internally queueing the requests and processing them sequentially
in a separate thread (in the same order they arrived).

"""
def __init__(self, filename, autocommit, journal_mode):
super(SqliteMultithread, self).__init__()
self.filename = filename
self.autocommit = autocommit
self.journal_mode = journal_mode
self.reqs = Queue() # use request queue of unlimited size
self.setDaemon(True) # python2.5-compatible
self.start()

def run(self):
if self.autocommit:
conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
else:
conn = sqlite3.connect(self.filename, check_same_thread=False)
conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
conn.text_factory = str
cursor = conn.cursor()
cursor.execute('PRAGMA synchronous=OFF')
while True:
req, arg, res = self.reqs.get()
if req == '--close--':
break
elif req == '--commit--':
conn.commit()
else:
cursor.execute(req, arg)
if res:
for rec in cursor:
res.put(rec)
res.put('--no more--')
if self.autocommit:
conn.commit()
conn.close()

def execute(self, req, arg=None, res=None):
"""
`execute` calls are non-blocking: just queue up the request and return immediately.

"""
self.reqs.put((req, arg or tuple(), res))

def executemany(self, req, items):
for item in items:
self.execute(req, item)

def select(self, req, arg=None):
"""
Unlike sqlite's native select, this select doesn't handle iteration efficiently.

The result of `select` starts filling up with values as soon as the
request is dequeued, and although you can iterate over the result normally
(`for res in self.select(): ...`), the entire result will be in memory.

"""
res = Queue() # results of the select will appear as items in this queue
self.execute(req, arg, res)
while True:
rec = res.get()
if rec == '--no more--':
break
yield rec

def select_one(self, req, arg=None):
"""Return only the first row of the SELECT, or None if there are no matching rows."""
try:
return iter(self.select(req, arg)).next()
except StopIteration:
return None

def commit(self):
self.execute('--commit--')

def close(self):
self.execute('--close--')

#endclass SqliteMultithread
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: