第一个MapReduce程序----wordcount(编写并运行)
2016-03-17 22:33
519 查看
感谢段海涛老师
先引入common和mapreduce及其依赖的jar包
WordCountMapper
写完之后导出jar包
先生成一个测试的数据,之后上传到hdfs,然后运行,最后查看结果。也可直接在eclipse中run
也可直接在eclipse中run,但要先导出jar包并在WordCountRunner加入这个jar包的位置等信息。还要导入hdfs的jar包及其依赖,不然会报Exception in thread "main" java.io.IOException: No FileSystem for scheme: hdfs
也可本地运行,把yarn目录下的jar包引入后,把WordCountRunner的地址改为本地,直接run就可以了。
先引入common和mapreduce及其依赖的jar包
WordCountMapper
package club.drguo.hadoop.mapreduce; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { //获取到一行文件的内容 String line = value.toString(); //切分一行的内容作为一个单词数组,以空格分隔 String[] words = StringUtils.split(line, " "); //遍历输出<word,1> for(String word : words){ context.write(new Text(word), new LongWritable(1)); } } }WordCountReducer
package club.drguo.hadoop.mapreduce; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ @Override protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { //定义一个累加计数器 long count = 0; for(LongWritable value : values){ count += value.get(); } //输出<单词:count>键值对 context.write(key, new LongWritable(count)); } }WordCountRunner
package club.drguo.hadoop.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 用来描述一个作业JOB(使用哪个mapper类,reducer类,输出文件在哪,输出结果在哪) * 然后提交JOB到hadoop集群 * @author guo * */ //club.drguo.hadoop.mapreduce.WordCountRunner public class WordCountRunner { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); //设置job中的资源所在的jar包(指明main方法所在类) job.setJarByClass(WordCountRunner.class); //job要使用哪个mapper类 job.setMapperClass(WordCountMapper.class); //job要使用哪个reducer类 job.setReducerClass(WordCountReducer.class); //job的mapper类输出的kv数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //job的reducer类输出的kv数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //指定要处理的数据所存放的路径(不是指定某一个文件,而是该路径下的所有文件) FileInputFormat.setInputPaths(job, "hdfs://localhost:9000/data/input"); //先判断一下,如果结果已存在删除 FileSystem fileSystem = FileSystem.get(configuration); Path path = new Path("/data/output/firstmr"); if(fileSystem.exists(path)){ fileSystem.delete(path, true);//true递归删除 } //指定处理结果所存放的路径 FileOutputFormat.setOutputPath(job, path); //显示进度 boolean b = job.waitForCompletion(true); System.out.println(b?"完成":"未完成"); } }
写完之后导出jar包
先生成一个测试的数据,之后上传到hdfs,然后运行,最后查看结果。也可直接在eclipse中run
guo@guo:~$ touch test.log guo@guo:~$ vi test.log guo@guo:~$ vi test.log guo@guo:~$ hdfs dfs -put ./test.log /data/input guo@guo:~$ hdfs dfs ls /data/input ls: Unknown command Did you mean -ls? This command begins with a dash. guo@guo:~$ hdfs dfs -ls /data/input Found 1 items -rw-r--r-- 1 guo supergroup 37 2016-03-17 22:22 /data/input/test.log guo@guo:~$ hadoop jar /home/guo/firstwordcount.jar club.drguo.hadoop.mapreduce.WordCountRunner 16/03/17 22:24:47 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 16/03/17 22:24:47 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 16/03/17 22:24:48 INFO input.FileInputFormat: Total input paths to process : 1 16/03/17 22:24:48 INFO mapreduce.JobSubmitter: number of splits:1 16/03/17 22:24:48 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1458222408729_0002 16/03/17 22:24:48 INFO impl.YarnClientImpl: Submitted application application_1458222408729_0002 16/03/17 22:24:48 INFO mapreduce.Job: The url to track the job: http://guo:8088/proxy/application_1458222408729_0002/ 16/03/17 22:24:48 INFO mapreduce.Job: Running job: job_1458222408729_0002 16/03/17 22:24:52 INFO mapreduce.Job: Job job_1458222408729_0002 running in uber mode : false 16/03/17 22:24:52 INFO mapreduce.Job: map 0% reduce 0% 16/03/17 22:24:55 INFO mapreduce.Job: map 100% reduce 0% 16/03/17 22:24:59 INFO mapreduce.Job: map 100% reduce 100% 16/03/17 22:25:00 INFO mapreduce.Job: Job job_1458222408729_0002 completed successfully 16/03/17 22:25:00 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=71 FILE: Number of bytes written=234977 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=143 HDFS: Number of bytes written=41 HDFS: Number of read operations=6 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Launched reduce tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=1306 Total time spent by all reduces in occupied slots (ms)=1475 Total time spent by all map tasks (ms)=1306 Total time spent by all reduce tasks (ms)=1475 Total vcore-milliseconds taken by all map tasks=1306 Total vcore-milliseconds taken by all reduce tasks=1475 Total megabyte-milliseconds taken by all map tasks=1337344 Total megabyte-milliseconds taken by all reduce tasks=1510400 Map-Reduce Framework Map input records=5 Map output records=3 Map output bytes=59 Map output materialized bytes=71 Input split bytes=106 Combine input records=0 Combine output records=0 Reduce input groups=3 Reduce shuffle bytes=71 Reduce input records=3 Reduce output records=3 Spilled Records=6 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=66 CPU time spent (ms)=790 Physical memory (bytes) snapshot=439631872 Virtual memory (bytes) snapshot=3839033344 Total committed heap usage (bytes)=324009984 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=37 File Output Format Counters Bytes Written=41 完成 guo@guo:~$ hdfs dfs -cat /data/output/firstmr/* hello 2 kitty 1 world 1
也可直接在eclipse中run,但要先导出jar包并在WordCountRunner加入这个jar包的位置等信息。还要导入hdfs的jar包及其依赖,不然会报Exception in thread "main" java.io.IOException: No FileSystem for scheme: hdfs
package club.drguo.hadoop.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 用来描述一个作业JOB(使用哪个mapper类,reducer类,输出文件在哪,输出结果在哪) * 然后提交JOB到hadoop集群 * @author guo * */ //club.drguo.hadoop.mapreduce.WordCountRunner public class WordCountRunner { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); configuration.set("mapreduce.job.jar", "wordcount.jar");//加入这行,我是把jar包直接导在项目中了 Job job = Job.getInstance(configuration); //设置job中的资源所在的jar包(指明main方法所在类) job.setJarByClass(WordCountRunner.class); //job要使用哪个mapper类 job.setMapperClass(WordCountMapper.class); //job要使用哪个reducer类 job.setReducerClass(WordCountReducer.class); //job的mapper类输出的kv数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //job的reducer类输出的kv数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //指定要处理的数据所存放的路径(不是指定某一个文件,而是该路径下的所有文件) FileInputFormat.setInputPaths(job, "hdfs://localhost:9000/data/input"); //先判断一下,如果结果已存在删除 FileSystem fileSystem = FileSystem.get(configuration); Path path = new Path("/data/output/firstmr"); if(fileSystem.exists(path)){ fileSystem.delete(path, true);//true递归删除 } //指定处理结果所存放的路径 FileOutputFormat.setOutputPath(job, path); //显示进度 boolean b = job.waitForCompletion(true); System.out.println(b?"完成":"未完成"); } }
也可本地运行,把yarn目录下的jar包引入后,把WordCountRunner的地址改为本地,直接run就可以了。
package club.drguo.hadoop.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 本地 * @author guo * */ //club.drguo.hadoop.mapreduce.WordCountRunner public class WordCountRunner2 { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); //设置job中的资源所在的jar包(指明main方法所在类) job.setJarByClass(WordCountRunner2.class); //job要使用哪个mapper类 job.setMapperClass(WordCountMapper.class); //job要使用哪个reducer类 job.setReducerClass(WordCountReducer.class); //job的mapper类输出的kv数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //job的reducer类输出的kv数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //指定要处理的数据所存放的路径(不是指定某一个文件,而是该路径下的所有文件) FileInputFormat.setInputPaths(job, "/home/guo/data/input"); //指定处理结果所存放的路径 FileOutputFormat.setOutputPath(job, new Path("/home/guo/data/output")); //显示进度 boolean b = job.waitForCompletion(true); System.out.println(b?"完成":"未完成"); } }
相关文章推荐
- Python最长公共子串和最长公共子序列的实现
- Qt5的Log打印
- VK Cup 2016 - Qualification Round 1 (Russian-Speaking Only, for VK Cup teams) A. Voting for Photos 水题
- Redis 集群
- 第二篇学习笔记
- session和cookie的区别
- JEECMS子站访问
- CSS实现垂直、水平居中方式
- 微信的那个老外产品经理又写了一篇《中国移动应用设计趋势》
- STL set集合算法
- Android开发最佳实践
- nyoj523亡命逃窜
- 找最小值(只用一个变量)
- 百度地图API试用--(初次尝试)
- 对代理方法的理解iOS
- 二叉树重建
- 数组
- linux日志
- windows server git
- 设计模式C++学习笔记之四(Multition多例模式)