您的位置:首页 > 编程语言 > Java开发

用maven管理spark应用程序,提交到spark on yarn 集群上运行

2016-03-10 23:46 423 查看
maven管理项目的优势是:不用到处的拷贝jar包,只要将项目依赖的其他项目在pom.xml中以添加依赖的方式加进去进行了。所以我用maven管理spark应用,打包后提交给spark集群。本篇文章介绍本地调试运行spark应用,再打包,提交到spark集群运行,及最后结果的展示。

我采用的例子是WordCount(对hdfs中某个文件的单词个数进行统计)

(1)在eclipse中新建maven项目,将spark依赖加入到pom.xml中

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
</dependency>


(2)编写spark应用程序

package com.test.three_maven;

import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

public final class WordCount {
private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) throws Exception {

if (args.length < 1) {
System.err.println("Usage: JavaWordCount <file>");
System.exit(1);
}

SparkConf sparkConf = new SparkConf().setAppName("WordCount").setMaster("local");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = ctx.textFile(args[0], 1);

JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

public Iterable<String> call(String s) {
return Arrays.asList(SPACE.split(s));
}
});

JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {

public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});

JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {

public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});

//counts.saveAsTextFile(args[1]);

List<Tuple2<String, Integer>> output = counts.collect();

for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());

}
ctx.stop();
}
}


(3)本地运行,注意:在程序中加入setMaster("local"),本程序需要输入参数(需要统计单词的文件,我从hdfs中读文件)

          在run  configuration中设置arguments为hdfs://hdfs服务ip:端口号/文件路径     我的参数设置为 hdfs://192.168.119.174:8020/ascv/a.ascv



运行结果输出到控制台中:



(4)将setMaster("local")去掉后,用maven打包成jar。用maven的run as --maven  install命令,就会在项目的target文件夹下生成jar包。



(5)将jar包上传到spark集群的客户端机器,在spark的bin目录下执行以下的命令

spark-submit    --class   spark应用的main函数所在类的全类名    --master    yarn-cluster       jar包路径及jar包      参数

例如我的jar包放在了spark中,在客户端使用spark用户提交程序的,当然spark-submit的参数不止这些,我只是简单地做个示范而已。

spark-submit    --class   com.test.first_maven.WordCount       --master    yarn-cluster     /home/spark/WordCount.jar         hdfs://192.168.119.174:8020/ascv/a.ascv   



运行成功后,可以在控制台的输出日志中找到查看集群运行情况的url,想要看输出的结果,查看yarn的日志即可。



总结:到此为止,一个简单的用java开发的spark应用就完成了,这几天配置和部署环境给我最大的启发是:多看官方文档,即便它是英文的。希望我的这篇文章能帮助到大家。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息