您的位置:首页 > 产品设计 > UI/UE

Spark programming guide

2014-07-29 21:42 260 查看
基于javaAPI开发spark程序

1,配置maven依赖

groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.0.1
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>


2,创建一个JavaSparkContext

SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("10.200.200.56");
JavaSparkContext sc = new JavaSparkContext(conf);


3,创建一个parallelized collection

List<Integer> list = Arrays.asList(1,2,3,4,5);
JavaRDD<Integer> disData = sc.parallelize(list);


4,加载文件
JavaRDD<String> file = sc.textFile("hdfs://...");
JavaPairRDD<String,Integer> seqfile = sc.sequenceFile("", String.class,Integer.class);
<span style="font-family: Arial, Helvetica, sans-serif;">JavaPairRDD<String,Integer> hadoopFile = sc.hadoopRDD(new JobConf(),TextInputFormat.class,String.class,Integer.class);</span>


新hadoopAPI
JavaPairRDD<String,Integer> newHadoopRDD = sc.newAPIHadoopRDD(new Configuration(), TextInputFormat.class,String.class,Integer.class);


JavaRDD.saveAsObjectFile and JavaSparkContext.objectFile support saving an RDD in a simple format consisting of serialized Java objects. While this is not as efficient as specialized formats like Avro, it offers an easy way to save any RDD.

5,spark支持RDD操作

transformations

All
transformations in Spark are lazy

所有的transformation都是采用的懒策略,就是如果只是将transformation提交是不会执行计算的,计算只有在action被提交的时候才被触发。

wordCount代码片段

//返回一个指向的内容被“扁平化”的迭代器,flatMap用于把map返回的序列展平,还能自动把结果中的空值过滤掉
//java不支持隐式转换
JavaRDD<String> words = file.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) throws Exception {
return Arrays.asList(s.split(" "));
}
});
//把一个函数作用在每个从RDD读入的元素上,每次函数输出是一个新元素
JavaPairRDD<String,Integer> paris = words.map(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String,Integer>(s,1);
}
});
//作用于一组RDD,把它们作为键值对来生成一个新RDD,
JavaPairRDD<String,Integer> counts = paris.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer a, Integer b) throws Exception {
return a+b;
}
});
counts.groupByKey().mapValues(new Function<List<Integer>, Integer>() {
@Override
public Integer call(List<Integer> integers) throws Exception {
Integer sum = 0 ;
for(int i=0;i<integers.size();i++){
sum += integers.get(i);
}
return sum;
}
});
counts.saveAsTextFile("hdfs://...");


6,保持进内存

lineLengths.persist();
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: