您的位置:首页 > 其它

spark 使用kryo

2016-05-23 22:36 127 查看

kryo vs nokryo

背景

我今天突发奇想,说测一下kryo如果没有注册会咋样,结果发现没有注册的Kryo性能低于直接使用
java.io.Serializable


读取速度

源数据: 42903237条记录 907M

formattime / s
kryo134.341
java.io.Serializable150.878
kryoWithNoRegister165.701

存储成本

spark task : 122

formattask/no space
kryoall-pass
java.io.Serializable47
KryoWithNoRegister15
在这个测试类一共有122个task,用kryo在全程都没出现空间不足,如果没使用注册,不到20个task就出现了内存不足,在完全不使用kryo的情况下大概会在50个task的时候出现

如何使用kryo

需要序列化的类继承
java.io.Serializable


注册类继承
KryoRegistrator
并且注册那些需要序列化的类

sparkConf
中设置
spark.serializer
spark.kryo.registrator


测试数据源:

1   \N  男   \N
2 \N    女   \N
3   \N  女   \N
4   \N  男   \N
5   \N  保密  \N


对应的
case class
:

/**
* Created by todd.chen on 5/23/16.
* email : todd.chen@ximalaya.com
*/
case  class UserInfo(id:Long,name:String,gender:String,thirdParty:String) extends java.io.Serializable


性能测试代码

package com.ctao.test.kryo

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by todd.chen on 5/23/16.
* email : todd.chen@ximalaya.com
*/
object KryoTest {

def main(args: Array[String]) {
val start = System.currentTimeMillis()
val conf = new SparkConf().setMaster("local[*]").setAppName("kryoTest")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "com.ctao.test.kryo.MyRegisterKryo")
val sc = new SparkContext(conf)
val userInfo = sc.textFile("/Users/cjuexuan/data/user_info").map(line ⇒
line.split("\t")).map(user ⇒ UserInfo(user(0).toLong, user(1), user(2), user(3)))
userInfo.persist(StorageLevel.MEMORY_AND_DISK_SER)
println(userInfo.count())
println(System.currentTimeMillis() - start)
sc.stop()
}

}


package com.ctao.test.kryo

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

/**
* Created by todd.chen on 5/23/16.
* email : todd.chen@ximalaya.com
*/
class MyRegisterKryo extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
kryo.register(classOf[UserInfo])
}
}


日志整理类:

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by todd.chen on 5/23/16.
* email : todd.chen@ximalaya.com
*/
object FilterLog {
def main(args: Array[String]) {
val input = args(0)
val conf = new SparkConf().setMaster("local[*]").setAppName("filterLog")
val sc = new SparkContext(conf)
sc.textFile(input)
.map(_.split(" "))
.filter(_.length > 3)
.filter(x ⇒ x(3) == "MemoryStore:" || x(3) == "BlockManagerInfo:").map(_.mkString(" "))
.collect().foreach(println)
sc.stop()
}

}


参考资料:浅谈Spark Kryo serialization
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark 性能