MapReduce Tool 接口
2017-12-21 22:19
113 查看
一 关于Tool 接口的MapReduce编码
1 自定义序列化类
2 MapReduce编码
二 eclipst编译打包成mapreduce2.jar
三 MapReduce测试
1 准备数据
2 运行MapReduce
3 测试结果
四 参考
http://www.jikexueyuan.com/course/2710.html
1 自定义序列化类
package com.cakin.hadoop.mr; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class UserWritable implements WritableComparable<UserWritable> { private Integer id; private Integer income; private Integer expenses; private Integer sum; public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeInt(id); out.writeInt(income); out.writeInt(expenses); out.writeInt(sum); } public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub this.id=in.readInt(); this.income=in.readInt(); this.expenses=in.readInt(); this.sum=in.readInt(); } public Integer getId() { return id; } public UserWritable setId(Integer id) { this.id = id; return this; } public Integer getIncome() { return income; } public UserWritable setIncome(Integer income) { this.income = income; return this; } public Integer getExpenses() { return expenses; } public UserWritable setExpenses(Integer expenses) { this.expenses = expenses; return this; } public Integer getSum() { return sum; } public UserWritable setSum(Integer sum) { this.sum = sum; return this; } public int compareTo(UserWritable o) { // TODO Auto-generated method stub return this.id>o.getId()?1:-1; } @Override public String toString() { return id + "\t"+income+"\t"+expenses+"\t"+sum; } }
2 MapReduce编码
package com.cakin.hadoop.mr; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class CountMapReduce2 implements Tool{ public static class CountMapper extends Mapper<LongWritable,Text,IntWritable,UserWritable> { private UserWritable userWritable =new UserWritable(); private IntWritable id =new IntWritable(); @Override protected void map(LongWritable key,Text value, Mapper<LongWritable,Text,IntWritable,UserWritable>.Context context) throws IOException, InterruptedException{ String line = value.toString(); String[] words = line.split("\t"); if(words.length ==3) { userWritable.setId(Integer.parseInt(words[0])) .setIncome(Integer.parseInt(words[1])) .setExpenses(Integer.parseInt(words[2])) .setSum(Integer.parseInt(words[1])-Integer.parseInt(words[2])); id.set(Integer.parseInt(words[0])); } context.write(id, userWritable); } } public static class CountReducer extends Reducer<IntWritable,UserWritable,UserWritable,NullWritable> { /* * 输入数据 * <1,{[1,1000,0,1000],[1,2000,1000,1000]}> * <2,[2,500,300,200],[2,500,200,300]> * * */ private UserWritable userWritable = new UserWritable(); private NullWritable n = NullWritable.get(); protected void reduce(IntWritable key,Iterable<UserWritable> values, Reducer<IntWritable,UserWritable,UserWritable,NullWritable>.Context context) throws IOException, InterruptedException{ Integer income=0; Integer expenses = 0; Integer sum =0; for(UserWritable u:values) { income += u.getIncome(); expenses+=u.getExpenses(); } sum = income - expenses; userWritable.setId(key.get()) .setIncome(income) .setExpenses(expenses) .setSum(sum); context.write(userWritable, n); } } private Configuration conf; public static void main(String[] args) throws Exception { int status = ToolRunner.run(new CountMapReduce2(), args); if(status!=1){ System.err.println("任务失败------"); } } public void setConf(Configuration conf) { this.conf = conf; } public Configuration getConf() { return conf; } public int run(String[] args) throws Exception { Configuration conf=new Configuration(); conf.setBoolean("mapreduce.job.ubertask.enable", true); /* * 集群中节点都有配置文件 conf.set("mapreduce.framework.name.", "yarn"); conf.set("yarn.resourcemanager.hostname", "mini1"); */ Job job=Job.getInstance(conf,"countMR"); //jar包在哪里,现在在客户端,传递参数 //任意运行,类加载器知道这个类的路径,就可以知道jar包所在的本地路径 job.setJarByClass(CountMapReduce.class); //指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(CountMapper.class); job.setReducerClass(CountReducer.class); //指定mapper输出数据的kv类型 job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(UserWritable.class); //指定最终输出的数据kv类型 job.setOutputKeyClass(UserWritable.class); job.setOutputKeyClass(NullWritable.class); //指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); //指定job的输出结果所在目录 FileOutputFormat.setOutputPath(job, new Path(args[1])); //将job中配置的相关参数及job所用的java类在的jar包,提交给yarn去运行 //提交之后,此时客户端代码就执行完毕,退出 //job.submit(); //等集群返回结果在退出 boolean res=job.waitForCompletion(true); if(!res) { System.err.println("任务执行失败--------"); } return res?1:-1; } }
二 eclipst编译打包成mapreduce2.jar
三 MapReduce测试
1 准备数据
[root@centos hadoop-2.7.4]# bin/hdfs dfs -cat /input/data 1 1000 0 2 500 300 1 2000 1000 2 500 200
2 运行MapReduce
[root@centos hadoop-2.7.4]# bin/yarn jar /root/jar/mapreduce2.jar /input/data /output4 17/12/21 22:11:35 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 17/12/21 22:11:36 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 17/12/21 22:11:38 INFO input.FileInputFormat: Total input paths to process : 1 17/12/21 22:11:38 INFO mapreduce.JobSubmitter: number of splits:1 17/12/21 22:11:39 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1513865426338_0001 17/12/21 22:11:40 INFO impl.YarnClientImpl: Submitted application application_1513865426338_0001 17/12/21 22:11:41 INFO mapreduce.Job: The url to track the job: http://centos:8088/proxy/application_1513865426338_0001/ 17/12/21 22:11:41 INFO mapreduce.Job: Running job: job_1513865426338_0001 17/12/21 22:12:07 INFO mapreduce.Job: Job job_1513865426338_0001 running in uber mode : true 17/12/21 22:12:07 INFO mapreduce.Job: map 0% reduce 0% 17/12/21 22:12:12 INFO mapreduce.Job: map 100% reduce 0% 17/12/21 22:12:15 INFO mapreduce.Job: map 100% reduce 100% 17/12/21 22:12:15 INFO mapreduce.Job: Job job_1513865426338_0001 completed successfully 17/12/21 22:12:16 INFO mapreduce.Job: Counters: 52 File System Counters FILE: Number of bytes read=220 FILE: Number of bytes written=346 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=310 HDFS: Number of bytes written=261779 HDFS: Number of read operations=35 HDFS: Number of large read operations=0 HDFS: Number of write operations=8 Job Counters Launched map tasks=1 Launched reduce tasks=1 Other local map tasks=1 Total time spent by all maps in occupied slots (ms)=4379 Total time spent by all reduces in occupied slots (ms)=2899 TOTAL_LAUNCHED_UBERTASKS=2 NUM_UBER_SUBMAPS=1 NUM_UBER_SUBREDUCES=1 Total time spent by all map tasks (ms)=4379 Total time spent by all reduce tasks (ms)=2899 Total vcore-milliseconds taken by all map tasks=4379 Total vcore-milliseconds taken by all reduce tasks=2899 Total megabyte-milliseconds taken by all map tasks=4484096 Total megabyte-milliseconds taken by all reduce tasks=2968576 Map-Reduce Framework Map input records=4 Map output records=4 Map output bytes=80 Map output materialized bytes=94 Input split bytes=94 Combine input records=0 Combine output records=0 Reduce input groups=2 Reduce shuffle bytes=94 Reduce input records=4 Reduce output records=2 Spilled Records=8 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=103 CPU time spent (ms)=1630 Physical memory (bytes) snapshot=519172096 Virtual memory (bytes) snapshot=6041153536 Total committed heap usage (bytes)=273686528 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=41 File Output Format Counters Bytes Written=32
3 测试结果
[root@centos hadoop-2.7.4]# bin/hdfs dfs -cat /output4/part-r-00000 1 3000 1000 2000 2 1000 500 500
四 参考
http://www.jikexueyuan.com/course/2710.html
相关文章推荐
- HTTP服务端接口模拟工具-HttpServerMockTool
- Hadoop MapReduce编程模型之InputFormat接口学习
- Hadoop详解(三)——MapReduce原理和执行过程,远程Debug,Writable序列化接口,MapReduce程序编写
- mapreduce 辅助类GenericOptionsParser,Tool和ToolRunner
- hadoop学习第四天-Writable和WritableComparable序列化接口的使用&&MapReduce中传递javaBean的简单例子
- Http接口测试工具HttpTestTool
- CodeBuilder之Tool接口
- 关于Tool接口--------hadoop接口:extends Configured implements Tool 和 ToolRunner.run
- mapreduce 比较接口
- HBase之java api接口调用与mapreduce整合即从hdfs中通过mapreduce来导入数据到hbase中
- Hadoop MapReduce概念学习系列之在MapReduce编程时,三大接口抉择(十六)
- org.apache.hadoop.mapreduce(接口、类、枚举)——学习的先后顺序
- Hadoop源代码分析(包hadoop.mapred中的MapReduce接口)
- Hadoop:MapReduce编程接口体系结构
- 利用MapReduce的java编程接口完成数据的统计
- MapReduce编程模型--接口体系结构--架构设计--《hadoop技术内幕》读书笔记
- codeTool代码自动生成工具(参数类sql数据增删改查语句,实体类,接口代码)
- cxf客户端代码调用cxf的webservice接口,报错:XJC,classnotfound,引入了tool.jar,却还是报classnotfound
- p3:An open source pcap packet and NetFlow file analysis tool using Hadoop MapReduce and Hive.
- MapReduce 编程接口体系结构