Scala的Apache Spark机器学习示例
在此Apache Spark机器学习示例中,将介绍Spark MLlib并审查Scala源代码。 这篇文章和随附的截屏视频将演示自定义Spark MLlib Spark驱动程序应用程序。 然后,将检查Spark MLLib Scala源代码。 将显示和解释许多主题,但首先,让我们描述一些机器学习概念。
机器学习关键概念
什么是机器学习?
机器学习正在创建和使用从数据中学习的模型。 您可能还会听到机器学习,称为预测建模或数据挖掘。
机器学习的三个例子是什么?
- 垃圾邮件预测
- 欺诈性信用卡交易预测
- 产品或广告推荐引擎
机器学习模型有两种类型:有监督的和无监督的。 监督模型包含一组带有正确答案的数据,而非监督模型则不包含标签。
监督机器学习模型的示例
- k近邻–例如:预测一个人如何投票(如果您知道他们的邻居如何投票)
- 朴素的贝叶斯–示例:确定传入的电子邮件是否为垃圾邮件
- 线性回归–尝试确定两个变量是否相关
- 决策树–使用一种结构来表示许多可能的决策路径以及每个路径的结果
无监督机器学习模型的示例
- 群集–处理未标记的数据并尝试对其进行“群集”。 例如,显示百万富翁居住地的数据集可能在比佛利山庄和曼哈顿等地聚集
- 潜在狄利克雷分析(LDA)–自然语言处理,通常用于识别文本或一组文档中的常见主题
- 神经网络–示例:手写识别和面部图像检测
在构建用于进行预测的模型时,我们经常根据现有数据集训练模型。 随着越来越多的训练数据集变得可用,可以对模型进行多次训练。 例如,随着我们更多地了解导致产品销售或目标参与度指标的事件,我们将基于协作过滤对推荐引擎进行重新培训。
Apache Spark机器学习示例(与Scala一起使用)
让我们展示一个Apache Spark机器学习程序的演示。 在下面的演示中,我们将从训练k均值聚类模型开始,然后使用该训练的模型来预测Slack传入文本流的语言。
此示例基于先前的Apache Spark流媒体教程构建,该教程可从Slack团队站点流式传输数据。 这些视频及更多内容也可以在我们的Spark培训课程中找到。 请参阅下面的资源部分以获取链接。
但是,让我们继续进行演示:
Apache Spark机器学习Scala源代码审查
好的,现在我们已经有了演示了,让我们回顾一下Spark MLLib的相关代码。 同样,可以在下面的参考资料小节中找到源代码的链接。 让我们从进入我们的Spark Machine Learning示例以及在演示SlackMLApp的spark-submit部署期间调用的内容开始:
Scala的Spark机器学习示例
object SlackMLApp { object Config { @Parameter(names = Array("-st", "--slackToken")) var slackToken: String = null @Parameter(names = Array("-nc", "--numClusters")) var numClusters: Int = 4 @Parameter(names = Array("-po", "--predictOutput")) var predictOutput: String = null @Parameter(names = Array("-td", "--trainData")) var trainData: String = null @Parameter(names = Array("-ml", "--modelLocation")) var modelLocation: String = null } def main(args: Array[String]) { new JCommander(Config, args.toArray: _*) val conf = new SparkConf().setAppName("SlackStreamingWithML") val sparkContext = new SparkContext(conf) // optain existing or create new model val clusters: KMeansModel = if (Config.trainData != null) { KMeanTrainTask.train(sparkContext, Config.trainData, Config.numClusters, Config.modelLocation) } else { if (Config.modelLocation != null) { new KMeansModel(sparkContext.objectFile[Vector](Config.modelLocation).collect()) } else { throw new IllegalArgumentException("Either modelLocation or trainData should be specified") } } if (Config.slackToken != null) { SlackStreamingTask.run(sparkContext, Config.slackToken, clusters, Config.predictOutput) } } }
上面的代码包含了
main方法,以及从
spark-submit调用的代码。 正如您希望看到的那样,我们将在运行SlackStreamingTask时训练新模型或使用现有模型。 它取决于传入的命令行参数,例如
trainData,
modelLocation和
slackToken。
在此Spark机器学习示例源代码分析和回顾中,我们接下来将重点关注1)用于在KMeanTrainTask中训练模型的代码,以及2)使用模型在SlackStreamingTask中进行预测。
首先,让我们打开KMeanTrainTask的相关部分
def train(sparkContext: SparkContext, trainData: String, numClusters: Int, modelLocation: String): KMeansModel = { if (new File(modelLocation).exists) removePrevious(modelLocation) val trainRdd = sparkContext.textFile(trainData) val parsedData = trainRdd.map(Utils.featurize).cache() // if we had a really large data set to train on, we'd want to call an action to trigger cache. val model = KMeans.train(parsedData, numClusters, numIterations) sparkContext.makeRDD(model.clusterCenters, numClusters).saveAsObjectFile(modelLocation) val example = trainRdd.sample(withReplacement = false, 0.1).map(s => (s, model.predict(Utils.featurize(s)))).collect() println("Prediction examples:") example.foreach(println) model }
调用
train我们将尝试删除
removePrevious中任何以前保存的模型。 (未显示
removePrevious因为它与我们对使用Apache Spark进行机器学习的关注无关。)因此,让我们设置一个名为
trainRdd的新RDD。 由于
textFile接受目录的String参数,因此它将读取我们用“输入”调用的目录中包含的所有文件。
接下来,我们必须将RDD中的元素(文本行)转换为适合KMeans的格式。 我们通过调用
Utils.featurize来做到这一点:
object Utils { val NUM_DEMENSIONS: Int = 1000 val tf = new HashingTF(NUM_DEMENSIONS) /** * This uses min hash algorithm https://en.wikipedia.org/wiki/MinHash to transform * string to vector of double, which is required for k-means */ def featurize(s: String): Vector = { tf.transform(s.sliding(2).toSeq) } }
现在,如果我们返回到KMeansTrain任务对象,则可以使用带有
parsedData,
numClusters和
numIterations
KMeans.train函数来训练模型。 后来,我们保存模型并派几个
example通过遍历例子,发送到集群控制台的预测
println。
现在我们已经训练了模型,让我们看看SlackStreamingTask
bject SlackStreamingTask { def run(sparkContext: SparkContext, slackToken: String, clusters: KMeansModel, predictOutput: String) { val ssc = new StreamingContext(sparkContext, Seconds(5)) val dStream = ssc.receiverStream(new SlackReceiver(slackToken)) val stream = dStream //create stream of events from the Slack... but filter and marshall to JSON stream data .filter(JSON.parseFull(_).get.asInstanceOf[Map[String, String]]("type") == "message") // get only message events .map(JSON.parseFull(_).get.asInstanceOf[Map[String, String]]("text")) // extract message text from the event val kmeanStream = kMean(stream, clusters) // create K-mean model kmeanStream.print() // print k-mean results. It is pairs (k, m), where k - is a message text, m - is a cluster number to which message relates if (predictOutput != null) { kmeanStream.saveAsTextFiles(predictOutput) // save to results to the file, if file name specified } ssc.start() // run spark streaming application ssc.awaitTermination() // wait the end of the application } /** * transform stream of strings to stream of (string, vector) pairs and set this stream as input data for prediction */ def kMean(dStream: DStream[String], clusters: KMeansModel): DStream[(String, Int)] = { dStream.map(s => (s, Utils.featurize(s))).map(p => (p._1, clusters.predict(p._2))) } }
使用先前保存的模型进行聚类预测的Spark MLlib代码为
clusters.predict。 在调用它之前,我们在DStream上进行映射,并再次使用
featurize以便与
predict一起使用。 我们将返回一个DStream,其中包含从Slack和预测的集群接收的原始文本。
如果已使用
predictOutput值调用了Spark驱动程序,则输出将另存为文本文件。
这是另一个截屏视频,我将更详细地描述代码。
资源资源
源代码: https : //github.com/tmcgrath/spark-course/tree/master/spark-ml
机器学习背景: http : //www.slideshare.net/ToddMcGrath1/machine-learning-with-apache-spark-62310284
Spark MLlib: http ://www.slideshare.net/ToddMcGrath1/machine-learning-with-spark-mllib
翻译自: https://www.javacodegeeks.com/2016/05/apache-spark-machine-learning-example-scala.html
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战007-DataStream与MySql自定义sink和source(Scala版)002
- Apache OpenJPA 示例代码演示
- 导入文章“Apache Spark学习:将Spark部署到Hadoop 2.2.0上”中给出的 assembly/target/scala-2.9.3/目录下的spark-assembly-0.8.
- Kylin系列-分布式大数据多维分析(OLAP)引擎Apache Kylin安装配置及使用示例
- apache 301重定向配置示例
- Spark机器学习--矩阵的定义——scala版本
- Apache Curator Lock 简单示例
- Apache Commons CLI 开发命令行工具示例
- 完整微服务化示例:使用 Apache ServiceComb (incubating) 进行微服务开发、容器化、弹性伸缩
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink高级特性与高级应用023-Flink中OutFormat设置(Scala版)003
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战008-DataStream与MySql自定义sink和source(Scala版)003
- scala使用多线程示例
- Spark机器学习1·编程入门(scala/java/python)
- Apache Hadoop Zookeeper示例
- 1.集群安装 - apache原生版[spark2.1 + hadoop2.6 + scala2.11.8 + jdk1.8 + flume1.6 + zookeeper3.4.9 + kafka0.
- scala 代码示例
- Apache dbutils使用示例
- Pipeline详解及Spark MLlib使用示例(Scala/Java/Python)
- Apache shiro示例
- Apache Commons fileUpload文件上传多个示例分享