Hadoop,往map/reduce中传值的问题解决方法实例
2017-01-10 14:35
459 查看
Hadoop,往map/reduce中传值的问题解决方法实例
最近在看一些map/reduce的程序,其中遇到一个问题:就是在类中定义的属性无法被mapreduce程序直接获取。具体代码如下
public class KeyJob { public static class myMap extends Mapper<LongWritable, Text, Text, IntWritable 4000 > { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private final static List<String> target_words = new ArrayList<String>(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] items = value.toString().split(","); for (String item : items) { if (target_words.contains(item)) { word.set(item); context.write(word, one); } } } public static void add(String word) { target_words.add(word); for (String s : target_words) { System.out.println(s); } System.out.println("------"); } } public static class myReduce extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { String job_name="keywordcount"; Configuration conf = new Configuration(); conf.set("fs.default.name","hdfs://bigdata:9000"); conf.set ("mapred.job.tracker", "bigdata:9001"); Scanner scanner=new Scanner(System.in); String str=scanner.next(); // Add to target String[] target_words = str.split(","); for (String word : target_words) { myMap.add(word.toLowerCase()); } Job job = new Job(conf, job_name); job.setJarByClass(KeyJob.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(myMap.class); job.setReducerClass(myReduce.class); FileInputFormat.addInputPath(job, new Path("/user/hadoop/input/"+job_name+"/")); FileOutputFormat.setOutputPath(job, new Path("/user/hadoop/out/"+job_name)); job.waitForCompletion(true); } }
如果说按java代码的逻辑去理解是没有问题的
然而一个奇怪的坑就出现了
这里的target_words是没有值的,导致程序不会进入if语句里面,最后我们想查找的单词输出为空。而我们明明是赋过值的
查阅资料发现,原来map reduce是无法直接读取到target_words里面的值的。因为执行的main函数是在作业发布的客户端的JVM进程里对target_words赋的值,而你要取的target_words值则是在另外一个JVM里,即TASK运行的MAP这JVM进程里,所以这个值无法传递过去。
解决办法
通过 Configuration 来传递参数在main函数中将要传递的值通过conf 来 set进去
然后在执行函数里面将set的值读取出来
问题解决
源码
以下是完整源码,笔者无偿共享,笔者被这个问题坑了一整天,希望大家多多交流,不要犯类似的问题。package com.zlf.job; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import com.zlf.util.HdUtil; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Scanner; public class Run { private static List<String> target_words = new ArrayList<String>(); public static class WordMap extends Mapper<LongWritable, Text, Text, IntWritable> { private final IntWritable one = new IntWritable(1); private Text word = new Text(); protected void setup( Context context) throws IOException, InterruptedException { /** 通过conf把传入的值取出来*/ Configuration conf=context.getConfiguration(); String keywords=conf.get("keywords"); String[] key_words=keywords.split(","); for ( String word : key_words) { target_words.add(word); System.out.println(word); } }; protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException { String[] items = value.toString().split(","); /** 手动添加list内容 */ // target_words.add("what"); // target_words.add("do"); // target_words.add("to"); for (String item : items) { /** 测试单词统计功能 */ // word.set(target_words.get(1));//不手动赋值发现get第一个元素 // 会出现数组越界--说明target_words里面是没有值的 // context.write(word,one ); /** 测试contains关键词统计功能 */ if (target_words.contains(item)) { System.out.println(item); word.set(item); context.write(word, one); } } } } public static class Myreduce extends Reducer<Text, IntWritable, Text, IntWritable> { protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { String job_name = "keywordcount"; boolean flag = HdUtil.removeDir("/user/hadoop/out/" + job_name); System.out.println(flag); Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://bigdata:9000"); conf.set("mapred.job.tracker", "bigdata:9001"); if (args.length < 1) { System.out .println("Usage: wordcount <input_path> <output_path> <keyword_list>"); return; } /** 控制台进行输入 */ Scanner scanner = new Scanner(System.in); String str = scanner.next(); // String[] target_words = str.split(","); // for (String word : target_words) { // add(word.toLowerCase()); // } conf.set("keywords", str);//将输入的值存入conf中,以备map取用 /** 命令行进行输入 */ // String[] target_words = args[0].split(","); // for (String word : target_words) { // add(word.toLowerCase()); // } Job job = new Job(conf, job_name); job.setJarByClass(Run.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(WordMap.class); job.setReducerClass(Myreduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path("/user/hadoop/input/" + job_name + "/")); FileOutputFormat.setOutputPath(job, new Path("/user/hadoop/out/" + job_name)); job.waitForCompletion(true); } }
以上是对指定关键词进行统计的程序。
原数据文件
There,are,moments,in,life,when,you,miss,someone,so,much,that you,just,want,to,pick,them,from,your,dreams,and,hug,them for,real!,Dream,what,you,want,to,dream;go,where,you,want to,go;be,what,you,want,to,be,because,you,have,only,one,life and,one c935 ,chance,to,do,all,the,things,you,want,to,do
运行日志文件
文件夹是否存在:true true you,do,to,and,what 17/01/10 14:21:12 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 17/01/10 14:21:12 INFO input.FileInputFormat: Total input paths to process : 1 17/01/10 14:21:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/01/10 14:21:12 WARN snappy.LoadSnappy: Snappy native library not loaded 17/01/10 14:21:12 INFO mapred.JobClient: Running job: job_201701100106_0025 17/01/10 14:21:13 INFO mapred.JobClient: map 0% reduce 0% 17/01/10 14:21:17 INFO mapred.JobClient: map 100% reduce 0% 17/01/10 14:21:24 INFO mapred.JobClient: map 100% reduce 33% 17/01/10 14:21:26 INFO mapred.JobClient: map 100% reduce 100% 17/01/10 14:21:26 INFO mapred.JobClient: Job complete: job_201701100106_0025 17/01/10 14:21:26 INFO mapred.JobClient: Counters: 26 17/01/10 14:21:26 INFO mapred.JobClient: Job Counters 17/01/10 14:21:26 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=4622 17/01/10 14:21:26 INFO mapred.JobClient: Launched reduce tasks=1 17/01/10 14:21:26 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 17/01/10 14:21:26 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 17/01/10 14:21:26 INFO mapred.JobClient: Launched map tasks=1 17/01/10 14:21:26 INFO mapred.JobClient: Data-local map tasks=1 17/01/10 14:21:26 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=8716 17/01/10 14:21:26 INFO mapred.JobClient: FileSystemCounters 17/01/10 14:21:26 INFO mapred.JobClient: FILE_BYTES_READ=190 17/01/10 14:21:26 INFO mapred.JobClient: HDFS_BYTES_READ=416 17/01/10 14:21:26 INFO mapred.JobClient: FILE_BYTES_WRITTEN=116939 17/01/10 14:21:26 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=29 17/01/10 14:21:26 INFO mapred.JobClient: Map-Reduce Framework 17/01/10 14:21:26 INFO mapred.JobClient: Map input records=5 17/01/10 14:21:26 INFO mapred.JobClient: Reduce shuffle bytes=190 17/01/10 14:21:26 INFO mapred.JobClient: Spilled Records=38 17/01/10 14:21:26 INFO mapred.JobClient: Map output bytes=146 17/01/10 14:21:26 INFO mapred.JobClient: Total committed heap usage (bytes)=160501760 17/01/10 14:21:26 INFO mapred.JobClient: CPU time spent (ms)=1020 17/01/10 14:21:26 INFO mapred.JobClient: Combine input records=0 17/01/10 14:21:26 INFO mapred.JobClient: SPLIT_RAW_BYTES=127 17/01/10 14:21:26 INFO mapred.JobClient: Reduce input records=19 17/01/10 14:21:26 INFO mapred.JobClient: Reduce input groups=5 17/01/10 14:21:26 INFO mapred.JobClient: Combine output records=0 17/01/10 14:21:26 INFO mapred.JobClient: Physical memory (bytes) snapshot=248950784 17/01/10 14:21:26 INFO mapred.JobClient: Reduce output records=5 17/01/10 14:21:26 INFO mapred.JobClient: Virtual memory (bytes) snapshot=3875409920 17/01/10 14:21:26 INFO mapred.JobClient: Map output records=19
输出结果文件
and 2 do 2 to 6 what 2 you 7
本博客参考了 数据手艺人的博客
http://www.cnblogs.com/zhengrunjian/p/4536572.html
相关文章推荐
- hadoop问题Type mismatch in value from map解决方法
- hadoop问题Type mismatch in value from map解决方法
- Windows下Cygwin环境的Hadoop安装(3)- 运行hadoop中的wordcount实例遇到的问题和解决方法
- Windows下Cygwin环境的Hadoop安装(3)- 运行hadoop中的wordcount实例遇到的问题和解决方法
- eclipse下的plugins导入hadoop-eclipse-plugin-2.7.1.jar,Preference下没有hadoop Map/Reduce的解决方法
- hadoop问题Type mismatch in value from map解决方法
- 【hadoop2.2(yarn)】基于yarn成功执行分布式map-reduce,记录问题解决过程。
- Windows下Cygwin环境的Hadoop安装(3)- 运行hadoop中的wordcount实例遇到的问题和解决方法
- isnan和hash_map使用问题解决方法
- 实例讲解hadoop中的map/reduce查询(python语言实现)
- Hadoop跑map-reduce任务时停滞不前的问题
- 多线程操作stl::map引起的数据不一致问题的解决过程与方法
- Hadoop使用常见问题以及解决方法
- Hadoop使用常见问题以及解决方法
- MySQL死锁问题实例分析及解决方法
- Hadoop关于处理大量小文件的问题和解决方法
- Hadoop使用常见问题以及解决方法(转载)
- Hadoop使用常见问题以及解决方法
- Hadoop常见问题已经解决方法
- Hadoop跑map-reduce任务时停滞不前的问题(二)