用Python实现Hadoop实时作业状态监控
2017-01-06 14:43
726 查看
基于Python的Hadoop实时作业状态监控
前言:
任务需要,要求完成这么一个程序,恰好博主以前在虚拟机上部署过hadoop,但是部署完后一直没用过,这次就来尝试下吧。
进入正题:
一、环境及工具:
ubuntu14.04 LTSHadoop
Python
PycURL
二、关于 API
先把语言放在一边,要想监控hadoop的作业状态,那hadoop至少要提供相应的API 吧,上官网一通猛翻,果然找到了,Hadoop RESTful API,[ Hadoop RESTful API ]
这个链接比官网要详细一些,里面提示了hadoop提供了curl工具来使用它的API。curl是什么呢,直接上官网去看WebHDFS REST API例子吧,传送门如下:
[ WebHDFS REST API ]
不过看完上述两个链接,我还是不知道如何开始写我的程序,继续在官网奋战,终于找到了这个简单粗暴实用的API:hadoop提供了ResourceManager REST API。
恩,链接放在下面,让各位少走弯路(说多都是泪)。
[ ResourceManager REST API ]
API在手,我们就要考虑下如何利用这个资源了,前面说了要利用curl这个工具,但我们是用Python来实现这个监控程序,Python怎么来调用curl呢?带着这个疑问在网上走弯路,神奇的工具出现了,pycurl库,恩,看名字我想大家也明白了,前人的智慧不服不行。
PycURL is a Python interface to libcurl
关于pycurl的使用方法,传送门如下:
[ PycURL库使用方法 ]
在环境中安装pycurl,安装方法如下:
[ PycURL库安装 ]
三、利用PycURL获取json数据
好了万事具备,我们可以上代码了:b = StringIO.StringIO() c = pycurl.Curl() checkurl = "http://<rm http address:port>/ws/v1/cluster/apps" #需要监控的地址 c.setopt(pycurl.URL, checkurl) c.setopt(pycurl.HTTPHEADER, ["Accept:"]) c.setopt(pycurl.WRITEFUNCTION, b.write) #回调 c.setopt(pycurl.FOLLOWLOCATION, 1) c.setopt(pycurl.MAXREDIRS, 5) #重定向 c.setopt(pycurl.CONNECTTIMEOUT, 60) #链接超时 c.perform() #运行 status = c.getinfo(c.HTTP_CODE) if status!=200: #HTTP状态码,200表示成功 return html = b.getvalue()
通过运行这段程序以及官网上的API描述,我们发现html里面存储的信息是标准的json数据,这就非常好了,因为python提供了字典这种数据结构,若是我们可以把json数据转换为字典结构,那岂不是会方便很多?
上网简单一搜索,发现python提供了json.loads()转换函数,果然厉害。
四、转换成Python字典结构
dic_a=json.loads(html) dic_b=dic_a['apps'] #dic_b=second dic list_c=dic_b['app'] #list_c= thrid list for dic_d in list_c: #dic_d= fourth dic print dic_d['state']
看上述代码也能明白,json数据转换完后的字典,嵌套着字典和列表,需要一层层访问,才能获得想要的信息,而我想要的就是
state这个作业状态 信息。
至此,我们已经获取到了想要的内容,大家可以根据自己的需求来合理利用。
五、完整代码
下面是按我的需求写的完整程序,带有计时功能,如果没有作业处于运行态,就启动倒计时,时间到了仍没有作业运行就发出提示(或者运行其他内容),倒计时期间有其他作业运行就关闭计时,直到又没有作业执行了,再次重新开始倒计时。# -*- encoding: utf-8 -*- import time import threading import pycurl import StringIO import re import json st=0 def time_count(): global st i=10 while i>0: print "i:%d"%i i-=1 time.sleep(1) if st==1: st=0 return print "there is no running task" def check_state(): st_num=0 b = StringIO.StringIO() c = pycurl.Curl() checkurl = "http://<rm http address:port>/ws/v1/cluster/apps" #需要监控的地址 c.setopt(pycurl.URL, checkurl) c.setopt(pycurl.HTTPHEADER, ["Accept:"]) c.setopt(pycurl.WRITEFUNCTION, b.write) #回调 c.setopt(pycurl.FOLLOWLOCATION, 1) c.setopt(pycurl.MAXREDIRS, 5) #重定向 c.setopt(pycurl.CONNECTTIMEOUT, 60) #链接超时 c.perform() #运行 status = c.getinfo(c.HTTP_CODE) html = b.getvalue() dic_a=json.loads(html) dic_b=dic_a['apps'] #dic_b=second dic if dic_b!=None: #没有作业时,列表为空,非空才可获取 list_c=dic_b['app'] #list_c= thrid liebiao for dic_d in list_c: #dic_d= fourth dic if dic_d['state']=='FINISHED': st_num+=1 if st_num==len(list_c): # 所有作业finished return 0 else: return 1 else: return 0 c.close() b.close() if __name__ == '__main__': t1=threading.Thread(target=time_count) pnum=0; while True: pnum+=1 print 'check state %d\n'%pnum re_state=check_state() if re_state==0: #no task is running : if t1.is_alive() ==False: t1=threading.Thread(target=time_count) t1.start() if re_state==1: # task is running : if t1.is_alive() ==True: st=1 #shutdown thread #other program time.sleep(1) #other program
一些说明:
本程序可以不使用线程,函数完全可以胜任,但由于后期还需改动,使用线程会更具模块化一些,方便扩展。
六、后记
到这就已经结束了,但不知道有没有细心的朋友发现,所谓监控hadoop作业的状态,并且hadoop API还提供了json数据的返回形式,其实…来,我们看下面这段代码:
import urllib url="http://<rm http address:port>/ws/v1/cluster/apps" page = urllib.urlopen(url) html = page.read() reg = r'“state”:[A-Z]*' imgre = re.compile(reg) imglist = re.findall(imgre,html) for content in imglist: print content
是不是有点懵,恩,博主程序写到一半的时候也发现了,既然监控只是要确认作业
state信息,那么我们直接把html爬下来,利用正则分析下,不是一样么?
简单粗暴,高效快捷。
不黑了,当然,对于我们这个程序,爬虫会更高效一些,但hadoop信息千千万,我们所需的数据远不止一个
state这么简单,而利用API获得的json,则会有利于我们进行一些更加复杂的操作。
相关文章推荐
- 用Python实现一个细粒度hadoop作业监控分析工具
- python 实现实时监控snapshot 状态
- 监控redis数据库应用状态:python,tornado实现
- 用python实现监控网站状态,并发送告警邮件
- hadoop命令行运行wordcount程序 web端监控不到作业状态
- 树莓派--python实现实时监控
- 用python实现监控网站状态,并发送告警邮件
- 基于邮件系统的远程实时监控系统的实现 Python版
- python实现实时监控文件的方法
- lvs+keepalived实现实时监控节点健康状态,并根据算法接管资源
- 如何实现实时监控数据库主从同步的状态
- zabbix使用Python实现监控MongoDB副本集状态
- python2.7 实现的实时监控指定股票价格的小程序
- Python实现数据可视化看如何监控你的爬虫状态【推荐】
- WCF服务通过TCP实时监控客户端状态,并可以向客户端广播推送消息,实现双向通信
- python实现实时监控文件的方法
- 通过nagios实现MD5实时监控iptables状态 推荐
- Qt实现 实时监控文件夹状态
- 使用swatch实时监控cisco路由器和H3C交换机端口状态
- 本周目标--用python实现系统监控测试