Hadoop开篇之Mapreduce实现多类别流量统计的两种实现方式
2015-09-04 19:57
381 查看
1、环境:hadoop2.6伪分布式
2、输入:流量日志文件
1)日志具体内容:
2)日志各列含义:
3、运行流程:
1)编写步骤4的实现代码,实现mapreduce业务逻辑
2)把java文件打成jar包,如“traffic2.jar”(注意打包过程必须添加main函数类)
3)上传jar包至linux下,本例放在hadoop安装目录的自定义目录mytestdata中
4)上传要统计流量的日志文件“HTTP_20130313143750.dat” 到hdfs的“/testdir”目录下,作为输入文件
5)执行“traffic2.jar”文件,其中out3为hdfs下的指定输出目录(命令:hadoop jar mytestdata/traffic2.jar /testdir /out3)
4、代码实现:
方式一:使用hadoop原始数据类型作为流量的输入输出
方式二:使用自定义类型作为流量输入输出
2、输入:流量日志文件
1)日志具体内容:
2)日志各列含义:
3、运行流程:
1)编写步骤4的实现代码,实现mapreduce业务逻辑
2)把java文件打成jar包,如“traffic2.jar”(注意打包过程必须添加main函数类)
3)上传jar包至linux下,本例放在hadoop安装目录的自定义目录mytestdata中
4)上传要统计流量的日志文件“HTTP_20130313143750.dat” 到hdfs的“/testdir”目录下,作为输入文件
5)执行“traffic2.jar”文件,其中out3为hdfs下的指定输出目录(命令:hadoop jar mytestdata/traffic2.jar /testdir /out3)
4、代码实现:
方式一:使用hadoop原始数据类型作为流量的输入输出
package com.crxy.mapreduce; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * 通过mapreduce实现流量统计 使用原生hadoop类型作复杂value类型 * @author Administrator * */ public class TrafficSumApp2 { public static void main(String[] args) throws Exception { //初始化执行驱动 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration, TrafficSumApp2.class.getSimpleName()); job.setJarByClass(TrafficSumApp2.class); //指定输入文件及输入格式 FileInputFormat.addInputPaths(job, args[0]); job.setInputFormatClass(TextInputFormat.class); //指定map任务执行类及输出类型 job.setMapperClass(TrafficMapper2.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //指定reduce任务执行类及输出类型 job.setReducerClass(TrafficReducer2.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //指定输出文件及输出格式 FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setOutputFormatClass(TextOutputFormat.class); //检查是否已有相同输出,有则删除 deleteOutDir(configuration,args[1]); //启动job执行计算 job.waitForCompletion(true); } //删除已存在同名输出目录 private static void deleteOutDir(Configuration configuration, String outUrl) throws IOException, URISyntaxException { FileSystem fileSystem = FileSystem.get(new URI(outUrl),configuration); if(fileSystem.exists(new Path(outUrl))){ fileSystem.delete(new Path(outUrl), true); } } } /** * 实现自己mapper重写key,value * @author Administrator * */ class TrafficMapper2 extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub Text k2=new Text(); Text v2=new Text(); String[] splits = v1.toString().split("\t"); k2.set(splits[1]); v2.set(splits[6]+"\t"+splits[7]+"\t"+splits[8]+"\t"+splits[9]); context.write(k2, v2); } } /** * 实现自定义reducer对map输出进行排序合并,组合成新的行Key和value输出 * @author Administrator * */ class TrafficReducer2 extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text k2, Iterable<Text> v2,Context context) throws IOException, InterruptedException { long t1=0L; long t2=0L; long t3=0L; long t4=0L; Text v3=new Text(); for(Text ct:v2){ String[] splits = ct.toString().split("\t"); t1+=Long.parseLong(splits[0]); t2+=Long.parseLong(splits[1]); t3+=Long.parseLong(splits[2]); t4+=Long.parseLong(splits[3]); } v3.set(t1+"\t"+t2+"\t"+t3+"\t"+t4); context.write(k2, v3); } }
方式二:使用自定义类型作为流量输入输出
package com.crxy.mapreduce; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * 通过mapreduce实现流量统计 * @author Administrator * */ public class TrafficSumApp { public static void main(String[] args) throws Exception { //定义执行驱动 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration,TrafficSumApp.class.getSimpleName()); job.setJarByClass(TrafficSumApp.class); //指定输入文件路径 FileInputFormat.setInputPaths(job, new Path(args[0])); job.setInputFormatClass(TextInputFormat.class); //指定map类及输出类型 job.setMapperClass(TrafficMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TrafficWritable.class); String outUrl=args[1]; FileOutputFormat.setOutputPath(job, new Path(outUrl)); job.setOutputFormatClass(TextOutputFormat.class); job.setReducerClass(TrafficReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TrafficWritable.class); deleteOutDir(configuration, outUrl);//删除存在目录 //启动执行 job.waitForCompletion(true); } private static void deleteOutDir(Configuration configuration, String outUrl) throws IOException, URISyntaxException { FileSystem fileSystem = FileSystem.get(new URI(outUrl),configuration); if(fileSystem.exists(new Path(outUrl))){ fileSystem.delete(new Path(outUrl), true); } } } /** * 实现自己mapper重写key,value * @author Administrator * */ class TrafficMapper extends Mapper<LongWritable, Text, Text, TrafficWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TrafficWritable>.Context context) 4000 throws IOException, InterruptedException { // TODO Auto-generated method stub Text text = new Text(); TrafficWritable trafficWritable = new TrafficWritable(); String[] splt = value.toString().split("\t"); text.set(splt[1]); trafficWritable.set(splt[6], splt[7], splt[8], splt[9]); context.write(text, trafficWritable); } } /** * 实现自定义reducer对分区后key,value进行排序组合并输出 * @author Administrator * */ class TrafficReducer extends Reducer<Text, TrafficWritable, Text, TrafficWritable>{ protected void reduce(Text key, Iterable<TrafficWritable> values, Reducer<Text, TrafficWritable, Text, TrafficWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub TrafficWritable trafficWritable = new TrafficWritable(); long sumVal1=0L; long sumVal2=0L; long sumVal3=0L; long sumVal4=0L; for(TrafficWritable val:values){ sumVal1+=val.val1; sumVal2+=val.val2; sumVal3+=val.val3; sumVal4+=val.val4; } trafficWritable.set(sumVal1, sumVal2, sumVal3, sumVal4); context.write(key, trafficWritable); } } /** * 自定义类型 * @author Administrator * */ class TrafficWritable implements Writable{ long val1=0L; long val2=0L; long val3=0L; long val4=0L; public void set(String str1,String str2,String str3,String str4){ //此处不能使用Long.valueof这样会导致空对象 val1=Long.parseLong(str1);//返回long而非Long对象 val2=Long.parseLong(str2); val3=Long.parseLong(str3); val4=Long.parseLong(str4); } public void set(long l1,long l2,long l3,long l4){ val1=l1; val2=l2; val3=l3; val4=l4; } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub val1 = in.readLong(); val2 = in.readLong(); val3 = in.readLong(); val4 = in.readLong(); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeLong(val1); out.writeLong(val2); out.writeLong(val3); out.writeLong(val4); } @Override public String toString() { return this.val1+"\t"+this.val2+"\t"+this.val3+"\t"+this.val4; } }
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- 详解HDFS Short Circuit Local Reads
- PropertyChangeListener简单理解
- 插入排序
- 冒泡排序
- 堆排序
- 快速排序
- 二叉查找树