第18课:Spark Streaming中空RDD处理及流处理程序优雅的停止
2016-06-01 19:26
369 查看
第18课:Spark Streaming中空RDD处理及流处理程序优雅的停止
/* 王家林老师授课http://weibo.com/ilovepains 每天晚上20:00YY频道现场授课频道 68917580*/
1 Spark Streaming中空RDD的处理
2 Spark Streaming程序优雅的停止
跟51cto和csdn课堂合作,最最重要的是如何贡献社会,祝福每个人拥有美好的人生。
机器学习在spark 2.x基础上授课。
空RDD没做什么事情又要消耗计算资源cpu cores,虽然什么都干,这个必须进行处理。
判断RDD有没有元素。
1、if(rdd.count()){} //不好的地方,count会触发一个job,不要这个方式。
2、if (!rdd.isEmpty){}// 目前有效的方式
3、if (rdd.partitions.isEmpty)
4、if(rdd.partitions > 0){
rdd.partitions.map(_)
//rdd.iterator //这个在executor中执行的,现在我们在driver中,无法使用的
}
5、rdd.partitions.isEmpty //这个不太对 ,不行的
课外资料
http://stackoverflow.com/questions/28454357/spark-efficient-way-to-test-if-an-rdd-is-empty
def time(n : Long, f : (RDD[Long]) => Boolean): Unit = {
val start = System.currentTimeMillis()
val rdd = sc.parallelize(1L to n, numSlices = 100)
val result = f(rdd)
printf("Time: " + (System.currentTimeMillis() - start) + " Result: " + result)
}
time(1000000000L, rdd => rdd.take(1).length == 0L)
time(1000000000L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1000000000L, rdd => rdd.count() == 0L)
time(1000000000L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1000000000L, rdd => rdd.fold(0)(_ + _) == 0L)
time(1L, rdd => rdd.take(1).length == 0L)
time(1L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1L, rdd => rdd.count() == 0L)
time(1L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1L, rdd => rdd.fold(0)(_ + _) == 0L)
time(0L, rdd => rdd.take(1).length == 0L)
time(0L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(0L, rdd => rdd.count() == 0L)
time(0L, rdd => rdd.takeSample(true, 1).isEmpty)
time(0L, rdd => rdd.fold(0)(_ + _) == 0L)
On my local machine with 3 worker cores I got these results
/* 王家林老师授课http://weibo.com/ilovepains 每天晚上20:00YY频道现场授课频道 68917580*/
1 Spark Streaming中空RDD的处理
2 Spark Streaming程序优雅的停止
跟51cto和csdn课堂合作,最最重要的是如何贡献社会,祝福每个人拥有美好的人生。
机器学习在spark 2.x基础上授课。
空RDD没做什么事情又要消耗计算资源cpu cores,虽然什么都干,这个必须进行处理。
判断RDD有没有元素。
1、if(rdd.count()){} //不好的地方,count会触发一个job,不要这个方式。
2、if (!rdd.isEmpty){}// 目前有效的方式
3、if (rdd.partitions.isEmpty)
4、if(rdd.partitions > 0){
rdd.partitions.map(_)
//rdd.iterator //这个在executor中执行的,现在我们在driver中,无法使用的
}
5、rdd.partitions.isEmpty //这个不太对 ,不行的
/** * An RDD that has no partitions and no elements. */ private[spark] class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) { override def getPartitions: Array[Partition] = Array.empty override def compute(split: Partition, context: TaskContext): Iterator[T] = { throw new UnsupportedOperationException("empty RDD") } }
课外资料
http://stackoverflow.com/questions/28454357/spark-efficient-way-to-test-if-an-rdd-is-empty
def time(n : Long, f : (RDD[Long]) => Boolean): Unit = {
val start = System.currentTimeMillis()
val rdd = sc.parallelize(1L to n, numSlices = 100)
val result = f(rdd)
printf("Time: " + (System.currentTimeMillis() - start) + " Result: " + result)
}
time(1000000000L, rdd => rdd.take(1).length == 0L)
time(1000000000L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1000000000L, rdd => rdd.count() == 0L)
time(1000000000L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1000000000L, rdd => rdd.fold(0)(_ + _) == 0L)
time(1L, rdd => rdd.take(1).length == 0L)
time(1L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1L, rdd => rdd.count() == 0L)
time(1L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1L, rdd => rdd.fold(0)(_ + _) == 0L)
time(0L, rdd => rdd.take(1).length == 0L)
time(0L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(0L, rdd => rdd.count() == 0L)
time(0L, rdd => rdd.takeSample(true, 1).isEmpty)
time(0L, rdd => rdd.fold(0)(_ + _) == 0L)
On my local machine with 3 worker cores I got these results
Time: 21 Result: false Time: 75 Result: false Time: 8664 Result: false Time: 18266 Result: false Time: 23836 Result: false Time: 113 Result: false Time: 101 Result: false Time: 68 Result: false Time: 221 Result: false Time: 46 Result: false Time: 79 Result: true Time: 93 Result: true Time: 79 Result: true Time: 100 Result: true Time: 64 Result: true
相关文章推荐
- android使用代码生成LayerDrawable的方法、源码分析和注意事项
- Android 中的线程调度
- Linux操作系统基础解析之(七)——Bash(Shell)基础知识(1)
- Android系统权限那些事
- bitmap.setPixels()方法及自己理解
- zzulioj-1877 蛤玮打扫教室
- win10 应用商店无法联网(0x80072EFD)解决方案
- MySQL-MMM实现MySQL高可用读写分离
- 子查询
- Do Evil Things with gopher:// , Ricter · 2016/06/01 9:32
- Parcelable序列化实现方法
- 跟踪程序执行流程,在某个模块的入口
- 三种页面置换算法C实现
- JDK升级到1.7后 com.sun.image.codec.jpeg JPEGImageEncoder不存在
- redis持久化之rdb
- 火拼泡泡龙技术
- DOM---点亮星
- Android从零开始之工欲善其事必先利其器--环境搭建与配置
- 获取手机通讯录
- codevs p4633