Spark浅显了解
2015-09-21 20:52
274 查看
Spark是基于内存计算的集群计算系统,非常适合于迭代运算的机器学习方法实现。作为一个数据挖掘的专业人员,不容错过此等神器,下面我们就来简单地体验一下Spark。
(2)Spark SQL:提供通过Apache Hive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。每个数据库表被当做一个RDD,Spark SQL查询被转换为Spark操作。对熟悉Hive和HiveQL的人,Spark可以拿来就用。
(3)Spark Streaming:允许对实时数据流进行处理和控制。很多实时数据库(如Apache Store)可以处理实时数据。Spark Streaming允许程序能够像普通RDD一样处理实时数据。
(4)MLlib:一个常用机器学习算法库,算法被实现为对RDD的Spark操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。之前可选的大数据机器学习库Mahout,将会转到Spark,并在未来实现。
(5)GraphX:控制图、并行图操作和计算的一组算法和工具的集合。GraphX扩展了RDD API,包含控制图、创建子图、访问路径上所有顶点的操作。
http://www.oschina.net/translate/spark-standalone?cmp
http://www.bubuko.com/infodetail-971596.html
IPython在spark上的使用。
http://blog.jobbole.com/86232/
上面的这些简单的例子,还远远不够满足我们的需求,但是可以让大家简单地了解Spark用Python编程的基本流程和语法,深入了解还需要大家查阅手册。
什么是RDD
RDD(弹性分布式数据集)是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现。RDD是Spark最核心的东西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现。RDD必须是可序列化的,RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作。Spark核心组件
(1)Spark Core:包含Spark的基本功能;尤其是定义RDD的API、操作以及这两者上的动作。其他Spark的库都是构建在RDD和Spark Core之上的。(2)Spark SQL:提供通过Apache Hive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。每个数据库表被当做一个RDD,Spark SQL查询被转换为Spark操作。对熟悉Hive和HiveQL的人,Spark可以拿来就用。
(3)Spark Streaming:允许对实时数据流进行处理和控制。很多实时数据库(如Apache Store)可以处理实时数据。Spark Streaming允许程序能够像普通RDD一样处理实时数据。
(4)MLlib:一个常用机器学习算法库,算法被实现为对RDD的Spark操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。之前可选的大数据机器学习库Mahout,将会转到Spark,并在未来实现。
(5)GraphX:控制图、并行图操作和计算的一组算法和工具的集合。GraphX扩展了RDD API,包含控制图、创建子图、访问路径上所有顶点的操作。
Spark安装
这里给出几个安装的网址,大家自己安装一下。http://www.oschina.net/translate/spark-standalone?cmp
http://www.bubuko.com/infodetail-971596.html
IPython在spark上的使用。
http://blog.jobbole.com/86232/
Spark简单例子
这里,我安装的是单机模式,用Python语言在IPython下编程。我在代码里加了很好地注释,可以很方便地看懂## http://127.0.0.1:8880/ ## spark 在ipython Notebook上编程 import os import sys from operator import add from pyspark import SparkContext, SparkConf #sys.path #conf = SparkConf().setAppName("abc").setMaster("spark://8080") #conf.set("es.index.auto.create", "true") #sc = SparkContext(conf=conf) ## 并行化集合 data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) print distData.collect() print distData.first() print distData.take(3) ## 由外部文件产生并行化集合 lines = sc.textFile("/home/yujianmin/spark-1.4.0-bin-hadoop2.6/data/mllib/mydata.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b) print totalLength # 如果想以后再用lineLenghts,将其保存到内存中 # #lineLengths.persist() #map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。 #任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。 print distData.collect(); y = distData.map(lambda x: x*2) print y.collect() z = y.reduce(lambda a, b: a+b) print z # 将function对象传给Spark # #object MyFunctions { # def func1(s: String): String = { ... } #} #myRdd.map(MyFunctions.func1) lines = sc.textFile("/home/yujianmin/spark-1.4.0-bin-hadoop2.6/data/mllib/mydata1.txt") # method (1)直接传递函数 def myFunc(s): words = s.split(" ") return len(words) lineLength2 = lines.map(myFunc) print lineLength2.collect() totalLength2 = lineLength2.reduce(lambda a, b: a+b) print totalLength2 # method (2) 传递类 class functionPool(): def __init__(self): self.para = "hell" def __del__(self): pass def say(self): print self.para def wordcount(self, s): words = s.split(" ") return len(words) def SPWordCount(self, rdd): return rdd.map(self.wordcount()) #functionPool a #lineLength2 = lines.map(a.wordcount()) #print lineLength2.collect() #totalLength2 = lineLength2.reduce(lambda a, b: a+b)
上面的这些简单的例子,还远远不够满足我们的需求,但是可以让大家简单地了解Spark用Python编程的基本流程和语法,深入了解还需要大家查阅手册。
相关文章推荐
- 象限覆盖 CodeChef Lighthouses
- SpringMVC 返回数据的缺省格式
- 图像编码基本分类
- 5条原则助你改善产品UX设计
- 怎样的配色能让你的作品高端!上档次呢?
- 自定义 EditText背景 选中和默认
- oracle 数据库 sqlplus的一些要点
- C#软件开发实例.私人订制自己的屏幕截图工具(三)托盘图标及菜单的实现
- 关于字符串的trim()和截取空格(2015年9月19日)
- 最大熵模型总结
- SOHO设计师如何保障自己的权益
- 响应式布局的9项基本原则
- 在linux中文件或文件夹名字中不要有空格
- Linux中tty框架与uart框架之间的调用关系剖析
- String split 分隔字符串,多个分隔符、正则表达式学习示例
- Objective-C-----用NSArray显示一年中所有的月份
- 比较三个数大小
- 常见的HTTP状态码
- WebServie 基础
- HDU 4635 Strongly connected(强连通分量+缩点)