Hadoop使用DATAJOIN软件包链接不同来源的数据
2016-03-24 22:26
399 查看
具体参见《Hadoop in action》
这里说一下几个问题:这几个问题在stackoverflow 得到了解决
(1)如何输入多个文件
将多个文件放入一个文件夹,输入路径写文件夹的路径
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class,MapClass.class);
(2)TaggedWritable要定义一个无参的构造函数,后面reduce反射的时候会用到
(3)调用data.readFields的时候,data有可能是空,而且并不知道data的类型,所以在TaggedWritable的write方法序列化data之前,保存一下data的类名,然后在readFields检查。
代码如下:
这里说一下几个问题:这几个问题在stackoverflow 得到了解决
(1)如何输入多个文件
将多个文件放入一个文件夹,输入路径写文件夹的路径
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class,MapClass.class);
(2)TaggedWritable要定义一个无参的构造函数,后面reduce反射的时候会用到
(3)调用data.readFields的时候,data有可能是空,而且并不知道data的类型,所以在TaggedWritable的write方法序列化data之前,保存一下data的类名,然后在readFields检查。
代码如下:
package Chapter5; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Set; import javax.lang.model.SourceVersion; import javax.print.DocFlavor.STRING; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase; import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.util.EnumCounters.Map; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapred.lib.MultipleInputs; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class DataJoin extends Configured implements Tool { public static class TaggedWritable extends TaggedMapOutput{ private Writable data; public TaggedWritable() { this.tag=new Text(); } public TaggedWritable(Writable data) { this.tag=new Text(""); this.data = data; } public void readFields(DataInput in) throws IOException { this.tag.readFields(in); String dataClz = in.readUTF(); if (this.data == null || !this.data.getClass().getName().equals(dataClz)) { try { this.data = (Writable) ReflectionUtils.newInstance(Class.forName(dataClz), null); } catch (ClassNotFoundException e) { e.printStackTrace(); } } this.data.readFields(in); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub this.tag.write(out); out.writeUTF(this.data.getClass().getName()); this.data.write(out); } @Override public Writable getData() { // TODO Auto-generated method stub return data; } public void setData(Writable data){ this.data=data; } } public static class MapClass extends DataJoinMapperBase{ @Override protected Text generateGroupKey(TaggedMapOutput arg0) { // TODO Auto-generated method stub String line=((Text)arg0.getData()).toString(); String tokens []=line.split(","); return new Text(tokens[0]); } @Override protected Text generateInputTag(String arg0) { // TODO Auto-generated method stub String datasource=arg0.split("-")[0]; return new Text(datasource); } @Override protected TaggedMapOutput generateTaggedMapOutput(Object arg0) { // TODO Auto-generated method stub TaggedMapOutput res=new TaggedWritable((Text)arg0); res.setTag(this.inputTag); return res; } } public static class Reduce extends DataJoinReducerBase{ @Override protected TaggedMapOutput combine(Object[] tags, Object[] values) { // TODO Auto-generated method stub if(tags.length<2)return null; String res=""; for(int i=0;i<values.length;i++){ if(i>0)res+=","; TaggedWritable tmp=(TaggedWritable)values[i]; String line=((Text)tmp.getData()).toString(); String tokens[]=line.split(",",2); res+=tokens[1]; } TaggedWritable retv=new TaggedWritable(new Text(res)); retv.setTag((Text)tags[0]); return retv; } } public int run(String[] args) throws Exception{ // TODO Auto-generated method stub Configuration configuration=getConf(); JobConf job=new JobConf(configuration,DataJoin.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setJobName("DataJoin"); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TaggedWritable.class); job.set("mapred.textoutputformat.separator", ","); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception{ // TODO Auto-generated method stub int res=ToolRunner.run(new Configuration(), new DataJoin(), args); System.exit(res); } }
相关文章推荐
- centos7上配置nexus(坑)
- sublime cscope使用方法
- Mac配置openfire(为xmpp准备)
- [OpenGL] 桌子的平移、旋转和缩放
- Toxophily
- Toxophily
- tomcat 创造虚拟目录(文件服务器)
- AWS邮件通知服务:实时监控邮件状态
- Convex Optimization
- 四大命令助你轻松管理Linux进程
- 常用学习网站
- 解决Xcode 9.3系统真机测试时出现 could not find developer disk image问题
- apache 详细参数命令 虚机
- Beaglebone LinuxCNC starterkit: ready-to-run SD card image
- linux帐号管理
- Nginx 运行 Laravel5.0+
- 20135320赵瀚青LINUX第五周学习笔记
- Centos 6.5下安装图形界面
- linux vi 编辑器的使用
- nginx安装