您的位置:首页 > 其它

Spark MLlib LinearRegression线性回归算法源码解析

2018-03-07 18:10 363 查看

线性回归

一元线性回归

hθ(x)=θ0+θ1xhθ(x)=θ0+θ1x ——————–1

多元线性回归

hθ(x)=∑mi=1θixi=θTXhθ(x)=∑i=1mθixi=θTX—————–2

损失函数

J(θ)=1/2∑mi=1(hθ(xi)−yi)2J(θ)=1/2∑i=1m(hθ(xi)−yi)2 —————3

1/2 是为了求导时系数为1,平方里是真实值减去估计值

我们的目的就是求其最小值

最小二乘法要求较为苛刻,求矩阵的逆比较慢,要求 XX 是满秩

梯度下降法

梯度下降法

我们的目的是为了求 J(θ)J(θ)的极小值

梯度方向有J(θ)J(θ) 对 θθ 的偏导数确定,由于求的是极小值,因此梯度方向是偏导数的反方向,如果在区间内是凸函数,就是说梯度在更新 θθ 时,在梯度是负数时增加θθ

θj:=θj−α∂∂θjJ(θ)θj:=θj−α∂∂θjJ(θ) ——————-4

其中αα为学习速率,过大容易越过最小值,过小容易造成迭代次数过多,收敛变慢

如果只有一条样本

∂∂θjJ(θ)=(hθ(x)−y)∂∂θj(∑mi=0θixi−y)=(hθ(x)−y)xj∂∂θjJ(θ)=(hθ(x)−y)∂∂θj(∑i=0mθixi−y)=(hθ(x)−y)xj—————–5

当数量不唯一时,将(5)带入(3)求偏导那么每个参数θjθj 沿梯度方向变化如下:

θj:=θj+α∑mi=0(yi−hθ(xi))xijθj:=θj+α∑i=0m(yi−hθ(xi))xji ——————————6

这里mm代表的时所有样本

随机梯度下降

梯度下降法会训练所有样本然后更新梯度,每次迭代复杂度O(mn),样本很大我们采用随机梯度下降

每读取一条样本,对θTθT 进行更新

虽然速度快但是会在最小值(极小?)附近震荡,造成永远不能收敛

为减少计算复杂度,对于θTθT的变化程度设置一个阈值

源码分析

MLlib源码分析

建立线性回归

org/apache/spark/mllib/regression/LinearRegression.scala

object LinearRegressionWithSGD //是LogisticRegressionWithSGD的伴生对象(可以理解为单例)
//主要定义train方法,传递训练参数和RDD给run方法,是LogisticRegressionWithSGD类的入口

//基于随机梯度下降法的线性回归模型
//损失函数f(weights) = 1/n ||A weights-y||^2^
class LinearRegressionWithSGD private[mllib] (//在mllib包下的
private var stepSize: Double,//迭代步长
private var numIterations: Int,//迭代次数
private var regParam: Double,//正则化参数
private var miniBatchFraction: Double)//迭代参与样本的比例
extends GeneralizedLinearAlgorithm[LogisticRegressionModel] with Serializable
//采用最小平方损失函数的梯度下降法
private val gradient = new LeastSquaresGradient()
//采用简单梯度更新,无正则化
private val updater = new SimpleUpdater()
@Since("0.8.0")
//新建梯度优化计算方法
override val optimizer = new GradientDescent(gradient, updater)
.setStepSize(stepSize)
.setNumIterations(numIterations)
.setRegParam(regParam)
.setMiniBatchFraction(miniBatchFraction)


训练的run方法

org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala

/**
* Run the algorithm with the configured parameters on an input RDD
* of LabeledPoint entries starting from the initial weights provided.
*
*/
@Since("1.0.0")
def run(input: RDD[LabeledPoint], initialWeights: Vector): M = {
//特征维度
//求RDD中features的数量
if (numFeatures < 0) {
numFeatures = input.map(_.features.size).first()
}
//输入样本检测
if (input.getStorageLevel == StorageLevel.NONE) {
logWarning("The input data is not directly cached, which may hurt performance if its"
+ " parent RDDs are also uncached.")
}

// Check the data properties before running the optimizer
if (validateData && !validators.forall(func => func(input))) {
throw new SparkException("Input validation failed.")
}

/**
数据降维,优化过程中,收敛取决于训练数据的维度,降维提高收敛速度
目前只适用于逻辑回归
*/
val scaler = if (useFeatureScaling) {
new StandardScaler(withStd = true, withMean = false).fit(input.map(_.features))
} else {
null
}

// Prepend an extra variable consisting of all 1.0's for the intercept.
//增加偏置项bias:θ0常数项
// TODO: Apply feature scaling to the weight vector instead of input data.
val data =
if (addIntercept) {
if (useFeatureScaling) {
input.map(lp => (lp.label, appendBias(scaler.transform(lp.features)))).cache()
} else {
//在feature后添加bias,持久化到内存
input.map(lp => (lp.label, appendBias(lp.features))).cache()
}
} else {
if (useFeatureScaling) {
input.map(lp => (lp.label, scaler.transform(lp.features))).cache()
} else {
input.map(lp => (lp.label, lp.features))
}
}

/**
* TODO: For better convergence, in logistic regression, the intercepts should be computed
* from the prior probability distribution of the outcomes; for linear regression,
* the intercept should be set as the average of response.
* 初始权重,包括增加偏置项
*/
val initialWeightsWithIntercept = if (addIntercept && numOfLinearPredictor == 1) {
appendBias(initialWeights)
} else {
/** If `numOfLinearPredictor > 1`, initialWeights already contains intercepts. */
initialWeights
}
//权重训练优化,进行梯度下降学习,返回最优权重
val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept)
//获取偏置项
val intercept = if (addIntercept && numOfLinearPredictor == 1) {
weightsWithIntercept(weightsWithIntercept.size - 1)
} else {
0.0
}
//获取权重
var weights = if (addIntercept && numOfLinearPredictor == 1) {
Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1))
} else {
weightsWithIntercept
}

/**
* The weights and intercept are trained in the scaled space; we're converting them back to
* the original scale.
* 对于降维,权重要进行还原
* Math shows that if we only perform standardization without subtracting means, the intercept
* will not be changed. w_i = w_i' / v_i where w_i' is the coefficient in the scaled space, w_i
* is the coefficient in the original space, and v_i is the variance of the column i.
*/
if (useFeatureScaling) {
if (numOfLinearPredictor == 1) {
weights = scaler.transform(weights)
} else {
/**
* For `numOfLinearPredictor > 1`, we have to transform the weights back to the original
* scale for each set of linear predictor. Note that the intercepts have to be explicitly
* excluded when `addIntercept == true` since the intercepts are part of weights now.
*/
var i = 0
val n = weights.size / numOfLinearPredictor
val weightsArray = weights.toArray
while (i < numOfLinearPredictor) {
val start = i * n
val end = (i + 1) * n - { if (addIntercept) 1 else 0 }

val partialWeightsArray = scaler.transform(
Vectors.dense(weightsArray.slice(start, end))).toArray

System.arraycopy(partialWeightsArray, 0, weightsArray, start, partialWeightsArray.length)
i += 1
}
weights = Vectors.dense(weightsArray)
}
}

// Warn at the end of the run as well, for increased visibility.
if (input.getStorageLevel == StorageLevel.NONE) {
logWarning("The input data was not directly cached, which may hurt performance if its"
+ " parent RDDs are also uncached.")
}

// Unpersist cached data
if (data.getStorageLevel != StorageLevel.NONE) {
data.unpersist(false)
}
//返回训练模型
createModel(weights, intercept)
}
}


梯度下降求解权重

org/apache/spark/mllib/optimization/GradientDescent.scala

optimizer.optimize->GradientDescent.optimize->GradientDescent.runMiniBatchSGD

def runMiniBatchSGD(
data: RDD[(Double, Vector)],
gradient: Gradient,
updater: Updater,
stepSize: Double,
numIterations: Int,
regParam: Double,
miniBatchFraction: Double,
initialWeights: Vector,
convergenceTol: Double): (Vector, Array[Double]) = {
//历史迭代误差数组
val stochasticLossHistory = new ArrayBuffer[Double](numIterations)
// Record previous weight and current one to calculate solution vector difference

var previousWeights: Option[Vector] = None
var currentWeights: Option[Vector] = None

//训练样本数m
val numExamples = data.count()

// if no data, return initial weights to avoid NaNs
if (numExamples == 0) {
logWarning("GradientDescent.runMiniBatchSGD returning initial weights, no data found")
return (initialWeights, stochasticLossHistory.toArray)
}

if (numExamples * miniBatchFraction < 1) {
logWarning("The miniBatchFraction is too small")
}

// 初始化权重Initialize weights as a column vector
var weights = Vectors.dense(initialWeights.toArray)
val n = weights.size

/**
* For the first iteration, the regVal will be initialized as sum of weight squares
* if it's L2 updater; for L1 updater, the same logic is followed.
*/
//这里的compute进行了第一次迭代,正则化值初始化为权重的加权平方和
var regVal = updater.compute(
weights, Vectors.zeros(weights.size), 0, 1, regParam)._2

var converged = false // indicates whether converged based on convergenceTol
var i = 1
//权重迭代计算
while (!converged && i <= numIterations) {
//广播权重变量,每个Executer都接受到当前权重
val bcWeights = data.context.broadcast(weights)
// Sample a subset (fraction miniBatchFraction) of the total data
// 随机抽样样本,对抽样的样本子集,采用treeAggregate的RDD方法,进行聚合计算
//计算每个样本的权重向量,误差值,然后对所有样本权重向量和误差值进行累加,这是一次map-reduce
// compute and sum up the subgradients on this subset (this is one map-reduce)
val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)
.treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(//初始值
//将v合并到c
seqOp = (c, v) => {
// c: (grad, loss, count), v: (label, features)
//返回的是loss
val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
(c._1, c._2 + l, c._3 + 1)
},
//合并两个c
combOp = (c1, c2) => {
// c: (grad, loss, count)
//
(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
})
bcWeights.destroy(blocking = false)

if (miniBatchSize > 0) {
/**
* lossSum is computed using the weights from the previous iteration
* and regVal is the regularization value computed in the previous iteration as well.
*/
//保存误差,更新权重
stochasticLossHistory += lossSum / miniBatchSize + regVal
val update = updater.compute(
weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
stepSize, i, regParam)
weights = update._1
regVal = update._2

previousWeights = currentWeights
currentWeights = Some(weights)
if (previousWeights != None && currentWeights != None) {
converged = isConverged(previousWeights.get,
currentWeights.get, convergenceTol)
}
} else {
logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")
}
i += 1
}

logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(
stochasticLossHistory.takeRight(10).mkString(", ")))
//返回模型和误差数组
(weights, stochasticLossHistory.toArray)

}

//判断是否收敛
private def isConverged(
previousWeights: Vector,
currentWeights: Vector,
convergenceTol: Double): Boolean = {
// To compare with convergence tolerance.
val previousBDV = previousWeights.asBreeze.toDenseVector
val currentBDV = currentWeights.asBreeze.toDenseVector

// This represents the difference of updated weights in the iteration.
val solutionVecDiff: Double = norm(previousBDV - currentBDV)

solutionVecDiff < convergenceTol * Math.max(norm(currentBDV), 1.0)
}

}


梯度计算

gradient.compute计算每个样本的梯度和误差,线性回归中使用LeastSquaresGradient。最小二乘

org/apache/spark/mllib/optimization/Gradient.scala

/**
* :: DeveloperApi ::
* Compute gradient and loss for a Least-squared loss function, as used in linear regression.
* This is correct for the averaged least squares loss function (mean squared error)
*              L = 1/2n ||A weights-y||^2
* See also the documentation for the precise formulation.
*/
@DeveloperApi
class LeastSquaresGradient extends Gradient {
override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
//y-h0(x)
val diff = dot(data, weights) - label

val loss = diff * diff / 2.0
val gradient = data.copy
scal(diff, gradient) //梯度值:x*(y-h0(x))
(gradient, loss)
}

override def compute(
data: Vector,
label: Double,
weights: Vector,
cumGradient: Vector): Double = {
val diff = dot(data, weights) - label //y-h0(x)
axpy(diff, data, cumGradient) // y= x* (y-h0(x))+cumGradient
diff * diff / 2.0
}
}


权重更新

/**
* :: DeveloperApi ::
* A simple updater for gradient descent *without* any regularization.
* Uses a step-size decreasing with the square root of the number of iterations.
*/
@DeveloperApi
class SimpleUpdater extends Updater {
override def compute(
weightsOld: Vector,
gradient: Vector,
stepSize: Double,
iter: Int,
regParam: Double): (Vector, Double) = {
//当前迭代次数的平方根的倒数作为趋近的因子α
val thisIterStepSize = stepSize / math.sqrt(iter)
val brzWeights: BV[Double] = weightsOld.asBreeze.toDenseVector
brzAxpy(-thisIterStepSize, gradient.asBreeze, brzWeights)

(Vectors.fromBreeze(brzWeights), 0)
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Spark MLlib