大数据之hadoop伪集群搭建与MapReduce编程入门
2015-05-22 08:39
543 查看
一、理论知识预热一句话介绍hadoop: Hadoop的核心由分布式文件系统HDFS与Map/Reduce计算模型组成。(1)HDFS分布式文件系统HDFS由三个角色构成:1)NameNode2)DataNode:文件存储的基本单元,它将文件块block存储在本地文件系统中3)Client:需要获取分布式文件系统文件的应用程序文件写入:client向NameNode发起文件写入请求,NameNode分配合适的DataNode地址给Client,Client将文件划分为多个Block后,根据DataNode地址顺序写入数据文件读取:client向NameNode发起文件读取请求,NameNode返回文件存储的DataNode信息,Client读取文件信息一个Block会有三份备份,一份放在NameNode指定的DataNode上,一份放在与指定DataNode不在同一台机器的DataNode上,最后一份放在与指定DataNode同一Rack的DataNode上(2)Map/Reduce计算模型由一个单独运行在主节点的JobTracker和运行在每个集群从节点的TaskTracker共同构成。一个Map/Reduce作业job通常会把输入的数据集切分为若干独立的数据块,由Map任务task以完全并行的方式处理它们。框架会先对Map的输出进行排序,然后把结果输入给Reduce任务。mapper->combine->partition->reduce在hadoop上运行的作业需要指明程序的输入/输出位置,并通过实现合适的接口或抽象类提供Map和Reduce函数。同时还需要指定作业的其他参数,构成作业配置Job Configuration。在Hadoop的JobClient提交作业(JAR包/可执行程序等)和配置信息给JobTracker之后,JobTracker会负责分发这些软件和配置信息给slave及调度任务,并监控它们的执行,同时提供状态和诊断信息给JobClient关键术语:NameNode\DataNode:JobTracker\TaskTrackermapper->combine->partition->reduce二、开发环境搭建ubuntu 64位为例第一步:安装java第二步:安装ssh,rsyncapt-get install ssh rsync第三步:下载hadoop稳定版本,解压缩惯例,了解关键文件bin/ hadoop, hdfs, mapred,yarn等可执行文件sbin/ start-dfs.sh start-yarn.sh stop-dfs.sh stop-yarn.sh等可执行文件etc/hadoop env和site等配置文件libexec/ hadoop-config.sh hdfs-config.sh mapred-confg.sh yarn-config.sh等用于配置的可执行文件logs/ 日志文件share 文档与jar包,可以认真浏览一下这些jar包,Map-Reduce编程接口就靠它include/ 头文件 lib/ 动态链接库第四步:hadoop配置cd hadoop-2.6.0/(1)修改脚本要用的环境变量,以运行hadoopvim etc/hadoop/hadoop-env.sh 编辑export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64 [替换JDK所在目录](2)配置HDFS和MapReduce常用的I/O设置vim etc/hadoop/core-site.xml 编辑<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> #配置HDFS的地址及端口号 </property></configuration> (3)Hadoop守护进程的配置项,包括namenode,辅助namenode和datanodevim etc/hadoop/hdfs-site.xml 编辑<configuration> <property> <name>dfs.replication</name> #配置备份方式,默认为3.在单机版里需要改为1 <value>1</value> </property></configuration>(4) MapReduce守护进程的配置项,包括jobtracker和tasktracker cp etc/hadoop/mapred-site.xml.template etc/hadoop/mapred-site.xmlvim etc/hadoop/mapred-site.xml 编辑<configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property></configuration> vim etc/hadoop/yarn-site.xml编辑<configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property></configuration>第五步:配置SSH服务免密码登陆ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsacat ~/.ssh/id_dsa.pub >>~/.ssh/authorized_keys检查是否配置成功ssh localhost第六步:启动hadoop1. 格式化HDFS bin/hdfs namenode -format2. 启动进程启动NameNode与DataNode守护进程sbin/start-dfs.sh 启动ResourceManager 和NodeManager守护进程sbin/start-yarn.sh NameNode的web接口 http://localhost:50070/Resourcemanager的web接口 http://localhost:8088/第七步: 体验执行MapReduce job, 对文件内容进行字符匹配第1)步:创建MapReduce job执行目录bin/hdfs dfs -mkdir /userbin/hdfs dfs -mkdir /user/root第2)步:拷贝hadoop配置文件到hadoop input目录下bin/hdfs dfs -put etc/hadoop input很不幸,报错了15/05/08 13:54:30 WARN hdfs.DFSClient: DataStreamer Exceptionorg.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/xxxxx._COPYING_ could only be replicated to 0 nodes instead of minReplication (=1). There are 0 datanode(s) running and no node(s) areexcluded in this operation.定位原因:查看logs目录下datanode日志【在google之前先看日志,是非常良好的习惯】发现以下提示 Lock on /tmp/hadoop-root/dfs/data/in_use.lock acquired by关闭hdfs后手动删除该目录sbin/stop-dfs.shrm -rf /tmp/hadoop-root/dfs/data/*重新启动sbin/start-dfs.sh查看DataNode是否启动jps输出如下22848 Jps20870 DataNode20478 NameNode31294 FsShell19474 Elasticsearch21294 SecondaryNameNode确认ok了,重新copy文件bin/hdfs dfs -put etc/hadoop input出错原因:反复执行了 bin/hdfs namenode -format ,但没有手动清除文件锁定第3)步:运行mapreduce示例bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar grep input output 'dfs[a-z.]+' 我们可以在Resourcemanager的web接口 http://localhost:8088/ 看到任务运行情况 第4)步:查看结果bin/hdfs dfs -cat output/*
6 dfs.audit.logger 4 dfs.class 3 dfs.server.namenode. 2 dfs.period 2 dfs.audit.log.maxfilesize 2 dfs.audit.log.maxbackupindex 1 dfsmetrics.log 1 dfsadmin 1 dfs.servers 1 dfs.replication 1 dfs.file
我们也可以把结果merge到单个文件中来看bin/hdfs dfs -getmerge output/ output
三、MapReduce编程入门
环境IDE:eclipse
导入hadoop核心包 hadoop-core-1.2.1.jar
导出:我们自己的fat jar包
示例代码放在我的git上 https://github.com/tanjiti/mapreduceExample
HelloWorld版本的MapReduce
仍然用单词切割的例子,单词切割就是HelloWorld版本的MapReduce
文件结构如下
├── bin 存放编译后的字节文件├── lib 存放依赖jar包,来自hadoop安装文件share/hadoop/│?? ├── commons-cli-1.2.jar│?? ├── hadoop-common-2.6.0.jar│?? └── hadoop-mapreduce-client-core-2.6.0.jar├── mymainfest 配置文件,指定classpath└── src 源文件└── mapreduceExample└── WordCount.java我一般在eclipse里编码(可视化便利性),然后采用命令行编译打包(命令行高效性)
第一步:源码编辑vim src/mapreduceExample/WordCount.java 编辑package mapreduceExample;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class WordCount {public static class TokenizerMapperextends Mapper<Object, Text, Text, IntWritable>{private final static IntWritable one = new IntWritable(1);private Text word = new Text();@Overridepublic void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}public static class IntSumReducerextends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();@Overridepublic void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: mapreduceExample.WordCount <in> <out>");System.exit(2);}Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class); //设置mapper类job.setCombinerClass(IntSumReducer.class); //设置combiner类,该类的作用是合并map结果,减少网络I/Ojob.setReducerClass(IntSumReducer.class);//设置reducer类job.setOutputKeyClass(Text.class);//设置reduce结果输出 key的类型job.setOutputValueClass(IntWritable.class); //设置reduce结果输出 value的类型FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//设置输入数据文件路径FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//设置输出数据文件路径System.exit(job.waitForCompletion(true) ? 0 : 1);}}
第二步:编译打包
编译javac -d bin/ -sourcepath src/ -cp lib/hadoop-common-2.6.0.jar:lib/hadoop-mapreduce-client-core-2.6.0.jar:lib/commons-cli-1.2.jar src/mapreduceExample/WordCount.java
编写配置文件vim mymainfest 编辑Class-Path: lib/hadoop-common-2.7.0.jar lib/hadoop-mapreduce-client-common-2.7.0.jar lib/commons-cli-1.2.jar
注:
mainfest 配置详见 https://docs.oracle.com/javase/tutorial/deployment/jar/manifestindex.html
其他重要配置项
Main-Class: 指定入口class,因为一个mapreduce项目,往往会有多种入口,因此不配置该项
打包jar cvfm mapreduceExample.jar mymainfest lib/* src/* -C bin .
查看jar包内容jar tf mapreduceExample.jar META-INF/META-INF/MANIFEST.MFlib/commons-cli-1.2.jarlib/hadoop-common-2.6.0.jarlib/hadoop-mapreduce-client-core-2.6.0.jarsrc/mapreduceExample/src/mapreduceExample/WordCount.javamapreduceExample/mapreduceExample/WordCount.classmapreduceExample/WordCount$TokenizerMapper.classmapreduceExample/WordCount$IntSumReducer.class第三步: 运行
查看Usagebin/hadoop jar /home/tanjiti/mapreduceExample/mapreduceExample.jar mapreduceExample.WordCount显示Usage: just4test.WordCount <in> <out>
当然,这是个很简陋的使用说明,但起码知道了参数是哪些,好了,正式运行bin/hadoop jar /home/tanjiti/mapreduceExample/mapreduceExample.jar[jar所在路径] mapreduceExample.WordCount[main Class,如果打包jar包时在manifest文件中指定了就不需要指定该参数] /in[输入文件路径,在执行任务前必须存在] /out[结果输出路径,在执行任务前不应该存在该路径]先合并结果再查看bin/hdfs dfs -getmerge /out wordcount_resulttail wordcount_result 部分结果如下"/?app=vote&controller=vote&action=total&contentid=7%20and%201=2%20union%20select%20group_concat(md5(7));%23" 1
MapReduce的思想其实非常简单
一句话来描述这个过程,就是开启一个MapReducer Job,设置好相应的Configuration,指定输入输出数据源的路径与格式,指定数据流K,V的格式(很多时候需要自定义格式,继承Writable),指定处理过程(map,combine,partition,sort,reduce)。
四、更多
再实现这些基本功能后,我们下一步会考虑如何共享数据,是读写HDFS文件,还是采用Configuration配置,还是使用DistributedCache;如何何处理多个mapreducejob,是线性的方式,还是ControlledJob,还是ChainMapper、ChainReducer。
等功能都搞定了,我们再考虑性能优化的问题,例如数据预处理(合并小文件,过滤杂音、设置InputSplit的大小),是否启用压缩方式,设置Map和Reduce任务的数量等job属性,全靠实战来填坑。
相关文章推荐
- 大数据hadoop入门学习之集群环境搭建集合
- Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)
- 大数据 hadoop2.6.0 高可用集群搭建(HA集群搭建)--亲测可用,入门必备
- Hadoop MapReduce编程 API入门系列之自定义多种输入格式数据类型和排序多种输出格式(十一)
- Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)
- Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2(十)
- 入门级Hadoop集群搭建详细教程(九):MySql的安装
- 【Hadoop入门学习系列之五】MapReduce 2.0编程实战
- 3,数据挖掘环境搭建-Hadoop集群搭建与测试
- Hadoop入门第四篇:手动搭建自己的hadoop小集群
- CDH版本大数据集群下搭建pig(hadoop-2.6.0-cdh5.5.4.gz + hive-1.1.0-cdh5.5.4.tar.gz + pig-0.12.0-cdh5.5.4.tar.gz的搭建)
- 大数据1-hadoop、zookeeper、hbase、spark集群环境搭建
- Linux上搭建Hadoop2.6.3集群以及WIN7通过Eclipse开发MapReduce的demo
- Hadoop MapReduce编程 API入门系列之网页流量版本1(二十二)
- 入门级Hadoop集群搭建详细教程(六):yum本地仓库与远程仓库配置
- Hadoop系列-MapReduce编程入门案例(八)
- 大数据入门第五天——离线计算之hadoop(上)概述与集群安装
- hive(01)、基于hadoop集群的数据仓库Hive搭建实践
- 教你玩转Hadoop分布式集群搭建,进击大数据
- Java+大数据开发——Hadoop集群环境搭建(一)