您的位置:首页 > 运维架构

Hadoop之MapReduce计算框架

2017-04-06 21:03 369 查看

1.相关类介绍

1.1StringTokenizer类

StringTokenizer是一个用来分隔String的应用类,属于:Java.util包。

1.1.1构造方法

public StringTokenizer(String str)
public StringTokenizer(String str, String delim)
public StringTokenizer(String str, String delim, boolean returnDelims)


第一个参数就是要分隔的String,第二个是分隔字符集合,第三个参数表示分隔符号是否作为标记返回,如果不指定分隔字符,默认的是:”\t\n\r\f”

(1) StringTokenizer(String str):构造一个用来解析str的StringTokenizer对象。java默认的分隔符是“空格”、“制表符(‘\t’)”、“换行符(‘\n’)”、“回车符(‘\r’)“。

(2) StringTokenizer(String str, String delim):构造一个用来解析str的StringTokenizer对象,并提供一个指定的分隔符。

(3) StringTokenizer(String str, String delim, boolean returnDelims):构造一个用来解析str的StringTokenizer对象,并提供一个指定的分隔符,同时,指定是否返回分隔符。

1.1.2核心方法

public boolean hasMoreTokens()
public String nextToken()
public String nextToken(String delim)
public int countTokens()


其实就是三个方法,返回分隔字符块的时候也可以指定分割符,而且以后都是采用最后一次指定的分隔符号。

1.1.3其他方法

说明:

1. 所有方法均为public;

2. 书写格式:[修饰符] <返回类型><方法名([参数表])

如:static int parseInt(String s)表示:此方法(parseInt)为类方法(static),返回类型为(int),方法所需参数为String类型。

1. int countTokens():返回nextToken方法被调用的次数。如果采用构造函数1和2,返回的就是分隔符数量(例2)。

2. boolean hasMoreTokens() :返回是否还有分隔符。

3. boolean hasMoreElements() :结果同2。

4. String nextToken():返回从当前位置到下一个分隔符的字符串。

5. Object nextElement() :结果同4。

6. String nextToken(String delim):与4类似,以指定的分隔符返回结果。

参考:http://docs.oracle.com/javase/7/docs/api/java/util/StringTokenizer.html

1.2intwritable 类

每个Java基本类型的Writable封装,其类的内部都包含一个对应基本类型的成员变量value,get()和set()方法就是用来对该变量进行取值/赋值操作的。

1.2.1构造方法

IntWritable()
IntWritable(int value)


1.2.2其他方法

int compareTo(IntWritable o) //Compares two IntWritables.
boolean equals(Object o)    // Returns true iff o is a IntWritable with the same value.
int get()                  //Return the value of this IntWritable.
int hashCode()
void readFields(DataInput in) //Deserialize the fields of this object from in.
void set(int value)          // Set the value of this IntWritable.
String  toString()
void write(DataOutput out)  //Serialize the fields of this object to out.


intwritable 类的源代码如下:

** A WritableComparable for ints. */
public class IntWritable implements WritableComparable {
private int value;
public IntWritable() {}
public IntWritable(int value) { set(value); }
/** Set the value of this IntWritable. */
public void set(int value) { this.value = value; }
/** Return the value of this IntWritable. */
public int get() { return value; }
public void readFields(DataInput in) throws IOException {
value = in.readInt();
}
public void write(DataOutput out) throws IOException {
out.writeInt(value);
}
/** Returns true iff o is a IntWritable with the same value. */
public boolean equals(Object o) {
if (!(o instanceof IntWritable))
return false;
IntWritable other = (IntWritable)o;
return this.value == other.value;
}
public int hashCode() {
return value;
}
/** Compares two IntWritables. */
public int compareTo(Object o) {
int thisValue = this.value;
int thatValue = ((IntWritable)o).value;
return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
}

public String toString() {
return Integer.toString(value);
}

/** A Comparator optimized for IntWritable. */
public static class Comparator extends WritableComparator {
public Comparator() {
super(IntWritable.class);
}

public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
int thisValue = readInt(b1, s1);
int thatValue = readInt(b2, s2);
return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
}
}

static {                                        // register this comparator
WritableComparator.define(IntWritable.class, new Comparator());
}
}


参考:http://hadoop.apache.org/docs/r2.6.0/api/org/apache/hadoop/io/IntWritable.html

1.3Text类

这个类存储文本使用标准UTF8编码,它提供了方法来序列化、反序列化和文本在字节层次进行比较;属于org.apache.hadoop.io包。

This class stores text using standard UTF8 encoding. It provides methods to serialize, deserialize, and compare texts at byte level. The type of length is integer and is serialized using zero-compressed format.

In addition, it provides methods for string traversal without converting the byte array to a string.

Also includes utilities for serializing/deserialing a string, coding /decoding a string, checking if a byte array contains valid UTF8 code, calculating the length of an encoded string

1.3.1构造方法

Text()
Text(byte[] utf8) // Construct from a byte array.
Text(String string)  //  Construct from a string.
Text(Text utf8)  //  Construct from another text.


1.3.2其他方法

void set(byte[] utf8)                     // Set to a utf8 byte array
void set(byte[] utf8, int start, int len) //Set the Text to range of bytes
void set(String string)                   //Set to contain the contents of a string.
void set(Text other)         //copy a text.
public String toString()     //Converttext back to string Overrides: toString in class Object


其他方法参考:

http://hadoop.apache.org/docs/r2.6.0/api/org/apache/hadoop/io/Text.html

1.4Mapper类

Mapper类属于org.apache.hadoop.mapreduce包。

public class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
extends Object


Maps input key/value pairs to a set of intermediate key/value pairs.

Maps are the individual tasks which transform input records into a intermediate records. The transformed intermediate records need not be of the same type as the input records. A given input pair may map to zero or many output pairs.

The Hadoop Map-Reduce framework spawns one map task for each InputSplit generated by the InputFormat for the job. Mapper implementations can access the Configuration for the job via the JobContext.getConfiguration().

The framework first calls setup(org.apache.hadoop.mapreduce.Mapper.Context), followed by map(Object, Object, Context) for each key/value pair in the InputSplit. Finally cleanup(Context) is called.

All intermediate values associated with a given output key are subsequently grouped by the framework, and passed to a Reducer to determine the final output. Users can control the sorting and grouping by specifying two key RawComparator classes.

The Mapper outputs are partitioned per Reducer. Users can control which keys (and hence records) go to which Reducer by implementing a custom Partitioner.

Users can optionally specify a combiner, via Job.setCombinerClass(Class), to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the Mapper to the Reducer.

Applications can specify if and how the intermediate outputs are to be compressed and which CompressionCodecs are to be used via the Configuration.

If the job has zero reduces then the output of the Mapper is directly written to the OutputFormat without sorting by keys.

1.4.1方法介绍



void run(org.apache.hadoop.mapreduce.Mapper.Context context)
//Expert users can override this method for more complete control over the execution of the Mapper.
protected  void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
//Called once at the beginning of the task.
protected  void map(KEYIN key, VALUEIN value, org.apache.hadoop.mapreduce.Mapper.Context context)
//Called once for each key/value pair in the input split.
protected  void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
//Called once at the end of the task.


参考:http://hadoop.apache.org/docs/r2.6.0/api/org/apache/hadoop/mapreduce/Mapper.html

1.5Reducer类

Reducer类属于org.apache.hadoop.mapreduce包。

Direct Known Subclasses: ChainReducer, FieldSelectionReducer, IntSumReducer, LongSumReducer, ValueAggregatorCombiner, ValueAggregatorReducer, WrappedReducer

public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
extends Object


Reduces a set of intermediate values which share a key to a smaller set of values.

Reducer implementations can access the Configuration for the job via the JobContext.getConfiguration() method.

Reducer has 3 primary phases:

1. Shuffle

The Reducer copies the sorted output from each Mapper using HTTP across the network.

2. Sort

The framework merge sorts Reducer inputs by keys (since different Mappers may have output the same key).

The shuffle and sort phases occur simultaneously i.e. while outputs are being fetched they are merged.

To achieve a secondary sort on the values returned by the value iterator, the application should extend the key with the secondary key and define a grouping comparator. The keys will be sorted using the entire key, but will be grouped using the grouping comparator to decide which keys and values are sent in the same call to reduce.The grouping comparator is specified via Job.setGroupingComparatorClass(Class). The sort order is controlled by Job.setSortComparatorClass(Class).

For example, say that you want to find duplicate web pages and tag them all with the url of the “best” known example. You would set up the job like:

3. Reduce

In this phase the reduce(Object, Iterable, Context) method is called for each<key,(collectionofvalues)>in the sorted inputs

The output of the reduce task is typically written to a RecordWriter via TaskInputOutputContext.write(Object, Object).

参考:http://hadoop.apache.org/docs/r2.6.0/api/org/apache/hadoop/mapreduce/Reducer.html

1.6 Job类

Job类属于org.apache.hadoop.mapreduce包。

public class Job
extends org.apache.hadoop.mapreduce.task.JobContextImpl
implements JobContext


The job submitter’s view of the Job.

It allows the user to configure the job, submit it, control its execution, and query the state. The set methods only work until the job is submitted, afterwards they will throw an IllegalStateException.

Normally the user creates the application, describes various facets of the job via Job and then submits the job and monitor its progress.

Here is an example on how to submit a job:

// Create a new Job
Job job = Job.getInstance();
job.setJarByClass(MyJob.class);

// Specify various job-specific parameters
job.setJobName("myjob");

job.setInputPath(new Path("in"));
job.setOutputPath(new Path("out"));

job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);

// Submit the job, then poll for progress until the job is complete
job.waitForCompletion(true);


参考:http://hadoop.apache.org/docs/r2.6.0/api/org/apache/hadoop/mapreduce/Job.html

1.7GenericOptionsParser类

GenericOptionsParser是hadoop框架中解析命令行参数的基本类。它能够辨别一些标准的命令行参数,能够使应用程序轻易地指定namenode,jobtracker,以及其他额外的配置资源。

1.7.1构造方法

/**
* 构造GenericOptionsParser来解析给定的选项以及基本的hadoop选项
* 命令行对象可以通过getCommandLine()函数获得
* @param conf the configuration to modify
* @param options options built by the caller
* @param args User-specified arguments
* @throws IOException
*/
public GenericOptionsParser(Configuration conf,Options options, String[] args) throws IOException {
parseGeneralOptions(options, conf, args);
this.conf = conf;
}


1.7.2其他方法

private String[] parseGeneralOptions(Options opts,Configuration conf, String[] args)


parseGeneralOptions(options, conf, args)这个函数解析用户指定的参数,获取基本选项以及根据需要修改配置。它首先指定每个通用选项的属性,然后解析选项,参数,把它转化为命令行对象(CommandLine),紧接着把设定好的命令行参数写入系统配置。

private void processGeneralOptions(Configuration conf,CommandLine line)


processGeneralOptions函数作用是修改配置,利用CommandLine对象的相关方法,这个类包含处理选项以及选项描述,选项值的方法。

2.Combiner的使用

map和 reduce 函数的输入输出都是key-value,Combiner和它们是一样的。作为map和reduce的中间环节,它的作用是聚合map task的磁盘,减少map端磁盘写入,减少reduce端处理的数据量,对于有大量shuffle的job来说,性能往往取决于reduce端。因为reduce 端要经过从map端copy数据、reduce端归并排序,最后才是执行reduce方法,此时如果可以减少map task输出将对整个job带来非常大的影响。

2.1什么时候可以使用Combiner?

比如你的Job是WordCount,那么完全可以通过Combiner对map 函数输出数据先进行聚合,然后再将Combiner输出的结果发送到reduce端。

2.2什么时候不能使用Combiner?

WordCount在reduce端做的是加法,如果我们reduce需求是计算一大堆数字的平均数,则要求reduce获取到全部的数字进行计算,才可以得到正确值。此时,是不能使用Combiner的,因为会其会影响最终结果。 注意事项:即使设置Combiner,它也不一定被执行(受参数min.num.spills.for.combine影响),所以使用Combiner的场景应保证即使没有Combiner,我们的MapReduce也能正常运行。

附录

Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量):参考 http://blog.csdn.net/beliefer/article/details/51122009
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: