您的位置:首页 > 运维架构 > Apache

Scala的Apache Spark机器学习示例

2020-06-29 04:33 926 查看

在此Apache Spark机器学习示例中,将介绍Spark MLlib并审查Scala源代码。 这篇文章和随附的截屏视频将演示自定义Spark MLlib Spark驱动程序应用程序。 然后,将检查Spark MLLib Scala源代码。 将显示和解释许多主题,但首先,让我们描述一些机器学习概念。

机器学习关键概念

什么是机器学习?

机器学习正在创建和使用从数据中学习的模型。 您可能还会听到机器学习,称为预测建模或数据挖掘。

机器学习的三个例子是什么?

  • 垃圾邮件预测
  • 欺诈性信用卡交易预测
  • 产品或广告推荐引擎

机器学习模型有两种类型:有监督的和无监督的。 监督模型包含一组带有正确答案的数据,而非监督模型则不包含标签。

监督机器学习模型的示例

  • k近邻–例如:预测一个人如何投票(如果您知道他们的邻居如何投票)
  • 朴素的贝叶斯–示例:确定传入的电子邮件是否为垃圾邮件
  • 线性回归–尝试确定两个变量是否相关
  • 决策树–使用一种结构来表示许多可能的决策路径以及每个路径的结果

无监督机器学习模型的示例

  • 群集–处理未标记的数据并尝试对其进行“群集”。 例如,显示百万富翁居住地的数据集可能在比佛利山庄和曼哈顿等地聚集
  • 潜在狄利克雷分析(LDA)–自然语言处理,通常用于识别文本或一组文档中的常见主题
  • 神经网络–示例:手写识别和面部图像检测

在构建用于进行预测的模型时,我们经常根据现有数据集训练模型。 随着越来越多的训练数据集变得可用,可以对模型进行多次训练。 例如,随着我们更多地了解导致产品销售或目标参与度指标的事件,我们将基于协作过滤对推荐引擎进行重新培训。

Apache Spark机器学习示例(与Scala一起使用)

让我们展示一个Apache Spark机器学习程序的演示。 在下面的演示中,我们将从训练k均值聚类模型开始,然后使用该训练的模型来预测Slack传入文本流的语言。

此示例基于先前的Apache Spark流媒体教程构建,该教程可从Slack团队站点流式传输数据。 这些视频及更多内容也可以在我们的Spark培训课程中找到。 请参阅下面的资源部分以获取链接。

但是,让我们继续进行演示:

Apache Spark机器学习Scala源代码审查

好的,现在我们已经有了演示了,让我们回顾一下Spark MLLib的相关代码。 同样,可以在下面的参考资料小节中找到源代码的链接。 让我们从进入我们的Spark Machine Learning示例以及在演示SlackMLApp的spark-submit部署期间调用的内容开始:

Scala的Spark机器学习示例

object SlackMLApp {

object Config {
@Parameter(names = Array("-st", "--slackToken"))
var slackToken: String = null
@Parameter(names = Array("-nc", "--numClusters"))
var numClusters: Int = 4
@Parameter(names = Array("-po", "--predictOutput"))
var predictOutput: String = null
@Parameter(names = Array("-td", "--trainData"))
var trainData: String = null
@Parameter(names = Array("-ml", "--modelLocation"))
var modelLocation: String = null
}

def main(args: Array[String]) {
new JCommander(Config, args.toArray: _*)
val conf = new SparkConf().setAppName("SlackStreamingWithML")
val sparkContext = new SparkContext(conf)

// optain existing or create new model
val clusters: KMeansModel =
if (Config.trainData != null) {
KMeanTrainTask.train(sparkContext, Config.trainData, Config.numClusters, Config.modelLocation)
} else {
if (Config.modelLocation != null) {
new KMeansModel(sparkContext.objectFile[Vector](Config.modelLocation).collect())
} else {
throw new IllegalArgumentException("Either modelLocation or trainData should be specified")
}
}

if (Config.slackToken != null) {
SlackStreamingTask.run(sparkContext, Config.slackToken, clusters, Config.predictOutput)
}

}

}

上面的代码包含了

main
方法,以及从
spark-submit
调用的代码。 正如您希望看到的那样,我们将在运行SlackStreamingTask时训练新模型或使用现有模型。 它取决于传入的命令行参数,例如
trainData
modelLocation
slackToken

在此Spark机器学习示例源代码分析和回顾中,我们接下来将重点关注1)用于在KMeanTrainTask中训练模型的代码,以及2)使用模型在SlackStreamingTask中进行预测。

首先,让我们打开KMeanTrainTask的相关部分

def train(sparkContext: SparkContext, trainData: String, numClusters: Int, modelLocation: String): KMeansModel = {

if (new File(modelLocation).exists) removePrevious(modelLocation)

val trainRdd = sparkContext.textFile(trainData)

val parsedData = trainRdd.map(Utils.featurize).cache()
// if we had a really large data set to train on, we'd want to call an action to trigger cache.

val model = KMeans.train(parsedData, numClusters, numIterations)

sparkContext.makeRDD(model.clusterCenters, numClusters).saveAsObjectFile(modelLocation)

val example = trainRdd.sample(withReplacement = false, 0.1).map(s => (s, model.predict(Utils.featurize(s)))).collect()
println("Prediction examples:")
example.foreach(println)

model
}

调用

train
我们将尝试删除
removePrevious
中任何以前保存的模型。 (未显示
removePrevious
因为它与我们对使用Apache Spark进行机器学习的关注无关。)因此,让我们设置一个名为
trainRdd
的新RDD。 由于
textFile
接受目录的String参数,因此它将读取我们用“输入”调用的目录中包含的所有文件。

接下来,我们必须将RDD中的元素(文本行)转换为适合KMeans的格式。 我们通过调用

Utils.featurize
来做到这一点:

object Utils {

val NUM_DEMENSIONS: Int = 1000

val tf = new HashingTF(NUM_DEMENSIONS)

/**
* This uses min hash algorithm https://en.wikipedia.org/wiki/MinHash to transform
* string to vector of double, which is required for k-means
*/
def featurize(s: String): Vector = {
tf.transform(s.sliding(2).toSeq)
}

}

现在,如果我们返回到KMeansTrain任务对象,则可以使用带有

parsedData
numClusters
numIterations
KMeans.train
函数来训练模型。 后来,我们保存模型并派几个
example
通过遍历例子,发送到集群控制台的预测
println

现在我们已经训练了模型,让我们看看SlackStreamingTask

bject SlackStreamingTask {

def run(sparkContext: SparkContext, slackToken: String, clusters: KMeansModel, predictOutput: String) {
val ssc = new StreamingContext(sparkContext, Seconds(5))
val dStream = ssc.receiverStream(new SlackReceiver(slackToken))

val stream = dStream //create stream of events from the Slack... but filter and marshall to JSON stream data
.filter(JSON.parseFull(_).get.asInstanceOf[Map[String, String]]("type") == "message") // get only message events
.map(JSON.parseFull(_).get.asInstanceOf[Map[String, String]]("text")) // extract message text from the event

val kmeanStream = kMean(stream, clusters) // create K-mean model
kmeanStream.print() // print k-mean results. It is pairs (k, m), where k - is a message text, m - is a cluster number to which message relates

if (predictOutput != null) {
kmeanStream.saveAsTextFiles(predictOutput) // save to results to the file, if file name specified
}

ssc.start() // run spark streaming application
ssc.awaitTermination() // wait the end of the application
}

/**
* transform stream of strings to stream of (string, vector) pairs and set this stream as input data for prediction
*/
def kMean(dStream: DStream[String], clusters: KMeansModel): DStream[(String, Int)] = {
dStream.map(s => (s, Utils.featurize(s))).map(p => (p._1, clusters.predict(p._2)))
}

}

使用先前保存的模型进行聚类预测的Spark MLlib代码为

clusters.predict
。 在调用它之前,我们在DStream上进行映射,并再次使用
featurize
以便与
predict
一起使用。 我们将返回一个DStream,其中包含从Slack和预测的集群接收的原始文本。

如果已使用

predictOutput
值调用了Spark驱动程序,则输出将另存为文本文件。

这是另一个截屏视频,我将更详细地描述代码。

资源资源

源代码: https : //github.com/tmcgrath/spark-course/tree/master/spark-ml

机器学习背景: http : //www.slideshare.net/ToddMcGrath1/machine-learning-with-apache-spark-62310284

Spark MLlib: http ://www.slideshare.net/ToddMcGrath1/machine-learning-with-spark-mllib

学习Spark总结中的Spark机器学习一章

Spark与Scala培训课程

翻译自: https://www.javacodegeeks.com/2016/05/apache-spark-machine-learning-example-scala.html

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: