您的位置:首页 > 其它

Spark浅显了解

2015-09-21 20:52 274 查看
  Spark是基于内存计算的集群计算系统,非常适合于迭代运算的机器学习方法实现。作为一个数据挖掘的专业人员,不容错过此等神器,下面我们就来简单地体验一下Spark。

什么是RDD

  RDD(弹性分布式数据集)是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现。RDD是Spark最核心的东西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现。RDD必须是可序列化的,RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作。

Spark核心组件

  (1)Spark Core:包含Spark的基本功能;尤其是定义RDD的API、操作以及这两者上的动作。其他Spark的库都是构建在RDD和Spark Core之上的。

  (2)Spark SQL:提供通过Apache Hive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。每个数据库表被当做一个RDD,Spark SQL查询被转换为Spark操作。对熟悉Hive和HiveQL的人,Spark可以拿来就用。

  (3)Spark Streaming:允许对实时数据流进行处理和控制。很多实时数据库(如Apache Store)可以处理实时数据。Spark Streaming允许程序能够像普通RDD一样处理实时数据。

  (4)MLlib:一个常用机器学习算法库,算法被实现为对RDD的Spark操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。之前可选的大数据机器学习库Mahout,将会转到Spark,并在未来实现。

  (5)GraphX:控制图、并行图操作和计算的一组算法和工具的集合。GraphX扩展了RDD API,包含控制图、创建子图、访问路径上所有顶点的操作。

Spark安装

  这里给出几个安装的网址,大家自己安装一下。

  http://www.oschina.net/translate/spark-standalone?cmp

  http://www.bubuko.com/infodetail-971596.html

  IPython在spark上的使用。

  http://blog.jobbole.com/86232/

Spark简单例子

  这里,我安装的是单机模式,用Python语言在IPython下编程。我在代码里加了很好地注释,可以很方便地看懂

## http://127.0.0.1:8880/ ## spark 在ipython Notebook上编程
import os
import sys
from operator import add
from pyspark import SparkContext, SparkConf
#sys.path
#conf = SparkConf().setAppName("abc").setMaster("spark://8080")
#conf.set("es.index.auto.create", "true")
#sc = SparkContext(conf=conf)

## 并行化集合
data = [1, 2, 3, 4, 5]  
distData = sc.parallelize(data)
print distData.collect()
print distData.first()
print distData.take(3)

## 由外部文件产生并行化集合
lines = sc.textFile("/home/yujianmin/spark-1.4.0-bin-hadoop2.6/data/mllib/mydata.txt")  
lineLengths = lines.map(lambda s: len(s))  
totalLength = lineLengths.reduce(lambda a, b: a + b)  
print totalLength
# 如果想以后再用lineLenghts,将其保存到内存中 #
#lineLengths.persist()

#map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。
#任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。
print distData.collect();
y = distData.map(lambda x: x*2)
print y.collect()
z = y.reduce(lambda a, b: a+b)
print z

# 将function对象传给Spark #
#object MyFunctions {
#    def func1(s: String): String = { ... }
#}
#myRdd.map(MyFunctions.func1)
lines = sc.textFile("/home/yujianmin/spark-1.4.0-bin-hadoop2.6/data/mllib/mydata1.txt")  
# method (1)直接传递函数
def myFunc(s):
    words = s.split(" ")
    return len(words)
lineLength2 = lines.map(myFunc)
print lineLength2.collect()
totalLength2 = lineLength2.reduce(lambda a, b: a+b)
print totalLength2
# method (2) 传递类
class functionPool():
    def __init__(self):
        self.para = "hell"
    def __del__(self):
        pass
    def say(self):
        print self.para
    def wordcount(self, s):
        words = s.split(" ")
        return len(words)
    def SPWordCount(self, rdd):
        return rdd.map(self.wordcount())
#functionPool a
#lineLength2 = lines.map(a.wordcount())
#print lineLength2.collect()
#totalLength2 = lineLength2.reduce(lambda a, b: a+b)


  上面的这些简单的例子,还远远不够满足我们的需求,但是可以让大家简单地了解Spark用Python编程的基本流程和语法,深入了解还需要大家查阅手册。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: