您的位置:首页 > 其它

一个完整的mapreduce程序

2015-05-07 21:24 375 查看
package com.hadoop.mapreduce.dc;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DataCount {

/**

*

* @param args

* 写map和reduce的入口程序

* @throws Exception

*/

public static void main(String[] args) throws Exception {

//

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

//把主程序设置上

job.setJarByClass(DataCount.class);

//设置map

job.setMapperClass(DCMapper.class);

//当<k2,v2>和<k3,v3>的类型一一对应的时候,下面两条可以省略【可以通过看reduce类的输入和输出看出一样不一样】

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(DataBean.class);

//设置输入的文件

FileInputFormat.setInputPaths(job, new Path(args[0]));//可以用args[0]可变参数表示,当执行时确定文件路径

//设置reduce

job.setReducerClass(DCReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(DataBean.class);

//设置输出文件的路径

FileOutputFormat.setOutputPath(job, new Path(args[1]));

//等待执行完成,并且把详情打印出来

job.waitForCompletion(true);

}

//定义静态内部类mapper

public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean>{

//重写map方法

@Override

protected void map(LongWritable key, Text value,Context context)

throws IOException, InterruptedException {

//1、接收数据;value中存的是一行行的内容

String line = value.toString();

//2、切分数据

String[] fields = line.split("\t");

//3、取出有用的数据

String telNo = fields[1];

long upPayLoad = Long.parseLong(fields[8]);

long downPayLoad = Long.parseLong(fields[9]);

//4、将取出的有用数据封装到bean对象中

DataBean bean = new DataBean(telNo,upPayLoad,downPayLoad);

//5、执行context写出

context.write(new Text(telNo), bean);

}

}

//定义静态内部类reducer

public static class DCReducer extends Reducer<Text, DataBean, Text, DataBean>{

@Override

protected void reduce(Text key, Iterable<DataBean> v2s,Context context)

throws IOException, InterruptedException {

//1、定义计数器:上行流量、下行流量

long up_num = 0;

long down_sun = 0;

//2、迭代values

for(DataBean bean : v2s){

//bean得到每次上网的信息

up_num += bean.getUpPayLoad();

down_sun += bean.getDownPayLoad();

}

//3、将数据传入bean中

DataBean bean = new DataBean("", up_num, down_sun);

//4、将<key,value>context

context.write(key, bean);

}

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐