您的位置:首页 > 其它

Spark 分层抽样与求最大值最小值

2017-08-21 16:05 405 查看
详见注释


package com.beagledata.spark

import org.apache.spark.{SparkConf, SparkContext}

/**
* 分层抽样
*
* Created by drguo on 2017/8/21.
* blog.csdn.net/dr_guo
*/

object PCSStratifiedSampling {
val conf = new SparkConf().setAppName("pcs_sampling")
.set("spark.jars.packages", "io.netty:netty-common:4.1.8.Final")
.set("spark.jars.exclude", "io.netty:netty-common")
//.setMaster("local")

val sc = new SparkContext(conf)

def main(args: Array[String]): Unit = {

//val pcs = sc.textFile("src/main/resources/part-00000")
//val pcs = sc.textFile("hdfs://xxxx:8020/data1/Data/NewPCSData")
val pcs = sc.textFile(args(0))

//设定抽样格式 double类型变量为抽取每层的比例,所有类别都要写上,不抽取的类别比例设为0
val fractions: Map[Int, Double] = (List((-1, 1.0), (5, 1.0), (6, 1.0), (13, 1.0), (0, 0.066046),
(3, 0.382914), (7, 0.357202), (8, 0.043421), (11, 0.69958), (1, 0.0), (2, 0.0), (4, 0.0), (-2, 0.0), (14, 0.0))).toMap

val pcsKV = pcs.map(_.split(";")).map{x =>
val key = x(0).toInt
val value = x(1)+";"+x(2)+";"+x(3)+";"+x(4)
(key, value)
}

val pcsSample = pcsKV.sampleByKey(withReplacement = false, fractions, 0)//0为种子,种子数相同保证每次抽样相同
.map(x => x._1 + ";" + x._2)
pcsSample.take(5).foreach(println)

//pcsSample.saveAsTextFile("/data1/Data/PCSSample")
pcsSample.saveAsTextFile(args(1))

}
}


参考链接: http://www.cnblogs.com/skyEva/p/5554130.html http://blog.csdn.net/xubo245/article/details/51485443 https://github.com/endymecy/spark-ml-source-analysis/blob/master/%E5%9F%BA%E6%9C%AC%E7%BB%9F%E8%AE%A1/tratified-sampling.md


package com.beagledata.spark

import org.apache.spark.{SparkConf, SparkContext}

/**
* pcs数据统计:按类别,x最大值,x最小值,y最大值,y最小值,x个数,y个数统计
*
* Created by drguo on 2017/8/21.
* blog.csdn.net/dr_guo
*/

object PCSDataCount {
val conf = new SparkConf().setAppName("pcscount")
.set("spark.jars.packages", "io.netty:netty-common:4.1.8.Final")
.set("spark.jars.exclude", "io.netty:netty-common")
//.setMaster("local")

val sc = new SparkContext(conf)

def main(args: Array[String]): Unit = {

//val pcs = sc.textFile("src/main/resources/part-00000")
//val pcs = sc.textFile("hdfs://xxxx:8020/data1/Data/NewPCSData")
val pcs = sc.textFile(args(0))

//按类别,x最大值,x最小值,y最大值,y最小值,x个数,y个数统计
val labelxy = pcs.map(_.split(";"))
.map(x => "label:"+x(0)+"\tx_max:"+x(1).split(",").max+"\tx_min:"+x(1).split(",").min+"\ty_max:"+x(2).split(",").max+"\ty_min:"+x(2).split(",").min+"\tx_length:"+x(1).split(",").length+"\ty_length:"+x(2).split(",").length)//.take(10).foreach(println)

labelxy.saveAsTextFile("/data1/Data/PCSDataCount")

//类别统计:统计每类有多少元素(类似wordcount)
pcs.map(_.split(";")).map(x => x(0)).map((_, 1)).reduceByKey(_+_).collect().foreach(println)

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark