您的位置:首页 > 其它

Spark local/standalone/yarn/远程调试-运行WordCount

2017-06-11 19:36 531 查看

local

直接启动spark-shell

./spark-shell --master local[*]


编写scala代码

sc.textFile("/input/file01.txt")
res0.cache()
res0.count
val wd=res0.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
wd.collect.foreach(println)


在输出日志中查看WordCount结果

在spark监控页面查看看,绿色的圆圈代表缓存在内存中







standalone

在spark-env.sh中添加

export SPARK_MASTER_IP  master
export SPARK_MASTER_PORT 7077
export SPARK_WORKER_CORES=2
export SPARK_WORK_INSTANCES=1
export SPARK_WORKER_MEMORY=3g
export JAVA_HOME=/usr/local/java/jdk1.7.0_75


在slaves中添加

master
slave1
slave2


启动spark集群,在sbin目录下

./start-all.sh


查看7077端口已经启动



然后启动spark-shell编写之前的代码查看处理过程,可以多写几句看看效果,例如





yarn

使用yarn-client模式提交jar包到yarn

./spark-submit --master yarn-client /usr/local/spark/examples/spark1-1.0-SNAPSHOT.jar


yarn监控页面查看



使用debug监听端口的方式调试程序

当数据量很大时,用这种方式在集群中运行程序,在本地设置断点进行debug

run.sh

/usr/local/spark/bin/spark-submit \
--master yarn-client \
--driver-cores 8 \
--driver-memory 1G \
--num-executors 2 \
--executor-memory 1G \
--executor-cores 4 \
--driver-java-options '-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=9887' \
/usr/local/spark/examples/spark1-1.0-SNAPSHOT.jar


运行run.sh



在idea中添加remote设置debug主机名和端口号(run.sh中配置的,如上9887),启动debug





spark上wordcount的java实现

SparkUtil工具类提供获取JavaSparkContext与外部存储方式读取RDD,文件读取

public class SparkUtil {
/**
* 获取JavaSparkContext
*/
public static JavaSparkContext getJavaSparkContext(String appName, String logLeverl){
SparkConf conf=new SparkConf().setAppName(appName);
//        conf.setMaster("local[*]");
SparkContext sc=new SparkContext(conf);//这步是否可以省略
JavaSparkContext jsc =new JavaSparkContext(sc);
return jsc;
}
/**
* 外部存储方式读取RDD,文件读取
*/
public static JavaRDD getRddExternal(JavaSparkContext jsc ,String filePath){
if (null==jsc)
return null;
return jsc.textFile(filePath);
}
}


创建JavaSparkContext,对Rdd进行操作实现word count

public class WordCount {

private static org.slf4j.Logger logger = LoggerFactory.getLogger(WordCount.class);

public static void main(String[] args){
JavaSparkContext jsc =SparkUtil.getJavaSparkContext("WordCount","WARN");
JavaRDD<String> wordData=SparkUtil.getRddExternal(jsc,"/input/file01.txt");
wordCount(wordData);
}

public static void wordCount(JavaRDD wordData){
JavaRDD<String> wordRdd=wordData.flatMap(new FlatMapFunction<String,String>() {
public Iterable call(String s) throws Exception {
return Arrays.asList(s.split(" "));
}
});

JavaPairRDD<String,Integer> wordMapToPair=wordRdd.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s,1);
}
});

JavaPairRDD<String ,Integer> wordReduceByKey=wordMapToPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer integer1, Integer integer2) throws Exception {
return integer1.intValue()+integer1.intValue();
}
});

wordReduceByKey.sortByKey().foreach(new VoidFunction<Tuple2<String, Integer>>() {
public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
System.out.println(stringIntegerTuple2._1+"="+stringIntegerTuple2._2);
logger.info(stringIntegerTuple2._1+"="+stringIntegerTuple2._2);
}
});
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark spark入门