mapreduce实现与流程解析—Hadoop2.6.0
2015-10-29 16:54
330 查看
MapReduce实现流程
主要类
Mapper实现
Reducer实现
运行结果:
主要类
package wc; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import wc.WCMapper; import wc.WCReducer; public class WordCount { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //设置jar job.setJarByClass(WordCount.class); //设置Mapper属性 具体含义函数名 job.setMapperClass(WCMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop1:9000/inputfile/words"));//也可以从本地读取数据 //设置Reducer属性 job.setReducerClass(WCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileOutputFormat.setOutputPath(job, new Path("D:/wordCountResult"));//结果放在本地的文件夹 //job.setCombinerClass(WCReducer.class); job.waitForCompletion(true); } }
Mapper实现
package wc; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //输入:行索引为key,行内容为value 形成一个<key,value>对.每一个<key,value>都map会一次.本例中也就是每一行都执行一次map. //输出为:对value内容解析,新形成的key,value. String line = value.toString(); String[] words = line.split(" "); for(String w : words){ context.write(new Text(w), new LongWritable(1)); } } }
Reducer实现
package wc; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override //map与reuce之间有一个非常重要的过程shuffle.默认情况下shuffle会将map输出的具有相同key的结果聚集在一起 //使得reduce的输入为<key,list(value1,value2)> reduce输出的value protected void reduce(Text key, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { long sum = 0; for(LongWritable lw : v2s){ sum += lw.get(); } //输出 context.write(key, new LongWritable(sum)); } }
运行结果:
相关文章推荐
- [学习OpenCV攻略][010][写入AVI文件]
- CentOS 6.5 系统上安装SVN服务器端的方法及步骤
- Android启动:Linux启动流程
- Nginx反向代理和负载均衡部署指南
- linux安装activeMQ
- Nginx负载均衡配置实例详解
- CentOS下部署web2py服务器
- linux下的ssh端口转发
- Centos 内存占满 释放内存
- Linux 下编译安装 PHP 5.6
- 转载:Linux查看设置系统时区
- Linux 下bin格式软件的安装与卸载
- hadoop 安装教程 转载
- nginx 模块的加载与初始化
- linux 系统常用命令
- Tomcat内存优化4.1 内存泄漏——内存分析工具 MAT 的使用
- 生产环境 centos6 源码编辑安装lnmp
- linux实现c多进程
- Linux线程属性总结
- centos系统无法使用ssh远程工具连接方法