您的位置:首页 > 大数据

[大数据]连载No6之Spark(RDD特性,算子)+第一行代码Hello WordCount

2018-02-13 11:09 495 查看
本次总结图如下



什么是spark?
1:快速的通用引擎,处理大规模数据
2:开源分布式计算框架,使数据分析更加快速
3:不但分析程序快速,写程序也快速

Spark RDD(Resilient distributed Dataset) 弹性的分布式数据集
5大特性

1)有一系类分区集合组成 (a list of partitions)
个人理解:计算的数据到达T级别以上时,hadoop会将这些数据切割成许多的block块,对于要计算的这个数据集Add,包含多个block块,对于spark,就是一系列的分区集合,

2) 提供一系类算子,用作在分区上面计算 (a function for computing each partition)
个人理解:数据集ADD提供很多API(算子),如map,collect,count..., 通过这些api(算子),可以对我们的大规模数计算,大数据存储和计算思想都是像数据移动的,其实这些api底层的计算还是落地到了每个分区(partiton)上面,在统一合并计算结果

3) 依赖很多其他的RDD
个人理解:在大数据处理过程中,对于处理的数据,要经过不断的数据过滤,清洗,计算,聚合,分组,等不同操作,每次处理的结果都是一个ADD,那么,下次add的操作,都需要依赖上一次计算结果ADD,彼此是互相依赖的,对于相互依赖,有一个好处就是容错,当rdd某一个partition的计算出现故障时,它可以基于父rdd重新计算

4)一个分区器是作用在key-value格式的RDD上面(shuffle的时候) (a Partitioner for key-value RDDs)
key-value格式的RDD:Rdd中存储的元素是二元组类型

5)一系列最佳计算位置来计算
个人理解:在分发task对rdd进行计算时,可以找到rdd的分区所以,分配对应的task选择最近节点,避免数据网络传输

见图spark-301



Driver作用
1:负责分发task(任务)到Worker节点(对于没有数据的节点,是不会分发task)
2:负责收集worker节点计算的结果

算子种类
1:Transformations(延迟算子)
例如:map,filter,join,union,sort,sample(随机抽样)...
特点:1:不会立即执行,出发条件是碰到actions算子
        2:返回rdd
2:Actions
例如:count,collect,lookup
特点:会立即执行

map与mapPartition
map:遍历rdd中每一个元素,单位是每条记录
mapPartition:遍历单位是一个partition,遍历之前会将一个partition的数据加载到内存,比map效率高,如果partiton数据大,可能会造成OOM,
   解决办法,使用repartiton增加分区数,使用场景:一般将一个RDD的j计算结果写入到数据库,可以避免过多的数据库连接

repartition和coalesce
1:如果想要增加RDD的分区数,那么在分区过程中必须的发生shuffle,也就是说得使用repartition这个算子或者coalesce(numberpartiton,true)
2:如果想要减少rDd的分区数,那么在分区过程中可以有shuffle,或者没有

总结:一般在公司中,增加rdd分区数使用repartiton,如果减少RDD分区数,得使用coalesce(numberpartiton,false)
reduceByKey和countByKey
相同点:都是根据key分组
不同点:1:reduceByKey会根据用户传入的聚合逻辑进行数据聚合,countByKey直接对组内数据进行统计记录数
       2:reduceBykey是Transformations算子,countByKey是actions算子

代码统计单词出现的次数
/**
* 大数据计算 第一个行代码 Hello Word Count
*/
object WordCount {
def main(args: Array[String]): Unit = {

/**
* Sparkconf是spark的配置对象
* 功能:
* 1:可以设置Application 所需要的资源(mem,core)情况
* 2:设置application名称
* 3:设置spark运行模式 yarn,mesos standalone(spark自带的资源调度框架), local(本地测试用)
*
*/
val conf = new SparkConf()
conf.setAppName("wordcount")
conf.setMaster("local")

/**
* Sparkcontext对象是通往集群通道
*/
val sc = new SparkContext(conf)

/**
* Step1 : 读取计算数据到RDD
* lineRDD中每一条记录就是对应的wc文件的每一行
*/
val lineRDD = sc.textFile("wc")
val rdd2 = lineRDD.map((str: String) => {
println("=====:" + str)
})

//统计数据条目数
var count = rdd2.count()
println(count)

/**
* Step2: flatMap算子,拆分单词
*/
val wordRDD = lineRDD.flatMap((word: String) => {
word.split(" ")
})
count = wordRDD.count()
println(count)

/**
* Step3:将wordRDD 变成KV格式的RDD
* pairRDD 每一行记录就是一个二元组类型的值(hello,1)
*/
val pairRDD = wordRDD.map((word: String) => (word, 1))

/**
* Step4:按照key来分组,然后将每一组内的数据进行聚集
* hello [1,1,1,1,1,1,1]
* 每一行数据
* (hello,count)
* (dilireba,count)
*
*/
val rdd5 = pairRDD.reduceByKey((x: Int, y: Int) => x + y)

/**
* Step5:foreach算子,出发执行,打印结果
*/
rdd5.foreach((x: (String, Int)) => {
println("word:" + x._1 + "\t count:" + x._2)

})

/**
* 释放资源
*/
sc.stop()

}

}结果如下

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Spark 大数据