将java开发的wordcount程序部署到spark集群上运行
2017-07-02 17:58
435 查看
package cn.spark.study.core; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; /** * 将java开发的wordcount程序部署到spark集群上运行 * @author Administrator * */ public class WordCountCluster { public static void main(String[] args) { // 如果要在spark集群上运行,需要修改的,只有两个地方 // 第一,将SparkConf的setMaster()方法给删掉,默认它自己会去连接 // 第二,我们针对的不是本地文件了,修改为hadoop hdfs上的真正的存储大数据的文件 // 实际执行步骤: // 1、将spark.txt文件上传到hdfs上去 // 2、使用我们最早在pom.xml里配置的maven插件,对spark工程进行打包 // 3、将打包后的spark工程jar包,上传到机器上执行 // 4、编写spark-submit脚本 // 5、执行spark-submit脚本,提交spark应用到集群执行 SparkConf conf = new SparkConf() .setAppName("WordCountCluster"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile("hdfs://hadoop:9000/test/spark.txt"); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairRDD<String, Integer> wordsCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordsCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Integer> tuple) throws Exception { System.out.println(tuple._1+":"+tuple._2); } }); sc.close(); } }
spark-submit脚本
/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 3 \
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
集群环境下要加 --master spark://192.168.1.107:7077
相关文章推荐
- 将java开发的wordcount程序部署到spark集群上运行
- 将java开发的wordcount程序提交到spark集群上运行
- JDK8+Scala2.11+spark-2.0.0+Intellij2017.3.4开发wordcount程序并在集群中运行
- spark集群搭建与集群上运行wordcount程序
- Spark wordcount开发并提交到集群运行
- hadoop学习之HDFS(2.1):linux下eclipse中配置hadoop-mapreduce开发环境并运行WordCount.java程序
- spark集群上运行helloworld程序--WordCount
- 分别用Java、Scala、spark-shell开发wordcount程序及测试代码
- Spark教程-构建Spark集群-配置Hadoop单机模式并运行Wordcount(2)
- 【Spark亚太研究院系列丛书】Spark实战高手之路-第一章 构建Spark集群-配置Hadoop伪分布模式并运行Wordcount示例(1)
- 使用Eclipse基于Maven使用Java开发WordCount程序项目
- 第8课:彻底实战详解使用IDE开发Spark程序--集群模式运行
- 联想ThinkPad S3-S440虚拟机安装,ubuntu安装,Hadoop(2.7.1)详解及WordCount运行,spark集群搭建
- Local模式下开发第一个Spark程序并运行于集群环境
- 在Spark上运行WordCount程序
- Spark Streaming开发入门——WordCount(Java&Scala)
- ubuntu系统下eclipse配置hadoop开发环境并运行wordcount程序
- 【Spark亚太研究院系列丛书】Spark实战高手之路-第一章 构建Spark集群-配置Hadoop单机模式并运行Wordcount(1)
- 【Spark亚太研究院系列丛书】Spark实战高手之路-第一章 构建Spark集群-配置Hadoop-伪分布模式并运行Wordcount(2)
- win7(64位)平台下Cygwin+Eclipse搭建Hadoop单机开发环境 (四) 导入Hadoop源码+wordcount程序+运行