您的位置:首页 > 运维架构

Hadoop2.6.0学习笔记(六)TextOutputFormat及RecordWriter解析

2015-12-03 13:04 375 查看
鲁春利的工作笔记,谁说程序员不能有文艺范?

MapReduce提供了许多默认的输出格式,如TextOutputFormat、KeyValueOutputFormat等。MapReduce中输出文件的个数与Reduce的个数一致,默认情况下有一个Reduce,输出只有一个文件,文件名为part-r-00000,文件内容的行数与map输出中不同key的个数一致。如果有两个Reduce,输出的结果就有两个文件,第一个为part-r-00000,第二个为part-r-00001,依次类推。

MapReduce中默认实现输出功能的类是TextOutputFormat,它主要用来将文本数据输出到HDFS上。
public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
// 定义了内部类用来实现输出,换行符为\n,分隔符为\t(可以通过参数修改)
protected static class LineRecordWriter<K, V> extends RecordWriter<K, V> {
public LineRecordWriter(DataOutputStream out) {    // 实际为FSDataOutputStream
this(out, "\t");
}
/** 主要的结构就是两个方法:write和close **/
public synchronized void write(K key, V value)throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);    // 将Text类型数据处理成字节数组
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);    // 换行(newline = "\n".getBytes(utf8);)
}

public synchronized void close(TaskAttemptContext context) throws IOException {
out.close();
}
}

// 内部类定义结束,下面为TextOutputFormat唯一的关键方法
public RecordWriter<K, V>  getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
// 1、根据Configuration判定是否需要压缩,若需要压缩获取压缩格式及后缀;
// 2. 获取需要生成的文件路径,getDefaultWorkFile(job, extension)
// 3. 根据文件生成FSDataOutputStream对象,并return new LineRecordWriter。
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator= conf.get(SEPERATOR, "\t");
CompressionCodec codec = null;
String extension = "";
if (isCompressed) {    // 如果是压缩,则根据压缩获取扩展名
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
// getDefaultWorkFile用来获取保存输出数据的文件名,由FileOutputFormat类实现
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);

// 获取writer对象
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
} else {
FSDataOutputStream fileOut = fs.create(file, false);
DataOutputStream dataOut = new DataOutputStream(codec.createOutputStream(fileOut));
return new LineRecordWriter<K, V>(dataOut, keyValueSeparator);
}
}
}
通过TextFileOutput类分析出具体需要将数据保存到HDFS的什么位置上,是通过FileOutputFormat类的getDefaultWorkFile方法来获取的。实际上对于MapReduce中所有的输出都需要继承OutputFormat,先看一下OutputFormat的类定义。
/**
* OutputFormat定义了Map-Reduce作业的输出规范,如:
* 1、校验,如指定的输出目录是否存在,输出的空间是否足够大;
* 2、指定RecordWriter来将MapReduce的输出写入到FileSystem(一般为HDFS);
*/
public abstract class OutputFormat<K, V> {
// 获取与当前task相关联的RecordWriter对象
public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException;

// 当提交job时检查当前job的输出规范是否有效,如输出目录是否已存在等
public abstract void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException;

// Get the output committer for this output format.
// This is responsible for ensuring the output is committed correctly.
public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException, InterruptedException;
}
在TextOutputFormat中实现了getRecordWriter,而TextOutputFormat的是FileOutputFormat的子类,而FileOutputFormat是的子类。
/** 用来实现写数据到HDFS的OutputFormat的基类 **/
public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> {
/** 当有多个分区时,会有多个输出文件,通过NUMBER_FORMAT定义输出文件编号,如part-r-00000,00001等。 **/
private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
/** 默认的输出文件为part开头的,可以通过该参数给指定一个输出的文件名 **/
protected static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
protected static final String PART = "part";
static {
NUMBER_FORMAT.setMinimumIntegerDigits(5);
NUMBER_FORMAT.setGroupingUsed(false);
}

// 对MapReduce的输出可以指定是否压缩及压缩形式,通过配置文件mapred-site.xml进行配置
// 默认为false
public static final String COMPRESS ="mapreduce.output.fileoutputformat.compress";
// 默认为org.apache.hadoop.io.compress.DefaultCodec
public static final String COMPRESS_CODEC = "mapreduce.output.fileoutputformat.compress.codec";
// 默认为RECORD,针对每行记录进行压缩。如果设置为BLOCK,针对一组记录进行压缩。
public static final String COMPRESS_TYPE = "mapreduce.output.fileoutputformat.compress.type";

// 设置map-reduce job的输出目录
public static void setOutputPath(Job job, Path outputDir) {
try {
outputDir = outputDir.getFileSystem(job.getConfiguration()).makeQualified(outputDir);
} catch (IOException e) {
// Throw the IOException as a RuntimeException to be compatible with MR1
throw new RuntimeException(e);
}
job.getConfiguration().set(FileOutputFormat.OUTDIR, outputDir.toString());
}

// 进行check检查
public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, IOException{
// 1. 判定是否设定了输出目录(FileOutputFormat.setOutputPath);
// 2. 判定输出目录是否存在(需指定空目录)。
}

// 获取输出的committer对象,MRv2引入的,以允许用户自己定制合适的OutputCommitter实现
public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
if (committer == null) {
Path output = getOutputPath(context);
committer = new FileOutputCommitter(output, context);
}
return committer;
}

// 获取当前output format对应的默认输出路径和文件名
public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException{
FileOutputCommitter committer = (FileOutputCommitter) getOutputCommitter(context);
return new Path(committer.getWorkPath(), getUniqueFile(context, getOutputName(context), extension));
}

/**
* Generate a unique filename, based on the task id, name, and extension
* 获取文件名,如part-r-00000,00001等
* @param context the task that is calling this
* @param name the base filename
* @param extension the filename extension
* @return a string like $name-[mrsct]-$id$extension
*/
public synchronized static String getUniqueFile(TaskAttemptContext context, String name, String extension) {
TaskID taskId = context.getTaskAttemptID().getTaskID();
int partition = taskId.getId();
StringBuilder result = new StringBuilder();
result.append(name);
result.append('-');
result.append(TaskID.getRepresentingCharacter(taskId.getTaskType()));
result.append('-');
result.append(NUMBER_FORMAT.format(partition));
result.append(extension);
return result.toString();
}
}


任务的类型是通过类org.apache.hadoop.mapreduce.TaskID$CharTaskTypeMaps获取
static String allTaskTypes = "(m|r|s|c|t)";
static {
setupTaskTypeToCharMapping();
setupCharToTaskTypeMapping();
}

private static void setupTaskTypeToCharMapping() {
typeToCharMap.put(TaskType.MAP, 'm');
typeToCharMap.put(TaskType.REDUCE, 'r');
typeToCharMap.put(TaskType.JOB_SETUP, 's');
typeToCharMap.put(TaskType.JOB_CLEANUP, 'c');
typeToCharMap.put(TaskType.TASK_CLEANUP, 't');
}

private static void setupCharToTaskTypeMapping() {
charToTypeMap.put('m', TaskType.MAP);
charToTypeMap.put('r', TaskType.REDUCE);
charToTypeMap.put('s', TaskType.JOB_SETUP);
charToTypeMap.put('c', TaskType.JOB_CLEANUP);
charToTypeMap.put('t', TaskType.TASK_CLEANUP);
}

// 获取part-r-00000中间的那个r
static char getRepresentingCharacter(TaskType type) {
return typeToCharMap.get(type);
}


应用示例:把首字母相同的单词放到一个文件里面
输入文件内容:
[hadoop@nnode code]$
[hadoop@nnode code]$ hdfs dfs -ls /data
Found 2 items
-rw-r--r--   1 hadoop hadoop         47 2015-06-09 17:59 /data/file1.txt
-rw-r--r--   2 hadoop hadoop         36 2015-06-09 17:59 /data/file2.txt
[hadoop@nnode code]$ hdfs dfs -text /data/file1.txt
hello   world
hello   markhuang
hello   hadoop
[hadoop@nnode code]$ hdfs dfs -text /data/file2.txt
hadoop  ok
hadoop  fail
hadoop  2.3
[hadoop@nnode code]$


自定义OutputFormat:
package com.lucl.hadoop.mapreduce.multiple;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

/**
* @author luchunli
* @description 自定义OutputFormat,这里继承TextOutputFormat,避免了自己实现OutputCommitter,<br/>
* MapReduce中key要求为WritableComparable类型的,value要求为Writable类型的.
*/
public class MultipleOutputFormat<K extends WritableComparable<?>, V extends Writable>
extends TextOutputFormat<K, V> {

/**
* OutputFormat通过获取Writer对象,将数据输出到指定目录特定名称的文件中。
*/
private MultipleRecordWriter writer = null;

// 在TextOutputFormat实现的时候对于每一个map或task任务都有一个唯一的标识,通过TaskID来控制,
// 其在输出时文件名是固定的,每一个输出文件对应一个LineRecordWriter,取其输出流对象(FSDataOutputStream),
// 在输出时通过输出流对象实现数据输出。
//
// 但是在这里实现的时候,实际上是要求对于一个task任务,将它需要输出的数据写入多个文件,文件是不固定的;
// 因此在每次输出的时候判定对应的文件是否已经有Writer对象,若有则通过该对象继续输出,否则创建新的。
@Override
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
if (null == writer) {
writer = new MultipleRecordWriter(context, this.getTaskOutputPath(context));
}
return writer;
}

// 获取任务的输出路径,仍然采用从committer中获取,TaskAttemptContext封装了task的上下文,后续分析。
// 在TextOutputFormat中是通过调用父类(FileOutputFormat)的getDefaultWorkFile来实现的,
// 而getDefaultWorkFile中获取MapReduce定义的默认的文件名,如需要自定义文件名,需自己实现
private Path getTaskOutputPath(TaskAttemptContext context) throws IOException {
Path workPath = null;
OutputCommitter committer = super.getOutputCommitter(context);

if (committer instanceof FileOutputCommitter) {
// Get the directory that the task should write results into.
workPath = ((FileOutputCommitter) committer).getWorkPath();
} else {
// Get the {@link Path} to the output directory for the map-reduce job.
// context.getConfiguration().get(FileOutputFormat.OUTDIR);
Path outputPath = super.getOutputPath(context);
if (null == outputPath) {
throw new IOException("Undefined job output-path.");
}
workPath = outputPath;
}

return workPath;
}

/**
* @author luchunli
* @description 自定义RecordWriter, MapReduce的TextOutputFormat的LineRecordWriter也是内部类,这里参照其实现方式
*/
public class MultipleRecordWriter extends RecordWriter<K, V> {

/** RecordWriter的缓存 **/
private HashMap<String, RecordWriter<K, V>> recordWriters = null;

private TaskAttemptContext context;

/** 输出目录 **/
private Path workPath = null;

public MultipleRecordWriter () {}

public MultipleRecordWriter(TaskAttemptContext context, Path path) {
super();
this.context = context;
this.workPath = path;
this.recordWriters = new HashMap<String, RecordWriter<K, V>>();
}

@Override
public void write(K key, V value) throws IOException, InterruptedException {
String baseName = generateFileNameForKeyValue (key, value, this.context.getConfiguration());
RecordWriter<K, V> rw = this.recordWriters.get(baseName);
if (null == rw) {
rw = this.getBaseRecordWriter(context, baseName);
this.recordWriters.put(baseName, rw);
}
// 这里实际仍然为通过LineRecordWriter来实现的
rw.write(key, value);
}

// 通过MultipleRecordWriter对LineRecordWriter进行了封装,对于同一个task在输出的时候进行了拆分
// 在MapReduce实现中,默认情况下只有一个reduce(Reduce的数量分区部分分析),根据之前的示例所有的输出都将写入到part-r-00000的文件中,
// 这里所做的工作就是屏蔽了到part-r-00000的输出,而是将同一个reduce的数据拆分为多个文件。
private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext context, String baseName) throws IOException {
Configuration conf = context.getConfiguration();

boolean isCompressed = getCompressOutput(context);
// 在LineRecordWriter的实现中,分隔符是通过变量如下方式指定的:
// public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
// String keyValueSeparator= conf.get(SEPERATOR, "\t");
// 这里给了个逗号作为分割
String keyValueSeparator = ",";

RecordWriter<K, V> rw = null;
if (isCompressed) {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(context, GzipCodec.class);
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
Path file = new Path(workPath, baseName + codec.getDefaultExtension());
FSDataOutputStream out = file.getFileSystem(conf).create(file, false);
rw = new LineRecordWriter<>(out, keyValueSeparator);
} else {
Path file = new Path(workPath, baseName);
FSDataOutputStream out = file.getFileSystem(conf).create(file, false);
rw = new LineRecordWriter<>(out, keyValueSeparator);
}

return rw;
}

@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
Iterator<RecordWriter<K, V>> it = this.recordWriters.values().iterator();
while (it.hasNext()) {
RecordWriter<K, V> rw = it.next();
rw.close(context);
}
this.recordWriters.clear();
}

/** 获取生成的文件的后缀名 **/
private String generateFileNameForKeyValue(K key, V value, Configuration configuration) {
char c = key.toString().toLowerCase().charAt(0);
if (c >= 'a' && c <= 'z') {
return c + ".txt";
}
return "other.txt";
}
}
}


实现Mapper
package com.lucl.hadoop.mapreduce.multiple;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
* @author luchunli
* @description 自定义Mapper
*/
public class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
private Text text = new Text();

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer token = new StringTokenizer(value.toString());
while (token.hasMoreTokens()) {
String word = token.nextToken();
text.set(word);

context.write(text, one);
}
}
}


实现Reducer
package com.lucl.hadoop.mapreduce.multiple;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
* @author luchunli
* @description 自定义Reducer
*/
public class TokenizerReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> value, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable intWritable : value) {
sum += intWritable.get();
}
context.write(key, new IntWritable(sum));
}
}


实现Driver
package com.lucl.hadoop.mapreduce.multiple;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* @author luchunli
* @description 驱动类
*/
public class MultipleWorkCount extends Configured implements Tool {

public static void main(String[] args) {
try {
ToolRunner.run(new MultipleWorkCount(), args);
} catch (Exception e) {
e.printStackTrace();
}

}

@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());

job.setJarByClass(MultipleWorkCount.class);

FileInputFormat.addInputPath(job, new Path(args[0]));

job.setMapperClass(TokenizerMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

job.setReducerClass(TokenizerReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputKeyClass(IntWritable.class);

job.setOutputFormatClass(MultipleOutputFormat.class);

FileOutputFormat.setOutputPath(job, new Path(args[1]));

return job.waitForCompletion(true) ? 0 : 1;
}

}


调用执行
[hadoop@nnode code]$ hadoop jar MultipleMR.jar /data /2015120500010
15/12/05 16:45:54 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032
15/12/05 16:45:55 INFO input.FileInputFormat: Total input paths to process : 2
15/12/05 16:45:55 INFO mapreduce.JobSubmitter: number of splits:2
15/12/05 16:45:55 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1449302623953_0004
15/12/05 16:45:56 INFO impl.YarnClientImpl: Submitted application application_1449302623953_0004
15/12/05 16:45:56 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1449302623953_0004/ 15/12/05 16:45:56 INFO mapreduce.Job: Running job: job_1449302623953_0004
15/12/05 16:46:27 INFO mapreduce.Job: Job job_1449302623953_0004 running in uber mode : false
15/12/05 16:46:27 INFO mapreduce.Job:  map 0% reduce 0%
15/12/05 16:46:56 INFO mapreduce.Job:  map 50% reduce 0%
15/12/05 16:46:58 INFO mapreduce.Job:  map 100% reduce 0%
15/12/05 16:47:16 INFO mapreduce.Job:  map 100% reduce 100%
15/12/05 16:47:18 INFO mapreduce.Job: Job job_1449302623953_0004 completed successfully
15/12/05 16:47:18 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=152
FILE: Number of bytes written=323517
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=271
HDFS: Number of bytes written=55
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=7
Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=58249
Total time spent by all reduces in occupied slots (ms)=17197
Total time spent by all map tasks (ms)=58249
Total time spent by all reduce tasks (ms)=17197
Total vcore-seconds taken by all map tasks=58249
Total vcore-seconds taken by all reduce tasks=17197
Total megabyte-seconds taken by all map tasks=59646976
Total megabyte-seconds taken by all reduce tasks=17609728
Map-Reduce Framework
Map input records=6
Map output records=12
Map output bytes=122
Map output materialized bytes=158
Input split bytes=188
Combine input records=0
Combine output records=0
Reduce input groups=7
Reduce shuffle bytes=158
Reduce input records=12
Reduce output records=7
Spilled Records=24
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=313
CPU time spent (ms)=4770
Physical memory (bytes) snapshot=511684608
Virtual memory (bytes) snapshot=2545770496
Total committed heap usage (bytes)=257171456
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=83
File Output Format Counters
Bytes Written=55
[hadoop@nnode code]$


查看输出结果:
[hadoop@nnode code]$ hdfs dfs -ls /2015120500010
Found 7 items
-rw-r--r--   2 hadoop hadoop          0 2015-12-05 16:47 /2015120500010/_SUCCESS
-rw-r--r--   2 hadoop hadoop          7 2015-12-05 16:47 /2015120500010/f.txt
-rw-r--r--   2 hadoop hadoop         17 2015-12-05 16:47 /2015120500010/h.txt
-rw-r--r--   2 hadoop hadoop         12 2015-12-05 16:47 /2015120500010/m.txt
-rw-r--r--   2 hadoop hadoop          5 2015-12-05 16:47 /2015120500010/o.txt
-rw-r--r--   2 hadoop hadoop          6 2015-12-05 16:47 /2015120500010/other.txt
-rw-r--r--   2 hadoop hadoop          8 2015-12-05 16:47 /2015120500010/w.txt
[hadoop@nnode code]$ hdfs dfs -text /2015120500010/h.txt
hadoop,4
hello,3
[hadoop@nnode code]$ hdfs dfs -text /2015120500010/o.txt
ok,1
[hadoop@nnode code]$ hdfs dfs -text /2015120500010/other.txt
2.3,1
[hadoop@nnode code]$


错误记录:
1、java.lang.RuntimeException: java.lang.InstantiationException
[hadoop@nnode code]$ hadoop jar MultipleMR.jar /data /2015120500001
15/12/05 16:18:19 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032
java.lang.RuntimeException: java.lang.InstantiationException
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131)
at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:559)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:432)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1296)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1293)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1293)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1314)
at com.lucl.hadoop.mapreduce.multiple.MultipleWorkCount.run(MultipleWorkCount.java:49)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
at com.lucl.hadoop.mapreduce.multiple.MultipleWorkCount.main(MultipleWorkCount.java:22)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.lang.InstantiationException
at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:129)
... 19 more
[hadoop@nnode code]$
原因:

由于之前还有一个子类,在Driver中是通过子类定义输出,后来感觉子类没有必要,于是去掉了,但是MultipleOutputFormat类定义仍然为abstract MultipleOutputFormat,没有把abstract给注释掉。

2、Error: java.io.IOException: Unable to initialize any output collector
[hadoop@nnode code]$ hadoop jar MultipleMR.jar /data /2015120500005
15/12/05 16:26:06 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032
15/12/05 16:26:07 INFO input.FileInputFormat: Total input paths to process : 2
15/12/05 16:26:07 INFO mapreduce.JobSubmitter: number of splits:2
15/12/05 16:26:08 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1449302623953_0003
15/12/05 16:26:08 INFO impl.YarnClientImpl: Submitted application application_1449302623953_0003
15/12/05 16:26:08 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1449302623953_0003/ 15/12/05 16:26:08 INFO mapreduce.Job: Running job: job_1449302623953_0003
15/12/05 16:26:43 INFO mapreduce.Job: Job job_1449302623953_0003 running in uber mode : false
15/12/05 16:26:43 INFO mapreduce.Job:  map 0% reduce 0%
15/12/05 16:27:13 INFO mapreduce.Job: Task Id : attempt_1449302623953_0003_m_000000_0, Status : FAILED
Error: java.io.IOException: Unable to initialize any output collector
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:412)
at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:695)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:767)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

15/12/05 16:27:13 INFO mapreduce.Job: Task Id : attempt_1449302623953_0003_m_000001_0, Status : FAILED
Error: java.io.IOException: Unable to initialize any output collector
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:412)
at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:695)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:767)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

^C[hadoop@nnode code]$
原因:
Text引用错了:com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text
正确的引用:org.apache.hadoop.io.Text

说明:
attempt_1449302623953_0003_m_000000_0
通过第二个错误信息能看到map task的命名规则:
// TaskAttemptID represents the immutable and unique identifier for a task attempt.
// Each task attempt is one particular instance of a Map or Reduce Task identified by TaskID.
// An example TaskAttemptID is : attempt_200707121733_0003_m_000005_0
// zeroth task attempt for the fifth map task in the third job running at the jobtracker started at 200707121733
public class TaskAttemptID extends org.apache.hadoop.mapred.ID {
protected static final String ATTEMPT = "attempt";
private TaskID taskId;
// ......
}


本文出自 “闷葫芦的世界” 博客,请务必保留此出处http://luchunli.blog.51cto.com/2368057/1719174
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: