Hadoop Map Reduce的Counter数量超过默认值120的解决方案
2015-06-29 19:43
387 查看
前一篇Hadoop的文章《Hadoop Map Reduce 限制counter的默认数量120》提到的一个MapReduce的Counter数量超过默认值120待解决方案,今天终于有空去实现了,实现方法如下:
使用类MultipleOutputs的对象write方法在reduce中进行统计计算,MultipleOutputs的使用可参考博文《hadoop文件输出控制,多路径输出到不同文件》.
直接上有注释的代码:
计算结果文件如下:
monitor-r-00008 monitor-r-00009 part-r-00000 part-r-00001 part-r-00002 part-r-00003 part-r-00004 part-r-00005 part-r-00006 part-r-00007 part-r-00008 part-r-00009 _SUCCESS
使用类MultipleOutputs的对象write方法在reduce中进行统计计算,MultipleOutputs的使用可参考博文《hadoop文件输出控制,多路径输出到不同文件》.
直接上有注释的代码:
public class UserProfileListCreditJob { private static Logger logger = LoggerFactory.getLogger(UserProfileListCreditJob.class); private final static String PROPERTIES = "hbasezookeeper.properties"; private final static String MONITOR_PATH = "monitor"; /** * Mapper */ public static class UserProfileListCreditMapper extends TableMapper<Text, LongWritable> { private UserProfileListCreditExtractor userProfileListCreditExtractor; private Text outValue = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException{ userProfileListCreditExtractor = UserProfileListCreditExtractor.getInstance(); super.setup(context); } @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { Map<String, Long> counters = new HashMap<String,Long>(); String result = userProfileListCreditExtractor.extractFormat(context.getConfiguration(), value, counters); // 将计算结果和Counter结果一股脑 write到Context中 if (result != null && result.length() > 0) { outValue.set(result); LongWritable zero = new LongWritable(); context.write(outValue, zero); Iterator<Map.Entry<String, Long>> it = counters.entrySet().iterator(); while (it.hasNext()) { Map.Entry<String, Long> entry = it.next(); Text monitorKey = new Text(); monitorKey.set(entry.getKey()); LongWritable monitorValue = new LongWritable(); monitorValue.set(entry.getValue()); context.write(monitorKey, monitorValue); } } } } /** * Reducer */ public static class UserProfileListCreditReducer extends Reducer <Text, LongWritable, Text, NullWritable> { private MultipleOutputs multipleOutputs; @Override protected void setup (Context context) throws IOException, InterruptedException { // 初始化MultipleOutputs对象 multipleOutputs = new MultipleOutputs(context); super.setup(context); } @Override protected void reduce (Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException{ if (key.toString().contains(UserProfileListCreditExtractor.DELIMITER)) { // 计算结果key中一定包含字符串${UserProfileListCreditExtractor.DELIMITER} // 将计算结果write到Context中 context.write(key, NullWritable.get()); } else { // Counters key中一定不包含上述字符串,则进行求和计算, // 最终结果write到multipleOutputs中 long sum = 0L; for (LongWritable longValues : values) { sum += longValues.get(); } multipleOutputs.write(MONITOR_PATH, key, sum); } } @Override protected void cleanup (Context context) throws IOException, InterruptedException { multipleOutputs.close(); super.cleanup(context); } } /** * 创建Hadoop Job * @param conf Configuration变量 * @return boolean 任务运行是否完成 * @throws java.io.IOException * @throws InterruptedException * @throws ClassNotFoundException */ public static boolean createJob(Configuration conf) throws IOException, ClassNotFoundException, InterruptedException{ String tableName = conf.get("table"); String output = conf.get("output"); // Delete output file final FileSystem fs = FileSystem.get(conf); if (conf.getBoolean("cleanup-output", true)) { fs.delete(new Path(output), true); } InputStream is = UserProfileListCreditJob.class.getClassLoader().getResourceAsStream(PROPERTIES); String hbaseCluster = ""; if (is != null) { try { BufferedReader in = new BufferedReader(new InputStreamReader(is)); String line = in.readLine(); String[] props = line.split("="); if (props.length == 2) { hbaseCluster = props[1]; } } catch (IOException e) { logger.error("Could not set hbase.zookeeper.quorum"); System.exit(2); } } if (hbaseCluster.length() > 0) { conf = NameService.resolve(hbaseCluster).createClusterConf(conf); } // check hbase table exist or not if (!HTable.isTableEnabled(conf, tableName)) { logger.error("Hbase table: " + tableName + " does not exist!"); System.exit(2); } MicloudMRJob job = MicloudMRJob.getInstance(conf, "User Profile List Credit Job: " + tableName); job.setJarByClass(UserProfileListCreditJob.class); job.setMapperClass(UserProfileListCreditMapper.class); job.setReducerClass(UserProfileListCreditReducer.class); job.setNumReduceTasks(10); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); Scan scan = new Scan(); scan.setCaching(100); scan.setCacheBlocks(false); TableMapReduceUtil.initTableMapperJob(tableName, scan, UserProfileListCreditMapper.class, Text.class, LongWritable.class, job); MultipleInputs.addInputPath(job, new Path("dummy"), TableInputFormat.class, UserProfileListCreditMapper.class); FileOutputFormat.setOutputPath(job, new Path(output)); // 注册添加NameOutput--counters计算结果存放文件路径,注意此处要使用 // org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class MultipleOutputs.addNamedOutput(job, MONITOR_PATH, org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class, Text.class, LongWritable.class); return job.waitForCompletion(true); } /** * 运行入口 * @param args args * @throws java.io.IOException * @throws InterruptedException * @throws ClassNotFoundException */ public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{ Configuration conf = new Configuration(); new GenericOptionsParser(conf, args).getRemainingArgs(); // table : HBase扫描的表名 if (StringUtils.isBlank(conf.get("table"))) { logger.error("jvm args: -Dtable must be specified!"); System.exit(2); } // output : 结果文件输出路径 if (StringUtils.isBlank(conf.get("output"))) { logger.error("jvm args: -Doutput must be specified!"); System.exit(2); } if (!createJob(conf)) { System.exit(-1); } } }
计算结果文件如下:
monitor-r-00008 monitor-r-00009 part-r-00000 part-r-00001 part-r-00002 part-r-00003 part-r-00004 part-r-00005 part-r-00006 part-r-00007 part-r-00008 part-r-00009 _SUCCESS
相关文章推荐
- 网站变为黑白
- Linux内存管理--逻辑层面分析
- linux 使用expect
- Linux学习笔记之RedHat Enterprise Linux 6.4 使用 Centos 6 的yum源问题
- Linux下CMake使用介绍
- Linux下CMake使用介绍
- linux操作系统-两台linux服务器SSH免密码登录
- Linux时间同步
- csu - 1538: Shopping (贪心)
- tomcat6.0 配置不同端口访问不同项目
- linux下查看和添加PATH环境变量
- inet_pton, inet_ntop
- Apache+Tomcat集群配置
- Linux学习笔记(九)
- 【图文教程】de4dot实战字符串解密(演示:hishop微分销系统)
- linux C判断文件是否存在
- linux基础命令
- 导入Opencv自带Sample项目 face-detection 报错
- How To Set Up a Private Docker Registry on Ubuntu 14.04
- Linux下找不到IFCONFIG命令---ifconfig