基于大数据开发套件定时调度带资源文件的MapReduce作业
2017-03-15 11:12
639 查看
MaxCompute里的MR作业,很少是只要跑一次就好了的。如果需要周期性调度,目前MaxCompute(原名ODPS)只提供了计算引擎,任务调度可以使用大数据开发套件来实现。这篇帖子从基础开始,介绍了3种周期性调度的方法。同时还介绍了如何使用资源文件。
在这个基础上,增加资源文件的读取方法,修改Reduce类。主要的逻辑是读取资源文件,资源文件里的数据格式是字符串1,字符串2。代码逻辑是如果word count里的word如果有在字符串1里出现的话,就替换成字符串2。
具体资源文件的用法可以参考文档 ,这里就不再多解释了。
我们先用手工调度来跑这个MR,这里跑通了后后面的所有的配置就很容易明白了。
首先需要把代码打出的jar包,和这个resource.txt文件上传到服务器上
然后通过命令行来调用
这里的-resources引用的是跑在服务器上的,-classpath是用来找到main方法的。理解这个对后面配置同步任务很有帮助。可以参阅文档
你可以先用
在Linux服务器上运行任务。注意安装odpscmd配置前需要先配置好java环境。然后后面的Crontab的配置就不展开了。
可以在右边看到可以配置任务的调度周期和上下游依赖,从而实现每天的定时调度,而且还能是保证上游的数据导入、预处理完成后才开始做MR操作,非常好用。
Shell任务需要先参考文档 先配置调度的ECS信息,这里不再展开。完成后写一个Shell脚本,内容为
要把里面的Access id/key,Project 替换成你自己的,然后开始测试代码。需要特别注意的是,** Shell任务是在机器上的admin账号下运行的** ,如果发现各种奇怪的错误,比如明明存在的文件找不到一类的错误,可以先su - admin,调试下Shell命令,或者访问下对应的文件,看看是否是环境变量,文件目录权限的问题。另外也可以把错误日志重定向到某个文件里,比如/tmp文件夹下的某个临时日志文件里,方便事后调试。大家可以在admin账号下把shell调试通过后再放到数加上去调用。
另外Shell任务可以调整调度的机器,可以参考
原文链接:
代码开发
代码以文档里的WordCount 作为例子。在这个基础上,增加资源文件的读取方法,修改Reduce类。主要的逻辑是读取资源文件,资源文件里的数据格式是字符串1,字符串2。代码逻辑是如果word count里的word如果有在字符串1里出现的话,就替换成字符串2。
public static class SumReducer extends ReducerBase { private Record result = null; private Map<String,String> maps = null; @Override public void setup(TaskContext context) throws IOException { result = context.createOutputRecord(); maps = new HashMap<String,String>(); StringBuilder importdata = new StringBuilder(); BufferedInputStream bufferedInput = null; try { byte[] buffer = new byte[1024]; int bytesRead = 0; //读取资源文件的内容 bufferedInput = context.readResourceFileAsStream("resource.txt"); while ((bytesRead = bufferedInput.read(buffer)) != -1) { String chunk = new String(buffer, 0, bytesRead); importdata.append(chunk); } //解析资源文件的内容,把替换前,替换后的数据放到map里 String lines[] = importdata.toString().split("\n"); for (int i = 0; i < lines.length; i++) { String[] ss = lines[i].split(","); maps.put(ss[0].trim(), ss[1].trim()); System.out.println(ss[0]+"->"+ss[1]); } } catch (FileNotFoundException ex) { throw new IOException(ex); } catch (IOException ex) { throw new IOException(ex); } finally { } } @Override public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException { long count = 0; while (values.hasNext()) { Record val = values.next(); count += (Long) val.get(0); } String value = key.get(0).toString(); if(maps.containsKey(value)){ System.out.println(value+"->"+maps.get(value)); value = maps.get(value); } result.set(0, value); result.set(1, count); context.write(result); } }
具体资源文件的用法可以参考文档 ,这里就不再多解释了。
客户端调用
对于测试数据,源文件的内容为odps,MaxCompute hello,Hello
我们先用手工调度来跑这个MR,这里跑通了后后面的所有的配置就很容易明白了。
首先需要把代码打出的jar包,和这个resource.txt文件上传到服务器上
>add jar D:\cx_word_count.jar -f; OK: Resource 'cx_word_count.jar' have been updated. >add file D:\resource.txt -f; OK: Resource 'resource.txt' have been updated.
然后通过命令行来调用
jar -resources cx_word_count.jar,resource.txt -classpath D:\cx_word_count.jar com.aliyun.odps.mr.WordCount;
这里的-resources引用的是跑在服务器上的,-classpath是用来找到main方法的。理解这个对后面配置同步任务很有帮助。可以参阅文档
Crontab调用
odpscmd客户端有一个参数,是-e,可以在shell里直接调用jar命令来跑MR,当然也可以使用odpscmd -f来再调用一个脚本文件,但是这样有点麻烦了。这里就直接用-e来做。你可以先用
/odps/cmd/bin/odpscmd -e "jar -resources cx_word_count.jar,resource.txt -classpath /odps/cx_word_count.jar co 4000 m.aliyun.odps.mr.WordCount;"
在Linux服务器上运行任务。注意安装odpscmd配置前需要先配置好java环境。然后后面的Crontab的配置就不展开了。
MR作业
配置DataIDE的MR作业的界面,很容易就让人想到MR任务的main方法。其实就是DataIDE会根据配置自己生成main方法,然后去调用MaxCompute上的任务。具体的配置可以参考这个截图:可以在右边看到可以配置任务的调度周期和上下游依赖,从而实现每天的定时调度,而且还能是保证上游的数据导入、预处理完成后才开始做MR操作,非常好用。
Shell任务
上述的MR任务简单方便,但是DataIDE出于安全考虑,不让用户自己写main方法。如果需要用到诸如传参数之类的功能,可以自己写Shell任务,但是调度让DataIDE来做。这样就集上面两个方法之长了。Shell任务需要先参考文档 先配置调度的ECS信息,这里不再展开。完成后写一个Shell脚本,内容为
##@resource_reference{"cx_word_count.jar,resource.txt"} /opt/taobao/tbdpapp/odpswrapper/odpsconsole/bin/odpscmd -u testid -p testkey --project=testproject --endpoint=http://service.odps.aliyun.com/api -e "jar -resources cx_word_count.jar,resource.txt -classpath /odps/cx_word_count.jar com.aliyun.odps.mr.WordCount"
要把里面的Access id/key,Project 替换成你自己的,然后开始测试代码。需要特别注意的是,** Shell任务是在机器上的admin账号下运行的** ,如果发现各种奇怪的错误,比如明明存在的文件找不到一类的错误,可以先su - admin,调试下Shell命令,或者访问下对应的文件,看看是否是环境变量,文件目录权限的问题。另外也可以把错误日志重定向到某个文件里,比如/tmp文件夹下的某个临时日志文件里,方便事后调试。大家可以在admin账号下把shell调试通过后再放到数加上去调用。
另外Shell任务可以调整调度的机器,可以参考
原文链接:
http://click.aliyun.com/m/13937/ |
相关文章推荐
- 基于大数据开发套件定时调度带资源文件的MapReduce作业
- 基于大数据开发套件定时调度带资源文件的MapReduce作业
- 【大数据系列】基于MapReduce的数据处理 SequenceFile序列化文件
- 基于ASM开发的一个关于class文件加密程序,可对整个jar进行加密且不影响资源文件
- Quartz.net通过配置文件来完成作业调度
- SSH(struts+spring+hibernate)迅速开发--第五章 定义配置和资源文件
- 基于数据库和ASP的网上教学资源管理系统的开发
- 利用.net2.0的资源文件实现多语言开发!
- .net开发笔记之七:定时清除临时文件
- sql 2000 作业调度(定时执行存储过程)
- 利用 DWR 开发基于 Ajax 的文件上载 portlet
- 利用.net2.0的资源文件实现多语言开发!
- 通过作业调度现实sql server数据库定时自动备份
- PC Camera开发日志(十五)---- 基于MFC的大型数据文件处理方法
- 海洋工作室——网站建设专家:【原】基于文件驱动的网站开发,易于维护,且使用简单,所见即所得!
- sql 2000 作业调度(定时执行存储过程)
- 首届“二六三”杯浙江大学软件设计开发大奖赛——基于P2P的多媒体应用软件设计(组委会提供以下开源的参考资源)
- NETCF开发之使用资源文件
- net2.0的资源文件实现多语言开发! 2222
- 移动项目开发笔记(总结MasterPage中的资源文件引用路径)