在Spark中自定义Kryo序列化输入输出API
2016-05-23 09:46
423 查看
在Spark中内置支持两种系列化格式:(1)、Java serialization;(2)、Kryo serialization。在默认情况下,Spark使用的是Java的ObjectOutputStream系列化框架,它支持所有继承java.io.Serializable的类系列化,虽然Java系列化非常灵活,但是它的性能不佳。然而我们可以使用Kryo 库来系列化,它相比Java serialization系列化高效,速度很快(通常比Java快10x),但是它不支持所有的系列化对象,而且要求用户注册类。
在Spark中,使用Kryo系列化比使用Java系列化更明智。在shuffling和caching大量数据的情况下,使用 Kryo系列化就变得非常重要。
虽然Kryo支持对RDD的cache和shuffle,但是在Spark中不是内置就显示提供使用Kryo将数据系列化到磁盘中的输入输出API,RDD中的saveAsObjectFile和SparkContext中的objectFile方法仅仅支持使用Java系列化。所以如果我们可以使用Kryo系列化将会变得很棒!
在这篇文章中,我将讨论如何自定义Kryo系列化输出输出相关API来将数据进行读写到磁盘中。
如何使用
下面例子使用上面介绍的两个方法来系列化和反序列化Person对象:
在Spark中,使用Kryo系列化比使用Java系列化更明智。在shuffling和caching大量数据的情况下,使用 Kryo系列化就变得非常重要。
虽然Kryo支持对RDD的cache和shuffle,但是在Spark中不是内置就显示提供使用Kryo将数据系列化到磁盘中的输入输出API,RDD中的saveAsObjectFile和SparkContext中的objectFile方法仅仅支持使用Java系列化。所以如果我们可以使用Kryo系列化将会变得很棒!
在这篇文章中,我将讨论如何自定义Kryo系列化输出输出相关API来将数据进行读写到磁盘中。
写数据
通常,我们使用rdd.saveAsObjectFile API将已经系列化的对象写入到磁盘中。下面的代码将展示如何使用我们自定义的saveAsObjectFile方法将已经使用kryo系列化的对象写入到磁盘中:def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String)这个函数中参数rdd就是我们需要写的数据;path是数据保存的路径。
val kryoSerializer = new KryoSerializer(rdd.context.getConf)KryoSerializer是Spark内部提供的用于提供操作Kryo的类。在上述代码中,我们创建了KryoSerializer对象,并从rdd.context.getConf中获取传进来的缓存大小。
rdd.mapPartitions(iter => iter.grouped(10) .map(_.toArray)) .map(splitArray => {}所有的objectFile 将会在HDFS上保存,我们对RDD中的每个分片进行遍历,然后将他们转换成Byte数组。
val kryo = kryoSerializer.newKryo()对每个splitArray,我们首先创建了kryo实例,kryo是线程不安全的,所以我们在每个map操作中单独创建。当我们调用 kryoSerializer.newKryo()来创建新的kryo实例,他也会调用我们自定义的KryoRegistrator。
//create output stream and plug it to the kryo output val bao = new ByteArrayOutputStream() val output = kryoSerializer.newKryoOutput() output.setOutputStream(bao) kryo.writeClassAndObject(output, splitArray) output.close()一旦我们拥有kryo实例,我们就可以创建kryo输出对象,然后我们将类信息和对象写入到那个输出对象中。
val byteWritable = new BytesWritable(bao.toByteArray) (NullWritable.get(), byteWritable) }).saveAsSequenceFile(path)我们在创建byteWritable的时候,包装了bytearray,然后保存成Sequence文件。使用那些代码我们就可以将Kryo对象写入到磁盘中。完整代码如下:
def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) {
val kryoSerializer = new KryoSerializer(rdd.context.getConf)
rdd.mapPartitions(iter => iter.grouped(10)
.map(_.toArray))
.map(splitArray => {
//initializes kyro and calls your registrator class
val kryo = kryoSerializer.newKryo()
//convert data to bytes
val bao = new ByteArrayOutputStream()
val output = kryoSerializer.newKryoOutput()
output.setOutputStream(bao)
kryo.writeClassAndObject(output, splitArray)
output.close()
// We are ignoring key field of sequence file
val byteWritable = new BytesWritable(bao.toByteArray) (NullWritable.get(), byteWritable) }).saveAsSequenceFile(path)
}
读
光有写没有读对我们来说仍然不完美。通常我们使用sparkContext中的objectFile API从磁盘中读取数据,这里我们使用自定义的objectFile API来读取Kryo对象文件。def objectFile[T](sc: SparkContext, path: String, minPartitions: Int = 1) (implicit ct: ClassTag[T]) = { val kryoSerializer = new KryoSerializer(sc.getConf) sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions) .flatMap(x => { val kryo = kryoSerializer.newKryo() val input = new Input() input.setBuffer(x._2.getBytes) val data = kryo.readClassAndObject(input) val dataObject = data.asInstanceOf[Array[T]] dataObject }) }上面的步骤和写的步骤很类似,只不过这里我们使用的是input,而不是output。我们从BytesWritable中读取bytes数据,然后使用readClassAndObject API反序列化数据。
如何使用
下面例子使用上面介绍的两个方法来系列化和反序列化Person对象:
// user defined class that need to serialized class Person(val name: String) def main(args: Array[String]) { if (args.length < 1) { println("Please provide output path") return } val outputPath = args(0) val conf = new SparkConf().setMaster("local").setAppName("kryoexample") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sc = new SparkContext(conf) //create some dummy data val personList = 1 to 10000 map (value => new Person(value + "")) val personRDD = sc.makeRDD(personList) saveAsObjectFile(personRDD, outputPath) val rdd = objectFile[Person](sc, outputPath) println(rdd.map(person => person.name).collect().toList) }
相关文章推荐
- 在Tachyon运行Spark应用程序
- Spark中parallelize函数和makeRDD函数的区别
- 一步步学spark之一scala的懒加载机制(lazy)1.6
- js 正则表达式 判断车牌号
- MySQL运行状态show status详解
- 动态添加子视图 UIView 的正确方法
- 针对苹果最新审核要求为应用兼容IPv6
- Map的四种遍历
- 基于linphone的视频语音模块的设计
- delphi 线程同步的基本方法
- 手表连接状态
- 设置颜色时初始化一张图片的方法
- 分享两种最简单的Android打渠道包的方法
- Jquery引入src js 使用Chart.js
- Java经典类库-Guava中的函数式编程讲解
- 布局 固定大小的布局
- Oracle数据库—— 存储过程与函数的创建
- PHP书写格式详解(必看)
- 【android】WebView使用Post请求和设置浏览器弹框
- QChar类