您的位置:首页 > 编程语言

【Hadoop学习笔记】HDFS编程实践——FilterAndMerge

2018-01-06 20:47 465 查看
emm..之前没学过java,hadoop的JAVA API接口用着很难受,边看代码边看hadoop接口说明(http://hadoop.apache.org/docs/stable/api/index.html)

这个代码任务是完成筛选后缀为不为.abc的文件,并将他们的内容合并到另一个文件中

资源说明:file1.txt , file2.txt , file3.abc ,file4.abc , file5.txt 存放在hdfs文件系统的相对路径下tempfile文件夹里 即tempfile/(绝对路径/home/hadoop/tempfile  其中hadoop为用户名),最后合并结果为merge.txt放至tempwork/

有个foreach的输出,不知道为什么只能输出第一个..... 但是合并是成功的

代码如下:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import java.io.IOException;
import java.io.PrintStream;
class MyPathFilter implements PathFilter{
//PathFilter 接口
String reg =null;

MyPathFilter(String reg){
this.reg=reg;
}//构造函数

@Override
//重载accept函数
public boolean accept(Path path){
if(!(path.toString().matches(reg)))
return true;
return false;
}

}

public class Merge{
Path inputPath = null;
Path outputPath =null;
public Merge(String input,String output)
{
this.inputPath=new Path(input);
this.outputPath=new Path(output);
}

public void doMerge() throws IOException{
Configuration conf1 =new Configuration();
Configuration conf2 =new Configuration();
conf1.set("fs.defaultFS", "hdfs://localhost:9000");
conf1.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
conf2.set("fs.defaultFS", "hdfs://localhost:9000");
conf2.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
FileSystem fsSource = FileSystem.get(conf1);
FileSystem fsDst = FileSystem.get(conf2);
FileStatus[] sourceStatus = fsSource.listStatus(inputPath,new MyPathFilter(".*\\.abc"));  //设置过滤器
FSDataOutputStream fsdos = fsDst.create(outputPath);  //输出流
for(FileStatus sta : sourceStatus){
System.out.print("路径:"+sta.getPath()+"  文件大小:"+sta.getLen()+"  权限:"+sta.getPermission()+"  内容:");
FSDataInputStream fsdis =fsSource.open(sta.getPath()); //输入流
byte[] data = new byte[1024];
int read =-1;//数据长度
PrintStream ps =new PrintStream(System.out);
while((read=fsdis.read(data))>0){  //FSDataInputStream.read(buff[]);将Path位置指定的文件读入buff[],并返回字节数
ps.write(data, 0, read);  //输出内容
fsdos.write(data, 0, read);  //合并到outputPath
}
fsdis.close();
ps.close();

}
fsdos.close();
}

public static void main(String[] args)throws IOException{
Merge merge =new Merge("tempfile/","tempwork/merge.txt");  //conf.set()已经设置,使用相对路径即可
merge.doMerge();
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Hadoop学习笔记