您的位置:首页 > 其它

MapReduce案例10——多数据文件依赖计算

2018-03-21 23:04 387 查看
题目:描述:求所有数对应位置的叠加和

比如

0001.txt 文件有数据:
1
2
3
4
5
.....

0002.txt 文件有数据:
10
10
10
10
10

返回结果是:
1 1
2 3
3 6
4 10
5 15
.....

10 25
10 35
10 45
10 55
10 65

也就是每一行数字的后面都追加一个累加到该数字的综合

总体解题思路:
1、先求出每个文件的总和
2、然后按照文件的顺序进行叠加

难点:怎么让一个文件的多个数据块可以顺序叠加?思路:采用mapjoin思想,分多步完成,第一步将每个文件的文件名、当前文件数字求和、以及当前文件之前所有文件和。
形式如下所示,然后将其作为配置文件,加载到内存中去,通过匹配文件名,获取当前文件数字的起始累加和,然后进行求和即可。0001.txt 55 0
0002.txt 550 55
0003.txt 10 605
0005.txt 200 615
0006.txt 50 815实现代码如下:
/**
* @author: lpj
* @date: 2018年3月16日 下午7:16:47
* @Description:
*/
package lpj.reduceWork;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.ObjectUtils.Null;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.shell.Count;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import lpj.reduceWork.BigNumFileSortMR2.BigNumFileSortMR2_Mapper;
import lpj.reduceWork.BigNumFileSortMR2.BigNumFileSortMR2_Reducer;
/**
*
*/
public class SumFielsNumMR {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
System.setProperty("HADOOP_USER_NAME", "hadoop");//使用集群
//-----------------------------
FileSystem fs = FileSystem.get(conf);//默认使用本地
Job job = Job.getInstance(conf);
job.setJarByClass(SumFielsNumMR.class);
job.setMapperClass(SumFielsNumMR_Mapper.class);
job.setReducerClass(SumFielsNumMR_Reducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

Path inputPath = new Path("/a/homework10/input");
Path outputPath = new Path("/a/homework10/output");
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
//---------------------------------------------
FileSystem fs2 = FileSystem.get(conf);//默认使用本地
Job job2 = Job.getInstance(conf);
job2.setJarByClass(SumFielsNumMR.class);
job2.setMapperClass(SumFielsNumMR2_Mapper.class);
job2.setNumReduceTasks(0);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(NullWritable.class);
URI uri = new URI("/a/homework10/output/part-r-00000");
job2.addCacheFile(uri);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
Path inputPath2 = new Path("/a/homework10/input");//读入多个文件
Path outputPath2 = new Path("/a/homework10/output2");//输出多个文件
if (fs2.exists(outputPath2)) {
fs2.delete(outputPath2, true);
}
String filename = "ok";
FileInputFormat.setInputPaths(job2, inputPath2);
FileOutputFormat.setOutputPath(job2, outputPath2);
//--------------------------------------
ControlledJob aJob = new ControlledJob(job.getConfiguration());
ControlledJob bJob = new ControlledJob(job2.getConfiguration());
aJob.setJob(job);
bJob.setJob(job2);
JobControl jc = new JobControl("jc");
jc.addJob(aJob);
jc.addJob(bJob);
bJob.addDependingJob(aJob);
Thread thread = new Thread(jc);
thread.start();
while(!jc.allFinished()){
thread.sleep(1000);
}
jc.stop();
}
//求文件信息---------------------------------------
public static class SumFielsNumMR_Mapper extends Mapper<LongWritable, Text, Text, Text>{
Text kout = new Text();
Text valueout = new Text();
@Override
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
InputSplit inputSplit = context.getInputSplit();
FileSplit fileSplit = (FileSplit) inputSplit;
String kk = fileSplit.getPath().getName();
kout.set(kk);
context.write(kout, value);
}
}
public static class SumFielsNumMR_Reducer extends Reducer<Text, Text, Text, Text>{
Text kout = new Text();
Text valueout = new Text();
long sumfile = 0;
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
long sum = 0;
for(Text text : values){
sum += Integer.parseInt(text.toString());
}
context.write(key, new Text(sum + "\t" + sumfile));
sumfile += sum;
}
}
//---------------------------将文件信息加载到内存-----------------------------------
public static class SumFielsNumMR2_Mapper extends Mapper<LongWritable, Text, Text, NullWritable>{
Text kout = new Text();
Text valueout = new Text();
Map<String, Long> fileSumMap = new HashMap<>();
String fname = null;
long fsum = 0;
@SuppressWarnings("deprecation")
@Override
protected void setup(Context context)throws IOException, InterruptedException {
Path[] paths = context.getLocalCacheFiles();
String str = paths[0].toUri().toString();
BufferedReader bf = new BufferedReader(new FileReader(new File(str)));
String readline = null;
while((readline = bf.readLine()) != null){
String[] split = readline.split("\t");
String filename = split[0];
long sum = Long.parseLong(split[2]);
fileSumMap.put(filename, sum);
}
IOUtils.closeStream(bf);
//			fileSumMap.put("0001.txt", 0L);
//			fileSumMap.put("0002.txt", 55L);
//			fileSumMap.put("0003.txt", 605L);
//			fileSumMap.put("0005.txt", 615L);
//			fileSumMap.put("0006.txt", 815L);
//			fileSumMap.put("7.txt", 865L);
}

int count = 0;
long firsetnum = 0;
@Override
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
count ++;
if (count == 1) {
long num = Long.parseLong(value.toString());
String kk = num + "\t" + (fsum + num);
kout.set(kk);
context.write(kout, NullWritable.get());
firsetnum = fsum + num;
}else {
long num = Long.parseLong(value.toString());
String kk = num + "\t" + (num + firsetnum);
kout.set(kk);
context.write(kout, NullWritable.get());
firsetnum = num + firsetnum;
}

}
}
public static class SumFielsNumMR2_Reducer extends Reducer<Text, Text, Text, Text>{

protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
}
}
}
运行结果:如果想要将结果输出到一个文件里面,需要添加reduce程序。10 65
20 85
30 115
40 155
50 205
60 265
70 335
80 415
90 505
100 605
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐