如何用python实现一个多线程定时器
2017-11-04 21:57
351 查看
因为自已要写一个和时间有关的方法,每过几秒钟之后要运行一个函数,但在主线程里写一个死循环来作定时器总是觉得不好。正好今天学习了一下python的多线程,可以拿来练手。写了下边的python定时器类,使用这个类你可以在你的代码中加入一个定时器。代码如下(pytimer.py):#!/usr/bin/env python
# -*- coding: utf-8 -*-
import threading
import time
from Queue import Queue
class _timerThread(threading.Thread):
def __init__(self, t_name,queue,cond):
threading.Thread.__init__(self, name=t_name)
self.threadtimes = []
self.threadFunc = {}
self.lasttimes = {}
self.queue = queue
def setNewTimer(self,newobj):
if newobj.func == None:
self.threadtimes.remove(newobj.secendtime)
self.threadFunc[str(newobj.secendtime)] = None
self.lasttimes[str(newobj.secendtime)] = None
else:
if newobj.secendtime in self.threadtimes:
self.threadFunc[str(newobj.secendtime)] = newobj.func
self.lasttimes[str(newobj.secendtime)] = int(time.time())
else:
self.threadtimes.append(newobj.secendtime)
self.threadFunc[str(newobj.secendtime)] = newobj.func
self.lasttimes[str(newobj.secendtime)] = int(time.time())
def run(self):
while(True):
if not self.queue.empty():
objtmp = self.queue.get()
self.setNewTimer(objtmp)
timetmp = int(time.time())
for tx in self.threadtimes:
if timetmp - self.lasttimes[str(tx)] >= tx:
self.lasttimes[str(tx)] = timetmp
self.threadFunc[str(tx)](timetmp)
self.condition.release()
class _timerObj():
def __init__(self,secendt,funct):
self.secendtime = secendt
self.func = funct
class pytimer():
def __init__(self):
self.queue = Queue()
self.cond = threading.Condition()
self.t_thread = _timerThread(str(int(time.time())),self.queue, self.cond)
self._timers = []
self._initTimer()
def _initTimer(self):
self.t_thread.setDaemon(True)
self.t_thread.start()
def getTimers(self):
return self._timers
def setTimer(self,secendTime,Func):
objtmp = _timerObj(secendTime,Func)
self._timers.append(secendTime)
self.queue.put(objtmp)
def removeTimer(self,secendTime):
objtmp = _timerObj(secendTime,None)
self._timers.remove(secendTime)
self.queue.put(objtmp)
功能测试(test.py):#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pytimer
def timerCallBack(timex):
print timex
def main():
timerx = pytimer.pytimer()
timerx.setTimer(2, timerCallBack)
while(True):
pass
if __name__ == '__main__':
main()
运行测试程序后输出结果:1446334639
1446334641
1446334643
1446334645
1446334647
1446334649
1446334651
1446334653...运行结果与预期想要的结果相同。每两秒调用了一次定时器返回函数。源码获取地址:https://github.com/fengmm521/pytimer/tree/master
# -*- coding: utf-8 -*-
import threading
import time
from Queue import Queue
class _timerThread(threading.Thread):
def __init__(self, t_name,queue,cond):
threading.Thread.__init__(self, name=t_name)
self.threadtimes = []
self.threadFunc = {}
self.lasttimes = {}
self.queue = queue
def setNewTimer(self,newobj):
if newobj.func == None:
self.threadtimes.remove(newobj.secendtime)
self.threadFunc[str(newobj.secendtime)] = None
self.lasttimes[str(newobj.secendtime)] = None
else:
if newobj.secendtime in self.threadtimes:
self.threadFunc[str(newobj.secendtime)] = newobj.func
self.lasttimes[str(newobj.secendtime)] = int(time.time())
else:
self.threadtimes.append(newobj.secendtime)
self.threadFunc[str(newobj.secendtime)] = newobj.func
self.lasttimes[str(newobj.secendtime)] = int(time.time())
def run(self):
while(True):
if not self.queue.empty():
objtmp = self.queue.get()
self.setNewTimer(objtmp)
timetmp = int(time.time())
for tx in self.threadtimes:
if timetmp - self.lasttimes[str(tx)] >= tx:
self.lasttimes[str(tx)] = timetmp
self.threadFunc[str(tx)](timetmp)
self.condition.release()
class _timerObj():
def __init__(self,secendt,funct):
self.secendtime = secendt
self.func = funct
class pytimer():
def __init__(self):
self.queue = Queue()
self.cond = threading.Condition()
self.t_thread = _timerThread(str(int(time.time())),self.queue, self.cond)
self._timers = []
self._initTimer()
def _initTimer(self):
self.t_thread.setDaemon(True)
self.t_thread.start()
def getTimers(self):
return self._timers
def setTimer(self,secendTime,Func):
objtmp = _timerObj(secendTime,Func)
self._timers.append(secendTime)
self.queue.put(objtmp)
def removeTimer(self,secendTime):
objtmp = _timerObj(secendTime,None)
self._timers.remove(secendTime)
self.queue.put(objtmp)
功能测试(test.py):#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pytimer
def timerCallBack(timex):
print timex
def main():
timerx = pytimer.pytimer()
timerx.setTimer(2, timerCallBack)
while(True):
pass
if __name__ == '__main__':
main()
运行测试程序后输出结果:1446334639
1446334641
1446334643
1446334645
1446334647
1446334649
1446334651
1446334653...运行结果与预期想要的结果相同。每两秒调用了一次定时器返回函数。源码获取地址:https://github.com/fengmm521/pytimer/tree/master
相关文章推荐
- 教你如何用 Python 来实现一个大数据搜索引擎
- 学python(03)—— 如何使用函数实现一个随机字符串里的大小写字符互换
- 看我如何用 Python 实现一个轻量型数据库
- python代码实现:如何反序的迭代一个序列?
- 如何用Python实现任一个英文的纯文本文件,统计其中的单词出现的个数?
- 如何实现从WinForm中打开一个需要身份验证的Web系统?
- 发布一个用Python实现的“法语动词变位工具”
- [Python]如何取出一个超大文本文件的最后几行
- 如何完成一个实现Pause和Continue这两个功能的Windows Service
- 自己写一个strcpy(char*dest,char*src),如何在函数内部实现防御性溢出?
- 如何遍历一个窗体中的某一种控件 c#实现
- 如何用C#写一个实现像股票行情的波动曲线图
- 如何多次包含一个含有实现的头文件
- 如何自定义一个Remoting中Sink对象实现特定协议上的消息转发
- 如何实现同一用户只能存在一个实例?
- [Python]Python中实现一个时间(日期)型加几个月的运算
- 如何在ASP.net中实现限制一个用户名在多个客户端IE登陆的方法
- [Python]如何取出一个超大文本文件的最后几行
- linux下如何实现为一个网卡绑定多个IP地址
- 如何编程实现删除一个sap的在线用户