您的位置:首页 > 数据库 > Redis

【原创】python-rq Redis-Queue

2017-03-20 14:48 507 查看
相信大家都看过官方文档,不过需要提醒一个大家,网上的示例代码,包括官方文档,又有坑,大坑!

在获取job的result或者return_value之前,需要job显示调用一个函数,这个函数到底是什么呢,我对着官方文档看啊看啊,打开各种搜索,搜啊搜啊,没有,真真的从中午弄到下午5点!!!!
这个函数是:perform() !!!

写一段结合gevent、rq的 生产者和消费者模式的例子。
文件一共有
config.py 配置信息
myfun.py 生产函数(必须单独写道一个文件或者模块中)
creater.py 生产者,必须:from myfun import xxxx
eater.py 消费者
server.py 服务中心

哎呀,键盘出问题了,ESC和TAB失灵了,明天再写吧,睡觉了

继续写完这篇
1. config.py 

1 # -*- coding: utf8 -*-
2 # author: zhipeng
3 # date: 2016-02-28
4
5 HOST = "127.0.0.1"
6 PORT = 7379
7 RQ_NAME = "test"

2. rq-server.py

1 # -*- coding: utf-8 -*-
2
3 # author: zhipeng
4 # data: 2016-02-28
5
6 from __future__ import (absolute_import, division, print_function,
7                         unicode_literals)
8
9 from rq import Connection, Queue, Worker
10 from redis import Redis
11
12 import config
13
14
15 if __name__ == '__main__':
16     listen = [config.RQ_NAME]
17     with Connection(Redis(HOST, PORT)):
18         #queue = Queue("test")
19         worker = Worker(map(Queue, listen))
20         worker.work()

3. create.py

1 # -*- coding: utf-8 -*-
2
3 from __future__ import (absolute_import, division, print_function,
4                         unicode_literals)
5
6 import os
7 import time
8
9 from rq import Connection, Queue
10 from redis import Redis
11
12 import config
13 from myfun import build_id, set_status
14
15
16 def main():
17     # Range of Fibonacci numbers to compute
18     num = 90
19     fib_range = range(num, num + 10)
20
21     # Kick off the tasks asynchronously
22     async_results = {}
23     q = Queue(config.RQ_NAME)
24     print("empty? ",q.is_empty())
25     print("set status", False)
26     status = q.enqueue(set_status, args=(False, ))
27     print("set status", status)
28     for x in fib_range:
29         # enqueue 添加boj,第一个参数是一个函数名,至关重要,这个函数必须单独写在一个文件或者模块中!
30         set_obj = q.enqueue(build_id, args=(x, x/2+1))
31         # perform() 显示调用至关重要, 最好写在消费者中!
32         # 如果在这里显示调用,生产者会把创建好的job运行,消费者会永远阻塞,等待生产结果。
33         # 从官方文档、官方demo、百度、谷歌等等出来的答案里面,有没有提到这个函数。
34         # 这次为了程序连贯,注释perform,如想看过程,可以去掉注释perform(), 查看对比输入。
35
143a8
# set_obj.perform()
36         print("set: ", set_obj)
37     print("set status", True)
38     status = q.enqueue(set_status, args=(True, ))
39     print("set status", status)
40
41     print("empty? ",q.is_empty(),)
42     get_obj = q.dequeue()
43     print("get: ", q.dequeue())
44     print("queue method: ", dir(q))
45     print("="*20)
46     print(dir(set_obj))
47     print("--> set queue: ", set_obj)
48     print("--> set value:", set_obj.result)
49     print("="*20)
50     print(dir(get_obj))
51     print("--> get queue: ", get_obj)
52     print("--> get value:", get_obj.result)
53     print("==> get value:", get_obj.result)
54     print("==> get value(return):", get_obj.return_value)
55     #print("==> get value(return):", get_obj())
56     while get_obj.result is None:
57         print("="*30)
58         for key in dir(get_obj):
59             if key[0] == "_":
60                 continue
61             try:
62                 print("call-->", key, getattr(get_obj, key)())
63             except TypeError:
64                 print("get-->", key, getattr(get_obj, key))
65             except:
66                 pass
67         print("="*30)
68         time.sleep(1)
69     print("|==> run obj.job:", get_obj.perform())
70     print("|==> get obj result:", get_obj.result)
71
72
73 if __name__ == '__main__':
74     with Connection(Redis(config.HOST, config.PORT)):
75         main()

4. eat.py

1 # -*- coding: utf-8 -*-
2 # author: zhipeng
3 # date: 2016-02-28
4
5 from __future__ import (absolute_import, division, print_function,
6                         unicode_literals)
7
8 import os
9 import time
10
11 import gevent
12 from gevent import Greenlet
13
14 from rq import Connection, Queue
15 from redis import Redis
16
17 import config
18
19
20 class GreenThread(Greenlet):
21
22     def __init__(self, n, queue):
23         Greenlet.__init__(self)
24         self.name = "Thread-%s" % n
25         self._queue = queue
26
27     def run(self):
28         done = False
29         while True:
30             if self._queue.is_empty():
31                 #  如果队列为空,并且没有标记完成,等待任务
32                 if not done:
33                     print("queue is empty, wait 2sec...")
34                     gevent.sleep(3)
35                     continue
36                 # 如果队列为空,并且标记完成,任务结束
37                 else:
38                     print("queue is empty, done mark is: True. end all job.")
39                     break
40
41             obj = self._queue.dequeue()
42             # 这是坑,一定要显示调用perform函数! 写在生产者里面任务就自己完成了。
43             # 如果perform在这里调用,可以认为,生产者只是生产了任务(函数),这里才是真正意思开始生产,并消费。
44             obj.perform()
45             print("obj:", obj)
46             print("try get-> result", obj.result)
47             print("try get-> return value", obj.return_value)
48             time.sleep(2)
49             print("re try get-> result", obj.result)
50             print("re try get-> return value", obj.return_value)
51
52             # 如果一直没有结果,则一直等待任务执行完成.
53             while obj.return_value is None and obj.result is None:
54                 print("job return value is None, wait...")
55                 gevent.sleep(2)
56
57             print("==> result", obj.result)
58             print("==> return value", obj.return_value)
59
60             if obj.result == True:
61                 done = True
62                 continue
63
64             self._do_some(obj.result)
65             continue
66
67     def _do_some(self, create_value):
68         print(self.name, "I'm got a creater.job value:", create_value)
69
70
71 def main():
72     queue = Queue(config.RQ_NAME)
73     threads = []
74     for i in range(10):
75         threads.append(GreenThread(i, queue))
76
77     for i in threads:
78         i.start()
79
80     for i in threads:
81         try:
82             i.join()
83         except gevent.hub.LoopExit, e:
84             print("join error")
85     print('Done')
86
87
88 if __name__ == '__main__':
89     with Connection(Redis(config.HOST, config.PORT)):
90         main()
91

5. myfun.py

1 # -*- coding: utf-8 -*-
2 # author: zhipeng
3 # date: 2016-02-28
4
5
6 from __future__ import (absolute_import, division, print_function,
7                         unicode_literals)
8
9 import time
10
11 def build_id(x, y):
12     time.sleep(x % 3 + 3)
13     num = x*y if x % 3 == 1 else x
14     print ("call slow fib result:", num)
15     return num
16
17 def set_status(status=False):
18     return status


运行效果如下:
1. rq-server.py




2. create.py








3. eat.py




嗯,可以看到,先把server
run起来,然后creater和eater顺序随意启动,生产者创建任务,消费者获取到任务,显示调用perform()
开始执行函数。
但有个BUG,不知道你们是否发现,就是消费者没有按照预定的生产者的status标记退出循环,而是一直等待中。

原因是:我在消费者中注释掉perform,生产者创建好任务后,看server,就会发现,server竟然在run任务。也就是说,会和perform()抢着run任务。。。等run完以后,我run
消费者,发现一直是空队列。。。。

----

为了解决这个BUG,查看了一下源码,发现在/usr/lib/python2.7/site-packages/rq/worker.py
中work函数中,大概431行,self.execute_job(job),坑爹啊,本应是服务端只做调度,竟然生产数据,导致的结果是消费者拿不到生产者的result
~当然,人家类名叫worker,本身就是生成和消费的一体的
我是好奇,如果不通过dequeue().result的方式获取数据,还有什么函数或者方式可以获取生产者的结果呢?
首先验证一下数据有没有存进Redis,Key值为rq:job
hgetall
"rq:job:2ee02db4-dabc-465e-9b18-08777b7ea9b7"
hget
"rq:job:2ee02db4-dabc-465e-9b18-08777b7ea9b7" result
hmget
"rq:job:2ee02db4-dabc-465e-9b18-08777b7ea9b7" result
============

127.0.0.1:7379> hgetall
"rq:job:5ac70054-8e0c-4e5b-8d4f-8c020c13d541"
 1) "status"
 2) "finished"
 3) "origin"
 4) "test"
 5) "description"
 6) "myfun.build_id(93, 47.5)"
 7) "created_at"
 8) "2016-03-01T10:57:05Z"
 9) "enqueued_at"
10) "2016-03-01T10:57:05Z"
11) "timeout"
12) "180"
13) "data"
14)
"\x80\x02(X\x0e\x00\x00\x00myfun.build_idq\x01NK]G@G\xc0\x00\x00\x00\x00\x00\x86q\x02}q\x03tq\x04."
15) "ended_at"
16) "2016-03-01T10:57:11Z"
17) "result"
18) "\x80\x02K]."
19) "ttl"
20) "-1"

============

127.0.0.1:7379> hgetall
"rq:job:5ac70054-8e0c-4e5b-8d4f-8c020c13d541"
(empty list or set)
============

数据确实存在于Redis,但result结果是做过处理的,理应是float类型,但"\x80\x02K]."什么鬼...
其实就是说,只要把work.execute_job(job)->main_work_horse(self,
job) --> perform_job(self, job)
重载一下就可以避免server
run任务了。
尝试一下 重载main_work_horse
在rq-server中定义函数 
worker = Worker(map(Queue, listen))
worker.main_work_horse = (lambda x: 1) 即可




把server run起来,create一下任务,可以看到server,任务没有被运行了。

执行一下eat,竟然是空队列,看着这个函数不能改,那只能对perform_job 下手试试看效果。
结果还是不行,看来只能对perform_job进行重构,不能直接pass.
发现也不行,里面也不行。我一直试了一个小时,然后静下心来想,job
job,已经拿到job了!这是从消息队列里面取出来的,无论做什么操作,都白搭!我开始以为server(worker)这边是消费者生产者发来的通知,不会影响消息队列大小。错误,错误。
需要在 对
work()中调用的 self.dequeue_job_and_maintain_ttl()做重构,因为这里面会dequeue_any(),导致消费者任务异常,怎么改呢?
这样改最合适
def work(self, burst=False, call_func=True):
    do some
... 
    try:
     
  while True:
     
      do some
....
     
      timeout =
None if burst else max(1, self.default_worker_ttl - 60)
     
     if not call_func:
   
     
     
 time.sleep(1)

   
     
     
 continue

     
     
 result =
self.dequeue_job_and_maintain_ttl(timeout)
    except:
     
  pass
红色部分添加是添加的代码,可以直接继承Worker
重写work;
也可以直接修改源码,这样做并不会影响其他任何程序。
然后在rq-server.py中worker.work()
改成worker.work(call_func=False)

源码修改示例:




server端就没有任何输出了,生产者不变(生产函数),消费者运行生产者函数。
坑爹,消费者协程模式run不起来了,不用join,改成主线程while True也是这样。阻塞了。。。




暂时先不管这个了,还有个问题要说。。。
这个模式里面感觉其实还是有问题的,perform默认是会返回数据的。
试着把生产者create.py中的perform不注释,把eat.py 中的perform注释掉。

发现竟然可以一边生产函数、执行函数返回结果,另一半消费者可以直接拿到数据了。生产了14条,有5条拿到了return_value。
也是个Bug啊。。。。







这个问题其实可以解决,理论上:既然eat中能拿到job-id,只是没有返回值。在rq源码job.py中,把perform也稍稍微改改,或者直接在在create.py中改,先将obj状态改回,然后
obj.save()
将job重新写入redis,eat.py中dequeue的时候是从redis中根据任务id获取到任务列表,然后再去获取job的,如果这样获取不到只能那id直接从redis查询。

2016-03-02 03:42:54  不搞了,睡觉(~﹃~)~zZ
-------

而且这个是会定时清理Redis队列的,解决办法:enqueue(fun,
args)的时候,加上参数result_ttl=-1,0表示立即删除。
就会变成这样子如下:




具体可以查看代码:/usr/lib/python2.7/site-packages/rq/worker.py
557行左右:perform_job(self, job) 
还有一个参数是ttl 这个默认就可以,作用是创建任务后,多久没消费就删除任务。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: