MapReduce案例10——多数据文件依赖计算
2018-03-21 23:04
387 查看
题目:描述:求所有数对应位置的叠加和
比如
0001.txt 文件有数据:
1
2
3
4
5
.....
0002.txt 文件有数据:
10
10
10
10
10
返回结果是:
1 1
2 3
3 6
4 10
5 15
.....
10 25
10 35
10 45
10 55
10 65
也就是每一行数字的后面都追加一个累加到该数字的综合
总体解题思路:
1、先求出每个文件的总和
2、然后按照文件的顺序进行叠加
难点:怎么让一个文件的多个数据块可以顺序叠加?思路:采用mapjoin思想,分多步完成,第一步将每个文件的文件名、当前文件数字求和、以及当前文件之前所有文件和。
形式如下所示,然后将其作为配置文件,加载到内存中去,通过匹配文件名,获取当前文件数字的起始累加和,然后进行求和即可。0001.txt 55 0
0002.txt 550 55
0003.txt 10 605
0005.txt 200 615
0006.txt 50 815实现代码如下:
20 85
30 115
40 155
50 205
60 265
70 335
80 415
90 505
100 605
比如
0001.txt 文件有数据:
1
2
3
4
5
.....
0002.txt 文件有数据:
10
10
10
10
10
返回结果是:
1 1
2 3
3 6
4 10
5 15
.....
10 25
10 35
10 45
10 55
10 65
也就是每一行数字的后面都追加一个累加到该数字的综合
总体解题思路:
1、先求出每个文件的总和
2、然后按照文件的顺序进行叠加
难点:怎么让一个文件的多个数据块可以顺序叠加?思路:采用mapjoin思想,分多步完成,第一步将每个文件的文件名、当前文件数字求和、以及当前文件之前所有文件和。
形式如下所示,然后将其作为配置文件,加载到内存中去,通过匹配文件名,获取当前文件数字的起始累加和,然后进行求和即可。0001.txt 55 0
0002.txt 550 55
0003.txt 10 605
0005.txt 200 615
0006.txt 50 815实现代码如下:
/** * @author: lpj * @date: 2018年3月16日 下午7:16:47 * @Description: */ package lpj.reduceWork; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.lang.ObjectUtils.Null; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.shell.Count; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import lpj.reduceWork.BigNumFileSortMR2.BigNumFileSortMR2_Mapper; import lpj.reduceWork.BigNumFileSortMR2.BigNumFileSortMR2_Reducer; /** * */ public class SumFielsNumMR { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop02:9000"); System.setProperty("HADOOP_USER_NAME", "hadoop");//使用集群 //----------------------------- FileSystem fs = FileSystem.get(conf);//默认使用本地 Job job = Job.getInstance(conf); job.setJarByClass(SumFielsNumMR.class); job.setMapperClass(SumFielsNumMR_Mapper.class); job.setReducerClass(SumFielsNumMR_Reducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Path inputPath = new Path("/a/homework10/input"); Path outputPath = new Path("/a/homework10/output"); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); //--------------------------------------------- FileSystem fs2 = FileSystem.get(conf);//默认使用本地 Job job2 = Job.getInstance(conf); job2.setJarByClass(SumFielsNumMR.class); job2.setMapperClass(SumFielsNumMR2_Mapper.class); job2.setNumReduceTasks(0); job2.setMapOutputKeyClass(Text.class); job2.setMapOutputValueClass(NullWritable.class); URI uri = new URI("/a/homework10/output/part-r-00000"); job2.addCacheFile(uri); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); Path inputPath2 = new Path("/a/homework10/input");//读入多个文件 Path outputPath2 = new Path("/a/homework10/output2");//输出多个文件 if (fs2.exists(outputPath2)) { fs2.delete(outputPath2, true); } String filename = "ok"; FileInputFormat.setInputPaths(job2, inputPath2); FileOutputFormat.setOutputPath(job2, outputPath2); //-------------------------------------- ControlledJob aJob = new ControlledJob(job.getConfiguration()); ControlledJob bJob = new ControlledJob(job2.getConfiguration()); aJob.setJob(job); bJob.setJob(job2); JobControl jc = new JobControl("jc"); jc.addJob(aJob); jc.addJob(bJob); bJob.addDependingJob(aJob); Thread thread = new Thread(jc); thread.start(); while(!jc.allFinished()){ thread.sleep(1000); } jc.stop(); } //求文件信息--------------------------------------- public static class SumFielsNumMR_Mapper extends Mapper<LongWritable, Text, Text, Text>{ Text kout = new Text(); Text valueout = new Text(); @Override protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException { InputSplit inputSplit = context.getInputSplit(); FileSplit fileSplit = (FileSplit) inputSplit; String kk = fileSplit.getPath().getName(); kout.set(kk); context.write(kout, value); } } public static class SumFielsNumMR_Reducer extends Reducer<Text, Text, Text, Text>{ Text kout = new Text(); Text valueout = new Text(); long sumfile = 0; @Override protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException { long sum = 0; for(Text text : values){ sum += Integer.parseInt(text.toString()); } context.write(key, new Text(sum + "\t" + sumfile)); sumfile += sum; } } //---------------------------将文件信息加载到内存----------------------------------- public static class SumFielsNumMR2_Mapper extends Mapper<LongWritable, Text, Text, NullWritable>{ Text kout = new Text(); Text valueout = new Text(); Map<String, Long> fileSumMap = new HashMap<>(); String fname = null; long fsum = 0; @SuppressWarnings("deprecation") @Override protected void setup(Context context)throws IOException, InterruptedException { Path[] paths = context.getLocalCacheFiles(); String str = paths[0].toUri().toString(); BufferedReader bf = new BufferedReader(new FileReader(new File(str))); String readline = null; while((readline = bf.readLine()) != null){ String[] split = readline.split("\t"); String filename = split[0]; long sum = Long.parseLong(split[2]); fileSumMap.put(filename, sum); } IOUtils.closeStream(bf); // fileSumMap.put("0001.txt", 0L); // fileSumMap.put("0002.txt", 55L); // fileSumMap.put("0003.txt", 605L); // fileSumMap.put("0005.txt", 615L); // fileSumMap.put("0006.txt", 815L); // fileSumMap.put("7.txt", 865L); } int count = 0; long firsetnum = 0; @Override protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException { count ++; if (count == 1) { long num = Long.parseLong(value.toString()); String kk = num + "\t" + (fsum + num); kout.set(kk); context.write(kout, NullWritable.get()); firsetnum = fsum + num; }else { long num = Long.parseLong(value.toString()); String kk = num + "\t" + (num + firsetnum); kout.set(kk); context.write(kout, NullWritable.get()); firsetnum = num + firsetnum; } } } public static class SumFielsNumMR2_Reducer extends Reducer<Text, Text, Text, Text>{ protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException { } } }运行结果:如果想要将结果输出到一个文件里面,需要添加reduce程序。10 65
20 85
30 115
40 155
50 205
60 265
70 335
80 415
90 505
100 605
相关文章推荐
- Oracle 数据文件 实际使用量 计算说明
- 一个mapreduce得到需要计算单词概率的基础数据
- MapReduce — 数据分类输出和小文件合并
- 云计算 数据文件系统 性能 架构 选择
- 大数据笔记10:大数据之Hadoop的MapReduce的原理
- 背水一战 Windows 10 (101) - 应用间通信: 通过协议打开指定的 app 并传递数据以及获取返回数据, 将本 app 沙盒内的文件共享给其他 app 使用
- 一共81个,开源大数据处理工具汇总:查询引擎、流式计算、迭代计算、离线计算、键值存储、表格存储、文件存储、资源管理、日志收集系统、消息系统、分布式服务、集群管理、基础设施、搜索引擎、数据挖掘=监控
- 大数据时代之hadoop(五):hadoop 分布式计算框架(MapReduce)
- vue.js计算属性用法(computed)技巧,依赖其他vue实例的数据
- Hadoop环境搭建之二配置启动HDFS及本地模式运行MapReduce案例(使用HDFS上数据)
- Python利用pandas计算多个CSV文件数据值
- 集算器比较csv文件中数据差异_润乾软件|数据处理-数据计算引擎
- 大数据采集、清洗、处理:使用MapReduce进行离线数据分析完整案例
- Java基础知识强化之IO流笔记10:File类输出指定目录下指定后缀名的文件名称案例(File类的文件过滤器方法改进list( FilenameFilter ff))
- 将HDFS中的数据通过MapReduce产生HFile,然后将HFile导入到HBase具体案例分析
- 利用mapreduce计算框架向hbase插入数据(python脚本)
- Oracle 数据文件 实际使用量 计算说明
- 文件数据云计算学习笔记---Hadoop HDFS和MapReduce 架构浅析
- 华为3COM NAS 存储 XFS文件系统数据恢复案例及方案
- 误操作删除数据文件恢复案例讨论