您的位置:首页 > 运维架构

Hadoop开篇之Mapreduce实现多类别流量统计的两种实现方式

2015-09-04 19:57 381 查看
1、环境:hadoop2.6伪分布式

2、输入:流量日志文件

1)日志具体内容:



2)日志各列含义:



3、运行流程:

1)编写步骤4的实现代码,实现mapreduce业务逻辑

2)把java文件打成jar包,如“traffic2.jar”(注意打包过程必须添加main函数类)

3)上传jar包至linux下,本例放在hadoop安装目录的自定义目录mytestdata中

4)上传要统计流量的日志文件“HTTP_20130313143750.dat” 到hdfs的“/testdir”目录下,作为输入文件

5)执行“traffic2.jar”文件,其中out3为hdfs下的指定输出目录(命令:hadoop jar mytestdata/traffic2.jar /testdir /out3)

4、代码实现:

方式一:使用hadoop原始数据类型作为流量的输入输出

package com.crxy.mapreduce;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* 通过mapreduce实现流量统计  使用原生hadoop类型作复杂value类型
* @author Administrator
*
*/
public class TrafficSumApp2 {
public static void main(String[] args) throws Exception {
//初始化执行驱动
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, TrafficSumApp2.class.getSimpleName());
job.setJarByClass(TrafficSumApp2.class);

//指定输入文件及输入格式
FileInputFormat.addInputPaths(job, args[0]);
job.setInputFormatClass(TextInputFormat.class);

//指定map任务执行类及输出类型
job.setMapperClass(TrafficMapper2.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//指定reduce任务执行类及输出类型
job.setReducerClass(TrafficReducer2.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

//指定输出文件及输出格式
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setOutputFormatClass(TextOutputFormat.class);

//检查是否已有相同输出,有则删除
deleteOutDir(configuration,args[1]);

//启动job执行计算
job.waitForCompletion(true);

}

//删除已存在同名输出目录
private static void deleteOutDir(Configuration configuration, String outUrl)
throws IOException, URISyntaxException {
FileSystem fileSystem = FileSystem.get(new URI(outUrl),configuration);
if(fileSystem.exists(new Path(outUrl))){
fileSystem.delete(new Path(outUrl), true);
}
}

}

/**
* 实现自己mapper重写key,value
* @author Administrator
*
*/
class TrafficMapper2 extends Mapper<LongWritable, Text, Text, Text>{

@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
Text k2=new Text();
Text v2=new Text();
String[] splits = v1.toString().split("\t");
k2.set(splits[1]);
v2.set(splits[6]+"\t"+splits[7]+"\t"+splits[8]+"\t"+splits[9]);
context.write(k2, v2);
}

}

/**
* 实现自定义reducer对map输出进行排序合并,组合成新的行Key和value输出
* @author Administrator
*
*/
class TrafficReducer2 extends Reducer<Text, Text, Text, Text>{

@Override
protected void reduce(Text k2, Iterable<Text> v2,Context context)
throws IOException, InterruptedException {
long t1=0L;
long t2=0L;
long t3=0L;
long t4=0L;
Text v3=new Text();
for(Text ct:v2){
String[] splits = ct.toString().split("\t");
t1+=Long.parseLong(splits[0]);
t2+=Long.parseLong(splits[1]);
t3+=Long.parseLong(splits[2]);
t4+=Long.parseLong(splits[3]);
}
v3.set(t1+"\t"+t2+"\t"+t3+"\t"+t4);

context.write(k2, v3);
}

}


方式二:使用自定义类型作为流量输入输出

package com.crxy.mapreduce;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* 通过mapreduce实现流量统计
* @author Administrator
*
*/
public class TrafficSumApp {
public static void main(String[] args) throws Exception {
//定义执行驱动
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration,TrafficSumApp.class.getSimpleName());
job.setJarByClass(TrafficSumApp.class);

//指定输入文件路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setInputFormatClass(TextInputFormat.class);

//指定map类及输出类型
job.setMapperClass(TrafficMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TrafficWritable.class);

String outUrl=args[1];
FileOutputFormat.setOutputPath(job, new Path(outUrl));
job.setOutputFormatClass(TextOutputFormat.class);

job.setReducerClass(TrafficReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TrafficWritable.class);

deleteOutDir(configuration, outUrl);//删除存在目录

//启动执行
job.waitForCompletion(true);
}

private static void deleteOutDir(Configuration configuration, String outUrl)
throws IOException, URISyntaxException {
FileSystem fileSystem = FileSystem.get(new URI(outUrl),configuration);
if(fileSystem.exists(new Path(outUrl))){
fileSystem.delete(new Path(outUrl), true);
}
}

}

/**
* 实现自己mapper重写key,value
* @author Administrator
*
*/
class TrafficMapper extends Mapper<LongWritable, Text, Text, TrafficWritable>{

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TrafficWritable>.Context context)
4000
throws IOException, InterruptedException {
// TODO Auto-generated method stub
Text text = new Text();
TrafficWritable trafficWritable = new TrafficWritable();
String[] splt = value.toString().split("\t");
text.set(splt[1]);
trafficWritable.set(splt[6], splt[7], splt[8], splt[9]);
context.write(text, trafficWritable);
}

}

/**
* 实现自定义reducer对分区后key,value进行排序组合并输出
* @author Administrator
*
*/
class TrafficReducer extends Reducer<Text, TrafficWritable, Text, TrafficWritable>{

protected void reduce(Text key, Iterable<TrafficWritable> values,
Reducer<Text, TrafficWritable, Text, TrafficWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
TrafficWritable trafficWritable = new TrafficWritable();
long sumVal1=0L;
long sumVal2=0L;
long sumVal3=0L;
long sumVal4=0L;
for(TrafficWritable val:values){
sumVal1+=val.val1;
sumVal2+=val.val2;
sumVal3+=val.val3;
sumVal4+=val.val4;
}
trafficWritable.set(sumVal1, sumVal2, sumVal3, sumVal4);
context.write(key, trafficWritable);
}

}

/**
* 自定义类型
* @author Administrator
*
*/
class TrafficWritable implements Writable{
long val1=0L;
long val2=0L;
long val3=0L;
long val4=0L;

public void set(String str1,String str2,String str3,String str4){
//此处不能使用Long.valueof这样会导致空对象
val1=Long.parseLong(str1);//返回long而非Long对象
val2=Long.parseLong(str2);
val3=Long.parseLong(str3);
val4=Long.parseLong(str4);
}

public void set(long l1,long l2,long l3,long l4){
val1=l1;
val2=l2;
val3=l3;
val4=l4;
}

@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
val1 = in.readLong();
val2 = in.readLong();
val3 = in.readLong();
val4 = in.readLong();

}

@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeLong(val1);
out.writeLong(val2);
out.writeLong(val3);
out.writeLong(val4);

}

@Override
public String toString() {
return this.val1+"\t"+this.val2+"\t"+this.val3+"\t"+this.val4;
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息