您的位置:首页 > 其它

分布式缓存法计算矩阵乘法

2016-01-20 20:50 393 查看
1)做矩阵F是.txt格式,右矩阵B是SequenceFile,代码如下:

package matrix;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.filecache.DistributedCache;//分布式缓存库

public class matrixProduct {
public static class MyMapper extends Mapper<IntWritable, Text,
IntWritable, Text>{
private int leftMatrixRowNum;
private int leftMatrixColNum;
private int rightMatrixRowNum;
private double[][] cacheMatrix;
private double[] valueVector;
private StringBuilder result=new StringBuilder();
//@SuppressWarnings("resource")
@Override
protected void setup(
Mapper<IntWritable, Text, IntWritable, Text>.Context context)
throws IOException, InterruptedException {
// TODO 自动生成的方法存根
super.setup(context);
leftMatrixRowNum=Integer.valueOf(context.getConfiguration().get("leftMatrixRowNum"));
leftMatrixColNum=Integer.valueOf(context.getConfiguration().get("leftMatrixColNum"));
rightMatrixRowNum=Integer.valueOf(context.getConfiguration().get("rightMatrixRowNum"));
cacheMatrix=new double[leftMatrixRowNum][leftMatrixColNum];
valueVector=new double[rightMatrixRowNum];

try {
Path[] cacheFiles=DistributedCache.getLocalCacheFiles(context.getConfiguration());
if(cacheFiles!=null&&cacheFiles.length>0){
String line;
BufferedReader dataReader=new BufferedReader(new FileReader(cacheFiles[0].toString()));
int i=-1;
while((line=dataReader.readLine())!=null){
++i;
String[] eleStrings=line.split("\t");
for(int j=0;j<eleStrings.length;++j){
cacheMatrix[i][j]=Double.valueOf(eleStrings[j]).doubleValue();
}
}
}

} catch (Exception e) {
// TODO: handle exception
System.out.println("setup exception");
}

}

@Override
protected void map(IntWritable key, Text value,
Mapper<IntWritable, Text, IntWritable, Text>.Context context)
throws IOException, InterruptedException {
// TODO 自动生成的方法存根
super.map(key, value, context);
String[] valueArray=value.toString().split("\t");

for (int i = 0; i < valueArray.length; i++) {
valueVector[i] = Double.valueOf(valueArray[i]).doubleValue();
}
double temp=0;
for (int i=0;i<1043;++i) {
temp=0;
for (int j=0;j<1043;++j){
temp+=cacheMatrix[i][j]*valueVector[j];
}
if(i!=1042)
result.append(String.valueOf(temp)).append("\t");
else
result.append(String.valueOf(temp)).append("\n");

}
context.write(key, new Text(result.toString()));

}

}
public static void run(String s1,String s2,String s3,String leftMatrixRowNum,String leftMatrixColNum
,String rightMatrixRowNum)throws Exception{
System.out.println("ewr");
URI fileURI=new URI(s1);
Configuration conf=new Configuration();
conf.set("leftMatrixRowNum", leftMatrixRowNum);
conf.set("leftMatrixColNum", leftMatrixColNum);
conf.set("rightMatrixRowNum", rightMatrixRowNum);
Job job=new Job(conf,"matrix cache memory");
job.setJarByClass(matrixProduct.class);
job.setMapperClass(MyMapper.class);
job.setNumReduceTasks(0);
DistributedCache.addCacheFile(fileURI, conf);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(s2));
FileOutputFormat.setOutputPath(job, new Path(s3));
System.exit(job.waitForCompletion(true)?0:1);
}

public static void main(String[] args) throws IOException, Exception{

run(args[0], args[1],args[2],args[3],args[4],args[5]);

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: