您的位置:首页 > 大数据 > Hadoop

HDFS版WordCount程序的开发实践——框架式开发

2019-05-26 18:12 2301 查看

HdfsWordCount 

public class HdfsWordCount {
public static void main(String[] args)throws Exception {

/**
* ===================1.初始化工作开始===================
*/
Properties props = new Properties();
props.load(HdfsWordCount.class.getClassLoader().getResourceAsStream("job.properties"));

Path input=new Path(props.getProperty("INPUT_PATH"));
Path output=new Path(props.getProperty("OUTPUT_PATH"));

Class<?> mapperClass = Class.forName(props.getProperty("MAPPER_CLASS"));
Mapper mapper =(Mapper) mapperClass.newInstance();
Context context=new Context();
/**
* ===================2.处理业务逻辑开始===================
*/
FileSystem fs=FileSystem.get(new URI("hdfs://hdp-01:9000"),new Configuration(),"root");
RemoteIterator<LocatedFileStatus> iter = fs.listFiles(input, false);
while(iter.hasNext()){
LocatedFileStatus file = iter.next();
FSDataInputStream in = fs.open(file.getPath());
BufferedReader br = new BufferedReader(new InputStreamReader(in));
//1.去hdfs中读取文件,逐行读取
String line=null;
while((line=br.readLine())!=null){
//2.调用一个方法对每一行进行业务处理
mapper.map(line,context);
}
br.close();
in.close();
}
/**
* ===================3.输出结果===================
*/
HashMap<Object, Object> contextMap = context.getContextMap();

if(fs.exists(output)){
throw new RuntimeException("指定的输出目录已经存在,请更换....!");
}
FSDataOutputStream out = fs.create(new Path(output,new Path("res.dat")));
Set<Map.Entry<Object, Object>> entrySet = contextMap.entrySet();
//4.将缓存中的结果数据输出到hdfs结果文件
for(Map.Entry<Object,Object>entry:entrySet){
out.write((entry.getKey().toString()+"\t"+entry.getValue()+"\n").getBytes());
}

out.close();
fs.close();
System.out.println("恭喜!数据统计完成");

}
}

Mapper 接口用于子类扩展业务逻辑

public interface Mapper {

public void map(String line,Context context);
}

Mapper接口实现类:WoreCountMapper ——按照空格分隔统计数量

public class WoreCountMapper implements Mapper {

public void map(String line, Context context) {

String[] words = line.split(" ");
//3.将这一行的处理结果放入一个缓存
for (String word:words) {
Object value = context.get(word);
if(null==value){
context.write(word,1);
}else{
context.write(word,(Integer)value+1);
}

}
}
}

Mapper接口实现类:CaseIgnoreWordCountMapper ——不区分大小写统计

public class CaseIgnoreWordCountMapper implements Mapper {
public void map(String line, Context context) {
String[] words = line.toUpperCase().split(" ");
//3.将这一行的处理结果放入一个缓存
for (String word:words) {
Object value = context.get(word);
if(null==value){
context.write(word,1);
}else{
context.write(word,(Integer)value+1);
}

}
}
}

job.properties配置文件

MAPPER_CLASS可以自己指定需要什么实现类

MAPPER_CLASS=com.xuyu.datacollection.CaseIgnoreWordCountMapper
INPUT_PATH=/wordcount/input
OUTPUT_PATH=/wordcount/output4

结果

 

(adsbygoogle = window.adsbygoogle || []).push({});
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  HDFS
相关文章推荐