您的位置:首页 > 理论基础 > 计算机网络

基于Oozie实现MapReduce作业的自动提交功能

2016-01-12 17:16 429 查看
Oozie是一个Hadoop工作流管理系统。OozieClient RestFul API官方参考如下。它提供了JAVA API 和 RESTFul API 两种形式使用Oozie客户端可以向Oozie服务端提交workflow。

workflow即工作流,在Oozie中使用workflow来配置各种类型的动作节点,如MapReduce类型的action,这些action就是完成具体功能的程序。而workflow负责组织各个动作节点,形成有向无环图。

对于用户而言,传统的MapReduce程序的提交运行过程如下:

a) 将编写好的模型打包成jar文件并上传到Hadoop集群的HDFS上;

b) 根据具体的作业配置好程序的输入目录和输出目录;

c) 使用hadoop jar 命令提交计算作业。

采用Oozie之后,可以借助Oozie来负责命令的提交。下面介绍使用Oozie RESTFul API 提交作业。

使用Oozie提交作业的流程如下:

a) 用户先提前将作业jar包以及相应的workflow.xml配置文件上传到HDFS上的合适目录中

b)运行ooziejob -oozie ..... -run 命令或者通过编程实现Oozie RestFul API提交作业。

Oozie的提交原理简介

当Oozie Server接收到用户的提交请求后,Oozie Server解析本次作业的workflow.xml文件,执行工作流,并启动一个特殊的mapreduce作业(没有reducer),由这个特殊的mpareduce作业负责真正地提交Hadoop作业执行(如MapReduce的wordcount)。正是由于Oozie Server通过启动一个特殊的MR作业来向Hadoop集群提交作业,事先是不知道这个特殊的MapReduce作业运行在哪个机器上的,因此这也是为什么需要提前将写好的程序jar文件以及配置文件workflow.xml先上传到HDFS上的原因。

作业提交的生产者--消费者模型

当有许多作业进行提交时,可以使用生产者--消费者模型来管理这些作业。

本生产者--消费者的设计思路如下:

当向Oozie Server提交一个作业之后,会返回一个jobId,生产者负责将jobId添加到链表中,交将作业的一些基本的静态信息保存到数据库中;而消费者则从链表中取出jobId,并根据此jobId来查询本次作业的执行时间、执行状态等信息。以此为基础,可以统计出所有向Oozie Server提交的作业。

提交系统提供了两种方式URI给用户进行作业提交:一种是Servelet实现,一种是WebService实现。

用户只需要通过发送HTTP请求,携带用户名和AppPath(提前配置好的上传到HDFS中的作业lib文件夹所在目录)两个参数,就可以完成作业的提交。用户名可以进行一定的权限验证,而AppPath则是告诉Oozie Server待执行的作业在哪里,Oozie Server根据AppPath找到作业,生成工作流,然后将作业提交给Hadoop计算,并监控作业直到完成。

作业提交的部分代码如下:

String jsonJobId = oozieClient.submitJob(userName, appPath);//submit job to oozie server
jobId = Parse.getJobId(jsonJobId);//get job id
System.out.println("job " + jobId + " submited at " + new Date());

//start producer thread
new Thread(new AddJob(this)).start();
//start consumer thread
new Thread(new JobStatistic(oozieClient)).start();


由于每一个作业都有一个唯一的作业ID,故生产者线程先将本次作业提交相关信息保存到数据库中,然后再将作业ID添加到生产者-消费者队列中(一个LinkedList)。生产者的部分代码如下:

String jobId = schedulerAction.getJobId();
String userName = schedulerAction.getUserName();
String appPath = schedulerAction.getAppPath();
long inputSize = CommonUtil.getFileSizeWithSplitPath(appPath, userName);
try{
db.insertJobProperty(userName, appPath, inputSize, jobId);
System.out.println("insert into mysql success");
}catch(SQLException e){
e.printStackTrace();
System.out.println("SQL Exeception");
}
//after producer insert job property into mysql, add jobId to LinkedList.so consumer always update successful.
addId(jobId);//as producer thread add jobId in LinkedList.
其中inputSize是本次作业运行时所处理的输入大小。这可以通过WebHDFS访问输入目录获得。

消费者负责从LinkedList中取出jobId,并根据此jobId向Oozie Server发送HTTP请求查询当前作业的执行状态以及作业执行完成后所花的时间。消费者部分代码如下:

String jobId = null;
synchronized (jobIds) {
try {
if (jobIds.size() == 0)
jobIds.wait();//LinkedList is empty
jobId = jobIds.removeLast();
System.out.println("remove a job from LinkedList");
} catch (InterruptedException e) {
e.printStackTrace();
}
}// end sync

try {
db.updateRunTime(runTime, jobId);
System.out.println("update job run Time success");
} catch (SQLException e) {
e.printStackTrace();
}


Client向Oozie Server发送HTTP请求通过 Apache HTTP Client 包来实现。

需要改进的地方:

1,每来一个提交请求时,就会启动二个线程,一个是生产者线程另一个是消费者线程。而对于消费者线程而言,可以预先启动起来,作为后台线程一直在运行,负责查询作业的执行情况。而不是一次请求就启动一个消费者线程。

2,一个提交请求会触发两次写数据库。一次发生在生产者将作业提交的相关信息写入数据库,一次发生在消费者将作业运行相关信息写入数据库。可以先将这些信息缓存在内存中,当提交的请求次数达到一定程度时,再统一写入数据库。从而减少访问数据库的次数。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息