您的位置:首页 > 其它

Spark学习笔记二 RDD

2016-02-17 16:47 246 查看
从名称上来看,Resilient Distributed Datasets 弹性分布式数据集是一种数据集(但在下文中我们可以看到并非完全如此)。每个RDD会被自动分割成若干分区,并由Spark自动分配到集群中的各个节点上运行。RDD的特点是在内存中运行,因此速度很快。且RDD数据由Spark自动分散到集群中运行和管理,因此对于程序来说是透明的。

创建RDD

创建RDD有三种方式:

1, Load一个外部数据集。

lines = sc.textFile(“README.md”)

2, 使用数组产生RDD。

lines = sc.parallelize([“pandas”, “i like pandas”])



data = [1, 2, 3, 4, 5]

distData = sc.parallelize(data)

3, 基于已有的RDD,创建新的RDD。

lines = sc.textFile(“README.md”)

pythonLines = lines.filter(lambda line: “Python” in line)

注意创建pythonLines并不会修改旧的RDD lines。Spark会建立所有RDD的血统图(lineage)。当执行计算任务时,Spark根据血统图来生成所有RDD,另外如果有RDD由于意外失败了或受损了,Spark使用血统图来重建RDD。

操作RDD

对RDD的操作有两种:数据转换类(Transformations)和行为类(action)。

Transformations construct a new RDD from a previous one.

Actions, on the other hand, compute a result based on an RDD, and either return it to the driver program or save it to an external storage system (e.g., HDFS).

要区分一个function,例如testFile()是转换类还是行为类很简单,就看返回的是什么。transformations return RDDs where as actions return some other data type.

The difference between transformations and actions is due to the way Spark computes RDDs. Although you can define new RDDs any time, Spark only computes them in a lazy fashion, the first time they are used in an action. 这么做的原因是if you will not reuse the RDD, there’s no reason to waste storage space when Spark could instead stream through the data once and just compute the result.

因此Rather than thinking of an RDD as containing specific data, it is best to think of each RDD as consisting of instructions on how to compute the data that we build up through transformations. Loading data into an RDD is lazily evaluated in the same way transformations are. So when we call sc.textFile the data is not loaded until it is necessary.

特别需要注意的是每当我们调用action的时候,整个血统图中的RDD必须被重新计算。例如,对于同一个RDD,执行了一次sc.count()和一次sc.take()(都是action),这时Spark从血统图的第一个RDD开始,计算两次来分别得到count和take的结果。为了避免这样的浪费,可以将中间结果固化,例如如果需要多次使用RDD数据集,则需要将RDD持久化到内存或硬盘里。

总的来说,一个Spark程序做的事情就是以下4步:

Create some input RDDs from external data.

Transform them to define new RDDs using transformations like filter().

Ask Spark to persist() any intermediate RDDs that will need to be reused.

Launch actions such as count() and first() to kick off a parallel computation, which is then optimized and executed by Spark.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: