您的位置:首页 > 编程语言

MapReduce编程学习(1)--简要分析并附源代码

2016-02-20 18:45 288 查看
          在完成了完全分布式Hadoop平台的搭建之后,一直在忙于C语言与计算机应用基础两门课程的教学与考试,好容易放了寒假,前几天在忙于LAMP与Oracle两门课程的教学备课。备课,永远是一名教师的无奈。顿时发现除了长了几斤肉肉之外,学习进展是一无是处。呵呵,也是醉了。抽了三个多小时把MapReduce编程的理念学习了一下,感觉相对来说还算好学,因为模板固定,以下记录几个实例供参考吧。在此申请:部分案例与代码来源于《炼数成金》课堂资源(没有打广告的嫌疑哦。。)

实例1:经典WordCount程序:

package wordCount;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class wordCount {

public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(wordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}



再在MyEclipse的Run Configuration中配置好输入与输出参数如下:



之后运行即可。

实例2:主被叫号码统计

将如下左图文件中的数据整理成下右图的数据:





算法思想:



package test2;
import java.io.IOException;
//import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//import org.apache.hadoop.util.ToolRunner;
//import org.apache.hadoop.util.GenericOptionsParser;
//import com.sun.jersey.core.impl.provider.entity.XMLRootElementProvider.Text;

public class test_2 extends Configured implements Tool
{
enum Counter
{
LINESKIP,  //出错的行
}
public static class Map extends Mapper<LongWritable,Text,Text,Text>
{
public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
{
String line = value.toString();
try
{
String [] lineSplit = line.split(" ");
String anum = lineSplit[0];
String bnum = lineSplit[1];
context.write(new Text(bnum), new Text(anum));

}
catch(ArrayIndexOutOfBoundsException e)
{
context.getCounter(Counter.LINESKIP).increment(1);
return;
}
}
}

public static class Reduce extends Reducer<Text,Text,Text,Text>
{
public void reduce(Text key,Iterable<Text> values,Context context)
{
String valueString;
String out="";
for(Text value:values)
{
valueString = value.toString();
out+=valueString+"|";
}
try {
context.write(key, new Text(out));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

public int run(String[] args)throws Exception
{
Configuration conf=getConf();
Job job = new Job(conf,"test_2");
job.setJarByClass(test_2.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.waitForCompletion(true);
System.out.println("任务名称:"+job.getJobName());
System.out.println("任务成功:"+(job.isSuccessful()?"Yes":"No"));
System.out.println("跳过行数:"+job.getCounters().findCounter(Counter.LINESKIP).getValue());
return job.isSuccessful()?0:1;

}

public static void main(String[] args)throws Exception
{
int res=ToolRunner.run(new Configuration(), new test_2(),args);
System.exit(res);
}

}
顺便说一下MapReduce程序的导出:









实例3:合并运营商的位置数据与上网数据,以划分时段







主要有两代Java代码:

package cn.dataguru.hadoop;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.TreeMap;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

//位置数据
//IMSI|IMEI|UPDATETYPE|CGI|TIME
//上网数据
//IMSI|IMEI|CGI|TIME|CGI|URL

/**
* 汇总基站数据表
* 计算每个用户在不同的时间段不同的基站停留的时长
* 输入参数 < input path > < output path > < date > < timepoint >
* 参数示例: “/base /output 2012-09-12 09-17-24"
* 意味着以“/base”为输入,"/output"为输出,指定计算2012年09月12日的数据,并分为00-07,07-17,17-24三个时段
* 输出格式 “IMSI|CGI|TIMFLAG|STAY_TIME”
*/
public class BaseStationDataPreprocess extends Configured implements Tool
{
/**
* 计数器
* 用于计数各种异常数据
*/
enum Counter
{
TIMESKIP,		//时间格式有误
OUTOFTIMESKIP,	//时间不在参数指定的时间段内
LINESKIP,		//源文件行有误
USERSKIP		//某个用户某个时间段被整个放弃
}

/**
* 读取一行数据
* 以“IMSI+时间段”作为 KEY 发射出去
*/
public static class Map extends Mapper<LongWritable, Text, Text, Text>
{
String date;
String [] timepoint;
boolean dataSource;

/**
* 初始化
*/
public void setup ( Context context ) throws IOException
{
this.date = context.getConfiguration().get("date");							//读取日期
this.timepoint = context.getConfiguration().get("timepoint").split("-");	//读取时间分割点

//提取文件名
FileSplit fs = (FileSplit)context.getInputSplit();
String fileName = fs.getPath().getName();
if( fileName.startsWith("POS") )
dataSource = true;
else if ( fileName.startsWith("NET") )
dataSource = false;
else
throw new IOException("File Name should starts with POS or NET");
}

/**
* MAP任务
* 读取基站数据
* 找出数据所对应时间段
* 以IMSI和时间段作为 KEY
* CGI和时间作为 VALUE
*/
public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException
{
String line = value.toString();
TableLine tableLine = new TableLine();

//读取行
try
{
tableLine.set(line, this.dataSource, this.date, this.timepoint );
}
catch ( LineException e )
{
if(e.getFlag()==-1)
context.getCounter(Counter.OUTOFTIMESKIP).increment(1);
else
context.getCounter(Counter.TIMESKIP).increment(1);
return;
}
catch (Exception e)
{
context.getCounter(Counter.LINESKIP).increment(1);
return;
}

context.write( tableLine.outKey(), tableLine.outValue() );
}
}

/**
* 统计同一个IMSI在同一时间段
* 在不同CGI停留的时长
*/
public static class Reduce extends Reducer<Text, Text, NullWritable, Text>
{
private String date;
private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

/**
* 初始化
*/
public void setup ( Context context )
{
this.date = context.getConfiguration().get("date");							//读取日期
}

public void reduce ( Text key, Iterable<Text> values, Context context ) throws IOException, InterruptedException
{
String imsi = key.toString().split("\\|")[0];
String timeFlag = key.toString().split("\\|")[1];

//用一个TreeMap记录时间
TreeMap<Long, String> uploads = new TreeMap<Long, String>();
String valueString;

for ( Text value : values )
{
valueString = value.toString();
try
{
uploads.put( Long.valueOf( valueString.split("\\|")[1] ), valueString.split("\\|")[0] );
}
catch ( NumberFormatException e )
{
context.getCounter(Counter.TIMESKIP).increment(1);
continue;
}
}

try
{
//在最后添加“OFF”位置
Date tmp = this.formatter.parse( this.date + " " + timeFlag.split("-")[1] + ":00:00" );
uploads.put ( ( tmp.getTime() / 1000L ), "OFF");

//汇总数据
HashMap<String, Float> locs = getStayTime(uploads);

//输出
for( Entry<String, Float> entry : locs.entrySet() )
{
StringBuilder builder = new StringBuilder();
builder.append(imsi).append("|");
builder.append(entry.getKey()).append("|");
builder.append(timeFlag).append("|");
builder.append(entry.getValue());

context.write( NullWritable.get(), new Text(builder.toString()) );
}
}
catch ( Exception e )
{
context.getCounter(Counter.USERSKIP).increment(1);
return;
}
}

/**
* 获得位置停留信息
*/
private HashMap<String, Float> getStayTime(TreeMap<Long, String> uploads)
{
Entry<Long, String> upload, nextUpload;
HashMap<String, Float> locs = new HashMap<String, Float>();
//初始化
Iterator<Entry<Long, String>> it = uploads.entrySet().iterator();
upload = it.next();
//计算
while( it.hasNext() )
{
nextUpload = it.next();
float diff = (float) (nextUpload.getKey()-upload.getKey()) / 60.0f;
if( diff <= 60.0 )									//时间间隔过大则代表关机
{
if( locs.containsKey( upload.getValue() ) )
locs.put( upload.getValue(), locs.get(upload.getValue())+diff );
else
locs.put( upload.getValue(), diff );
}
upload = nextUpload;
}
return locs;
}
}

@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();

conf.set("date", args[2]);
conf.set("timepoint", args[3]);

Job job = new Job(conf, "BaseStationDataPreprocess");
job.setJarByClass(BaseStationDataPreprocess.class);

FileInputFormat.addInputPath( job, new Path(args[0]) );			//输入路径
FileOutputFormat.setOutputPath( job, new Path(args[1]) );		//输出路径

job.setMapperClass( Map.class );								//调用上面Map类作为Map任务代码
job.setReducerClass ( Reduce.class );							//调用上面Reduce类作为Reduce任务代码
job.setOutputFormatClass( TextOutputFormat.class );
job.setOutputKeyClass( Text.class );
job.setOutputValueClass( Text.class );

job.waitForCompletion(true);

return job.isSuccessful() ? 0 : 1;
}

public static void main(String[] args) throws Exception
{
if ( args.length != 4 )
{
System.err.println("");
System.err.println("Usage: BaseStationDataPreprocess < input path > < output path > < date > < timepoint >");
System.err.println("Example: BaseStationDataPreprocess /user/james/Base /user/james/Output 2012-09-12 07-09-17-24");
System.err.println("Warning: Timepoints should be begined with a 0+ two digit number and the last timepoint should be 24");
System.err.println("Counter:");
System.err.println("\t"+"TIMESKIP"+"\t"+"Lines which contain wrong date format");
System.err.println("\t"+"OUTOFTIMESKIP"+"\t"+"Lines which contain times that out of range");
System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are invalid");
System.err.println("\t"+"USERSKIP"+"\t"+"Users in some time are invalid");
System.exit(-1);
}

//运行任务
int res = ToolRunner.run(new Configuration(), new BaseStationDataPreprocess(), args);

System.exit(res);
}
}
package cn.dataguru.hadoop;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.io.Text;

/**
* 定义异常类
*/
class LineException extends Exception
{
private static final long serialVersionUID = 8245008693589452584L;
int flag;
public LineException(String msg, int flag)
{
super(msg);
this.flag = flag;
}
public int getFlag()
{
return flag;
}
}

/**
* 读取一行数据
* 提取所要字段
*/
public class TableLine
{
private String imsi, position, time, timeFlag;
private Date day;
private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

/**
* 初始化并检查该行的合法性
*/
public void set ( String line, boolean source, String date, String [] timepoint ) throws LineException
{
String [] lineSplit = line.split("\t");
if( source )
{
this.imsi = lineSplit[0];
this.position = lineSplit[3];
this.time = lineSplit[4];
}
else
{
this.imsi = lineSplit[0];
this.position = lineSplit[2];
this.time = lineSplit[3];
}

//检查日期合法性
if ( ! this.time.startsWith(date) )			//年月日必须与date一致
throw new LineException("", -1);

try
{
this.day = this.formatter.parse(this.time);
}
catch ( ParseException e )
{
throw new LineException("", 0);
}

//计算所属时间段
int i = 0, n = timepoint.length;
int hour = Integer.valueOf( this.time.split(" ")[1].split(":")[0] );
while ( i < n && Integer.valueOf( timepoint[i] ) <= hour )
i++;
if ( i < n )
{
if ( i == 0 )
this.timeFlag = ( "00-" + timepoint[i] );
else
this.timeFlag = ( timepoint[i-1] + "-" + timepoint[i] );
}
else 									//Hour大于最大的时间点
throw new LineException("", -1);
}

/**
* 输出KEY
*/
public Text outKey()
{
return new Text ( this.imsi + "|" + this.timeFlag );
}

/**
* 输出VALUE
*/
public Text outValue()
{
long t = ( day.getTime() / 1000L );						//用时间的偏移量作为输出时间
return new Text ( this.position + "|" + String.valueOf(t) );
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: