spark源码action系列-foreach与foreachPartition
2016-01-29 15:18
821 查看
RDD.foreachPartition/foreach的操作
在这个action的操作中:这两个action主要用于对每个partition中的iterator时行迭代的处理.通过用户传入的function对iterator进行内容的处理.
首先我们先看看foreach的操作:
在fureach中,传入一个function,这个函数的传入参数就是每个partition中,每次的foreach得到的一个rdd的kv实例,也就是具体的内容,这种处理你并不知道这个iterator的foreach什么时候结果,只能是foreach的过程中,你得到一条数据,就处理一条数据.
由下面的红色部分可以看出,foreach操作是直接调用了partition中数据的foreach操作.
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
示例说明:
val list = new ArrayBuffer()
Rdd.foreach(record => {
list += record
If (list.size >= 10000) {
list.flush....
}
})
上面这段示例代码中,如果这么使用就会存在一个问题,
迭代的最后,list的结果可能还没有达到10000条,这个时候,你在内部的处理的flush部分就不会执行,也就是迭代的最后如果没有达到10000的数据就会丢失.
所以在foreach中,一般就是拿到一条数据进行下处理Rdd.foreach(record => {record._1 == a return})
然后接下来看看foreachPartition:
这个函数也是根据传入的function进行处理,但不同处在于,这里function的传入参数是一个partition对应数据的iterator.而不是直接使用iterator的foreach,
这种情况下,如果是上面foreach的示例代码中list这个片段在这个action中就能够正常的去处理.
def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}
示例代码:
Val list = new ArrayBuffer
rdd.foreachPartition(it => {
It.foreach(r => {
List += r
If (list.size > 10000) flush
})
If (list.size > 0) flush
})
最后说下这两个action的区别:
Foreach与foreachPartition都是在每个partition中对iterator进行操作,
不同的是,foreach是直接在每个partition中直接对iterator执行foreach操作,而传入的function只是在foreach内部使用,
而foreachPartition是在每个partition中把iterator给传入的function,让function自己对iterator进行处理.
相关文章推荐
- 迁移 Windows 上 Oracle 11.2.0.3.0 到 Linux 上 Oracle 11.2.0.3.0
- VMware虚拟机克隆CentOS后网卡修改方法
- 十七、联合国的性质、宗旨、原则、机构
- Android高效加载大图
- SVN和Git 介绍,区别,优缺点,适用范围总结
- SQL Server 2012应用实践.pdf
- C++ 异常
- 开始架设kbengine
- 学电子信息工程,出路在哪里?
- Unity3D 画线插件 Vectrosity_Simple2DLine
- winsecs .net 开发参考手册
- 使用Spring,Mabatis框架update返回值问题?
- 数学之路-python计算实战(14)-机器视觉-图像增强(直方图均衡化)
- 输入框内格式化金额、银行卡号
- mac下安装mysql,myeclipse for retina
- quartz2d基本绘图
- bower的使用
- Android 如何新建继承Activity的类
- 十六、经济全球化及其重要历史意义
- 前端ui框架