您的位置:首页 > 运维架构

Hadoop : 一个目录下的数据只由一个map处理

2015-03-19 08:49 274 查看


Hadoop:一个目录下的数据只由一个map处理

转帖:http://outofmemory.cn/wr/?u=http%3A%2F%2Fwww.rigongyizu.com%2Fhadoop-one-map-process-one-directory%2F

9条评论

有这么个需求:一个目录下的数据只能由一个map来处理。如果多个map处理了同一个目录下的数据会导致数据错乱。
刚开始google了下,以为网上都有现成的InputFormat,找到的答案类似我之前写的“mapreduce
job让一个文件只由一个map来处理“。
或者是把目录写在文件里面,作为输入:

/path/to/directory1

/path/to/directory2

/path/to/directory3

代码里面按行读取:

1
@Override
2
protected
void
map(LongWritable
key,Textvalue,Contextcontext)
throws
IOException,
InterruptedException{
3
FileSystem
fs=FileSystem.get(context.getConfiguration());
4
for
(FileStatus
status:fs.listStatus(
new
Path(value.toString())))
{
5
//
processfile
6
}
7
}
都不能满足需求,还是自己实现一个OneMapOneDirectoryInputFormat吧,也很简单:

1
import
java.io.IOException;
2
import
java.util.*;
3
4
import
org.apache.commons.logging.Log;
5
import
org.apache.commons.logging.LogFactory;
6
import
org.apache.hadoop.fs.FileStatus;
7
import
org.apache.hadoop.fs.Path;
8
import
org.apache.hadoop.mapreduce.InputSplit;
9
import
org.apache.hadoop.mapreduce.JobContext;
10
import
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
11
import
org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
12
13
/**
14
*
一个map处理一个目录的数据
15
*/
16
public
abstract
class
OneMapOneDirectoryInputFormat<K,
V>
extends
CombineFileInputFormat<K,
V>{
17
18
private
static
final
Log
LOG=LogFactory.getLog(OneMapOneDirectoryInputFormat.
class
);
19
20
@Override
21
protected
boolean
isSplitable(JobContext
context,Pathfile){
22
return
false
;
23
}
24
25
@Override
26
public
List<InputSplit>
getSplits(JobContextjob)
throws
IOException
{
27
//
getallthefilesininputpath
28
List<FileStatus>
stats=listStatus(job);
29
List<InputSplit>
splits=
new
ArrayList<InputSplit>();
30
if
(stats.size()
==
0
)
{
31
return
splits;
32
}
33
34
LOG.info(
"fileNums="
+
stats.size());
35
Map<String,
List<FileStatus>>map=
new
HashMap<String,
List<FileStatus>>();
36
for
(FileStatus
stat:stats){
37
String
directory=stat.getPath().getParent().toString();
38
if
(map.containsKey(directory))
{
39
map.get(directory).add(stat);
40
}
else
{
41
List<FileStatus>
fileList=
new
ArrayList<FileStatus>();
42
fileList.add(stat);
43
map.put(directory,
fileList);
44
}
45
}
46
47
//
设置inputSplit
48
long
currentLen
=
0
;
49
List<Path>
pathLst=
new
ArrayList<Path>();
50
List<Long>
offsetLst=
new
ArrayList<Long>();
51
List<Long>
lengthLst=
new
ArrayList<Long>();
52
Iterator<String>
itr=map.keySet().iterator();
53
while
(itr.hasNext())
{
54
String
dir=itr.next();
55
List<FileStatus>
fileList=map.get(dir);
56
for
(
int
i
=
0
;
i<fileList.size();i++){
57
FileStatus
stat=fileList.get(i);
58
pathLst.add(stat.getPath());
59
offsetLst.add(0L);
60
lengthLst.add(stat.getLen());
61
currentLen
+=stat.getLen();
62
}
63
64
Path[]
pathArray=
new
Path[pathLst.size()];
65
CombineFileSplit
thissplit=
new
CombineFileSplit(pathLst.toArray(pathArray),
66
getLongArray(offsetLst),
getLongArray(lengthLst),
new
String[
0
]);
67
LOG.info(
"combineFileSplit("
+
splits.size()+
")
fileNum("
+
pathLst.size()
68
+
")
length("
+
currentLen+
")"
);
69
for
(
int
i
=
0
;
i<pathArray.length;i++){
70
LOG.info(
"
->path["
+
i+
"]="
+
pathArray[i].toString());
71
}
72
splits.add(thissplit);
73
74
pathLst.clear();
75
offsetLst.clear();
76
lengthLst.clear();
77
currentLen
=
0
;
78
}
79
80
return
splits;
81
}
82
83
private
long
[]
getLongArray(List<Long>lst){
84
long
[]
rst=
new
long
[lst.size()];
85
for
(
int
i
=
0
;
i<lst.size();i++){
86
rst[i]
=lst.get(i);
87
}
88
return
rst;
89
}
90
}
这个InputFormat的具体使用方法就不说了。其实与“一个Hadoop程序的优化过程
–根据文件实际大小实现CombineFileInputFormat”中的MultiFileInputFormat比较类似。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐