您的位置:首页 > 运维架构

hadoop入门之利用hadoop来对文档数据归类统计案例wordcount

2017-10-24 09:55 519 查看
一个庞大的数据,少则几十兆多则上TB,我们来统计文档的数据就得使用hadoop来进行了。

本文章通过一个案例的讲解,带大家了解使用mapreducer的方法。

现在我们有一个文件如图所示



可以看出每一行通过空格来划分,第一个为时间,第二个为提示类型,第三个为提示信息来源……

我们现在想统计一下提示类型分别有几种,以及各种提示信息来源有几种。

首先建一个sortCount的类

package pack;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class sortCount {
public static void main(String [] args) throws IOException
{
Configuration conf =new Configuration();
Job job=new Job(conf);
job.setJarByClass(sortCount.class);
FileInputFormat.addInputPath(job, new Path("/anaconda.syslog"));
//必须有,这就是示例文件
FileOutputFormat.setOutputPath(job,new Path("/output") );
//必须没有,输出的结果
job.setMapperClass(myMapper.class);
//设置Map类
job.setReducerClass(myReducer.class);
//设置Reducer类
job.setCombinerClass(myReducer.class);
//预先为reducer处理数据,见下文详解
job.setNumReduceTasks(2);
//设置两个Reduce来执行任务
job.setPartitionerClass(myPartitioner.class);
job.setOutputKeyClass(Text.class);设置输出的键的类型
job.setOutputValueClass(IntWritable.class);设置输出的值的类型
try {
System.exit(job.waitForCompletion(true) ? 0 : 1);  //执行语句
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}


上面用到combiner,现对其进行分析

假如现在有两个maptask,combiner的角色就相当于代收作业的班长来减轻老师的负担一样。Maptask1和Maptask2为两个班,combiner是各个班的班长,班长提前收好作业,老师reducer的负担就减轻了。



类myMapper

package pack;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class myMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
protected void map(LongWritable key ,Text value ,Context context) throws IOException, InterruptedException
{
//Text value 是一整行的数据     MAp源码会自动循环调用此函数只到把每一行都读入进来。
String [] str =value.toString().split(" ");
//for(String ss:str)
//{
//context.write(new Text(ss),new IntWritable(1));   //不知道把结果发给谁,用上下文
//}
//}//此注释代码用于wordCount示例
Text ss = new Text();
ss.set(new StringBuffer("loglevel::").append(str[1]).toString());//添加一个头部,用于区分
Text ss2 = new Text();
ss2.set(new StringBuffer("logresource::").append(str[2]).toString());
context.write(ss,new IntWritable(1));
context.write(ss2,new IntWritable(1));
//统计anaconda.syslog文件的各种提示出现次数
}}


类myReducer

package pack;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
public class myReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
//到reduce之前combiner已经自动把相同值(key)的数放到一起,现在只需要将其个数相加就可以了。
protected void reduce(Text key,Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum=0;
Iterator<IntWritable> it = values.iterator();
while(it.hasNext())
{
sum+=it.next().get();
}
context.write(key, new IntWritable(sum));
}
}
//ctrl+shift+t 查找类——>小技巧


类myPartitioner

package pack;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class myPartitioner extends Partitioner<Text, IntWritable>{
@Override
public int getPartition(Text key, IntWritable value, int arg2) //arg2为设置的reducer数量{
// TODO Auto-generated method stub
if(key.toString().startsWith("loglevel::"))
return 0;//分发到第一个reducer
else if(key.toString().startsWith("logresource::"))
return 1;//分发到第二个reducer    0,1要以次写出来
return 0;
}
}


打包执行,见下图说明操作成功





(●’◡’●)欢迎指教!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息