您的位置:首页 > 大数据 > 人工智能

Spark MLlib NaiveBayes 贝叶斯分类器

2015-04-29 11:28 260 查看

1.1朴素贝叶斯公式

贝叶斯定理:





其中A为事件,B为类别,P(B|A)为事件A条件下属于B类别的概率。
朴素贝叶斯分类的正式定义如下:
1、设

为一个待分类项,而每个a为x的一个特征属性。
2、有类别集合


3、计算


4、如果

,则


那么现在的关键就是如何计算第3步中的各个条件概率:
1、找到一个已知分类的待分类项集合,这个集合叫做训练样本集。
2、统计得到在各类别下各个特征属性的条件概率估计。即


3、如果各个特征属性是条件独立的,则根据贝叶斯定理有如下推导:



因为分母对于所有类别为常数,因为我们只要将分子最大化皆可。又因为各特征属性是条件独立的,所以有:



1.2 NaiveBayesModel源码解析

1、NaiveBayesModel主要的三个变量:
1)labels:类别
scala> labels
res56: Array[Double] = Array(2.0, 0.0, 1.0)
2)pi:各个label的先验概率
scala> pi
res57: Array[Double] = Array(-1.1631508098056809, -0.9808292530117262, -1.1631508098056809)
3)theta:存储各个特征在各个类别中的条件概率。
scala> theta
res58: Array[Array[Double]] = Array(Array(-2.032921526044943, -1.8658674413817768, -0.33647223662121295), Array(-0.2451224580329847, -2.179982770901713, -2.26002547857525), Array(-1.9676501356917193, -0.28410425110389714,
-2.2300144001592104))
4)lambda:平滑因子


2、NaiveBayesModel代码

[b]1) train[/b]

/**

* Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries.

*

* @param data RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].

*/
def run(data: RDD[LabeledPoint]) = {
// requireNonnegativeValues:取每一个样本数据的特征值,以向量形式存储,特征植必需为非负数。

val requireNonnegativeValues: Vector => Unit = (v: Vector) => {

val values = vmatch {

case SparseVector(size, indices, values) =>

values

case DenseVector(values) =>

values

}

if (!values.forall(_ >=0.0)) {

thrownew SparkException(s"Naive Bayes requires nonnegative feature values but found $v.")

}

}


// Aggregates term frequencies per label.

// TODO: Calling combineByKey and collect creates two stages, we can implement something
// TODO: similar to reduceByKeyLocally to save one stage.
// aggregated:对所有样本数据进行聚合,以label为key,聚合同一个label的features;
// createCombiner:完成样本从V到C的combine转换,(v:
Vector) –> (c: (Long, BDV[Double])

// mergeValue:将下一个样本中Value合并为操作后的C类型数据,(c:
(Long, BDV[Double]), v: Vector) –> (c: (Long, BDV[Double]);

// mergeCombiners:根据每个Key所对应的多个C,进行归并,(c1:
(Long, BDV[Double]), c2: (Long, BDV[Double])) –> (c: (Long, BDV[Double]);


val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, BDV[Double])](

createCombiner = (v: Vector) => {

requireNonnegativeValues(v)

(1L, v.toBreeze.toDenseVector)

},

mergeValue = (c: (Long, BDV[Double]), v: Vector) => {

requireNonnegativeValues(v)

(c._1 + 1L, c._2 += v.toBreeze)

},

mergeCombiners = (c1: (Long, BDV[Double]), c2: (Long, BDV[Double])) =>

(c1._1 + c2._1, c1._2 += c2._2)

).collect()

val numLabels = aggregated.length

var numDocuments =0L

aggregated.foreach { case (_, (n, _)) =>

numDocuments += n

}

val numFeatures = aggregated.headmatch {case
(_, (_, v)) => v.size }

val labels =new Array[Double](numLabels)

val pi =new Array[Double](numLabels)

val theta = Array.fill(numLabels)(new Array[Double](numFeatures))

val piLogDenom = math.log(numDocuments + numLabels * lambda)
var i =0
// labels:存储类别
// pi:计算每个类别的概率
// theta:计算在该类别下每个特征的概率

aggregated.foreach { case (label, (n, sumTermFreqs)) =>

labels(i) = label

val thetaLogDenom = math.log(brzSum(sumTermFreqs) + numFeatures * lambda)

pi(i) = math.log(n + lambda) - piLogDenom

var j =0

while (j < numFeatures) {

theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom

j += 1

}

i += 1

}
//
返回模型


new NaiveBayesModel(labels, pi, theta)
}



2) predict

// brzPi各类别概率向量,brzTheta每个类别下各特征概率矩阵

privateval brzPi =new BDV[Double](pi)
privateval brzTheta =new
BDM[Double](theta.length, theta(0).length)

{
// Need to put an extra pair of braces to prevent Scala treating `i` as a member.
var i =0
while (i < theta.length) {
var j =0
while (j < theta(i).length) {
brzTheta(i, j) = theta(i)(j)
j +=
1
}
i +=
1
}
}
// 根据各类别概率向量,每个类别下各特征概率矩阵,对样本RDD计算,其中对每行向量数据进行计算。

overridedef predict(testData: RDD[Vector]): RDD[Double] = {
val bcModel = testData.context.broadcast(this)
testData.mapPartitions { iter =>
val model = bcModel.value
iter.map(model.predict)
}
}
// 根据各类别概率向量,每个类别下各特征概率矩阵,对测试向量数据进行计算。

overridedef predict(testData: Vector): Double = {
labels(brzArgmax(brzPi + brzTheta * testData.toBreeze))
}
计算过程如下:

测试样本:0.0,[1.0,0.0,0.1]

scala> labels

res87: Array[Double] = Array(2.0, 0.0, 1.0)



scala> pi
res76: Array[Double] = Array(-1.1631508098056809, -0.9808292530117262, -1.1631508098056809)

scala> theta
res77: Array[Array[Double]] = Array(Array(-2.032921526044943, -1.8658674413817768, -0.33647223662121295), Array(-0.2451224580329847, -2.179982770901713, -2.26002547857525), Array(-1.9676501356917193, -0.28410425110389714,
-2.2300144001592104))

scala> brzPi
res78: breeze.linalg.DenseVector[Double] = DenseVector(-1.1631508098056809, -0.9808292530117262, -1.1631508098056809)

scala> brzTheta
res79: breeze.linalg.DenseMatrix[Double] =
-2.032921526044943 -1.8658674413817768 -0.33647223662121295
-0.2451224580329847 -2.179982770901713 -2.26002547857525
-1.9676501356917193 -0.28410425110389714 -2.2300144001592104

scala> val testData = new BDV[Double](Array(1.0,0.0,0.1))
testData: breeze.linalg.DenseVector[Double] = DenseVector(1.0, 0.0, 0.1)

scala> brzPi + brzTheta * testData
res86: breeze.linalg.DenseVector[Double] = DenseVector(-3.2297195595127457, -1.4519542589022358, -3.3538023855133217)

scala> labels(brzArgmax(brzPi + brzTheta * testData))
res88: Double = 0.0
结果正确。

1.3 NaiveBayesModel实例

1、数据
数据格式为:类别, 特征1 特征2 特征3
0,1 0 0
0,2 0 0
0,1 0 0.1
0,2 0 0.2
0,1 0.1 0
0,2 0.2 0
1,0 1 0.1
1,0 2 0.2
1,0.1 1 0
1,0.2 2 0
1,0 1 0
1,0 2 0
2,0.1 0 1
2,0.2 0 2
2,0 0.1 1
2,0 0.2 2
2,0 0 1
2,0 0 2
2、代码
//数据路径
valdata_path ="user/tmp/sample_naive_bayes_data.txt"
//读取数据,转换成LabeledPoint类型

valexamples =sc.textFile(data_path).map
{ line =>

valitems =line.split(',')

vallabel =items(0).toDouble

valvalue =items(1).split('
').toArray
.map(f =>f.toDouble)

LabeledPoint(label, Vectors.dense(value))

}
examples.cache()
//样本划分,80%训练,20%测试

valsplits =examples.randomSplit(Array(0.8,0.2))

valtraining =splits(0)

valtest =splits(1)

valnumTraining =training.count()

valnumTest =test.count()

println(s"numTraining = $numTraining, numTest = $numTest.")
//样本训练,生成分类模型

valmodel =new NaiveBayes().setLambda(1.0).run(training)
//根据分类模型,对测试数据进行测试,计算测试数据的正常率

valprediction =model.predict(test.map(_.features))

valpredictionAndLabel =prediction.zip(test.map(_.label))

valaccuracy =predictionAndLabel.filter(x => x._1
== x._2).count().toDouble /numTest
println(s"Test accuracy = $accuracy.")
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: