您的位置:首页 > 编程语言 > Python开发

gearman 分布式系统学习 python

2014-11-05 17:34 141 查看
前段时间遇到了这样一个需求

某一客户端向一个服务器提交任务

服务器再将分发下去由对于的工作人员来完成

前辈告诉我用gearman搭建一个分布式系统。gearman有三个部分,client、service和worker

client:提交任务

service:分配任务

worker:执行任务

它可以实现的效果就是一台机器上搭建好了服务器

另外可以多台机器作为客户端,可以一起提交任务,然后服务器会用个队列来存起来

然后就是起多台机器作为worker去服务器要任务

把每个步骤都分布到了不同的主机上,这就是典型的分布式系统。

(感觉这个这就是一个加强版的多线程机制,一个线程提交任务到队列,起多个线程去队列中去。就是一个网游一个是单机)

详细介绍可以看官网Gearman,官网上还有各种语言的例子

配置过程的话跟着网上来就可以,前辈说linux就是配置特别麻烦,等配置好了之后,用起来就方便了

gearman client

提交任务用法很简单,以下是用例

gm_client = gearman.GearmanClient(['localhost:4730'])

gm_client.submit_job(GRARMAN_TASK_NAME, data, priority=gearman.PRIORITY_HIGH, background=True)


记得先引包 import gearman

['localhost:4730'] 就是gearman服务器的位置,端口默认是4730

然后client有一个submit_job的方法,下面有该函数的源码,有一堆参数意思就和名字一样。如background 参数就是提交后台任务False就是等待返回结果,适用于大量提交任务。

其中最重要的参数 就是 task 在测试代码中我填写是 GRARMAN_TASK_NAME 。这个是任务的唯一标识,就是可以有多个client提交任务,但服务器怎么识别任务呢,就是靠这参数,当然worker也是靠这个参数识别。(我第一次写的时候把这个参数写成了“echo”结果提交了一坨任务,但我自己的worker只收到几个,我调试了好久才发现。。。)

gearman.client.submit_job 源码

1     def submit_job(self, task, data, unique=None, priority=PRIORITY_NONE, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None):
2         """Submit a single job to any gearman server"""
3         job_info = dict(task=task, data=data, unique=unique, priority=priority)
4         completed_job_list = self.submit_multiple_jobs([job_info], background=background, wait_until_complete=wait_until_complete, max_retries=max_retries, poll_timeout=poll_timeout)
5         return gearman.util.unlist(completed_job_list)


gearman worker

worker里面的话,除了GRARMAN_TASK_NAME 和 ['localhost:4730']值得注意意外,还有一个地方就是每个worker都需要注册一下自己要做的任务,就是下面代码中的

gm_worker.register_task(GRARMAN_TASK_NAME, task_listener_reverse)

注册的名字需要和对应client中的一致,后面一个参数是一个方法,写的格式也是代码中的那样,相当于就是拿到了任务怎么干,这个怎么干的过程就写到方法里面

gm_worker = gearman.GearmanWorker(['localhost:4730'])

def task_listener_reverse(gearman_worker, gearman_job):

#gearman_job 就是client端传过来的数据

print "这里是想要干的事"

return gearman_job.data[::-1]#返回数据的逆序

#GRARMAN_TASK_NAME  这个名字需要是任务的唯一标识
gm_worker.register_task(GRARMAN_TASK_NAME, task_listener_reverse)


GearmanAdminClient

今天有一个需求

得到gearman服务上有多少个job,又有多少worker正在工作

然后根据job和worker的数量进行一些相应的调整工作

突然发现gearman中GearmanAdminClient有以下两个方法,瞬间完成任务

def get_workers(self):
"""Retrieves a list of workers and reports what tasks they're operating on"""
self.establish_admin_connection()
self.current_handler.send_text_command

(GEARMAN_SERVER_COMMAND_WORKERS)
return self.wait_until_server_responds(GEARMAN_SERVER_COMMAND_WORKERS)

def get_status(self):
"""Retrieves a list of all registered tasks and reports how many items/workers are in

the queue"""
self.establish_admin_connection()
self.current_handler.send_text_command(GEARMAN_SERVER_COMMAND_STATUS)
return self.wait_until_server_responds(GEARMAN_SERVER_COMMAND_STATUS)


测试代码

ad_client=gearman.GearmanAdminClient(['localhost:4730'])

list=ad_client.get_workers()
for row in list:
print row

print "\n"

list=ad_client.get_status()
for row in list:
print row


部分结果展示

{'file_descriptor': '34', 'tasks': (), 'client_id': '-', 'ip': '127.0.0.1'}
{'file_descriptor': '50', 'tasks': ('resize', 'like', 'dislike'), 'client_id': '-', 'ip': '127.0.0.1'}
{'file_descriptor': '46', 'tasks': ('resize', 'like', 'dislike'), 'client_id': '-', 'ip': '127.0.0.1'}
{'file_descriptor': '59', 'tasks': ('add_phone_info',), 'client_id': '-', 'ip': '127.0.0.1'}
{'file_descriptor': '55', 'tasks': ('add_phone_info',), 'client_id': '-', 'ip': '127.0.0.1'}

{'workers': 0, 'running': 0, 'task': 'apkcrawler', 'queued': 22028}
{'workers': 0, 'running': 0, 'task': 'reverse', 'queued': 0}
{'workers': 1, 'running': 0, 'task': 'echo', 'queued': 0}
{'workers': 10, 'running': 0, 'task': 'add_phone_info', 'queued': 0}
{'workers': 0, 'running': 0, 'task': 'write_hbase', 'queued': 0}
{'workers': 0, 'running': 0, 'task': 'write_amazon', 'queued': 0}
{'workers': 10, 'running': 0, 'task': 'dislike', 'queued': 0}


  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: