spark系列一:java和scala及spark-shell开发wordcount程序
2020-01-13 13:55
246 查看
spark学习记录,希望能坚持下去
环境:centos6.5+spark1.3+hadoop2.4.1
1、代码开发:
package cn.spark.study.core; import java.util.Arrays; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2;
public class WorldCountLocal {
public static void main(String[] args){
SparkConf conf = new SparkConf()
.setAppName("WorldCountLocal")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("C://Users//hlz//Desktop/test.txt");
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>(){
private static final long serialVersionUID = 1L; @Override
public Iterable<String> call(String line) throws Exception {
// TODO Auto-generated method stub
return Arrays.asList(line.split(" "));
}
});
JavaPairRDD<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>(){ private static final long serialVersionUID = 1L; @Override
public Tuple2<String, Integer> call(String word) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<String, Integer>(word,1);
}
});
JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>(){
private static final long serialVersionUID = 1L; @Override
public Integer call(Integer v1, Integer v2) throws Exception {
// TODO Auto-generated method stub
return v1 + v2;
}
});
wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>(){
private static final long serialVersionUID = 1L; @Override
public void call(Tuple2<String, Integer> wordCount) throws Exception {
System.out.println(wordCount._1 + " append " + wordCount._2 + " times. ");
}
});
sc.close();
} }
scala版本:
package com.spark.study.core import org.apache.spark.SparkConf
import org.apache.spark.SparkContext object WordCount {
def main(args: Array[String]){
val conf = new SparkConf()
.setAppName("WordCount");
val sc = new SparkContext(conf)
val lines = sc.textFile("hdfs://master:9000/test.txt",1)
val words = lines.flatMap { line => line.split(" ")}
val pairs = words.map { word => (word, 1) }
val wordCounts = pairs.reduceByKey { _ + _ }
wordCounts.foreach(wordCount => println(wordCount._1 + " append " + wordCount._2 + " times. "))
}
} spark-shell版本:
这里就不贴出了,和scala版本区别就在于spark-shell已经帮我们初始化好了sc即sparkcontext对象,直接使用
2、关于代码执行份本地测试和集群测试
本地的话可以在代码中通过setMaster("local")方法设置(java版红色部分)或者在shell脚本中如下:
/usr/local/src/spark/bin/spark-submit \
--class com.spark.study.core.WordCount \
--num-executors 3 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 3 \
/usr/local/src/spark-scala/WordCount.jar \
提交集群运行还需要添加一个--master spark://192.168.220.51:7077 指定要连接的master节点,shell如下:
/usr/local/src/spark/bin/spark-submit \
--class com.spark.study.core.WordCount \
--master spark://192.168.220.51:7077 \
--num-executors 3 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 3 \
/usr/local/src/spark-scala/WordCount.jar \
Jauar
25th April 2018
环境:centos6.5+spark1.3+hadoop2.4.1
1、代码开发:
package cn.spark.study.core; import java.util.Arrays; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2;
public class WorldCountLocal {
public static void main(String[] args){
SparkConf conf = new SparkConf()
.setAppName("WorldCountLocal")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("C://Users//hlz//Desktop/test.txt");
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>(){
private static final long serialVersionUID = 1L; @Override
public Iterable<String> call(String line) throws Exception {
// TODO Auto-generated method stub
return Arrays.asList(line.split(" "));
}
});
JavaPairRDD<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>(){ private static final long serialVersionUID = 1L; @Override
public Tuple2<String, Integer> call(String word) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<String, Integer>(word,1);
}
});
JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>(){
private static final long serialVersionUID = 1L; @Override
public Integer call(Integer v1, Integer v2) throws Exception {
// TODO Auto-generated method stub
return v1 + v2;
}
});
wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>(){
private static final long serialVersionUID = 1L; @Override
public void call(Tuple2<String, Integer> wordCount) throws Exception {
System.out.println(wordCount._1 + " append " + wordCount._2 + " times. ");
}
});
sc.close();
} }
scala版本:
package com.spark.study.core import org.apache.spark.SparkConf
import org.apache.spark.SparkContext object WordCount {
def main(args: Array[String]){
val conf = new SparkConf()
.setAppName("WordCount");
val sc = new SparkContext(conf)
val lines = sc.textFile("hdfs://master:9000/test.txt",1)
val words = lines.flatMap { line => line.split(" ")}
val pairs = words.map { word => (word, 1) }
val wordCounts = pairs.reduceByKey { _ + _ }
wordCounts.foreach(wordCount => println(wordCount._1 + " append " + wordCount._2 + " times. "))
}
} spark-shell版本:
这里就不贴出了,和scala版本区别就在于spark-shell已经帮我们初始化好了sc即sparkcontext对象,直接使用
2、关于代码执行份本地测试和集群测试
本地的话可以在代码中通过setMaster("local")方法设置(java版红色部分)或者在shell脚本中如下:
/usr/local/src/spark/bin/spark-submit \
--class com.spark.study.core.WordCount \
--num-executors 3 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 3 \
/usr/local/src/spark-scala/WordCount.jar \
提交集群运行还需要添加一个--master spark://192.168.220.51:7077 指定要连接的master节点,shell如下:
/usr/local/src/spark/bin/spark-submit \
--class com.spark.study.core.WordCount \
--master spark://192.168.220.51:7077 \
--num-executors 3 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 3 \
/usr/local/src/spark-scala/WordCount.jar \
Jauar
25th April 2018
来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/30541278/viewspace-2153358/,如需转载,请注明出处,否则将追究法律责任。
转载于:http://blog.itpub.net/30541278/viewspace-2153358/
- 点赞
- 收藏
- 分享
- 文章举报
相关文章推荐
- 分别用Java、Scala、spark-shell开发wordcount程序及测试代码
- 将java开发的wordcount程序提交到spark集群上运行
- 将java开发的wordcount程序部署到spark集群上运行
- Spark 实现 WordCount 三种方式 spark-shell、Scala、JAVA
- 将java开发的wordcount程序部署到spark集群上运行
- Spark Streaming开发入门——WordCount(Java&Scala)
- Spark系列 —— 单词统计WordCount (Scala/Python/Java)
- JDK8+Scala2.11+spark-2.0.0+Intellij2017.3.4开发wordcount程序并在集群中运行
- Spark——Java和Scala混编Maven开发:WordCount
- spark-shell开发wordcount程序
- 安装spark并编写scala,java程序实现wordcount
- Spark实战----(1)使用Scala开发本地测试的Spark WordCount程序
- java编写WordCound的Spark程序,Scala编写wordCound程序
- Spark开发-WordCount详细讲解Java版本
- 2 大数据实战系列-spark shell wordcount
- Spark:用Scala和Java实现WordCount
- python、scala、java分别实现在spark上实现WordCount
- Spark 程序 WordCount实现 Scala、Python
- idea搭建 spark mavne scala开发环境 wordcount例子
- SPARK-Shell 用Scala执行WordCount