您的位置:首页 > 运维架构

Hadoop使用DATAJOIN软件包链接不同来源的数据

2016-03-24 22:26 399 查看
具体参见《Hadoop in action》

这里说一下几个问题:这几个问题在stackoverflow 得到了解决

(1)如何输入多个文件

将多个文件放入一个文件夹,输入路径写文件夹的路径

MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class,MapClass.class);

(2)TaggedWritable要定义一个无参的构造函数,后面reduce反射的时候会用到

(3)调用data.readFields的时候,data有可能是空,而且并不知道data的类型,所以在TaggedWritable的write方法序列化data之前,保存一下data的类名,然后在readFields检查。

代码如下:

package Chapter5;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Set;

import javax.lang.model.SourceVersion;
import javax.print.DocFlavor.STRING;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.util.EnumCounters.Map;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.MultipleInputs;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DataJoin extends Configured implements Tool {

public static class TaggedWritable extends TaggedMapOutput{

private Writable data;

public TaggedWritable() {
this.tag=new Text();
}

public TaggedWritable(Writable data) {
this.tag=new Text("");
this.data = data;
}

public void readFields(DataInput in) throws IOException {
this.tag.readFields(in);
String dataClz = in.readUTF();
if (this.data == null  || !this.data.getClass().getName().equals(dataClz)) {
try {
this.data = (Writable) ReflectionUtils.newInstance(Class.forName(dataClz), null);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
this.data.readFields(in);
}

@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub

this.tag.write(out);
out.writeUTF(this.data.getClass().getName());
this.data.write(out);
}

@Override
public Writable getData() {
// TODO Auto-generated method stub
return data;
}
public void setData(Writable data){
this.data=data;
}
}
public static class MapClass extends DataJoinMapperBase{

@Override
protected Text generateGroupKey(TaggedMapOutput arg0) {
// TODO Auto-generated method stub

String line=((Text)arg0.getData()).toString();
String tokens []=line.split(",");
return new Text(tokens[0]);
}

@Override
protected Text generateInputTag(String arg0) {
// TODO Auto-generated method stub

String datasource=arg0.split("-")[0];
return new Text(datasource);
}

@Override
protected TaggedMapOutput generateTaggedMapOutput(Object arg0) {
// TODO Auto-generated method stub
TaggedMapOutput res=new TaggedWritable((Text)arg0);
res.setTag(this.inputTag);
return res;
}

}

public static class Reduce extends DataJoinReducerBase{

@Override
protected TaggedMapOutput combine(Object[] tags, Object[] values) {
// TODO Auto-generated method stub

if(tags.length<2)return null;

String res="";
for(int i=0;i<values.length;i++){
if(i>0)res+=",";
TaggedWritable tmp=(TaggedWritable)values[i];
String line=((Text)tmp.getData()).toString();
String tokens[]=line.split(",",2);
res+=tokens[1];
}
TaggedWritable retv=new TaggedWritable(new Text(res));
retv.setTag((Text)tags[0]);
return retv;
}

}
public int run(String[] args) throws Exception{
// TODO Auto-generated method stub
Configuration configuration=getConf();
JobConf job=new JobConf(configuration,DataJoin.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setJobName("DataJoin");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TaggedWritable.class);
job.set("mapred.textoutputformat.separator", ",");
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
int res=ToolRunner.run(new Configuration(), new DataJoin(), args);
System.exit(res);
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: