Hadoop:一个目录下的数据只由一个map处理
转帖:http://outofmemory.cn/wr/?u=http%3A%2F%2Fwww.rigongyizu.com%2Fhadoop-one-map-process-one-directory%2F
9条评论
有这么个需求:一个目录下的数据只能由一个map来处理。如果多个map处理了同一个目录下的数据会导致数据错乱。
刚开始google了下,以为网上都有现成的InputFormat,找到的答案类似我之前写的“mapreduce
job让一个文件只由一个map来处理“。
或者是把目录写在文件里面,作为输入:
/path/to/directory1
/path/to/directory2
/path/to/directory3
代码里面按行读取:
2 | protected void map(LongWritable
key,Textvalue,Contextcontext) throws IOException,
InterruptedException{ |
3 | FileSystem
fs=FileSystem.get(context.getConfiguration()); |
4 | for (FileStatus
status:fs.listStatus( new Path(value.toString())))
{ |
都不能满足需求,还是自己实现一个OneMapOneDirectoryInputFormat吧,也很简单:
1 | import java.io.IOException; |
4 | import org.apache.commons.logging.Log; |
5 | import org.apache.commons.logging.LogFactory; |
6 | import org.apache.hadoop.fs.FileStatus; |
7 | import org.apache.hadoop.fs.Path; |
8 | import org.apache.hadoop.mapreduce.InputSplit; |
9 | import org.apache.hadoop.mapreduce.JobContext; |
10 | import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; |
11 | import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; |
16 | public abstract class OneMapOneDirectoryInputFormat<K, V> extends CombineFileInputFormat<K, V>{ |
18 | private static final Log LOG=LogFactory.getLog(OneMapOneDirectoryInputFormat. class ); |
21 | protected boolean isSplitable(JobContext context,Pathfile){ |
26 | public List<InputSplit> getSplits(JobContextjob) throws IOException
{ |
27 | // getallthefilesininputpath |
28 | List<FileStatus> stats=listStatus(job); |
29 | List<InputSplit> splits= new ArrayList<InputSplit>(); |
34 | LOG.info( "fileNums=" +
stats.size()); |
35 | Map<String, List<FileStatus>>map= new HashMap<String,
List<FileStatus>>(); |
36 | for (FileStatus stat:stats){ |
37 | String directory=stat.getPath().getParent().toString(); |
38 | if (map.containsKey(directory))
{ |
39 | map.get(directory).add(stat); |
41 | List<FileStatus> fileList= new ArrayList<FileStatus>(); |
43 | map.put(directory,
fileList); |
49 | List<Path> pathLst= new ArrayList<Path>(); |
50 | List<Long> offsetLst= new ArrayList<Long>(); |
51 | List<Long> lengthLst= new ArrayList<Long>(); |
52 | Iterator<String> itr=map.keySet().iterator(); |
55 | List<FileStatus> fileList=map.get(dir); |
56 | for ( int i = 0 ; i<fileList.size();i++){ |
57 | FileStatus stat=fileList.get(i); |
58 | pathLst.add(stat.getPath()); |
60 | lengthLst.add(stat.getLen()); |
61 | currentLen +=stat.getLen(); |
64 | Path[] pathArray= new Path[pathLst.size()]; |
65 | CombineFileSplit thissplit= new CombineFileSplit(pathLst.toArray(pathArray), |
66 | getLongArray(offsetLst), getLongArray(lengthLst), new String[ 0 ]); |
67 | LOG.info( "combineFileSplit(" + splits.size()+ ")
fileNum(" +
pathLst.size() |
68 | + ")
length(" + currentLen+ ")" ); |
69 | for ( int i = 0 ; i<pathArray.length;i++){ |
70 | LOG.info( " ->path[" + i+ "]=" +
pathArray[i].toString()); |
83 | private long [] getLongArray(List<Long>lst){ |
84 | long [] rst= new long [lst.size()]; |
85 | for ( int i = 0 ; i<lst.size();i++){ |
这个InputFormat的具体使用方法就不说了。其实与“一个Hadoop程序的优化过程
–根据文件实际大小实现CombineFileInputFormat”中的MultiFileInputFormat比较类似。