【Data Algorithms_Recipes for Scaling up with Hadoop and Spark】Chapter 12. K-Means Clustering
2016-04-05 19:43
357 查看
:spark examples中的kmeans实现
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ // scalastyle:off println package org.apache.spark.examples import breeze.linalg.{ Vector, DenseVector, squaredDistance } import org.apache.spark.{ SparkConf, SparkContext } import org.apache.spark.SparkContext._ /** * K-means clustering. * * This is an example implementation for learning how to use Spark. For more conventional use, * please refer to org.apache.spark.mllib.clustering.KMeans */ object SparkKMeans { def parseVector(line: String): Vector[Double] = { DenseVector(line.split(' ').map(_.toDouble)) } def closestPoint(p: Vector[Double], centers: Array[Vector[Double]]): Int = { var bestIndex = 0 var closest = Double.PositiveInfinity for (i <- 0 until centers.length) { val tempDist = squaredDistance(p, centers(i)) if (tempDist < closest) { closest = tempDist bestIndex = i } } bestIndex } def showWarning() { System.err.println( """WARN: This is a naive implementation of KMeans Clustering and is given as an example! |Please use the KMeans method found in org.apache.spark.mllib.clustering |for more conventional use. """.stripMargin) } def main(args: Array[String]) { if (args.length < 3) { System.err.println("Usage: SparkKMeans <file> <k> <convergeDist>") System.exit(1) } showWarning() val sparkConf = new SparkConf().setAppName("SparkKMeans") val sc = new SparkContext(sparkConf) val lines = sc.textFile(args(0)) val data = lines.map(parseVector _).cache() val K = args(1).toInt val convergeDist = args(2).toDouble val kPoints = data.takeSample(withReplacement = false, K, 42).toArray var tempDist = 1.0 while (tempDist > convergeDist) { val closest = data.map(p => (closestPoint(p, kPoints), (p, 1))) val pointStats = closest.reduceByKey { case ((p1, c1), (p2, c2)) => (p1 + p2, c1 + c2) } val newPoints = pointStats.map { pair => (pair._1, pair._2._1 * (1.0 / pair._2._2)) }.collectAsMap() tempDist = 0.0 for (i <- 0 until K) { tempDist += squaredDistance(kPoints(i), newPoints(i)) } for (newP <- newPoints) { kPoints(newP._1) = newP._2 } println("Finished iteration (delta = " + tempDist + ")") } println("Final centers:") kPoints.foreach(println) sc.stop() } } // scalastyle:on println
相关文章推荐
- Odoo8.0不能创建客户问题的解决 Document type: res.partner, Operation: read
- [原创] hadoop学习笔记:重新格式化HDFS文件系统
- Linux系统的命令源代码的获取方法
- linux串口测试程序
- getopt 函数
- U盘启动pe+CDlinux最简单方法
- hadoop2.x集群的log4j配置文件
- LINUX的XEN和KVM到底区别在什么地方?
- Linux GRUB legacy
- hadoop中的sqoop工具的使用
- openjudge SDAU 链表 构造单向链表
- linux配置ftp服务器
- sockiopool
- linux环境下的伪分布式的hadoop基本搭建
- 【Android学习】Android控件架构
- Hadoop 命令提示/home/hadoop/src/hadoop-2.7.1/bin/hadoop: line 166: /home/hadoop/src/hadoop-2.7.1//bin/ja
- 对Linux 七个运行级别的详解
- linux环境下的jdk安装配置
- 怎样配置vi编辑器
- Maven部署多个环境