spark mllib源码分析之逻辑回归弹性网络ElasticNet(一)
2017-08-03 19:19
531 查看
相关文章
spark mllib源码分析之逻辑回归弹性网络ElasticNet(二)
spark源码分析之L-BFGS
spark mllib源码分析之OWLQN
spark中的online均值/方差统计
spark源码分析之二分类逻辑回归evaluation
spark正则化
spark在ml包中将逻辑回归封装了下,同时在算法中引入了L1和L2正则化,通过elasticNetParam来调节两种正则化的系数,同时根据选择的正则化,决定使用L-BFGS还是OWLQN优化,是谓Elastic Net。
distinctMap的key是label,类型为Long,value是个tuple,第一个元素是label出现的次数,第二维是weight的和,
∑wil=l∗∑wi
l是label,weight为1的时候,这里相当于label的数量。
因为这个累积器主要用在treeAggregator中,重要的是两个函数,add用于累积样本,merge用于两个MultiClassSummarizer的合并
返回统计到的class数,默认从0开始,所以是最大label+1
返回weight累积和
对比numClasses,可以看到这里result实现是有点问题的,必须要求class从0到k-1全部出现了,否则会丢失部分的class的统计。
可以看到二分类与多分类是分开处理的,其原理是不同的
从score函数可以看到,这里是将margin带入了sigmoid函数,我们看margin函数
就是将特征与系数相乘,再加上截距。
二分类中还实现了一些低级API,用在evaluate model,分别计算margin,预测值,预测label
调用了raw2prediction函数
可以看到,如果没有设置thresholds数组(一般不会设置),直接返回了入参rawPrediction向量中最大元素所在的位置(index),举例来说rawPrediction如果是[2.3, 1.2, 5.1, 3.4],则返回2(最大值5.1)。rawPrediction来自于predictRaw函数
直接调用了margins函数
代码比较简单,系数矩阵分别于特征向量相乘,再与截距向量相加。
metadata中除了训练参数,还保存了训练时的环境,官方demo的训练参数保存结果
δδθjJ(θ)=−1m∑i=1m(hθ(xi)−yi)xi,jJ(θ)=−1m∑i=1m[yiloghθ(xi)+(1−yi)log(1−hθ(xi))]
这里的m是样本数,对于单个样本m=1,h是sigmoid函数,y是label,整理可得
margin=−x⃗ ⋅β⃗ ⋯(1)multiplier=wi∗(11+emargin−label)⋯(2)∂ℓ(β,x⃗ i,wi)∂β=xi,j∗multiplier⋯(3)whenlabel=1ℓ(β,x⃗ i,wi)whenlabel=0ℓ(β,x⃗ i,wi)=−yiloghθ(xi)=log(1+emargin)⋯(4)=−(1−yi)log(1−hθ(xi))=−logemargin1+emargin=log(1+emargin)−margin⋯(5)
P(yi=0|x⃗ i,β)=ex⃗ Tiβ⃗ 0∑K−1k=0ex⃗ Tiβ⃗ kP(yi=1|x⃗ i,β)=ex⃗ Tiβ⃗ 1∑K−1k=0ex⃗ Tiβ⃗ k⋯⋯P(yi=K−1|x⃗ i,β)=ex⃗ Tiβ⃗ K−1∑K−1k=0ex⃗ Tiβ⃗ k
模型的系数组成一个K(classNum)乘N(特征数,如果有截距就是N+1)的矩阵。对比有pivot class的方式,这种方式其实更加简洁优雅,但是其实我们对所有P都分子分母同时除以ex⃗ Tiβ⃗ 0,就是pivot class方式的表述,而且这种方式带来一个问题,就是从形式上看当截距变化时,概率p是不随其改变的
ex⃗ Ti(β⃗ 0+c⃗ )∑K−1k=0ex⃗ Ti(β⃗ k+c⃗ )=ex⃗ Tiβ⃗ 0ex⃗ Tic⃗ ex⃗ Tic⃗ ∑K−1k=0ex⃗ Tiβ⃗ k=ex⃗ Tiβ⃗ 0∑K−1k=0ex⃗ Tiβ⃗ k
但是如果加入正则化,我们则只有一组系数矩阵可以最小化正则项,则这个系数矩阵就是具有区分度的(或者说是唯一的)。对于单个样本,其loss(忽略正则项)可写作
ℓ(β,xi)=−logP(yi|x⃗ i,β)=log(∑k=0K−1ex⃗ Tiβ⃗ k)−x⃗ Tiβ⃗ y=log(∑k=0K−1emarginsk)−marginsy⋯(8)wheremarginsk=x⃗ Tiβ⃗ k
优化求导可得
∂ℓ(β,x⃗ i,wi)∂βj,k=xi,j⋅wi⋅(ex⃗ i⋅β⃗ k∑K−1k′=0ex⃗ i⋅β⃗ k′−Iy=k)=xi,j⋅wi⋅multiplierk⋯(6)Iy=k={10y=kelsemultiplierk=⎛⎝ex⃗ i⋅β⃗ k∑K−1k=0ex⃗ i⋅β⃗ k−Iy=k⎞⎠⋯(7)
这里的I的含义是对于class k的样本,我们计算所有class的梯度向量时,只有当k==label时,I为1,其他时候为0。wi是样本权重。
类似于我们在L-BFGS文章中的讨论,这里的指数超过709.78时,有溢出的风险,类似处理
ℓ(β,x)=log(∑k=0K−1emarginsk−maxMargin)−marginsy+maxMargin⋯(9)
梯度也是做类似处理
multiplierk=ex⃗ i⋅β⃗ k−maxMargin∑K−1k′=0ex⃗ i⋅β⃗ k′−maxMargin−Iy=k
1.4.2.1.1. 二分类
add直接调用binaryUpdateInPlace函数
1.4.2.1.2. 多分类
add调用multinomialUpdateInPlace,对应上述算法,源码实现
梯度与系数矩阵是对应的,在迭代中是当成一维的向量存储,按维度展开有两种展开方式,如下图
结合梯度更新的代码,我们可以看出梯度向量在迭代中的存储格式是图中的第一种,先存特征0在各class的梯度,再存特征1,以此类推。对应到上面的DenseMatrix,其行是numCoefficientSets,列是numFeaturesPlusIntercept,是一个K*N的矩阵,取元素(i,j)(从0开始)则是i+j∗numCoefficientSets,例如我们要取class1,特征2对应的梯度值,应该是1+2k,对号对应上图第一种f2的第2个位置,对应代码
calculate用于计算每轮迭代时的loss和gradient
这里的predictions是样本经过模型预测,增加了预测值。
计算评价指标只需要预测值与label两列,用来初始化BinaryClassificationMetrics类,参见 spark源码分析之二分类逻辑回归evaluation。这里返回的评估指标其实都来自于BinaryClassificationMetrics中,只不过在其返回的数据中加入了列名,构造成DataFrame,包括ROC曲线,AUC值,pr曲线,threshold-fMeasure曲线,threshold-precision曲线,threshold-recall曲线,比较简单,不再赘述。
spark mllib源码分析之逻辑回归弹性网络ElasticNet(二)
spark源码分析之L-BFGS
spark mllib源码分析之OWLQN
spark中的online均值/方差统计
spark源码分析之二分类逻辑回归evaluation
spark正则化
spark在ml包中将逻辑回归封装了下,同时在算法中引入了L1和L2正则化,通过elasticNetParam来调节两种正则化的系数,同时根据选择的正则化,决定使用L-BFGS还是OWLQN优化,是谓Elastic Net。
1. 辅助类
我们首先介绍下模型训练和预测,评价中使用到的一些类。1.1. MultiClassSummarizer
主要用在样本的训练过程中,统计数据中各种label出现的次数及其weight,这里引入了样本weight,可以用在unbalance的数据中,通过惩罚数量大的class达到样本均衡,默认为1class MultiClassSummarizer extends Serializable { private val distinctMap = new mutable.HashMap[Int, (Long, Double)] private var totalInvalidCnt: Long = 0L
distinctMap的key是label,类型为Long,value是个tuple,第一个元素是label出现的次数,第二维是weight的和,
∑wil=l∗∑wi
l是label,weight为1的时候,这里相当于label的数量。
因为这个累积器主要用在treeAggregator中,重要的是两个函数,add用于累积样本,merge用于两个MultiClassSummarizer的合并
/** * Add a new label into this MultilabelSummarizer, and update the distinct map. * * @param label The label for this data point. * @param weight The weight of this instances. * @return This MultilabelSummarizer */ def add(label: Double, weight: Double = 1.0): this.type = { require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") if (weight == 0.0) return this //这里要求label必须为整数,否则认为非法 if (label - label.toInt != 0.0 || label < 0) { totalInvalidCnt += 1 this }else { val (counts: Long, weightSum: Double) = distinctMap.getOrElse(label.toInt, (0L, 0.0)) //累加样本次数及weight distinctMap.put(label.toInt, (counts + 1L, weightSum + weight)) this } } /** * Merge another MultilabelSummarizer, and update the distinct map. * (Note that it will merge the smaller distinct map into the larger one using in-place * merging, so either `this` or `other` object will be modified and returned.) * * @param other The other MultilabelSummarizer to be merged. * @return Merged MultilabelSummarizer object. */ def merge(other: MultiClassSummarizer): MultiClassSummarizer = { //将size小的并入大的,性能 val (largeMap, smallMap) = if (this.distinctMap.size > other.distinctMap.size) { (this, other) } else { (other, this) } smallMap.distinctMap.foreach { case (key, value) => val (counts: Long, weightSum: Double) = largeMap.distinctMap.getOrElse(key, (0L, 0.0)) //直接累加 largeMap.distinctMap.put(key, (counts + value._1, weightSum + value._2)) } largeMap.totalInvalidCnt += smallMap.totalInvalidCnt largeMap }
返回统计到的class数,默认从0开始,所以是最大label+1
def numClasses: Int = if (distinctMap.isEmpty) 0 else distinctMap.keySet.max + 1
返回weight累积和
def histogram: Array[Double] = { val result = Array.ofDim[Double](numClasses) var i = 0 //应该是val len = numClasses val len = result.length //这里要求class从0到k-1 while (i < len) { result(i) = distinctMap.getOrElse(i, (0L, 0.0))._2 i += 1 } result }
对比numClasses,可以看到这里result实现是有点问题的,必须要求class从0到k-1全部出现了,否则会丢失部分的class的统计。
1.2. MultivariateOnlineSummarizer
在spark中的online均值/方差统计中已有介绍,计算样本集的方差,用于归一化。1.3 LogisticRegressionModel
逻辑回归model,放着训练得到的系数矩阵,矩阵,class数,是否多分类等参数。1.3.1. 预测
override protected def predict(features: Vector): Double = if (isMultinomial) { super.predict(features) } else { // Note: We should use getThreshold instead of $(threshold) since getThreshold is overridden. if (score(features) > getThreshold) 1 else 0 }
可以看到二分类与多分类是分开处理的,其原理是不同的
1.3.1.1. 二分类
从上面可以看到二分类的预测是通过计算特征得分,与threshold比较,大于为1,否则0,score函数代码private val score: Vector => Double = (features) => { val m = margin(features) 1.0 / (1.0 + math.exp(-m)) }
从score函数可以看到,这里是将margin带入了sigmoid函数,我们看margin函数
private val margin: Vector => Double = (features) => { BLAS.dot(features, _coefficients) + _intercept }
就是将特征与系数相乘,再加上截距。
二分类中还实现了一些低级API,用在evaluate model,分别计算margin,预测值,预测label
//计算二分类的margin,返回DenseVector override protected def predictRaw(features: Vector): Vector = { val m = margin(features) Vectors.dense(-m, m) } //由margin计算原始的预测值,也就是经过sigmoid函数的值 override protected def raw2probabilityInPlace(rawPrediction: Vector): Vector = { rawPrediction match { case dv: DenseVector => var i = 0 val size = dv.size while (i < size) { dv.values(i) = 1.0 / (1.0 + math.exp(-dv.values(i))) i += 1 } dv case sv: SparseVector => throw new RuntimeException("Unexpected error in LogisticRegressionModel:" + " raw2probabilitiesInPlace encountered SparseVector") } } //由原始的预测值,预测label,从上面可知vector(1)为实际的预测值,用来预测label override protected def raw2prediction(rawPrediction: Vector): Double = { // Note: We should use getThreshold instead of $(threshold) since getThreshold is overridden. val t = getThreshold val rawThreshold = if (t == 0.0) { Double.NegativeInfinity } else if (t == 1.0) { Double.PositiveInfinity } else { math.log(t / (1.0 - t)) } if (rawPrediction(1) > rawThreshold) 1 else 0 }
1.3.1.2. 多分类
多分类时,其调用了父类的predict函数override protected def predict(features: FeaturesType): Double = { raw2prediction(predictRaw(features)) }
调用了raw2prediction函数
override protected def raw2prediction(rawPrediction: Vector): Double = { if (!isDefined(thresholds)) { rawPrediction.argmax } else { probability2prediction(raw2probability(rawPrediction)) } }
可以看到,如果没有设置thresholds数组(一般不会设置),直接返回了入参rawPrediction向量中最大元素所在的位置(index),举例来说rawPrediction如果是[2.3, 1.2, 5.1, 3.4],则返回2(最大值5.1)。rawPrediction来自于predictRaw函数
override protected def predictRaw(features: Vector): Vector = { if (isMultinomial) { margins(features) } else { val m = margin(features) Vectors.dense(-m, m) } }
直接调用了margins函数
private val margins: Vector => Vector = (features) => { val m = interceptVector.toDense.copy //m = alpha * coefficientMatrix * features + beta * m BLAS.gemv(1.0, coefficientMatrix, features, 1.0, m) m }
代码比较简单,系数矩阵分别于特征向量相乘,再与截距向量相加。
1.3.2. save model
使用LogisticRegressionModelWriter将训练的参数和得到的系数矩阵写入hdfsclass LogisticRegressionModelWriter(instance: LogisticRegressionModel) extends MLWriter with Logging { private case class Data( numClasses: Int, numFeatures: Int, interceptVector: Vector, coefficientMatrix: Matrix, isMultinomial: Boolean) override protected def saveImpl(path: String): Unit = { //训练时的参数 DefaultParamsWriter.saveMetadata(instance, path, sc) //保存训练结果 val data = Data(instance.numClasses, instance.numFeatures, instance.interceptVector, instance.coefficientMatrix, instance.isMultinomial) val dataPath = new Path(path, "data").toString sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) } }
metadata中除了训练参数,还保存了训练时的环境,官方demo的训练参数保存结果
{ "class": "org.apache.spark.ml.classification.LogisticRegressionModel", "timestamp": 1500886361787, "sparkVersion": "2.0.2", "uid": "logreg_ea57ce7dcde4", "paramMap": { "fitIntercept": true, "rawPredictionCol": "rawPrediction", "predictionCol": "prediction", "tol": 0.000001, "labelCol": "label", "standardization": true, "regParam": 0.3, "probabilityCol": "probability", "featuresCol": "features", "maxIter": 10, "elasticNetParam": 0.8, "threshold": 0.5 } }
1.3.3. load model
使用LogisticRegressionModelReader将save保存的模型读取回来,metadata使用json解析回来,解析parquet获取系数矩阵,截距等,比较简单。1.4. LogisticAggregator
LogisticAggregator用于训练过程中,计算每轮迭代的梯度和loss,需要分布式计算,类似于上面的summarizer,也是用在treeAggregator中。1.4.1. 算法
用于训练过程中计算梯度与loss,在前面介绍L-BFGS时说过其训练结果返回的系数向量只有k-1维,预测时则默认class 0的margin是0,这种是带pivot class,二分类属于这种;这里的多分类不使用这种方法,而是训练得到k个class分别对应的系数。1.4.1. 1. 二分类
如前文所述,二分类是有pivot,一般二分类的梯度δδθjJ(θ)=−1m∑i=1m(hθ(xi)−yi)xi,jJ(θ)=−1m∑i=1m[yiloghθ(xi)+(1−yi)log(1−hθ(xi))]
这里的m是样本数,对于单个样本m=1,h是sigmoid函数,y是label,整理可得
margin=−x⃗ ⋅β⃗ ⋯(1)multiplier=wi∗(11+emargin−label)⋯(2)∂ℓ(β,x⃗ i,wi)∂β=xi,j∗multiplier⋯(3)whenlabel=1ℓ(β,x⃗ i,wi)whenlabel=0ℓ(β,x⃗ i,wi)=−yiloghθ(xi)=log(1+emargin)⋯(4)=−(1−yi)log(1−hθ(xi))=−logemargin1+emargin=log(1+emargin)−margin⋯(5)
1.4.1. 2. 多分类
多分类时,P(yi=0|x⃗ i,β)=ex⃗ Tiβ⃗ 0∑K−1k=0ex⃗ Tiβ⃗ kP(yi=1|x⃗ i,β)=ex⃗ Tiβ⃗ 1∑K−1k=0ex⃗ Tiβ⃗ k⋯⋯P(yi=K−1|x⃗ i,β)=ex⃗ Tiβ⃗ K−1∑K−1k=0ex⃗ Tiβ⃗ k
模型的系数组成一个K(classNum)乘N(特征数,如果有截距就是N+1)的矩阵。对比有pivot class的方式,这种方式其实更加简洁优雅,但是其实我们对所有P都分子分母同时除以ex⃗ Tiβ⃗ 0,就是pivot class方式的表述,而且这种方式带来一个问题,就是从形式上看当截距变化时,概率p是不随其改变的
ex⃗ Ti(β⃗ 0+c⃗ )∑K−1k=0ex⃗ Ti(β⃗ k+c⃗ )=ex⃗ Tiβ⃗ 0ex⃗ Tic⃗ ex⃗ Tic⃗ ∑K−1k=0ex⃗ Tiβ⃗ k=ex⃗ Tiβ⃗ 0∑K−1k=0ex⃗ Tiβ⃗ k
但是如果加入正则化,我们则只有一组系数矩阵可以最小化正则项,则这个系数矩阵就是具有区分度的(或者说是唯一的)。对于单个样本,其loss(忽略正则项)可写作
ℓ(β,xi)=−logP(yi|x⃗ i,β)=log(∑k=0K−1ex⃗ Tiβ⃗ k)−x⃗ Tiβ⃗ y=log(∑k=0K−1emarginsk)−marginsy⋯(8)wheremarginsk=x⃗ Tiβ⃗ k
优化求导可得
∂ℓ(β,x⃗ i,wi)∂βj,k=xi,j⋅wi⋅(ex⃗ i⋅β⃗ k∑K−1k′=0ex⃗ i⋅β⃗ k′−Iy=k)=xi,j⋅wi⋅multiplierk⋯(6)Iy=k={10y=kelsemultiplierk=⎛⎝ex⃗ i⋅β⃗ k∑K−1k=0ex⃗ i⋅β⃗ k−Iy=k⎞⎠⋯(7)
这里的I的含义是对于class k的样本,我们计算所有class的梯度向量时,只有当k==label时,I为1,其他时候为0。wi是样本权重。
类似于我们在L-BFGS文章中的讨论,这里的指数超过709.78时,有溢出的风险,类似处理
ℓ(β,x)=log(∑k=0K−1emarginsk−maxMargin)−marginsy+maxMargin⋯(9)
梯度也是做类似处理
multiplierk=ex⃗ i⋅β⃗ k−maxMargin∑K−1k′=0ex⃗ i⋅β⃗ k′−maxMargin−Iy=k
1.4.2. 实现
梯度和loss的计算支持分布式计算,add函数用于计算样本,merge用户累积器的合并。1.4.2.1. add
add的入参是特征向量features,样本weight,label。1.4.2.1.1. 二分类
add直接调用binaryUpdateInPlace函数
private def binaryUpdateInPlace( features: Vector, weight: Double, label: Double): Unit = { val localFeaturesStd = bcFeaturesStd.value val localCoefficients = bcCoefficients.value val localGradientArray = gradientSumArray //指数部分,式(1) val margin = - { var sum = 0.0 features.foreachActive { (index, value) => if (localFeaturesStd(index) != 0.0 && value != 0.0) { //归一化 sum += localCoefficients(index) * value / localFeaturesStd(index) } } //截距 if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1) sum } //式(2) val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label) //式(3),更新梯度 features.foreachActive { (index, value) => if (localFeaturesStd(index) != 0.0 && value != 0.0) { //归一化 localGradientArray(index) += multiplier * value / localFeaturesStd(index) } } if (fitIntercept) { localGradientArray(numFeaturesPlusIntercept - 1) += multiplier } //loss if (label > 0) { // 式(4) lossSum += weight * MLUtils.log1pExp(margin) } else { //式(5) lossSum += weight * (MLUtils.log1pExp(margin) - margin) }
1.4.2.1.2. 多分类
add调用multinomialUpdateInPlace,对应上述算法,源码实现
private def multinomialUpdateInPlace( features: Vector, weight: Double, label: Double): Unit = { // TODO: use level 2 BLAS operations /* Note: this can still be used when numClasses = 2 for binary logistic regression without pivoting. */ val localFeaturesStd = bcFeaturesStd.value val localCoefficients = bcCoefficients.value val localGradientArray = gradientSumArray // marginOfLabel is margins(label) in the formula var marginOfLabel = 0.0 var maxMargin = Double.NegativeInfinity //计算每个class的margin val margins = new Array[Double](numClasses) //计算系数与特征部分 features.foreachActive { (index, value) => val stdValue = value / localFeaturesStd(index) var j = 0 while (j < numClasses) { margins(j) += localCoefficients(index * numClasses + j) * stdValue j += 1 } } //加截距 var i = 0 while (i < numClasses) { if (fitIntercept) { margins(i) += localCoefficients(numClasses * numFeatures + i) } //记录label对应的margin,用于loss计算 if (i == label.toInt) marginOfLabel = margins(i) //记录最大的margin,看是否需要额外处理 if (margins(i) > maxMargin) { maxMargin = margins(i) } i += 1 } /** * When maxMargin is greater than 0, the original formula could cause overflow. * We address this by subtracting maxMargin from all the margins, so it's guaranteed * that all of the new margins will be smaller than zero to prevent arithmetic overflow. */ val multipliers = new Array[Double](numClasses) //式(7)的分母,所有class的margin和 val sum = { var temp = 0.0 var i = 0 while (i < numClasses) { //最大margin大于0,先减去max if (maxMargin > 0) margins(i) -= maxMargin val exp = math.exp(margins(i)) temp += exp multipliers(i) = exp i += 1 } temp } //式(7) margins.indices.foreach { i => //label对应的margin,I=1,否则I=0 multipliers(i) = multipliers(i) / sum - (if (label == i) 1.0 else 0.0) } features.foreachActive { (index, value) => if (localFeaturesStd(index) != 0.0 && value != 0.0) { val stdValue = value / localFeaturesStd(index) var j = 0 //式(6),更新梯度 while (j < numClasses) { localGradientArray(index * numClasses + j) += weight * multipliers(j) * stdValue j += 1 } } } //截距当做特征值全为1的一维特征,更新方法可类比于正常特征 if (fitIntercept) { var i = 0 while (i < numClasses) { localGradientArray(numFeatures * numClasses + i) += weight * multipliers(i) i += 1 } } val loss = if (maxMargin > 0) { //式(8) math.log(sum) - marginOfLabel + maxMargin } else { //式(9) math.log(sum) - marginOfLabel } lossSum += weight * loss }
1.4.2.2. merge
merge处理累积器之间的合并,loss和梯度都是直接累加即可,这里不再赘述1.4.2.3. 结果返回
merge之后的结果需要对weight(如样本weight为1,这里相当于m)平均def loss: Double = { require(weightSum > 0.0, s"The effective number of instances should be " + s"greater than 0.0, but $weightSum.") lossSum / weightSum } def gradient: Matrix = { require(weightSum > 0.0, s"The effective number of instances should be " + s"greater than 0.0, but $weightSum.") val result = Vectors.dense(gradientSumArray.clone()) scal(1.0 / weightSum, result) new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, result.toArray) }
梯度与系数矩阵是对应的,在迭代中是当成一维的向量存储,按维度展开有两种展开方式,如下图
结合梯度更新的代码,我们可以看出梯度向量在迭代中的存储格式是图中的第一种,先存特征0在各class的梯度,再存特征1,以此类推。对应到上面的DenseMatrix,其行是numCoefficientSets,列是numFeaturesPlusIntercept,是一个K*N的矩阵,取元素(i,j)(从0开始)则是i+j∗numCoefficientSets,例如我们要取class1,特征2对应的梯度值,应该是1+2k,对号对应上图第一种f2的第2个位置,对应代码
private[ml] def index(i: Int, j: Int): Int = { require(i >= 0 && i < numRows, s"Expected 0 <= i < $numRows, got i = $i.") require(j >= 0 && j < numCols, s"Expected 0 <= j < $numCols, got j = $j.") //本例isTransposed=false if (!isTransposed) i + numRows * j else j + numCols * i }
1.5. LogisticCostFun
逻辑回归的损失函数,用于每轮迭代中计算所有样本的loss和gradient,对所有样本累积的时候会使用LogisticAggregator,然后再加上正则项,返回本次更新的梯度。类成员private class LogisticCostFun( instances: RDD[Instance], //样本集 numClasses: Int, //分类数 fitIntercept: Boolean, //是否拟合截距 standardization: Boolean, //是否归一化 bcFeaturesStd: Broadcast[Array[Double]], //各维特征的标准差 regParamL2: Double, //L2正则化系数 multinomial: Boolean, //是否是多分类 //累积层数,从样本逐层累积,类似于树 aggregationDepth: Int) extends DiffFunction[BDV[Double]] {
calculate用于计算每轮迭代时的loss和gradient
override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = { val coeffs = Vectors.fromBreeze(coefficients) val bcCoeffs = instances.context.broadcast(coeffs) val featuresStd = bcFeaturesStd.value val numFeatures = featuresStd.length val numCoefficientSets = if (multinomial) numClasses else 1 val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures //所有样本,计算loss和gradient,参见LogisticAggregator的add和merge val logisticAggregator = { val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance) val combOp = (c1: LogisticAggregator, c2: LogisticAggregator) => c1.merge(c2) instances.treeAggregate( new LogisticAggregator(bcCoeffs, bcFeaturesStd, numClasses, fitIntercept, multinomial) )(seqOp, combOp, aggregationDepth) } //正则项 val totalGradientMatrix = logisticAggregator.gradient val coefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, coeffs.toArray) // regVal is the sum of coefficients squares excluding intercept for L2 regularization. val regVal = if (regParamL2 == 0.0) { 0.0 } else { var sum = 0.0 coefMatrix.foreachActive { case (classIndex, featureIndex, value) => // We do not apply regularization to the intercepts val isIntercept = fitIntercept && (featureIndex == numFeatures) if (!isIntercept) { //计算带正则项的梯度和loss,更新梯度矩阵 sum += { if (standardization) { val gradValue = totalGradientMatrix(classIndex, featureIndex) totalGradientMatrix.update(classIndex, featureIndex, gradValue + regParamL2 * value) value * value } else { if (featuresStd(featureIndex) != 0.0) { //设置不使用归一化,但是在计算梯度时使用了归一化,这里正则项需要反归一化,使得优化函数与无归一化等效 val temp = value / (featuresStd(featureIndex) * featuresStd(featureIndex)) val gradValue = totalGradientMatrix(classIndex, featureIndex) totalGradientMatrix.update(classIndex, featureIndex, gradValue + regParamL2 * temp) value * temp } else { 0.0 } } } } } 0.5 * regParamL2 * sum } bcCoeffs.destroy(blocking = false) //更新loss和梯度 (logisticAggregator.loss + regVal, new BDV(totalGradientMatrix.toArray)) }
1.6. BinaryLogisticRegressionSummary
计算二分类逻辑回归的模型评估指标,如AUC,F-measure等class BinaryLogisticRegressionSummary private[classification] ( //样本集 @Since("1.5.0") @transient override val predictions: DataFrame, //预测值score的类名,用于DataFrame select @Since("1.5.0") override val probabilityCol: String, //label列名,用于DataFrame select @Since("1.5.0") override val labelCol: String, //特征向量列名,用于DataFrame select @Since("1.6.0") override val featuresCol: String)
这里的predictions是样本经过模型预测,增加了预测值。
private val binaryMetrics = new BinaryClassificationMetrics( predictions.select(col(probabilityCol), col(labelCol).cast(DoubleType)).rdd.map { case Row(score: Vector, label: Double) => (score(1), label) }, 100 )
计算评价指标只需要预测值与label两列,用来初始化BinaryClassificationMetrics类,参见 spark源码分析之二分类逻辑回归evaluation。这里返回的评估指标其实都来自于BinaryClassificationMetrics中,只不过在其返回的数据中加入了列名,构造成DataFrame,包括ROC曲线,AUC值,pr曲线,threshold-fMeasure曲线,threshold-precision曲线,threshold-recall曲线,比较简单,不再赘述。
相关文章推荐
- spark mllib源码分析之逻辑回归弹性网络ElasticNet
- spark mllib源码分析之逻辑回归弹性网络ElasticNet(二)
- spark mllib源码分析之二分类逻辑回归evaluation
- 弹性网络( Elastic Net) 多任务 Lasso回归 MultiTaskLasso
- 高性能网络I/O框架-netmap源码分析(5) http://blog.chinaunix.net/uid-23629988-id-3693204.html
- <转>Spark Mllib逻辑回归算法分析
- 高性能网络I/O框架-netmap源码分析(6) http://blog.chinaunix.net/uid-23629988-id-3803045.html
- <转>Spark Mllib逻辑回归算法分析
- Spark-mllib源码分析之逻辑回归(Logistic Regression)
- Redis源码分析(二十一)--- anet网络通信的封装
- 4.弹性网络( Elastic Net)
- Spark MLlib之线性回归源码分析
- [深度学习]Python/Theano实现逻辑回归网络的代码分析
- Net的网络层的构建(源码分析)
- 【机器学习经典算法源码分析系列】-- 逻辑回归
- 机器学习算法与Python实践(9) - 弹性网络(Elastic Net)
- contiki 源码分析之网络层(三)(core / net)
- 机器学习与算法(11)--弹性网络(Elastic Net)
- 4.弹性网络( Elastic Net)
- redis anet网络通信的源码分析