您的位置:首页 > 运维架构

Hadoop 2.5.1学习笔记7: 计数器的使用

2014-11-17 00:00 387 查看
因为有的字段无法爬到标签,那么需要评估标签的缺失对整个数据分析的影响。

代码很简单,比上一个例子要简单很多,直接拿来改改就可以了。

----------------------------------------------------------------------------

package com.dew.counter;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.MongoClient;
import com.mongodb.ServerAddress;

public class PullMongoDB extends Configured implements Tool {

@Override
public int run(String[] args) throws Exception {
if (null == args || args.length < 4) {
return 0;
}
List list = new ArrayList();
String[] array = args[0].split(":");
list.add(new ServerAddress(array[0], Integer.parseInt(array[1])));
MongoClient mongoClient = new MongoClient(list);
DB database = mongoClient.getDB("" + array[2]);
DBCollection collection = database.getCollection("" + array[3]);

//
BasicDBObject query = new BasicDBObject();
query.put("pkg", new BasicDBObject("$exists", true));
query.put("tags", new BasicDBObject("$exists", true));
BasicDBObject fields = new BasicDBObject();
fields.put("pkg", 1);
fields.put("tags", 1);

//write hdfs
Configuration conf = new Configuration();
FileSystem hdfs = FileSystem.get(conf);
FSDataOutputStream outHandler = hdfs.create(new Path("" + args[1]));

//write
DBCursor cursor = collection.find(query, fields);
while (cursor.hasNext()) {
BasicDBObject record = (BasicDBObject) cursor.next();
String pkg = record.getString("pkg");
ArrayList<String> als = (ArrayList<String>) record.get("tags");
String tags = "";
for (String s : als) {
tags += " " + s.trim();
}
tags = tags.trim();
String finalString = pkg + "\t" + tags + System.getProperty("line.separator");
outHandler.write(finalString.getBytes("UTF8"));
}

//remove handle
outHandler.close();
cursor.close();
mongoClient.close();
return 0;
}

}

package com.dew.counter;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.mongodb.BasicDBObject;

public class TagCounter extends Configured implements Tool {
private static String PREFIX = "";
private static String NULL = "Null";
private static String ZERO = "Zero";
private static String HIT = "Hit";

// map
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {

private Hashtable<String, String> joinData = new Hashtable<String, String>();

private void readFile(String file) {
BufferedReader joinReader = null;
String line = null;
try {
joinReader = new BufferedReader(new FileReader(file));
while ((line = joinReader.readLine()) != null) {
String[] array = line.split("\t");
if (null == array || array.length < 2)
continue;

String pkg = array[0];
if (null == pkg || pkg.length() <= 0)
continue;

String tagStr = array[1];
if (null == tagStr)
continue;
tagStr = tagStr.trim();
if (tagStr.length() <= 0)
continue;
joinData.put(pkg, tagStr);
System.out.println("[map,setup] " + pkg + "  |  " + tagStr);
}
} catch (Exception e) {
// XXX
System.out
.println("--------------------------------------------\n"
+ e.toString());
} finally {
if (null != joinReader)
try {
joinReader.close();
} catch (IOException e) {
e.printStackTrace();
}
}

}

protected void setup(Context context) throws java.io.IOException,
java.lang.InterruptedException {
try {
// Configuration conf = context.getConfiguration();
URI[] cacheFiles = context.getCacheFiles();
if (null != cacheFiles && cacheFiles.length > 0) {

readFile(cacheFiles[0].getPath().toString());

}
} catch (IOException e) {
// xxx
}
}

public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

// key neglected
if (null == value)
return;
String content = value.toString();
if (null == content || content.trim().length() == 0)
return;
// split
String[] strArray = content.split("\t");
if (null == strArray || strArray.length < 29)
return;
String sender = strArray[12].trim();
String receiver = strArray[14].trim();
String pkg = strArray[28].trim();
if (null == sender || sender.length() == 0 || null == receiver
|| receiver.length() == 0 || null == pkg
|| pkg.length() == 0) {
return;
}
String tags = this.joinData.get(pkg);
if (null == tags) {
Counter c = context.getCounter(PREFIX, NULL);
c.increment(1);
} else if (tags.trim().length() == 0) {
Counter c = context.getCounter(PREFIX, ZERO);
c.increment(1);
} else {
Counter c = context.getCounter(PREFIX, HIT);
c.increment(1);
}
// okay,output it

// context.write(new Text(sender), new Text(tags));
// context.write(new Text(receiver), new Text(tags));
}
}

@Override
public int run(String[] args) throws Exception {

Configuration conf = getConf();
Job job = new Job(conf, "ComputeProfileHDFSPlusMongoDB");

// add distributed file
job.addCacheFile(new Path(args[1]).toUri());
// DistributedCache.addCacheFile(new Path(args[1]).toUri(),
// job.getConfiguration());

// prepare
FileInputFormat.setInputPaths(job, new Path(args[2]));
FileOutputFormat.setOutputPath(job, new Path(args[3]));
// FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setJobName("TagCounter");
job.setJarByClass(TagCounter.class);
job.setMapperClass(MapClass.class);
// job.setCombinerClass(Combiner.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(NullOutputFormat.class);

// execute
int exitCode = job.waitForCompletion(true) ? 0 : 1;
try {
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path(args[1]));
fs.delete(new Path(args[3]));
} catch (Exception e) {

}
return exitCode;
}

public static String[] args = null;

public static void main(String[] a) throws Exception {
args = a;

int res;
res = ToolRunner.run(new Configuration(), new PullMongoDB(), a);

res = ToolRunner.run(new Configuration(), new TagCounter(), args);
System.exit(res);
}

}


很简单,没啥好说的。

结果截图如下:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Hadoop 计数器