HDFS版WordCount程序的开发实践——框架式开发
2019-05-26 18:12
2301 查看
HdfsWordCount
public class HdfsWordCount { public static void main(String[] args)throws Exception { /** * ===================1.初始化工作开始=================== */ Properties props = new Properties(); props.load(HdfsWordCount.class.getClassLoader().getResourceAsStream("job.properties")); Path input=new Path(props.getProperty("INPUT_PATH")); Path output=new Path(props.getProperty("OUTPUT_PATH")); Class<?> mapperClass = Class.forName(props.getProperty("MAPPER_CLASS")); Mapper mapper =(Mapper) mapperClass.newInstance(); Context context=new Context(); /** * ===================2.处理业务逻辑开始=================== */ FileSystem fs=FileSystem.get(new URI("hdfs://hdp-01:9000"),new Configuration(),"root"); RemoteIterator<LocatedFileStatus> iter = fs.listFiles(input, false); while(iter.hasNext()){ LocatedFileStatus file = iter.next(); FSDataInputStream in = fs.open(file.getPath()); BufferedReader br = new BufferedReader(new InputStreamReader(in)); //1.去hdfs中读取文件,逐行读取 String line=null; while((line=br.readLine())!=null){ //2.调用一个方法对每一行进行业务处理 mapper.map(line,context); } br.close(); in.close(); } /** * ===================3.输出结果=================== */ HashMap<Object, Object> contextMap = context.getContextMap(); if(fs.exists(output)){ throw new RuntimeException("指定的输出目录已经存在,请更换....!"); } FSDataOutputStream out = fs.create(new Path(output,new Path("res.dat"))); Set<Map.Entry<Object, Object>> entrySet = contextMap.entrySet(); //4.将缓存中的结果数据输出到hdfs结果文件 for(Map.Entry<Object,Object>entry:entrySet){ out.write((entry.getKey().toString()+"\t"+entry.getValue()+"\n").getBytes()); } out.close(); fs.close(); System.out.println("恭喜!数据统计完成"); } }
Mapper 接口用于子类扩展业务逻辑
public interface Mapper { public void map(String line,Context context); }
Mapper接口实现类:WoreCountMapper ——按照空格分隔统计数量
public class WoreCountMapper implements Mapper { public void map(String line, Context context) { String[] words = line.split(" "); //3.将这一行的处理结果放入一个缓存 for (String word:words) { Object value = context.get(word); if(null==value){ context.write(word,1); }else{ context.write(word,(Integer)value+1); } } } }
Mapper接口实现类:CaseIgnoreWordCountMapper ——不区分大小写统计
public class CaseIgnoreWordCountMapper implements Mapper { public void map(String line, Context context) { String[] words = line.toUpperCase().split(" "); //3.将这一行的处理结果放入一个缓存 for (String word:words) { Object value = context.get(word); if(null==value){ context.write(word,1); }else{ context.write(word,(Integer)value+1); } } } }
job.properties配置文件
MAPPER_CLASS可以自己指定需要什么实现类
MAPPER_CLASS=com.xuyu.datacollection.CaseIgnoreWordCountMapper INPUT_PATH=/wordcount/input OUTPUT_PATH=/wordcount/output4
结果
(adsbygoogle = window.adsbygoogle || []).push({});
相关文章推荐
- hadoop学习之HDFS(2.1):linux下eclipse中配置hadoop-mapreduce开发环境并运行WordCount.java程序
- 在Ubuntu中使用eclipse操作HDFS跑wordcount程序
- 7.Spark Streaming:输入DStream之基础数据源以及基于HDFS的实时wordcount程序
- win7(64位)平台下Cygwin+Eclipse搭建Hadoop单机开发环境 (四) 导入Hadoop源码+wordcount程序+运行
- hadoop hdfs搭建 mapreduce环境搭建 wordcount程序简单注释
- eclipse配置hadoop开发环境并运行WordCount小程序
- ubuntu系统下eclipse配置hadoop开发环境并运行wordcount程序
- spark streaming 的wordcount程序,从hdfs上读取文件中的内容并计数
- 7.Spark Streaming:输入DStream之基础数据源以及基于HDFS的实时wordcount程序
- 开发WordCount程序-第六讲
- 将java开发的wordcount程序部署到spark集群上运行
- 7.Spark Streaming:输入DStream之基础数据源以及基于HDFS的实时wordcount程序
- 小白学习大数据之路——Hadoop3.0.0-alpha2 安装以及测试程序wordcount实践
- 【hadoop学习】在伪分布式hadoop上实践word count程序——c/c++ pipes版本
- ubuntu系统下eclipse配置hadoop开发环境并运行wordcount程序
- 将java开发的wordcount程序提交到spark集群上运行
- Spark实战----(1)使用Scala开发本地测试的Spark WordCount程序
- 7.Spark Streaming:输入DStream之基础数据源以及基于HDFS的实时wordcount程序
- Hadoop:第二个程序操作HDFS -> 【获取Datanode名】【写文件】【WordCount计数】
- 用eclipse开发hadoop入门程序wordcount