您的位置:首页 > 大数据

spark快速大数据分析笔记-2

2018-01-21 17:06 267 查看
Spark快速大数据分析第三章-笔记

本章介绍Spark对数据的核心抽象--弹性分布式数据集RDD,其实就是分布式的元素集合。在Spark中,对数据的所有操作不外呼创建RDD,转化已有RDD以及调用RDD操作进行求值。Spark会自动将RDD中的数据分发到集群上,并将操作并行化执行。

RDD基础

Spark中的RDD就是一个不可变的分布式对象集合。每个RDD都被分为多个分区,这些分区运行在集群中的不同点上。RDD可以包含Python,Java,Scala中任意类型的对象,甚至可以包含用户自定义的对象。

创建RDD的两种方法:读取一个外部数据集,或在驱动器程序里分发驱动器程序中的对象集合(比方说list和set)。

RDD支持的两种类型的操作:转化操作(transformation)和行动操作(action)。转化操作会由一个RDD生成一个新的RDD;行动操作会对RDD计算出一个结果,并把结果返回到驱动程序中,或者把结果存储到外部存储系统(如HDFS)中。

转化操作和行动操作的区别在于Spark计算RDD的方式不同,虽然你可以在任何时候定义新的RDD,但是Spark只会惰性计算这些RDD。也就是说,当真的需要计算的时候才会计算。

默认情况下,Spark的RDD会在你每次对它们进行行动操作时重新计算。如果想在多个行动操作中重用同一个RDD,可以使用RDD.persist()让Spark把这个RDD缓存下来。在实际操作中,你会经常用persist()来把数据的一部分读取到内存中,并反复查询这部分数控。

总的来说,每个Spark程序或shell会话都按如下的方式工作。

(1)从外部数据创建输入RDD

(2)使用诸如filter()这样的转化操作对RDD进行转化,以定义新的RDD.

(3)告诉Spark对需要被重用的中间结果RDD执行persist()操作。

(4)使用行动操作(例如count()和first()等)来触发一次并行计算,Spark会对计算进行优化后再执行。

cache()与使用默认存储级别调用persist()是一样的。

创建RDD。

创建RDD最简单的方式就是把程序中一个已有的集合传给SparkContext的parallelize()方法。比方说

Python中的parallelize()方法

line=sc.parallelize(["pandas","i like pandas"])

JAVA中的parallelize()方法

JavaRDD<String> lines = sc.parallelize(Arrays.asList("pandas","i like pandas"));

不过,更常用的方式是从外部存储中读取数据来创建RDD。比方说使用方法textFile().

Python版本的textFile()方法

line = sc.textFile("/path/to/README.md")

Java版本的textFile()方法

JavaRDD<String> lines = sc.textFile("/path/to/READ.md")

RDD操作

转化操作:会返回一个新的RDD作为操作的结果。假设我们想从日志文件log.txt中找出其中的错误消息。

Python实现filter()的转化操作

inputRDD = sc.textFile("log.txt")

errorsRDD= inputRDD.filter(lambda x:"error" in x)

JAVA实现filter()的转化操作

JavaRDD<String> inputRDD = sc.textFile("log.txt");

JavaRDD<String> errorsRDD = inputRDD.filter(new Function<String,Boolean>(){

  public Boolean call(String x){

  return x.contains("error");

  }

});

注意:filter()操作会返回一个全新的RDD,不会改变已有的inputRDD中的数据。inputRDD在后面的程序中还可以继续使用。

Python版本的union()转化操作

errorsRDD = inputRDD.filter(lambda x: "error" in x)

warningsRDD = inputRDD.filter(lambda x:"warning" in x)

badLinesRDD = errorsRDD.union(warningsRDD)

转化操作可以操作任意数量的输入RDD。

从已有的RDD中派生出新的RDD,Spark会使用谱系图(lineage graph)来记录这些不同RDD之间的依赖关系。也可以靠谱系图在持久化的RDD丢失部分数据时恢复所丢失的数据。
如图Spark3-1.png所示:



行动操作

行动操作会把最终求得的结果返回到驱动程序,或者写入外部存储系统中。如我们可能想输出关于badLinesRDD的一些信息。为此需要使用两个行动操作来实现。用count()来返回计算结果,用take()来收集RDD中的一些元素。

Python版本的对错误进行计数

print "input had"+badLinesRDD.count()+" concerning lines"

print "Here are 10 examples:"

for line in badLinesRDD.take(10):

  print line

Java版本中对错误进行计数

System.out.println("Input had "+badLinesRDD.count()+" concerning lines")

System.out.println("Here are 10 examples:")

for(String line:badLinesRDD.take(10)){

  System.out.println(line);

}

RDD中还有一个collect()函数,可以用来获取整个RDD中的数据。如果你的程序把RDD筛选到一个很小的规模,并且你想在本地处理这些数据,就可以使用它。但是注意collect()不能在大规模数据集中使用。另外每当我们调用一个新的行动操作时,整个RDD都会从头开始计算。要避免这种低效的行为,用户可以将中间结果持久化。

惰性求值

惰性求值意味着当我们对RDD调用转化操作时,操作不会立即执行,相反,Spark会在内部激励下所要求执行的操作的相关信息。把数据读取到RDD的操作也同样是惰性的。因此当我们调用sc.textFile()时,数据并没有读取进来,而是在必要时才会读取,和转化操作一样的是,读取数据的操作也有可能会多次执行。

虽然转化操作是惰性求值的,但你还是可以随时通过运行一个行动操作来强制Spark执行RDD的转化操作比如使用count().这是一种对你所写的程序进行部分测试的简单方法。

向Spark传递函数

Python版本

三种方式来把函数传递给Spark.传递比较短的函数时,可以使用lambda表达式传递。

如:

word=rdd.filter(lambda s:"error" in s)

def containsError(s):

    return "error" in s

word = rdd.filter(containsError)

除了lambda表达式,我们也可以传递顶层函数或是定义的局部函数。

传递函数的注意事项:Python会在你不经意间把函数所在的对象也序列化传递出去。当你传递的对象是某个对象的成员,或者包含了某个对象中一个字段的引用时,spark就会把整个对象发到工作节点上。如果遇到无法序列化的对象,也会导致你的程序失败。替代方案时,把你需要传递的字段单独放到一个局部变量中,然后传递着个局部变量。

JAVA版本
在java中,函数需要作为实现了Spark的org.apache.spark.api.java.function包中的任一函数接口的对象来传递。根据不同的返回类型,有不同的接口。最基本的接口如图spark3-2.png所示:



可以把我们的函数类内联定义为使用匿名内部类,也可以创建一个具名类:

例如在Java中使用匿名内部类进行函数传递

RDD<String> errors = lines.filter(new Function<String,Boolean>(){

  public Boolean call(String x){

  return x.contains("error");

  }

});

在Java中使用具名类进行函数传递

class ContainsError implements Function<String,Boolean>(){

  public Boolean call(String x){

  return x.contains("error");

  }

}

RDD<String> errors = line.filter(new ContainsError());

例如带参数的Java函数类

class Contains implements Function<Sting,Boolean>(){

  private String query;

  public Contains(String query){

  this.query = query;

  }

  public Boolean call(String x){

  return x.contains(query);

  }

}

RDD<String> errors = line.filter(new Contains("error"));

JAVA8的lambda表达式进行传递函数:

RDD<String> errors =line.filter(s->s.contains("error"));

常见的转换操作和行动操作

1,针对各个元素的转化操作

最常用的就是map()和filter().map()接收一个函数,把这个函数用于RDD中的每个元素,将函数的返回结果作为结果RDD中对应元素的值,而filter()则接收一个函数,并将RDD中满足该函数的元素放入新的RDD中返回。

比方说使用map()函数来对RDD中的所有数求平方。

Python版本:

nums = sc.parallelize([1,2,3,4])

squared=nums.map(lambda x:x*x).collect();

for num in squared:

    print "%i " %(num)

java版本:

SparkConf conf = new SparkConf().setMaster("local").setAppName("squared");

 JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4));

JavaRDD<Integer> result = rdd.map(new Function<Integer,Integer>(){

  public Integer call(Integer x){

  return x*x;

  }

});

System.out.println(StringUtils.join(result.collect(),","));

如果你想对每个输入元素生成多个输出元素,实现该功能的操作叫作flatmap().我们提供给flatmap()的函数被分别应用到了输入RDD的每个元素上。不过返回的不是一个元素,而是返回值序列的迭代器。我们得到的是一个包含各个迭代器可访问的所有元素的RDD。flatmap()的一个简单用途是把输入的字符串切分为单词:

Python版本

lines = sc.parallelize(["hello world","hi"])

words = lines.flatMap(lambda line:line.split(" "))

words.first() #返回“hello"

Java版本

JavaRDD<String> lines = sc.parallelize(Arrays.asList("Hello world","hi"));

JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>(){

  public Iterable<String> call(String line){

  return Arrays.asList(line.split(" "));

  }

});

words.first();//返回Hello
图Spark3-3阐释了flatMap()和map()的区别



伪集合的操作

我们的RDD中最常缺失的集合属性是元素的唯一性,因为常常有重复元素,如果只要唯一的元素,可以通过使用RDD.distinct()转化操作来生成一个只包含不同元素的新RDD。
图Spark3-4总结了常见的RDD转化操作



行动操作

最常见的行动操作是reduce(),它接收一个函数作为参赛,这个函数要操作两个RDD的元素类型的数据并返回一个同样类型的新元素。

Python中的reduce()

sum = rdd.reduce(lambda x,y:x+y)

Java中的reduce()

Integer sum = rdd.reduce(new Function2<Integer,Integer,Integer>(){

  public Integer call(Integer x,Integer y){

  return x+y;

  }

})

fold()和reduce()类似,接收一个与reduce()接收的函数签名相同的函数,再加上一个“初始值”来作为每个分区第一次调用时的结果。

我们可以用aggregate()来计算RDD的平均值,代替map()后面接fold()的方式。

Python中的aggregate()

sumCount=nums.aggregate((0,0),(lambda acc,value:(acc[0]+value,acc[1]+1),(lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1]))))

return sumCount[0]/float(sumCount[1])

Java中的aggregate()

class AvgCount implements Serializable{

  public AvgCount(int total,int num){

  this.total=total;

  this.num=num;

  }

  public int num;

  public int total;

  public double avg(){

  return total/(double)num;

  }

}

Function2<AvgCount,Integer,AvgCount> addAndCount = new Function2<AvgCount,Integer,AvgCount>(){

  public AvgCount call(AvgCount a,Integer x){

  a.total+=x;

  a.num+=1;

  return a;

  }

};

Function2<AvgCount,AvgCount,AvgCount> combine = new Function2<AvgCount,AvgCount,AvgCount>(){

  public AvgCount call(AvgCount a,AvgCount b){

  a.total+=b.total;

  a.num+=b.num;

  return a;

  }

};

AvgCount initial = new AvgCount(0,0);

AvgCount result = rdd.aggregate(initial,addAndCount,combine);

System.out.println(result.avg());

collect()把数据返回驱动程序中。

take(n)返回RDD中的n个元素。

top()从RDD中获取前几个元素。
图Spark3-5.png展示了基本的RDD行动操作:



在不同RDD类型间转换

有些函数只能用于特定类型的RDD,比如mean()和variance()只能用在数值RDD上。而join()只能用在键值对RDD上。

在java中,各种RDD的特殊类型间的转换更为明确。Java中有两个专门的类,JavaDoubleRDD和JavaPairRDD,来处理特殊的RDD。要构建出这些特殊类型到的RDD,需要使用特殊版本的类来代替一般使用的Function类,如果要从T类型的RDD创建出一个DoubleRDD,我们应该使用DoubleFunction<T>来代替Function<T,Double>.
Java中针对专门类型的函数接口如图Spark3-6.png



用Java创建DoubleRDD

JavaDoubleRDD result = rdd.mapToDouble(

new DoubleFunction<Integer>(){

  public double call(Integer x){

  return (double) x*x;

  }

});

System.out.println(result.mean());

持久化(缓存)

为了避免多次计算同一个RDD,可以让Spark对数据进行持久化。当我们让Spark持久化存储一个RDD时,计算出RDD的节点会分别保存它们所求出的分区数据。如果一个有持久化数据的节点发送故障,Spark会在需要用到缓存的数据时重算丢失的数据分区。在java中,默认情况下persist()会把数据以序列化的形式缓存在jvm的堆空间中。RDD中还有一个方法unpersist(),调用该方法可以手动把持久化的RDD从缓存中移除。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息