您的位置:首页 > 编程语言 > Python开发

用Python实现Hadoop实时作业状态监控

2017-01-06 14:43 726 查看

基于Python的Hadoop实时作业状态监控

前言:

  任务需要,要求完成这么一个程序,恰好博主以前在虚拟机上部署过hadoop,但是部署完后一直没用过,这次就来尝试下吧。


进入正题:

一、环境及工具:

ubuntu14.04 LTS

Hadoop

Python

PycURL

二、关于 API

  先把语言放在一边,要想监控hadoop的作业状态,那hadoop至少要提供相应的API 吧,上官网一通猛翻,果然找到了,Hadoop RESTful API,

   [ Hadoop RESTful API ]

  这个链接比官网要详细一些,里面提示了hadoop提供了curl工具来使用它的API。curl是什么呢,直接上官网去看WebHDFS REST API例子吧,传送门如下:

  [ WebHDFS REST API ]

  不过看完上述两个链接,我还是不知道如何开始写我的程序,继续在官网奋战,终于找到了这个简单粗暴实用的API:hadoop提供了ResourceManager REST API。

恩,链接放在下面,让各位少走弯路(说多都是泪)。

  [ ResourceManager REST API ]

  API在手,我们就要考虑下如何利用这个资源了,前面说了要利用curl这个工具,但我们是用Python来实现这个监控程序,Python怎么来调用curl呢?带着这个疑问在网上走弯路,神奇的工具出现了,pycurl库,恩,看名字我想大家也明白了,前人的智慧不服不行。

PycURL is a Python interface to libcurl

关于pycurl的使用方法,传送门如下:

  [ PycURL库使用方法 ]

在环境中安装pycurl,安装方法如下:

   [ PycURL库安装 ]

  

  

三、利用PycURL获取json数据

  好了万事具备,我们可以上代码了:

b = StringIO.StringIO()
c = pycurl.Curl()
checkurl = "http://<rm http address:port>/ws/v1/cluster/apps" #需要监控的地址
c.setopt(pycurl.URL, checkurl)
c.setopt(pycurl.HTTPHEADER, ["Accept:"])
c.setopt(pycurl.WRITEFUNCTION, b.write) #回调
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.setopt(pycurl.MAXREDIRS, 5) #重定向
c.setopt(pycurl.CONNECTTIMEOUT, 60) #链接超时
c.perform() #运行
status = c.getinfo(c.HTTP_CODE)
if status!=200:  #HTTP状态码,200表示成功
return
html = b.getvalue()


  通过运行这段程序以及官网上的API描述,我们发现html里面存储的信息是标准的json数据,这就非常好了,因为python提供了字典这种数据结构,若是我们可以把json数据转换为字典结构,那岂不是会方便很多?

  上网简单一搜索,发现python提供了json.loads()转换函数,果然厉害。

  

  

四、转换成Python字典结构

dic_a=json.loads(html)
dic_b=dic_a['apps']              #dic_b=second dic
list_c=dic_b['app']              #list_c= thrid list
for dic_d in list_c:             #dic_d= fourth dic
print dic_d['state']


  看上述代码也能明白,json数据转换完后的字典,嵌套着字典和列表,需要一层层访问,才能获得想要的信息,而我想要的就是
state
这个作业状态 信息。

  至此,我们已经获取到了想要的内容,大家可以根据自己的需求来合理利用。

 

 

五、完整代码

  下面是按我的需求写的完整程序,带有计时功能,如果没有作业处于运行态,就启动倒计时,时间到了仍没有作业运行就发出提示(或者运行其他内容),倒计时期间有其他作业运行就关闭计时,直到又没有作业执行了,再次重新开始倒计时。

# -*- encoding: utf-8 -*-
import time
import threading
import pycurl
import StringIO
import re
import json

st=0

def time_count():
global st
i=10
while i>0:
print "i:%d"%i
i-=1
time.sleep(1)
if st==1:
st=0
return

print "there is no running task"

def check_state():
st_num=0
b = StringIO.StringIO()
c = pycurl.Curl()
checkurl = "http://<rm http address:port>/ws/v1/cluster/apps" #需要监控的地址
c.setopt(pycurl.URL, checkurl)
c.setopt(pycurl.HTTPHEADER, ["Accept:"])
c.setopt(pycurl.WRITEFUNCTION, b.write) #回调
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.setopt(pycurl.MAXREDIRS, 5) #重定向
c.setopt(pycurl.CONNECTTIMEOUT, 60) #链接超时
c.perform() #运行
status = c.getinfo(c.HTTP_CODE)

html = b.getvalue()

dic_a=json.loads(html)
dic_b=dic_a['apps']                  #dic_b=second dic
if dic_b!=None:                  #没有作业时,列表为空,非空才可获取
list_c=dic_b['app']              #list_c= thrid liebiao
for dic_d in list_c:             #dic_d= fourth dic
if dic_d['state']=='FINISHED':
st_num+=1
if st_num==len(list_c):          # 所有作业finished
return 0
else:
return 1

else:
return 0

c.close()
b.close()

if __name__ == '__main__':

t1=threading.Thread(target=time_count)
pnum=0;
while True:
pnum+=1
print 'check state %d\n'%pnum
re_state=check_state()
if re_state==0:                                        #no task is running :
if t1.is_alive() ==False:
t1=threading.Thread(target=time_count)
t1.start()
if re_state==1:                                        # task is running :
if t1.is_alive() ==True:
st=1                                   #shutdown thread
#other program
time.sleep(1)

#other program


一些说明

  本程序可以不使用线程,函数完全可以胜任,但由于后期还需改动,使用线程会更具模块化一些,方便扩展。

  

六、后记

  到这就已经结束了,但不知道有没有细心的朋友发现,所谓监控hadoop作业的状态,并且hadoop API还提供了json数据的返回形式,其实…

  来,我们看下面这段代码:

import urllib

url="http://<rm http address:port>/ws/v1/cluster/apps"
page = urllib.urlopen(url)
html = page.read()

reg = r'“state”:[A-Z]*'
imgre = re.compile(reg)
imglist = re.findall(imgre,html)
for content in imglist:
print content


  是不是有点懵,恩,博主程序写到一半的时候也发现了,既然监控只是要确认作业
state
信息,那么我们直接把html爬下来,利用正则分析下,不是一样么?

简单粗暴,高效快捷。

  不黑了,当然,对于我们这个程序,爬虫会更高效一些,但hadoop信息千千万,我们所需的数据远不止一个
state
这么简单,而利用API获得的json,则会有利于我们进行一些更加复杂的操作。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: