您的位置:首页 > 其它

数据科学家如何优雅的运行R在spark内存计算引擎上

2017-08-14 00:13 579 查看
来源:http://www.ppvke.com/Blog/archives/46156

R在数据科学中超过10,000包,是主要的编程语言之一。R是开源软件,作为 统计学和计算机科学课程的一部分,在专科和本科大学广泛传授。R使用DateFrame作为API可以方便操控数据。R还有很强大的基础设施,让数据分析师可以很有效的解释数据。

使用R分析数据会被单机的可用内存量限制,并且R是单线程,在大型数据集上使用R分析不太现实。为了解决R扩展性的问题, Spark社区用R用户熟悉的语法规则使数据处理结构化,基于分布式数据框架开发了SparkR包。Spark提供分布式处理引擎、数据源、off-memory数据结构。R提供了动态环境, 交互式, 打包, 可视化等,而 SparkR 结合了Spark和 R两者的优点。

在下一环节中, 我们将证明如何集成SparkR和 R,然后用它从传统R用户的视角解决一些典型的数据科学问题。


SparkR 架构

如下图所示,SparkR架构包含两种主要的组件:把R—JVM绑定在驱动程序上,这样子R程序就可以把工作提交到Spark集群上并且支持R在Spark执行器上运行。



在SparkR DataFrames上自动执行操作将所有可用节点分布到Spark集群上。

我们使用socket-based API调用R上JVM的函数。这些函数支持两种语言跨平台使用,而且不使用任何外部库。由于大部分被传递的信息都是控制信息, 所以使用sockets的成本比使用进程内通信方法低。


数据科学的工作流

数据科学是一科激动人心的学科, 你可以将原始数据转换成知识,对于R用户来说,典型的数据项目就像下图所示:



首先,您必须将数据导入R。这通常意味着你首先要把数据存储在文件、数据库、云存储或web API,然后将其加载到R的一个数据框架中。
当数据导入完成之后,最好是将它井然有序的整理。在实际生活中的数据集包括冗余行/列和消失的值,这就需要我们增加步骤来解决这些问题。
一旦数据整理完成,操作者需要做一个常见的转换步骤select/filter/arrange/join/groupBy。同时, 数据整理和转换被称作数据清洗,因为要将数据转换成合适使用的格式,往往会像一场战斗一样!
一旦你需要数据以行/列的形式有理化以后,就会生成两个主要的知识引擎:可视化和建模。这些使得他们的优缺点互补了,因此这当中任何真实的分析都会迭代很多次。

随着数据量持续增长, 数据分析师使用类似的工作流来解决问题, 但是需要添加革命性的工具, 例如 SparkR。然而SparkR 不可能具备所有的R功能, 也没必要, 毕竟不是所有的功能都需要可扩展性,也不是每个数据集都很大。例如,你有10亿条记录,并且你的模型具有有几十个功能的逻辑回归一样简单,你可能就不用训练整个数据集。但是随机森林分类器却有上千个功能,它就可以利用更多的数据。我们应该在正确的地方使用正确的工具,在下一节中, 我们将用一个典型的案例证明 SparkR和 R集成的最佳运作方法。


将SPARKR + R用于数据科学工作流

使用SparkR可以克服单机R主要的可扩展性问题并加速数据科学工作流,


BIG DATA, SMALL LEARNING



用户通常以存储在HDFS, AWS S3, 或者 RDBMS等系统上、并且存储为 JSON, CSV, ORC或者 parquet等文件格式的大数据集开始。特别是基于云的大数据基础设施不断发展。 数据科学开始联结需要的数据集,然后:
执行数据清理操作删除无效的行或者列。
选择具体的行/列。

以下这些用户一般通过聚合或者采样数据这些步骤来降低数据集的大小。我们通常称这些步骤为数据清洗,数据清洗涉及了数据集操控,并且SparkR是最适合处理这种工作负载的工具。

预处理后的数据集合在本地,用于建模或者在单机R上执行其他的统计任务。通常数据科学家应该都很了解单机R能从成千上万的CRAN包中获取的利益。

作为数据科学家, 我们通常执行探索性数据分析,并提前导入dplyr 包。dplyr为单机时代方便处理数据提供了很多很棒,容易操作的功能。而在大数据时代,SparkR支持和API接口基本相同的功能,用于处理更大规模的数据集, 对于传统的R用户, 这种功能可以非常顺滑的迁移。

我们将使用著名航空公司的数据集(http://stat-computing.org/dataexpo/2009/)来证明这篇文章的所描述的观点。

首先我们使用SparkR DataFrame API的功能做一些预处理输入。作为预处理的一部分,我们决定采用 dropna().删除空行值。
airlines <- read.df(path=”/data/2008.csv”, source=”csv”,header=”true”, inferSchema=”true”)
> planes <- read.df(path=”/data/plane-data.csv”, source=”csv”, header=”true”, inferSchema=”true”)
> joined <- join(airlines, planes, airlines$TailNum ==planes$tailnum)
> df1 <- select(joined, “aircraft_type”, “Distance”, “ArrDelay”, “DepDelay”)
> df2 <- dropna(df1)
> df3 <- sample(df2, TRUE, 0.1)
head(df3)




数据清洗之后, 用户通常会聚合或者采样数据, 这一步就可以缩小数据集的大小。

当然,可能出现的问题是采样显著降低模型的准确性。大部分情况下是可以接受的, 对于其他情况, 我们将在大型机器学习这一环节讨论如何处理这种情况。

SparkR数据框架在任务调度、代码生成、存储管理等方面继承了计算引擎所有的优化项。例如,这张表格对比了在单机上的R,Python和 Scala 各1000万整数对集合running group运行时间的性能。下图中, 我们可以看出 SparkR 的性能类似 Scala / Python。




分区聚合

用户自定义的功能(UFDs)
l    Dapply
l    Gapply

并行执行功能
l  spark.lapply

分区聚合工作流在一些统计应用程序方面很有用, 例如综合学习, 参数调整, 或者引导聚合。在这些情况下,用户通常有一种特殊函数,需要跨输入数据集的不同分区并行执行,将每个分区的结果结合在一起去使用聚合功能。。SparkR提供UDFs处理这种工作负载。

SparkR UDFs在每个分区分布完DataFrame之后返回到本地R生成列式 DataFrame,这种本地R列式DataFrame稍后将在JVM中转换成相应的格式。

和传统的R相比,使用这种 API 要求用dapply()改变一些常规代码。

以下示例中展示的是使用 SparkR UDFs 如何在几秒内添加 “起飞 ”延迟时间新的列,并单独最大化实际延迟时间。
schema <-structType(structField(“aircraft_type”, “string”), structField(“Distance”, “integer”),   
structField(“ArrDelay”, “integer”),   structField(“DepDelay”, “integer”),   structField(“DepDelayS”, “integer”))
> df4 <- dapply(df3,function(x) { x <- cbind(x, x$DepDelay * 60L) }, schema)
> head(df4)




schema <-structType(structField(“Distance”, “integer”),   structField(“MaxActualDelay”, “integer”))
> df5 <- gapply(df3, “Distance”,function(key, x) { y <- data.frame(key, max(x$ArrDelay-x$DepDelay)) },schema)
>head(df5)




最大实际延迟的距离

此外,在一些情况下, 输入数据量很小,但是相同的数据却被评估出大量的参数值, 有点类似传统R并行处理元素表。

为了技术上支持这种工作流,我们提供了一个并行执行的API,并将这个API放入一个本地列表中,然后在整个集群的某个核中为每一个本地元素表执行并运行这个API.

例如,我们要结合最好的glmnet 模型评估不同的参数。为了自定义最佳模型, 我们在本次案例中,我们将使用更大的AUC值,这就意味模型会更好。传统的方式我们可以使用两个循环串联参数组合和训练模型。
or (lambda in c(0.5, 1.5)) {
 for (alpha in c(0.1, 0.5, 1.0)) {
  model <- glmnet(A, b, lambda=lambda, alpha=alpha)
  c <- predit(model, A)
  c(coef(model), auc(c, b)) 
 }
}


然而,使用SparkR我们可以结合执行器和train parallelly分布参数。



values <- c(c(0.5, 0.1),c(0.5, 0.5), c(0.5, 1.0), c(1.5, 0.1), c(1.5, 0.5), c(1.5, 1.0))
train <- function(value) {
 lambda <- value[1]
 alpha <- value[2]
 model <- glmnet(A, b, lambda=lambda,alpha=alpha)
 c(coef(model), auc(c, b))
}
models <- spark.lapply(values,train)


SPARKR的虚拟环境



SparkR通过spark.lapply提供了分布式计算能力,在很多案例中,分布式计算能力大小取决于第三方R包,就像之前案例中 glmnet 包需要在所有的worker节点中运行 Spark executors。这意味着用集群上的Hadoop管理者部署这些R包在每一个executors/workers节点上是非常灵活的。退出工作之后, 请您注意清理工作环境。因为 SparkR 是交互式分析工具,用户可以在整个环节中加载很多本地R包,但是在启动SparkR之前安装完成所有需要的包几乎不可能。

我们使SparkR支持Spark2.1版本的虚拟环境,当你在YARN上部署Spark,每个SparkR工作都有自己的库目录,绑定在执行器上的YARN容器。所有需要的第三方库都会存储在这个目录里, 为了不影响其他工作环境, 当Spark工作完成之后,目录会被立刻删除。
spark.addFile(“/path/to/glmnet”)
values <- c(c(0.5, 0.1),c(0.5, 0.5), c(0.5, 1.0), c(1.5, 0.1), c(1.5, 0.5), c(1.5, 1.0))
train <- function(value) {
 path <- spark.getSparkFiles(“glmnet”)
 install.packages(path, repos = NULL, type = “source”)
 library(“glmnet”)
 lambda <- value[1]
 alpha <- value[2]
 model <- glmnet(A, b, lambda=lambda,alpha=alpha)
 c(coef(model), auc(c, b))
} 
models <- spark.lapply(values,train)


大型机器学习

R支持的功能包括:大量的机器学习算法,默认统计包,其他可以选择的软件包 glmnet, randomForest, gbm,等。 大量的数据统计用这些软件包在Kaggle比赛获得不错的成绩。机器学习算法通常直接在DataFrame上运行, 使用C或者Fortranlinkages 有效执行。

近年来,在各种应用程序中很显然,更多的训练数据和更大的模型往往能生成更高的精度。在这种案例中,数据通常具有预处理生成数据的功能,训练功能, 为了适应模型 ,给出的标签被输入到机器学习算法中。适用的模型尺寸通常比输入的数据和用于预测服务输入的数据模型更小。



Spark 1.5通过 SparkR DataFrame添加initia用以支持分布式机器学习。为了给R用户提供一个直观的界面,SparkR 沿用R的本地用法使用大型机器学习的MLlib,去适应和评估模型。MLlib由标准学习算法的快速实施和可扩展实施组成。而常见的学习算法则包括,分类,回归,协同过滤,集群,梯度下降等组成。

Spark 2.1中,我们为用户研发了一个采用大量MLlib算法的界面。大量在单节点机器学习包排名前20的算法在Spark中都能相应的实施可扩展性。我们正着手于为SparkR用户发布API并迁移更多可扩展性的机器学习到MLlib。

下一步就是使用MLlib调用glm公式指定的各类模型。如果我们指定Gaussian family就意味着我们要执行的是线性回归。MLlib在分布式数据集上缓存输入的DataFrame并且为适应模型发起了一系列的Spark任务。因此,基于这些功能,我们对根据航班延误时间,飞机类型,和飞行距离来预测航班到达时延迟的时间很感兴趣。

正如R的本地模型一样,coefficients(系数表)可以用汇总函数检索,需要注意的是 aircraft_type 是分类功能。采用hood,SparkR可以自动执行这些功能的one-hot 编码,这样就无需手动完成了。
> model <- glm(ArrDelay ~DepDelay + Distance + aircraft_type, family = “gaussian”, data = df3)
> summary(model)
Deviance Residuals:
(Note: These areapproximate quantiles with relative error <= 0.01)
   Min         1Q   Median       3Q      Max
-74.937   -7.786  -1.812    5.202  276.409


Coefficients:(系数表)





(gaussian family 参数的分布是195.5971)

Null偏差: 52481026 on 34524 自由度

剩余偏差:  6751817  on 34519 自由度

AIC: 280142

Fisher Scoring 重复次数: 1

SparkR现在提供了DataFrame的无缝集合和常见的R模型的功能。使得R用户在利用 MLlib的分布式机器学习的算法越来越简单。


未来方向

Spark社区和我们正共同努力着手改善SparkR collect/createDataFrame的性能,为 SparkR 用户提供更好的R公式支持,给MLlib 添加更多可扩展的机器学习算法,改善SparkR UDF的性能。让我们一起期待未来更有用的功能的发布吧!!!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: