您的位置:首页 > 运维架构

自学Hadoop1.0——初识MapReduce基本组件

2015-05-23 15:00 309 查看

初识MapReduce基本组件

编程读写HDFS

   开发一个PutMerge程序,用来合并文件后放入HDFS。命令行工具不支持这个操作,需要使用API编程实现。

   Hadoop文件的API的起点类是FileSystem类。这是一个可以和文件系统进行交互的抽象类,存在不同的具体实现子类用来处理HDFS和本地文件系统。可以通过调用factory方法FileSystem.get(Configuration conf)来得到需要的FileSystem实例。即:Configuration conf=new Configuration();  FileSystem hdfs=FileSystem.get(conf);
。同样,可用factory方法中FileSystem.getLocal(Configuration conf); ,即:FileSystem local = FileSystem.getLocal(conf); 。

   Haddop文件API使用Path对象来编制文件和目录名,使用FileStatus对象来存储文件和目录的元数据。。。关于Hadoop文件API的最新Javadoc在http://apache.org/core/docs/current/api/org/apache/haddop/fs/package-summary.html中可以查阅。

具体代码:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class PutMerge{
public static void main(String[] args) throws IOException{
Configuration conf = new Configuration();
FileSystem hdfs = FileSystem.get(conf);
FileSystem local = FileSystem.getLocal(conf);

Path inputDir = new Path(args[0]);  //设定输入目录语输出文件
path hdfsFile = new Path(args[1]);

try{
FileStatus[] inputFiles =local.listStatus(inputDir);  //得到本地文件列表
FSDataOutputStream out = hdfs.create(hdfsFile);  //生成HDFS输出流
for(int i=0;i<inputFiles.length;i++){  //下边的读本地文件流和写hdfs文件的方式可类比RandomAccessFile(随机访问文件流)对文件的读写操作
System.out.println(inputFiles[i].getPath().getName());
FADataInputStream in = local.open(inputFiles[i].getPath());  //打开本地输入流
byte[] buffer = new byte[256];  //做缓存
int count=0;
while((count = in.read(buffer))>0){
out.write(buffer,0,count);
}
in.close();
}
out.close();
}cath(IOException e){
e.printStackTrace();
}
}
}


简单MapReduce程序

Hadoop数据类型

    在MapReduce框架中难免要说到键/值对,而对于键与值的数据类型是有特别要求的。在MapReduce中框架中提供来一种序列化键/值对的方法。具体而言,实现Writable接口的类可以是值,而实现WritableComparable<T>接口的类既可以是键,也可以是值。注意WritableComparable<T>接口是Witable和java.lang.Comparable<T>接口的组合。对于键来说,这个比较是需要的,因为在Reduce阶段需要进行排序用,而值仅会被简单地传递。

   Hadoop中预定义的用于实现WritableComaprable的类,及面向所有基本数据类型的封装类:

Hadoop中可用于实现WritableComparaable接口的类
描述
BooleanWritable标准布尔变量的封装
ByteWritable单字节数的封装
DoubleWritable双字节数的封装
FloatWritable浮点数的封装
IntWritable整数的封装
LongWirtableLong的封装
Text使用UT8格式的文本封装
NullWritable无键值时的站位符
   在Hadoop中的键/值对数据类型还可以是自定义的实现了Writable(或WritableComparable<T>)接口的数据类型。

实例代码:

public class Edge implements WritableComparable<Edge>{
   private String departureNode;
   private String arrivalNode;

   public String getDepartureNode(){
      return departureNode;
   }

   @Override
   public void readFields(DataInput in) throws IOException{   //说明如何读入数据
      departureNode = in.readUTF();
      arrivalNode = in,readUTF();
   }

   @Override
   public void write(DataOutput out) throws IOException{   //说明如何写出数据
      out.writeUTF(departureNode);
      out.writeUTF(arrivalNode);
   }

   @Override
   public int comparaTo(Edge o){   //定义数据排序
      return (departureNode.compareTo(o.departureNode) != 0)
         ? departureNode.compareTo(o.departureNode)
         : arrivalNode.compareTo(o.arrivalNode);
   }
}

Mapper简述

   一个类要作为mapper,就需要继承MapReduceBase基类并实现Mapper接口。它包含类的构造方法和解构方法。

   void configure(JobConf job)。该函数提取XML配置文件或者应用程序主类中的参数,在数据处理之前调用该函数。

   void close()。作为map任务结束前的最后一个操作,该函数完成所有的结尾工作,如关闭数据连接、打开文件等。

   Mapper接口负责数据处理阶段,采用的形式为Mapper<K1,V1,K2,V2>Java泛型。其中只有一个方法map,用于处理一个单独键/值对。

void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter) throws IOException
该函数处理一个给定的键/值对(K1,V1),生成一个键/值对(K2,V2)的列表。OutputCollector接收这个映射过程的输出,Reportet可提供对mapper相关附加信息的记录,形成任务进度。

Reducer简述

   reducer的实现和mapper一样必须首先在MapReduce基类上扩展,允许配置和清理。

void reduce (K2 key, Iterator<V2> values, OutputCollector<K3, V3> output, Reporter reporter) throws IOException当reducer任务接受来自各个mapper的输出时,它按照键/值对中的键对输入数据进行排序并将相同键的值归并,然后调用reduce()函数,并通过迭代处理那些与指定键相关联的值,生成一个列表(K3, V3)。
void reduce (K2 key, Iterator<V2> values, OutputCollector<K3, V3> output, Reporter reporter) throws IOException

Partitioner:重定向Mapper输出

    Partition的工作是将mapper的结果输出给不同的reducer。一个定制的partitioner需要继承Partitioner<K2, V2>接口,实现configure()和getPartition()两个方法。前者是将hadoop对作业的配置应用在partitioner上,后者返回一个介于0和reduce任务数之间的整数。

简单代码:

public class EdgePartitioner implements Partitioner<Edge, Writable>{
@Override
public int getPartition(Edge key, Writable value, int numPartitions){
return key.getDepartureNode().hashCode()%numPartitions;
}
@Override
public void configure(JobConf conf){ }
}

Combiner:本地reduce

   在许多MapReduce应用场景中,都由在分发mapper结果之前做一次“本地reduce”,即添加Combiner()方法。这是因为:为了减少map向reduce传送的内容,在map段先用combiner(其实就是map中的小reduce),不过这里面要注意的是:如果是求每组当中最大值,可以用combiner,如果是均值的话,用combiner最后可能会有误差,,(如果2个map中个数相同,那么均值是对的,否则不对)比如:求(10,10,25,15,20)这几个数的平均数为16,如果(10,10),(25,15,20)分别在2个map中他们的平均值分别为10和20,然后再求10和20评价为15和真实16有差距。。。。


  注意:上面这个想法是错误的:其实均值也可以用combiner,在map段直接计算和,和个数,然后在reduce段把和相加除以总数就可以了(这个是在面试的时候教训,不能太死板,,这样不行换一种方法看看能不能解决)

9f45


WordCount例子

/**
* Created by william_jm on 15/5/24.
*/
package org.myorg;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}

public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();

Job job = new Job(conf, "wordcount");

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop mapreduce 实例 java