您的位置:首页 > 运维架构

Hadoop Map Reduce的Counter数量超过默认值120的解决方案

2015-06-29 19:43 387 查看
前一篇Hadoop的文章《Hadoop Map Reduce 限制counter的默认数量120》提到的一个MapReduce的Counter数量超过默认值120待解决方案,今天终于有空去实现了,实现方法如下:

使用类MultipleOutputs的对象write方法在reduce中进行统计计算,MultipleOutputs的使用可参考博文《hadoop文件输出控制,多路径输出到不同文件》.

直接上有注释的代码:

public class UserProfileListCreditJob {

private static Logger logger = LoggerFactory.getLogger(UserProfileListCreditJob.class);

private final static String PROPERTIES = "hbasezookeeper.properties";

private final static String MONITOR_PATH = "monitor";

/**
* Mapper
*/
public static class UserProfileListCreditMapper extends TableMapper<Text, LongWritable> {

private UserProfileListCreditExtractor userProfileListCreditExtractor;
private Text outValue = new Text();

@Override
protected void setup(Context context) throws IOException, InterruptedException{
userProfileListCreditExtractor = UserProfileListCreditExtractor.getInstance();
super.setup(context);
}

@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
Map<String, Long> counters = new HashMap<String,Long>();
String result = userProfileListCreditExtractor.extractFormat(context.getConfiguration(), value, counters);
// 将计算结果和Counter结果一股脑 write到Context中
if (result != null && result.length() > 0) {
outValue.set(result);
LongWritable zero = new LongWritable();
context.write(outValue, zero);

Iterator<Map.Entry<String, Long>> it = counters.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, Long> entry = it.next();
Text monitorKey = new Text();
monitorKey.set(entry.getKey());
LongWritable monitorValue = new LongWritable();
monitorValue.set(entry.getValue());

context.write(monitorKey, monitorValue);
}
}

}
}

/**
* Reducer
*/
public static class UserProfileListCreditReducer extends Reducer <Text, LongWritable, Text, NullWritable> {
private MultipleOutputs multipleOutputs;

@Override
protected void setup (Context context) throws IOException, InterruptedException {
// 初始化MultipleOutputs对象
multipleOutputs = new MultipleOutputs(context);

super.setup(context);
}

@Override
protected void reduce (Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException{
if (key.toString().contains(UserProfileListCreditExtractor.DELIMITER)) {
// 计算结果key中一定包含字符串${UserProfileListCreditExtractor.DELIMITER}
// 将计算结果write到Context中
context.write(key, NullWritable.get());
} else {
// Counters key中一定不包含上述字符串,则进行求和计算,
// 最终结果write到multipleOutputs中
long sum = 0L;
for (LongWritable longValues : values) {
sum += longValues.get();
}
multipleOutputs.write(MONITOR_PATH, key, sum);
}
}

@Override
protected void cleanup (Context context) throws IOException, InterruptedException {
multipleOutputs.close();

super.cleanup(context);
}
}

/**
* 创建Hadoop Job
* @param conf Configuration变量
* @return boolean 任务运行是否完成
* @throws java.io.IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static boolean createJob(Configuration conf)
throws IOException, ClassNotFoundException, InterruptedException{
String tableName = conf.get("table");
String output = conf.get("output");

// Delete output file
final FileSystem fs = FileSystem.get(conf);
if (conf.getBoolean("cleanup-output", true)) {
fs.delete(new Path(output), true);
}

InputStream is = UserProfileListCreditJob.class.getClassLoader().getResourceAsStream(PROPERTIES);
String hbaseCluster = "";
if (is != null) {
try {
BufferedReader in = new BufferedReader(new InputStreamReader(is));
String line = in.readLine();
String[] props = line.split("=");
if (props.length == 2) {
hbaseCluster = props[1];
}
} catch (IOException e) {
logger.error("Could not set hbase.zookeeper.quorum");
System.exit(2);
}
}

if (hbaseCluster.length() > 0) {
conf = NameService.resolve(hbaseCluster).createClusterConf(conf);
}
// check hbase table exist or not
if (!HTable.isTableEnabled(conf, tableName)) {
logger.error("Hbase table: " + tableName + " does not exist!");
System.exit(2);
}

MicloudMRJob job = MicloudMRJob.getInstance(conf, "User Profile List Credit Job: " + tableName);
job.setJarByClass(UserProfileListCreditJob.class);
job.setMapperClass(UserProfileListCreditMapper.class);
job.setReducerClass(UserProfileListCreditReducer.class);
job.setNumReduceTasks(10);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);

Scan scan = new Scan();
scan.setCaching(100);
scan.setCacheBlocks(false);

TableMapReduceUtil.initTableMapperJob(tableName, scan, UserProfileListCreditMapper.class, Text.class, LongWritable.class, job);
MultipleInputs.addInputPath(job, new Path("dummy"), TableInputFormat.class, UserProfileListCreditMapper.class);

FileOutputFormat.setOutputPath(job, new Path(output));
// 注册添加NameOutput--counters计算结果存放文件路径,注意此处要使用
// org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class
MultipleOutputs.addNamedOutput(job, MONITOR_PATH, org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class, Text.class, LongWritable.class);

return job.waitForCompletion(true);

}

/**
* 运行入口
* @param args args
* @throws java.io.IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{
Configuration conf = new Configuration();
new GenericOptionsParser(conf, args).getRemainingArgs();

// table : HBase扫描的表名
if (StringUtils.isBlank(conf.get("table"))) {
logger.error("jvm args: -Dtable must be specified!");
System.exit(2);
}

// output : 结果文件输出路径
if (StringUtils.isBlank(conf.get("output"))) {
logger.error("jvm args: -Doutput must be specified!");
System.exit(2);
}

if (!createJob(conf)) {
System.exit(-1);
}
}
}


计算结果文件如下:

monitor-r-00008 monitor-r-00009 part-r-00000 part-r-00001 part-r-00002 part-r-00003 part-r-00004 part-r-00005 part-r-00006 part-r-00007 part-r-00008 part-r-00009 _SUCCESS
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: