您的位置:首页 > 运维架构 > Linux

MapReduce程序的3种集群提交运行模式详解---基于Windows与Linux两种开发环境

2016-07-05 20:57 1091 查看
继上一篇博客—-Hadoop本地运行模式深入理解,本篇文章将详细介绍在基于Windows与Linux两种开发环境下,MapReduce程序的3种集群运行方式。在通篇文章中,仍然以经典的WordCount程序为例进行说明,以提高文章的易读性,下面进入文章的正题。

(1)MapReduce程序的集群运行模式1—将工程打成jar包,上传到服务器,然后用hadoop命令hadoop jar xxx.jar 将jar包分发到集群中运行。

①Linux环境下WordCount程序实例:



wordcount代码:

package MapReduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class WordCount
{
public static String path1 = "hdfs://hadoop20:9000/word.txt";//读取HDFS中的数据
public static String path2 = "hdfs://hadoop20:9000/dir1/";//数据输出到HDFS中
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(conf);

if(fileSystem.exists(new Path(path2)))
{
fileSystem.delete(new Path(path2), true);
}

Job job = Job.getInstance(conf,"wordcount");

job.setJarByClass(WordCount.class);

FileInputFormat.setInputPaths(job, new Path(path1));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

job.setNumReduceTasks(1);
job.setPartitionerClass(HashPartitioner.class);

job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(path2));
job.waitForCompletion(true);
}
public  static  class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>
{
protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException
{
String[] splited = v1.toString().split("\t");
for (String string : splited)
{
context.write(new Text(string),new LongWritable(1L));
}
}
}
public  static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>
{
protected void reduce(Text k2, Iterable<LongWritable> v2s,Context context)throws IOException, InterruptedException
{
long sum = 0L;
for (LongWritable v2 : v2s)
{
sum += v2.get();
}
context.write(k2,new LongWritable(sum));
}
}
}


打成jar包后在集群中运行:

[root@hadoop20 local]# hadoop jar wordcount.jar


集群运行模式的标志日志:(摘取部分)

16/07/05 04:56:38 INFO client.RMProxy: Connecting to ResourceManager at hadoop20/192.168.80.100:8032(集群是提交给Yarn平台运行,本地模式中是提交给本地单个JVM运行)
……
16/07/05 04:56:40 INFO impl.YarnClientImpl: Submitted application application_1467703658206_0017(本地运行模式的话是提交给本地LocalJobRunner运行,但是集群运行模式是提交给Yarn平台运行)
……
16/07/05 04:56:40 INFO mapreduce.Job: Running job: job_1467703658206_0017(本地运行模式的话jobId的前面含有local字样,集群模式是没有local字样的)


查看运行结果:

[root@hadoop20 local]# hadoop fs -cat /dir/part-r-00000
hello   2
me      1
you     1


②Windows环境下WordCount程序实例(同上,步骤是相同的–打完jar包之后通过hadoop jar xxx.jar运行即可)

(2)MapReduce程序的集群运行模式2—在linux的eclipse中直接运行main方法,进而将程序提交到集群中去运行,但是必须采取以下措施:

①在工程src目录下加入 mapred-site.xml 和 yarn-site.xml 这两个配置文件或者在代码中加入:

Configuration conf = new Configuration();
conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.hostname", "hadoop20");


② 同时将工程打成jar包(xxx.jar),并在main方法中添加一个conf的配置参数 conf.set(“mapreduce.job.jar”,”路径/xxx.jar”);

还是以WordCount程序为例:

先导出个jar包:





查看jar包:

[root@hadoop20 local]# ll
total 415748
drwxrwxr-x.  8 root  root       4096 Jul  5 01:22 eclipse
-rw-r--r--.  1 root  root  262311443 Jul  3 06:26 eclipse-committers-neon-R-linux-gtk.tar.gz
drwxr-xr-x. 11 67974 users      4096 Jul  1 01:47 hadoop
-rw-r--r--.  1 root  root       3620 Jul  2 05:55 hadoop-env.cmd
drwxr-xr-x.  8 uucp    143      4096 Jun 16  2014 jdk
-rw-r--r--.  1 root  root  159958812 Jul  3 19:17 jdk-8u11-linux-i586.tar.gz
-rw-r--r--.  1 root  root    3400818 Apr  7 05:28 Skin_NonSkin.txt
-rw-r--r--.  1 root  root       6573 Jul  5 01:33 wc.jar
-rw-r--r--.  1 root  root       5230 Jul  5 05:31 wordcount2.jar  //这是导出的jar包wordcount2.jar
-rw-r--r--.  1 root  root       5106 Jul  5 04:56 wordcount.jar
-rw-r--r--.  1 root  root         19 Jun 24 00:15 word.txt
[root@hadoop20 local]# pwd
/usr/local


接下来在linux的eclipse代码中添加配置文件:

package MapReduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class WordCount
{
public static String path1 = "hdfs://hadoop20:9000/word.txt";//读取HDFS中的数据
public static String path2 = "hdfs://hadoop20:9000/dir2/";//数据输出到HDFS中
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
conf.set("mapreduce.job.jar","/usr/local/wordcount2.jar");  //change
conf.set("fs.defaultFS", "hdfs://hadoop20:9000/");          //change
conf.set("mapreduce.framework.name", "yarn");               //change
conf.set("yarn.resourcemanager.hostname", "hadoop20");      //change
FileSystem fileSystem = FileSystem.get(conf);

if(fileSystem.exists(new Path(path2)))
{
fileSystem.delete(new Path(path2), true);
}

Job job = Job.getInstance(conf,"wordcount");

job.setJarByClass(WordCount.class);

FileInputFormat.setInputPaths(job, new Path(path1));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

job.setNumReduceTasks(1);
job.setPartitionerClass(HashPartitioner.class);

job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(path2));
job.waitForCompletion(true);
}
public  static  class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>
{
protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException
{
String[] splited = v1.toString().split("\t");
for (String string : splited)
{
context.write(new Text(string),new LongWritable(1L));
}
}
}
public  static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>
{
protected void reduce(Text k2, Iterable<LongWritable> v2s,Context context)throws IOException, InterruptedException
{
long sum = 0L;
for (LongWritable v2 : v2s)
{
sum += v2.get();
}
context.write(k2,new LongWritable(sum));
}
}
}


eclipse控制台下集群运行标志日志:(截取部分)

2016-07-05 05:36:52,213 INFO  [main] client.RMProxy (RMProxy.java:createRMProxy(92)) - Connecting to ResourceManager at hadoop20/192.168.80.100:8032    //集群运行标志
2016-07-05 05:36:52,703 WARN  [main] mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2016-07-05 05:36:53,041 INFO  [main] input.FileInputFormat (FileInputFormat.java:listStatus(280)) - Total input paths to process : 1
2016-07-05 05:36:53,130 INFO  [main] mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(396)) - number of splits:1
2016-07-05 05:36:53,291 INFO  [main] mapreduce.JobSubmitter (JobSubmitter.java:printTokens(479)) - Submitting tokens for job: job_1467703658206_0019
2016-07-05 05:36:53,522 INFO  [main] impl.YarnClientImpl (YarnClientImpl.java:submitApplication(204)) - Submitted application application_1467703658206_0019   //集群运行标志
2016-07-05 05:36:53,572 INFO  [main] mapreduce.Job (Job.java:submit(1289)) - The url to track the job: http://hadoop20:8088/proxy/application_1467703658206_0019/ 2016-07-05 05:36:53,572 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1334)) - Running job: job_1467703658206_0019  //集群运行标志,不含local字样
2016-07-05 05:37:01,033 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1355)) - Job job_1467703658206_0019 running in uber mode : false
2016-07-05 05:37:01,034 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1362)) -  map 0% reduce 0%
2016-07-05 05:37:08,485 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1362)) -  map 100% reduce 0%
2016-07-05 05:37:20,637 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1362)) -  map 100% reduce 100%
2016-07-05 05:37:21,649 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1373)) - Job job_1467703658206_0019 completed successfully
2016-07-05 05:37:21,801 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1380)) - Counters: 49


接下来我们通过shell命令查看运行结果:

[root@hadoop20 local]# hadoop fs -cat /dir2/part-r-00000
hello   2
me      1
you     1


(3)MapReduce程序的集群运行模式3—在windows的eclipse中直接运行main方法,进而将程序提交给集群运行,但是因为平台不兼容,需要做很多的设置修改:

①要在windows中存放一份hadoop的安装包(解压好的)

②要将其中的lib和bin目录替换成根据你的windows版本重新编译出的文件(lib和bin都是和本地平台相关的东西)

③再要配置系统环境变量 HADOOP_HOME 和 PATH

④修改YarnRunner这个类的源码

从上面的介绍可以看出:在windows的eclipse里面跑集群模式非常不方便,没有任何实际意义—不推荐!!!!

那么,在现实的开发中,我们具体应该怎么做呢?

第一种方式:首先在windows中通过本地模式对编写出的程序进行调试,确定程序无误后,将工程打成jar包上传到服务器,然后用hadoop命令—hadoop jar xxx.jar 将jar包分发到集群中运行。

当然如果仅仅是简单的逻辑测试,用本地模式就可以了,不必到集群中运行。

第二种方式:因为linux对hadoop具有很好的平台兼容性,所以我们可以直接在linux的eclipse中进行集群模式的运行。

对于上面的介绍,如有问题,欢迎留言指正。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: