您的位置:首页 > 其它

Spark机器学习管道--中文翻译

2016-09-28 08:18 483 查看
在这一节里,我们将介绍ML Pipelines的概念,ML Pipelines提供了一个构建于DataFrames之上的统一的

统一的高级API的集合,用来帮助用户创建与调优实际的机器学习管道。

内容列表:

. 管道主要概念
。DataFrame
。管道组件
。转换器
。评估器
。流水线组件的属性
。管道
。怎么工作
。细节
。参数
。保存与加载管道
。代码例子
。例子:评估器、转换器和参数
。例子:管道
。模型选择(高级调优)

管道(Pipelines)主要概念
MLlib机器学习算法标准APIs让在一条流水线或工作流里绑定多个算法变得非常容易,这节将介绍流水线API引入的关键概念,这些概念主要受项目scikit-learn有启发
。DataFrame 机器学习库API用Spark SQL里的DataFrame做为机器学习的一个数据集,DataFrame能够包含多种数据类型,比如,一个DataFrame能够有不同的列分别存储文本,特征向量,正确的标签和预测结果。Transformer 一个转换器就是一个算法,它能够转换一个DataFrame到另一个DataFrame,比如,一个机器学习模型就是一个转换器,它能转换带有特征向量的DataFrame成带有预测结果的DataFrame。Estimator
一个评估器是一个算法,它能够在一个DataFrame是装配生成一个转换器,比如,一个学习算法是一个转换器它能够训练一个DataFrame以生成一个模型
。Pipeline 一个流水线链接多个转换器与评估器来指定一个机器学习工作流
。parameter 所有转换器与评估器为指定的参数共享通用的API

DataFrame
机器学习能够应用到广泛的数据类型,比如:向量、文本、图片和结构化数据,机器学习API采用来自于Spark SQL的DataFrame就是为了支持不同种类的数据类型DataFrame支持基本的结构化的类型,查阅Spark SQL datatype reference得到一个支持的类型列表,除了Spark SQL指南中列出的类型,DataFrame也能用机器学习库的向量类型.DataFrame能够从普通的RDD隐式或显示的创建,DataFrame的列是命名的,下面的代码例子命名列比如"text"
"features" 和 "label"

管道组件
Transformer
一个转换器是一个抽象,它包含了特征转换器与学习模型,从技术上讲,一个转换器实现了方法transform(),转换一个DataFrame到另一个DataFrame,一般而言通过追加一列或者多列
比如:
一个特征转换器可以应用到一个DataFrame,读一列(e.g., text),映射它到一个新的列 (e.g., feature vectors),输出一个带有新列的DataFrame
一个学习模型可以应用到一个DataFrame,读包含特征向量的列,为每一个特征向量预测标签,输出一个带有预测标签新列的DataFrame

Estimator
评估器是用来适配与训练数据的一个或多个机器学习算法的抽象概念,从技术上讲,一个评估器实现了方法fit(),方法用来接收一个DataFrame产生一个模型,产生的模型是一个转换器,比如,一个机器学习算法如逻辑回归(LogisticRegression )是一个评估器,通过调用fit()训练一个逻辑回归模型,因为它是一个模型所以也是一个转换器

流水线组件的属性
Transformer.transform()和Estimator.fit()都是无状态的,将来,带状态的算法可能被支持,转换器与评估器的每个实例有唯一的ID,在具体的参数中非常有用

管道
机器学习中,运行一系列的算法来处理与学习数据是很常见的,比如,一个简单的文本文档处理流可能包含下列步骤:
。分割文档文本为单词集合
。转换每一个文档单词为数字向量特征
。用特征向量与标签学习一个预测模型
机器学习库用流水线来表示这么一个工作流,它包含一系列的按指定顺序运行的流水线步骤(转换器与评估器)

怎么工作
一个流水线指定一系列的步骤,每一步是一个转换器或者评估器,这些步骤按顺序运行,输入的DataFrame通过每一步时转换,转换器步骤,在DataFrame上调用transform()方法,评估器步骤,fit()方法被调用用来产生一个转换器(它成为流水线模型的一部分),并且这个转换器的方法transform()将在DataFrame上调用

我们用简单的文本文档工作流来说明,



上面,顶行表示有三个步骤的流水线,前面的二个步骤(Tokenizer and HashingTF)是转换器(蓝色),第三个步骤(LogisticRegression)是评估器(红色),底行代表通过流水线的数据流,圆柱体代表DataFrames,方法Pipeline.fit()在原始的DataFrame上调用,它有原始的文本文档与标签,方法 Tokenizer.transform()分割原始的文本文档为单词集,给DataFrame添加新的words列,方法HashingTF.transform()转换words列为特征向量,把特征向量做为新的列添加进DataFrame,现在,因为LogisticRegression是一个评估器,流水线首先调用LogisticRegression.fit()方法生生逻辑回归模型,如果流水线有更多的步骤,在把DataFrame传输到下一个步骤前,它将在DataFrame调用方法LogisticRegressionModel’s
transform()
 
流水线是一个评估器,因此在Pipeline’s fit
4000
()方法运行后,生成一个流水线模型(PipelineModel),它是一个转换器,
这个PipelineModel用来测试,下图说明了这个用途:



 

细节
DAG流水线:一个流水线的阶段用一个顺序数组指定,这里给出的例子全部是线性流水线,每一个步骤用前一个步骤产生的数据,创建非线性的流水线是可能的,只要数据流图形成一个有向无环图,这个图一般是基于每个步骤输入输出的列名隐式指定的(一般做为参数指定),如果流水线形成一个DAG,每一个步骤必须按拓扑顺序指定。
运行时检查:因为流水线能够操作具有不同类型数据的DataFrames,它不能使用编译时类型检查,流水线与流水线模型在真正实际运行流水线前做运行时检查,通过DataFrame的结构做类型检查,DataFrame的结构是列的数据类型的描述唯一的流水步骤:流水线的步骤必须是唯一的实例,比如,因为流水线的步骤必须有唯一的IDs,所以myHashingTF的同一个实例不能插入流水线二次,然而,不同的实例myHashingTF1和myHashingTF2能放进同一个流水线,因为不同的实例将用不同的ID创建

参数
机器学习库里的评估器与转换器用统一的API指定参数,参数是包含独立文档的命名参数,ParamMap是(parameter, value)对的集合,
有二个主要的方法传参数给一个算法:
1.为一个实例设置参数,比如:lr是一个逻辑回归对象实例,可以通过调用lr.setMaxIter(10)来使lr.fit()最多迭代10次
2.传递一个ParamMap给方法fit()或者transform(),在ParamMap里的任务参数将覆盖通过set方法指定的参数

保存与加载管道
通常保存流水线与模型到磁盘以备将来使用是值得的,在Spark 1.6模型输入输出功能被加入到流水线API,大多数转换器与一些基本的机器学习模型都支持

代码例子

Example: Estimator, Transformer, and Param

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row

// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")

// Create a LogisticRegression instance. This instance is an Estimator.
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

// We may set parameters using setter methods.
lr.setMaxIter(10)
.setRegParam(0.01)

// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)

// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
val paramMap = ParamMap(lr.maxIter -> 20)
.put(lr.maxIter, 30)  // Specify 1 Param. This overwrites the original maxIter.
.put(lr.regParam -> 0.1, lr.threshold -> 0.55)  // Specify multiple Params.

// One can also combine ParamMaps.
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability")  // Change output column name.
val paramMapCombined = paramMap ++ paramMap2

// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training, paramMapCombined)
println("Model 2 was fit using parameters: " + model2.parent.extrac
10d21
tParamMap)

// Prepare test data.
val test = spark.createDataFrame(Seq(
(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
(0.0, Vectors.dense(3.0, 2.0, -0.1)),
(1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")

// Make predictions on test data using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
model2.transform(test)
.select("features", "label", "myProbability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
println(s"($features, $label) -> prob=$prob, prediction=$prediction")
}


Example: Pipeline

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.01)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))

// Fit the pipeline to training documents.
val model = pipeline.fit(training)

// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")

// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")

// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "mapreduce spark"),
(7L, "apache hadoop")
)).toDF("id", "text")

// Make predictions on test documents.
model.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Spark ML 机器学习 管道