您的位置:首页 > 运维架构

Hadoop MapReduce实现矩阵的乘法

2014-05-16 09:42 134 查看
算法的实现思路:

图片和思路来自于http://blog.fens.me/hadoop-mapreduce-matrix/

新建2个矩阵数据文件:m1, m2

m1
1,0,2
-1,3,1


m2
3,1
2,1
1,0


新建启动程序:MainRun.java

新建MR程序:MartrixMultiply.java





MainRun.java

public class MainRun {

    public static final String HDFS = "hdfs://10.103.240.160:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static void main(String[] args) {
        martrixMultiply();
    }
    
    public static void martrixMultiply() {
        Map<String, String> path = new HashMap<String, String>();
        path.put("m1", "logfile/matrix/m1.csv");// 本地的数据文件
        path.put("m2", "logfile/matrix/m2.csv");
        path.put("input", HDFS + "/usr/hadoop/matrix");// HDFS的目录
        path.put("input1", HDFS + "/usr/usr/matrix/m1");
        path.put("input2", HDFS + "/usr/usr/matrix/m2");
        path.put("output", HDFS + "/usr/hadoop/matrix/output");

        try {
        	MartrixMultiply.run(path);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.exit(0);
    }
}


MartrixMultiply.java

public class MartrixMultiply {

	public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
		private String flag; // m1 or m2
		private int rowNumA = 2; // 矩阵A的行数,因为要在对B的矩阵处理中要用
		private int colNumA = 3;// 矩阵A的列数
		private int rolNumB = 3;
		private int colNumB = 2;// 矩阵B的列数
		private int rowIndexA = 1; // 矩阵A,当前在第几行
		private int rowIndexB = 1; // 矩阵B,当前在第几行
		private static final Text k = new Text();
		private static final Text v = new Text();

		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			FileSplit split = (FileSplit) context.getInputSplit();
			flag = split.getPath().getName();// 判断读的数据集
		}

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String[] elements = value.toString().split(",");
			/*
			 * i表示在这一行中,该元素是第一个元素 j表示该行应与B矩阵的哪一列进行相乘 k.set(rowIndexA + "," + (j
			 * + 1)); 行:是由rowIndexA决定的 列:是由该行与矩阵B相乘的那一列决定的
			 */
			if (flag.equals("m1")) {
				// 对于每一个elements元素进行处理,elements.length=colNumA
				for (int i = 0; i < colNumA; i++) {
					for (int j = 0; j < colNumB; j++) {
						k.set(rowIndexA + "," + (j + 1));
						v.set("A" + ":" + (i + 1) + "," + elements[i]);
						context.write(k, v);
					}
				}
				rowIndexA++;

			}
			/*
			 * i表示在这一行中,该元素是第几个元素 j表示A矩阵的第几行与该矩阵进行相乘 k.set((j+1) + "," +
			 * (i+1))表示该元素的在C矩阵中位置 行:是由与该元素相乘的A矩阵的第几行决定的 列:是有该元素在B矩阵中第几列决定的
			 * rowIndexB决定了该元素在相乘得到的和中占第几个位置
			 */
			else if (flag.equals("m2")) {
				for (int i = 0; i < colNumB; i++) {
					for (int j = 0; j < rowNumA; j++) {
						k.set((j + 1) + "," + (i + 1));
						v.set("B:" + rowIndexB + "," + elements[i]);
						context.write(k, v);
					}
				}
			}
			rowIndexB++;
		}
	}

	public static class MyReducer extends
			Reducer<Text, Text, Text, IntWritable> {
		private static int[] a = new int[3];
		private static int[] b = new int[3];
		private static IntWritable v = new IntWritable();

		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			for (Text value : values) {
				String[] vs = value.toString().split(":");
				if (vs[0].equals("A")) {
					String[] ints = vs[1].toString().split(",");
					a[Integer.parseInt(ints[0])-1] = Integer.parseInt(ints[1]);
				} else {
					String[] ints = vs[1].toString().split(",");
					b[Integer.parseInt(ints[0])-1] = Integer.parseInt(ints[1]);
				}
			}
				v.set(a[0] * b[0] + a[1] * b[1] + a[2] * b[2]);
				context.write(key, v);
		}

	}

	public static void run(Map<String, String> path) throws Exception {
		String input = path.get("input");
		String input1 = path.get("input1");
		String input2 = path.get("input2");
		String output = path.get("output");

		Configuration conf = new Configuration();
		final FileSystem fileSystem = FileSystem.get(new URI(input), conf);
		final Path outPath = new Path(output);
		if (fileSystem.exists(outPath)) {
			fileSystem.delete(outPath, true);
		}
		conf.set("hadoop.job.user", "hadoop");
//		conf.set("mapred.job.tracker", "10.103.240.160:9001");

		final Job job = new Job(conf);
		FileInputFormat.setInputPaths(job, input);
		job.setJarByClass(MartrixMultiply.class);
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReducer.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		job.setNumReduceTasks(1);// 设置个数为1
		FileOutputFormat.setOutputPath(job, outPath);
		job.waitForCompletion(true);
	}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: