您的位置:首页 > 其它

HBase(十一):HBaseAndMapReduce小案例总结

2016-05-21 00:00 197 查看
摘要: 将多个文件的倒排索引结果内容写入HBase中。

一:HBaseAndMapReduce 小案列总结:

在HDFS某目录文件下有多个文件内容,将这些多个文件内容中的数据通过倒排索引后将结果写入到HBase某张表中,代码如下:

1.InvertedIndexMapper

public class InvertedIndexMapper extends Mapper<Object, Text, Text, Text>{

private Text keyInfo = new Text();  // 存储单词和URI的组合
private Text valueInfo = new Text(); //存储词频
private FileSplit split;  // 存储split对象。

@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)
throws IOException, InterruptedException {

System.out.println(  "key-->: " +key + "\n value --> : "+value );
//获得<key,value>对所属的FileSplit对象。
split = (FileSplit) context.getInputSplit();
System.out.println(  "split---> "+split.toString() );
//System.out.println("value.tostring()---> "+  value.toString() );
StringTokenizer itr = new StringTokenizer( value.toString());

while( itr.hasMoreTokens() ){
// key值由单词和URI组成。
keyInfo.set( itr.nextToken()+":"+split.getPath().toString());
//System.out.println("split.getPath().toString() --> "+  split.getPath().toString() );
//词频初始为1
valueInfo.set("1");
context.write(keyInfo, valueInfo);
}
}
}

2.InvertedIndexCombiner

public class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text>{

private Text info = new Text();

@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
//统计词频
int sum = 0;
for (Text value : values) {
sum += Integer.parseInt(value.toString() );
}
int splitIndex = key.toString().indexOf(":");
//重新设置value值由URI和词频组成
info.set( key.toString().substring( splitIndex + 1) +":"+sum );

//重新设置key值为单词
key.set( key.toString().substring(0,splitIndex));
context.write(key, info);
}
}

3.InvertedIndexReducer

public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>{

private Text result = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {

//生成文档列表
String fileList = new String();
for (Text value : values) {
fileList += value.toString()+";";
}
result.set(fileList);
context.write(key, result);
}
}

4.HBaseAndInvertedIndex

public class HBaseAndInvertedIndex {

private static Path outPath;

public static void main(String[] args) throws Exception {
run();
System.out.println( "\n\n************************");
runHBase();
}

public static void run() throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"Hadoop-InvertedIndex");

job.setJarByClass(HBaseAndInvertedIndex.class);

//实现map函数,根据输入的<key,value>对生成中间结果。
job.setMapperClass(InvertedIndexMapper.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setCombinerClass(InvertedIndexCombiner.class);
job.setReducerClass(InvertedIndexReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path("hdfs://192.168.226.129:9000/txt/invertedindex/"));
DateFormat df = new SimpleDateFormat( "yyyyMMddHHmmssS" );
String filename = df.format( new Date() );
outPath = new Path("hdfs://192.168.226.129:9000/rootdir/invertedindexhbase/result/"+filename+"/");
FileOutputFormat.setOutputPath(job, outPath);

int result = job.waitForCompletion(true) ? 0 : 1;

}

public static void runHBase() throws Exception {
Configuration conf = new Configuration();
conf = HBaseConfiguration.create(conf);
conf.set("hbase.zookeeper.quorum", "192.168.226.129");

Job job = Job.getInstance(conf, "HBase-InvertedIndex");
job.setJarByClass(HBaseAndInvertedIndex.class);

job.setInputFormatClass(KeyValueTextInputFormat.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

// 把数据写入Hbase数据库
FileInputFormat.addInputPath(job, new Path(outPath.toString()+"/part-r-00000") );
System.out.println( "path---> "+ outPath.toString()  );
TableMapReduceUtil.initTableReducerJob("invertedindex",InvertedIndexHBaseReducer.class, job);
//将数据写入HBase数据库
//首先先检查表是否存在
checkTable(conf);

System.exit( job.waitForCompletion(true) ? 0 : 1 );

}

private static void checkTable(Configuration conf) throws Exception {
Connection con = ConnectionFactory.createConnection(conf);
Admin admin = con.getAdmin();
TableName tn = TableName.valueOf("invertedindex");
if (!admin.tableExists(tn)){
HTableDescriptor htd = new HTableDescriptor(tn);
HColumnDescriptor hcd = new HColumnDescriptor("indexKey");
htd.addFamily(hcd);
admin.createTable(htd);
System.out.println("表不存在,新创建表成功....");
}
}

/**
* 1. 因为map是从hdfs中取数据,因此没有太大变化;而reduce需要输出结果到hbase中,
* 		所以这里继承了TableReduce<keyin,valuein,keyout>,这里没有valueout,
* 			但是规定TableReduce的valueout必须是Put或者Delete实例
*
* 2.ImmutableBytesWritable:它是一个可以用作key或value类型的字节序列,
* */
public static class InvertedIndexHBaseReducer extends TableReducer<Text, Text, ImmutableBytesWritable> {
@Override
protected void reduce(
Text key,
Iterable<Text> values,
Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context)
throws IOException, InterruptedException {
System.out.println(  "key---> " + key.toString()  );
//注意行健参数的书写。
Put put = new Put(key.toString().getBytes());
put.addColumn(Bytes.toBytes( "indexKey" ), Bytes.toBytes("indexUrlWeight"),values.iterator().next().getBytes());
context.write(new ImmutableBytesWritable(key.getBytes()), put);
}
}
}


///原数据目录文件:



invertedindex1.txt

Hello I will Learning Hadoop
HDFS MapReduce
Other I will Learning HBase

invertedindex2.txt :

Hello HBase
MapReduce HDFS


查看结果:scan:

hbase(main):002:0> scan 'invertedindex'
ROW                             COLUMN+CELL
HBase                          column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900
0/txt/invertedindex/invertedindex2.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in
vertedindex1.txt:1;
HDFS                           column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900
0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in
vertedindex2.txt:1;
Hadoop                         column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900
0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in
vertedindex2.txt:1;
Hello                          column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900
0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in
vertedindex2.txt:1;
I                              column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900
0/txt/invertedindex/invertedindex1.txt:2;hdfs://192.168.226.129:9000/txt/invertedindex/in
vertedindex2.txt:1;
Learning                       column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900
0/txt/invertedindex/invertedindex1.txt:2;hdfs://192.168.226.129:9000/txt/invertedindex/in
vertedindex2.txt:1;
MapReduce                      column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900
0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in
vertedindex2.txt:1;
Other                          column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900
0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in
vertedindex2.txt:1;
will                           column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900
0/txt/invertedindex/invertedindex1.txt:2;hdfs://192.168.226.129:9000/txt/invertedindex/in
vertedindex2.txt:1;
9 row(s) in 0.2240 seconds
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: