您的位置:首页 > 移动开发 > 微信开发

hadoop集群运行小程序wordCount记录

2016-03-25 08:21 686 查看
1.在Linux下编写程序

WordCountMapper 分配任务

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.commons.lang.StringUtils;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
//input <longwrite,string> : : output string longwrite

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//获取到每行的内容
String line = value.toString();
StringUtils.split(line," ");
String[] words = StringUtils.split(line," ".charAt(0));

//output <word,1>
for(String word: words){
context.write(new Text(word), new LongWritable(1));

}
}

}


WordCountReducer 执行mapper分配的任务

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
//tjut.edu.dcx.hadoop.mapreduce.wordcount.WordCountReducer
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values,Context context)
throws IOException, InterruptedException {

//key:hello values:{1,1,1,1,}
//定义一个累加器
long count = 0;
for(LongWritable value:values){
count+=value.get();
}
//输出<单词:count>键值对
context.write(key, new LongWritable(count));

}
}


WordCountRunner 任务开启

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* 用来描述一个作业job(使用哪个mapper类,哪个reducer类,输入文件在哪,输出结果放哪。。。。)
* 然后提交这个job给hadoop集群
* commit job to hadoop
* @author ding
*tjut.edu.dcx.hadoop.mapreduce.wordcount.WordCountRunner
*/
public class WordCountRunner {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();

//conf.set("mapreduce.job.jar", "wcount.jar");
Job wcjob = Job.getInstance(conf);
////设置wcjob中的资源所在的jar包
wcjob.setJarByClass(WordCountRunner.class);
//wcjob要使用哪个mapper类
wcjob.setMapperClass(WordCountMapper.class);
//wcjob要使用哪个reducer类
wcjob.setReducerClass(WordCountReducer.class);

//wcjob的mapper类输出的kv数据类型
wcjob.setMapOutputKeyClass(Text.class);
wcjob.setMapOutputValueClass(LongWritable.class);

//wcjob的reducer类输出的kv数据类型
wcjob.setOutputKeyClass(Text.class);
wcjob.setOutputValueClass(LongWritable.class);

//指定要处理的原始数据所存放的路径
FileInputFormat.setInputPaths(wcjob, "hdfs://master:9000/upload/test.txt");

//指定处理之后的结果输出到哪个路径
FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://master:9000/upload/output1"));

boolean res =  wcjob.waitForCompletion(true);
System.exit(res?0:1);
}

}


2.打成jar包运行

其中需要的数据文件路径和结果输出路径在runner里面已经定义好了



3.查看结果



查看输出文件



查看运行结果



成功!

PS:

LongWritable, Text是hadoop提出的序列化的东西,它的一个标准。可以当成long ,String来理解

大概原理示意:



在Map阶段

会记录偏移量即字节

统计每个单词出现次数,比如说 long long ago

long 1

long 1

ago 1

在reduce阶段

long,{1,1}

ago,{1}

统计得到

long:2

ago:1
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop