您的位置:首页 > 其它

Spark RDD API扩展开发

2015-11-13 16:20 387 查看

原文链接:

SparkRDDAPI扩展开发(1)

SparkRDDAPI扩展开发(2):自定义RDD

我们都知道,ApacheSpark内置了很多操作数据的API。但是很多时候,当我们在现实中开发应用程序的时候,我们需要解决现实中遇到的问题,而这些问题可能在Spark中没有相应的API提供,这时候,我们就需要通过扩展SparkAPI来实现我们自己的方法。
我们可以通过两种方法来扩展SparkAPI,(1)、其中一种就是在现有的RDD中添加自定义的方法;(2)、第二种就是创建属于我们自己的RDD。在这篇文章中,我将对这两种方法进行阐述,并赋予代码。下面我就开始介绍第一种方法。

  假如我们中有一些商品的销售数据,数据的格式是CSV的。为了简单起见,假如每行数据都是由id,customerId,itemId以及itemValue四个字段组成,我们用SalesRecord来表示:

1
class
SalesRecord(
val
id
:
String,
2
val
customerId
:
String,
3
val
itemId
:
String,
4
val
itemValue
:
Double)
extends
Comparable[SalesRecord]
5
with
Serializable
  所以我们可以将商品的销售数据进行解析,并存储到RDD[SalesRecord]中:

01
/**
02
*User:过往记忆
03
*Date:15-03-31
04
*Time:上午00:24
05
*bolg:http://www.iteblog.com
06
*本文地址:http://www.iteblog.com/archives/1298
07
*过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
08
*过往记忆博客微信公共帐号:iteblog_hadoop
09
*/
10
11
val
sc
=
new
SparkContext(args(
0
),
"iteblogRDDExtending"
)
12
val
dataRDD
=
sc.textFile(
"file:///www/iteblog.csv"
)
13
val
salesRecordRDD
=
dataRDD.map(row
=
>{
14
val
colValues
=
row.split(
","
)
15
new
SalesRecord(colValues(
0
),colValues(
1
),
16
colValues(
2
),colValues(
3
).toDouble)
17
})
  如果我们想计算出这些商品的总销售额,我们会这么来写:

1
salesRecordRDD.map(
_
.itemValue).sum
  虽然这看起来很简洁,但是理解起来却有点困难。但是如果我们可以这么来写,可能会很好理解:

1
salesRecordRDD.totalSales
  在上面的代码片段中,totalSales方法让我们感觉就是Spark内置的操作一样,但是Spark是不提供这个方法的,我们需要在现有的RDD中实现我们自定义的操作。

  下面我就来介绍一些如何在现有的RDD中添加我们自定义的方法。

  一、定义一个工具类,来存放我们所有自定义的操作

  当然,你完全没必要自定义一个类类添加我们自定义的方法,但是为了管理,还是建议你这么做。下面我们来定义IteblogCustomFunctions类,它存储所有我们自定义的方法。它是专门用来处理RDD[SalesRecord],所以这个类中提供的操作全部是用来处理销售数据的:

1
class
IteblogCustomFunctions(rdd
:
RDD[SalesRecord]){
2
def
totalSales
=
rdd.map(
_
.itemValue).sum
3
}
  二、隐形转换来实现在RDD中添加方法

  我们定义了隐形的addIteblogCustomFunctions函数,这可以将所有操作销售数据的方法作用于RDD[SalesRecord]上:

01
/**
02
*User:过往记忆
03
*Date:15-03-31
04
*Time:上午00:24
05
*bolg:http://www.iteblog.com
06
*本文地址:http://www.iteblog.com/archives/1298
07
*过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
08
*过往记忆博客微信公共帐号:iteblog_hadoop
09
*/
10
11
object
IteblogCustomFunctions{
12
implicit
def
addIteblogCustomFunctions(rdd
:
RDD[SalesRecord])
=
new
13
IteblogCustomFunctions(rdd)
14
}
  三、使用自定义的方法

  下面方法通过导入IteblogCustomFunctions中的相应方法来实现使用我们自定义的方法:

1
import
IteblogCustomFunctions.
_
2
println(salesRecordRDD.totalSales)
  通过上面三步我们就可以在现有的RDD中添加我们自定义的方法。

自定义一个RDD类

在上文中我介绍了如何在现有的RDD中添加自定义的函数。本文将介绍如何自定义一个RDD类,假如我们想对没见商品进行打折,我们想用Action操作来实现这个操作,下面我将定义IteblogDiscountRDD类来计算商品的打折,步骤如下:

  一、创建IteblogDiscountRDD类

  自定义RDD类需要继承Spark中的RDD类,并实现其中的方法:

01
/**
02
*User:过往记忆
03
*Date:15-04-01
04
*Time:上午00:59
05
*bolg:http://www.iteblog.com
06
*本文地址:http://www.iteblog.com/archives/1299
07
*过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
08
*过往记忆博客微信公共帐号:iteblog_hadoop
09
*/
10
class
IteblogDiscountRDD(prev
:
RDD[SalesRecord],xxxxx
:
Double)
11
extends
RDD[SalesRecord](prev){
12
13
//继承compute方法
14
override
def
compute(split
:
Partition,context
:
TaskContext)
:
Iterator[SalesRecord]
=
{
15
firstParent[SalesRecord].iterator(split,context).map(salesRecord
=
>{
16
val
discount
=
salesRecord.itemValue*discountPercentage
17
new
SalesRecord(salesRecord.id,
18
salesRecord.customerId,salesRecord.itemId,discount)
19
})}
20
21
//继承getPartitions方法
22
override
protected
def
getPartitions
:
Array[Partition]
=
23
firstParent[SalesRecord].partitions
24
}
  上面代码中,我创建了一个IteblogDiscountRDD类,这个RDD只操纵销售数据,当我们继承RDD类时,我们必须重载两个方法:
  compute

  这个函数是用来计算RDD中每个的分区的数据,在我代码中,我们输入了销售数据,并对其中的数据计算打折计算。

  getPartitions
  
  getPartitions函数允许开发者为RDD定义新的分区,在我们的代码中,并没有改变RDD的分区,重用了父RDD的分区。

  定义IteblogDiscountRDD的时候将类型写死了(SalesRecord),它只能用来处理SalesRecord数据。如果我们想定义一个通用的RDD,只需要类似下面写即可

01
class
IteblogRDD(prev
:
RDD[T],XXXX
:
C)
02
extends
RDD[T](prev){
03
04
//继承compute方法
05
override
def
compute(split
:
Partition,context
:
TaskContext)
:
Iterator[T]
=
{
06
................................
07
}
08
09
//继承getPartitions方法
10
override
protected
def
getPartitions
:
Array[Partition]
=
11
................................
12
}
  二、自定义discount函数

  我们自定义discount函数,该函数可以创建一个IteblogDiscountRDD:

1
def
discount(discountPercentage
:
Double)
=
new
IteblogDiscountRDD(rdd,discountPercentage)
  三、使用IteblogDiscountRDD

  使用IteblogDiscountRDD也是非常简单的,我们可以像使用内置的RDD一样来使用:

1
import
IteblogCustomFunctions.
_
2
3
val
discountRDD
=
salesRecordRDD.discount(
0.1
)
4
println(discountRDD.collect().toList)
  自此,我们已经学会了如何在现有的RDD中定义方法和自定义自己的RDD。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: