spark 使用kryo
2016-05-23 22:36
127 查看
kryo vs nokryo
背景
我今天突发奇想,说测一下kryo如果没有注册会咋样,结果发现没有注册的Kryo性能低于直接使用java.io.Serializable
读取速度
源数据: 42903237条记录 907Mformat | time / s |
---|---|
kryo | 134.341 |
java.io.Serializable | 150.878 |
kryoWithNoRegister | 165.701 |
存储成本
spark task : 122format | task/no space |
---|---|
kryo | all-pass |
java.io.Serializable | 47 |
KryoWithNoRegister | 15 |
如何使用kryo
需要序列化的类继承java.io.Serializable
注册类继承
KryoRegistrator并且注册那些需要序列化的类
在
sparkConf中设置
spark.serializer和
spark.kryo.registrator
测试数据源:
1 \N 男 \N 2 \N 女 \N 3 \N 女 \N 4 \N 男 \N 5 \N 保密 \N
对应的
case class:
/** * Created by todd.chen on 5/23/16. * email : todd.chen@ximalaya.com */ case class UserInfo(id:Long,name:String,gender:String,thirdParty:String) extends java.io.Serializable
性能测试代码
package com.ctao.test.kryo import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} /** * Created by todd.chen on 5/23/16. * email : todd.chen@ximalaya.com */ object KryoTest { def main(args: Array[String]) { val start = System.currentTimeMillis() val conf = new SparkConf().setMaster("local[*]").setAppName("kryoTest") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrator", "com.ctao.test.kryo.MyRegisterKryo") val sc = new SparkContext(conf) val userInfo = sc.textFile("/Users/cjuexuan/data/user_info").map(line ⇒ line.split("\t")).map(user ⇒ UserInfo(user(0).toLong, user(1), user(2), user(3))) userInfo.persist(StorageLevel.MEMORY_AND_DISK_SER) println(userInfo.count()) println(System.currentTimeMillis() - start) sc.stop() } }
package com.ctao.test.kryo import com.esotericsoftware.kryo.Kryo import org.apache.spark.serializer.KryoRegistrator /** * Created by todd.chen on 5/23/16. * email : todd.chen@ximalaya.com */ class MyRegisterKryo extends KryoRegistrator { override def registerClasses(kryo: Kryo): Unit = { kryo.register(classOf[UserInfo]) } }
日志整理类:
import org.apache.spark.{SparkConf, SparkContext} /** * Created by todd.chen on 5/23/16. * email : todd.chen@ximalaya.com */ object FilterLog { def main(args: Array[String]) { val input = args(0) val conf = new SparkConf().setMaster("local[*]").setAppName("filterLog") val sc = new SparkContext(conf) sc.textFile(input) .map(_.split(" ")) .filter(_.length > 3) .filter(x ⇒ x(3) == "MemoryStore:" || x(3) == "BlockManagerInfo:").map(_.mkString(" ")) .collect().foreach(println) sc.stop() } }
参考资料:浅谈Spark Kryo serialization
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- 选定虚拟主机 性能凸显优势
- 修改一行代码提升 Postgres 性能 100 倍
- redis的hGetAll函数的性能问题(记Redis那坑人的HGETALL)
- 推荐Sql server一些常见性能问题的解决方法
- SQL Server误区30日谈 第9天 数据库文件收缩不会影响性能
- 和表值函数连接引发的性能问题分析
- SQLServer 2000 升级到 SQLServer 2008 性能之需要注意的地方之一
- 数据库性能优化三:程序操作优化提升性能
- VBS中的字符串连接的性能问题
- mysql 性能的检查和调优方法
- 数据库性能优化二:数据库表优化提升性能
- SQL语句性能优化(续)
- SQL语句优化提高数据库性能
- 如何用分表存储来提高性能 推荐
- ASP中使用FileSystemObject时提高性能的方法
- 如何改进javascript代码的性能