hadoop之MapReduce调用R的一次失败的总结~(续一)
2015-07-17 14:03
363 查看
问题原因虽没有追溯到,但有可以去避免它的发生!所以我改写了同事的MapReduce,严格控制Map的数量!下面贴上代码。
然而问题并没有解决,我检索github(https://github.com/RevolutionAnalytics/RHadoop/wiki/user%3Erhbase%3EHome)中,
有句话引起了我的注意,This package was built and tested using Thrift 0.8
所以我决定把thrift版本回退到0.8。
进入thrift0.9然后执行
进入thrift0.8的目录,按github的指引操作,make遇到了一点错误,如下
我检索了下,的确没有该文件。好在它是和ruby相关的东西。我也不懂他要干嘛,索性
其它按github的流程都能进行下去,也不过多累述了。
但是换完版本测试,发现问题居然还在~~~
不过我已经想好对策了,数据抽取部分,还是暂时放弃rhbase好了,直接Map中抽取完数据传递给R,虽然也许更麻烦,性能或许会变得更差些,但是前进的道路还得继续!
package mytest; import java.io.IOException; import java.net.URI; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.rosuda.JRI.Rengine; public class MapRLanguage { public static int mapNum = 12; public static final String RDIR_H="hdfs://bd110:9000/user/hadoop/"; public static final String RDIR_L="/home/hadoop/yibei/R/"; public static class RMapper extends Mapper<Object, Text, NullWritable, NullWritable>{ //每个文件只有一行 public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { //run R System.out.println(value); String args[] = value.toString().split("\\|"); Rengine re=new Rengine(new String[] { "--vanilla" }, false,null); if (!re.waitForR()) { System.out.println("Cannot load R"); return; } re.eval("setwd('"+RDIR_L+"')"); re.eval("source('main2.R')"); String rcmd="kpi.forecastByDBWithTime('"+args[4]+"','"+args[5]+"','"+args[6]+"','"+args[7]+"','"+args[1]+"','" + args[0] +"','" + args[2] +"','" + args[3] + "')"; re.eval(rcmd); re.end(); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { String dbtbl = "stat_plan_result_"+args[0].trim(); String dbname = args[1].trim(); String cells = args[2].trim(); String kpis = args[3].trim(); String hbtime = args[4].trim(); String hetime = args[5].trim(); String fbtime = args[6].trim(); String fetime = args[7].trim(); //2种思路,1种设置多少Size拆分一个Map使用FileInputFormat.setMaxInputSplitSize(),另1种利用每个文件一个Map的特性处理它。按文件处理也可以使用job.setInputFormatClass(); // String cells="1,2,3,4,5,6,7,8,9,10,11,12,13,14"; String cellarr[] = cells.split(","); String currentDir =RDIR_H+new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())+"/in/"; String currentoutDir =RDIR_H+new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())+"/out/"; if(cellarr.length<=mapNum){ for(int i =0;i<cellarr.length;i++){ //输出文件 String filepath = currentDir+i; FileSystem fs = FileSystem.get(URI.create(filepath), new Configuration()); FSDataOutputStream out = fs.create(new Path(filepath)); out.write((dbtbl+"|"+dbname+"|"+cellarr[i]+"|"+kpis+"|"+hbtime+"|"+hetime+"|"+fbtime+"|"+fetime).getBytes("UTF-8")); out.close(); } }else{ int s = cellarr.length/mapNum; int y = cellarr.length%mapNum; for(int i =0,j=y;i<mapNum;i++){ //2种分配方案,1种是循环每个cell,加入文件。另1种是计算出,每次分配的数量。 int cellNum=s; if(j>0){ cellNum+=1; j--; } String celldata=""; for(int k =0;k < cellNum; k++){ //每个Map的小区,从cellarr中获取。 int cellindex=k+i*s+(cellNum>s?i:y); celldata+=cellarr[cellindex]+","; } String filepath = currentDir+i; FileSystem fs = FileSystem.get(URI.create(filepath), new Configuration()); FSDataOutputStream out = fs.create(new Path(filepath)); out.write((dbtbl+"|"+dbname+"|"+celldata.substring(0,celldata.length()-1)+"|"+kpis+"|"+hbtime+"|"+hetime+"|"+fbtime+"|"+fetime).getBytes("UTF-8")); out.close(); } } Job job = Job.getInstance(new Configuration()); job.getConfiguration().setInt("mapreduce.task.timeout", 0);//关闭超时 job.setJarByClass(MapRLanguage.class); job.setMapperClass(RMapper.class); job.setOutputKeyClass(NullWritable.class);//和setMapOutputKeyClass的区别?也许是没有reduce的时候指的是一样的,有reduce的时候是只reduce的输出? job.setOutputValueClass(NullWritable.class);//同上 FileInputFormat.addInputPath(job,new Path(currentDir)); FileOutputFormat.setOutputPath(job,new Path(currentoutDir)); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
然而问题并没有解决,我检索github(https://github.com/RevolutionAnalytics/RHadoop/wiki/user%3Erhbase%3EHome)中,
有句话引起了我的注意,This package was built and tested using Thrift 0.8
所以我决定把thrift版本回退到0.8。
进入thrift0.9然后执行
make uninstall
进入thrift0.8的目录,按github的指引操作,make遇到了一点错误,如下
我检索了下,的确没有该文件。好在它是和ruby相关的东西。我也不懂他要干嘛,索性
make clean ./configure --without-ruby
其它按github的流程都能进行下去,也不过多累述了。
但是换完版本测试,发现问题居然还在~~~
不过我已经想好对策了,数据抽取部分,还是暂时放弃rhbase好了,直接Map中抽取完数据传递给R,虽然也许更麻烦,性能或许会变得更差些,但是前进的道路还得继续!
相关文章推荐
- Activity 重载方法 onStart和onResume、onPause和onStop的区别
- 最近有个需求,就是把某个网址跳转到另外一个网址
- 编译安装LNMP(在centos6.4系统下)
- Linux下黑色屏幕中蓝色字体颜色 修改方法
- CentOS7安装pidgin-lwqq
- linux 多线程那点事
- 尝试制作在A20上(CB2)自己的Linux系统
- CentOS 6.6配置安装(Apache+PHP5+MySQL)LAMP服务器
- Linux定时器
- openlayers 注册事件例子
- Linux中history用法15例
- ubuntu下的环境变量
- shell 常用命令
- 取消进程释放资源
- Centos 6 克隆导致网卡eth0变成eth1、及修改网卡名的方法
- vbs经典回顾之[键盘输入方法]wshShell.SendKeys
- Looper、Message、Handler
- 学习网页网站
- CentOs怎么开机直接进入命令行模式
- Linux(centos)的常用基本命令