您的位置:首页 > 大数据

大数据学习笔记:编写Mapreduce代码并运行

2017-08-06 23:16 525 查看
代码使用eclipse编写,首先新建Java项目(这有个问题,为什么新建项目之后没有弹出新界面,我是莫名奇妙的打开那个项目界面的),在资源管理器中新建一个库目录:



在windows中找到hadoop-3.0的安装目录,将其下除了后缀为sources.jar与tests.jar之外所有的.jar包都粘贴到项目的lib文件夹下,并添加到构建路径中去:



为了以后方便地多次导入需要的jar包以及关联相关类的源码,在Hadoop的安装目录下分别新建_lib与_source文件夹,将所有需要的jar包放到_lib下,而将所有的源码包(souces.jar)放入_source下:





mapper类

在源码文件夹src下新建一个mapper类:



它的父类(Superclass)为Mapper类:



可以看到其有四个参数,一个输入键值对与一个输出键值对:



先来看一下需要处理的数据样本(气温数据):



原始数据是以(key,value)的方式导入map的,每一行表示一个数据,key由程序自动生成,而value则由一长串原始数据填入。因为数据条目很多很多(大数据),所以KEYIN的格式设置为Long,而VALUEIN的格式设置为Text。

对于输出格式,我们想要数据以(年份,气温)的格式输出,所以KEYOUT设置为Text格式(年份保存为文本格式),VALUEOUT设为Int格式。

将hadoop中的相对应的格式填入参数框:



在代码中导入需要的库:



在代码中添加一个map方法:





可以看到由IDE自动补充的map方法中有一个参数:Mapper<LongWritable, Text, Text, IntWritable>.Context  context,因为在代码中已经导入了Mapper类,所以这一参数可以写成:Context  context。

接下来根据处理数据的需求,按照IDE提供的map方法模版来写代码:

package hadoopDemo_class;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
* Mapper类
*/

public class MaxTempMapper extends Mapper<LongWritable, Text, Text, IntWritable>
{
private static final int MISSING = 9999;	//若气温值为9999则表示数据缺失

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();		//类型转换
String year = line.substring(15, 19);	//数据字符串中标号15-18的数字为年份
int airTemperature;
//数据中标号87的正负号代表温度正负值
if (line.charAt(87) == '+')
{
airTemperature = Integer.parseInt(line.substring(88, 92));		//提取标号88-91的数字即为气温
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));		//若为负,则连负号一起提取出来
}
String quality = line.substring(92, 93);	//气温值后紧跟以为质量值
if (airTemperature != MISSING && quality.matches("[01459]")) 		//正则匹配,若质量值为0、1、4、5、9中任何一值
{
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
给出一条数据样本以供代码分析:

0043010450999991973010106004+69833+021900FM-12+000599999V0202901N00921009001CN0150001N9+00501+00401999999ADDAY121999GF108991081081004501999999MW1851

reducer类

同样新建一个reducer类:



导入必须的库:



导入方法的模版:



先看一下mapreduce的处理过程图。在map与reduce中间有一个shuffle过程,它将具有相同key的value值全部聚合起来:



我们的需求是输出全年的最高气温,所以reducer的任务就是在一个(key,value1,value2,...)中找到最大的value并输出。其代码如下:

package hadoopDemo_class;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
* Reducer类
*/
public class MaxTempReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
@Override
protected void reduce(Text keyin, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException
{
//最大气温的初始值设定为最小的
int maxValue = Integer.MIN_VALUE;
//对于输入的每一个气温值,将其与maxValue对比,保存最大的
for (IntWritable value : values)
{
maxValue = Math.max(maxValue, value.get());
}
//输出(年份,最大温度值)
context.write(keyin, new IntWritable(maxValue));
}
}

入口函数

新建普通类:



代码如下:

package hadoopDemo_class;  
  
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  
/** 
 * APP类 
 */  
public class MaxTempProgram   
{  
    public static void main(String[] args) throws Exception   
    {  
    	//作业进行之前删掉存在的输出目录与临时文件目录  
        Configuration conf=new Configuration();
        FileSystem fs=FileSystem.get(conf);  
        Path outputDir=new Path("/user/hadoopDemo/output");  
        if(fs.exists(outputDir))  
            fs.delete(outputDir, true);     //递归删除  
        Path tmpputDir=new Path("/tmp");  
        if(fs.exists(tmpputDir))  
            fs.delete(tmpputDir, true);     //递归删除
   
    	Job job = Job.getInstance();  
  
        job.setJarByClass(MaxTempProgram.class);  
        job.setJobName("Max temperature");  
        //设置IO路径  
        FileInputFormat.addInputPath(job, new Path(args[0])); // 输入路径可以添加多个,可以是文件或文件夹  
        FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径只有一个,会自动创建文件夹,若文件夹已存在则会报错  
        //设置mapper与reducer类  
        job.setMapperClass(MaxTempMapper.class);  
        job.setReducerClass(MaxTempReducer.class);  
        //设置输出键值对的格式  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(IntWritable.class);  
          
        job.waitForCompletion(true);  
        System.out.println(job.waitForCompletion(true));  
    }  
} 


编写好代码之后,把项目导出成jar包(注意设置主类的名称):







在windows上独立运行:

首先将windows上的hadoop配置为独立模式,在hadoop\etc目录下找到core-site.xml,hdfs-site.xml,mapred-site.xml与yarn-site.xml,将其中写入的配置内容删除。

将需要分析的数据也移动到此目录下,打开命令行窗口,进入到jar包所在目录,设置临时环境变量:

set HADOOP_CLASSPATH=hadoopDemo.jar
然后运行:

hadoop  hadoopDemo_class.MaxTempProgram file:///F:\Projects\Eclipse\hadoopDemo\010470-99999-1973.gz file:///F:\Projects\Eclipse\hadoopDemo\output
有几点需要注意:1.在指定IO路径时,”file:///“表示本地文件系统;2.输出路径的output文件夹不能存在,需要由mapreduce自己来创建。

运行成功后会在output文件夹生成一系列文件,其中名为_SUCCESS的标识文件代表成功,part-r-00000则为输出文件。程序处理的结果为1973年最高温度为230。



在Ubuntu上的集群运行:

将之前配置好的四台虚拟机启动(第四台未配置辅助节点),将jar包复制到第一台主机中,并启动hadoop:

hdfs namenode -format;start-all.sh
此时可用命令查看HDFS中是没有任何文件的:

hadoop fs -ls /
在HDFS中创建相关目录:

hadoop fs -mkdir -p /user/hadoopDemo/temperature
将温度数据上传到HDFS(数据是以冗余、均匀分布的方式保存在datanode中的):

hadoop fs -put ~/Downloads/temperature/*.gz /user/hadoopDemo/temperature
执行jar包,格式同windows下的类似(这里上传了一组gz数据):

hadoop jar hadoopDemo.jar /user/hadoopDemo/temperature/*.gz /user/hadoopDemo/output

此句命令执行时遇到Could not find or load main class org.apache.hadoop.mapreduce.v2.app.MRAppMaster错误。

往mapred-site.xml添加路径即可解决此问题:

<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.application.classpath</name>
<value>
/usr/software/hadoop-3.0.0-alpha4/share/hadoop/mapreduce/*
</value>
</property>
</configuration>
参考博客地址:http://blog.sina.com.cn/s/blog_77952e290102wx3a.html,在这里表示感谢。

然而这里又出现一个新问题:container is running beyond virtual memory limits,原因在于yarn框架设置的默认虚拟内存小于处理数据需要的内存,解决办法是在yarn-site.xml中增加一项虚拟内存即可解决此问题:



恩,电脑没装固态,这条命令执行时花了好久。卡到生活不能自理,机械硬盘的亲们执行前慎重考虑下,不然ctrl+C就没意义了。



成功之后,与windows类似,可在HDFS中查看输出结果:



感觉这速度太慢了,有点不正常,66个数据包,大小总计才5M不到,竟然跑了半个多小时!难道是该换硬盘了?
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: