Spark算子:RDD行动Action操作(7)–saveAsNewAPIHadoopFile、saveAsNewAPIHadoopDataset
2015-11-03 11:24
525 查看
saveAsNewAPIHadoopFile
def saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unitdef saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration): Unit
saveAsNewAPIHadoopFile用于将RDD数据保存到HDFS上,使用新版本Hadoop API。
用法基本同saveAsHadoopFile。
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import SparkContext._ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7))) rdd1.saveAsNewAPIHadoopFile("/tmp/lxw1234/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
saveAsNewAPIHadoopDataset
def saveAsNewAPIHadoopDataset(conf: Configuration): Unit作用同saveAsHadoopDataset,只不过采用新版本Hadoop API。
以写入HBase为例:
HBase建表:
create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}
完整的Spark应用程序:
package com.lxw1234.test import org.apache.spark.SparkConf import org.apache.spark.SparkContext import SparkContext._ import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put object Test { def main(args : Array[String]) { val sparkConf = new SparkConf().setMaster("spark://lxw1234.com:7077").setAppName("lxw1234.com") val sc = new SparkContext(sparkConf); var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7))) sc.hadoopConfiguration.set("hbase.zookeeper.quorum ","zkNode1,zkNode2,zkNode3") sc.hadoopConfiguration.set("zookeeper.znode.parent","/hbase") sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234") var job = new Job(sc.hadoopConfiguration) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) rdd1.map( x => { var put = new Put(Bytes.toBytes(x._1)) put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2)) (new ImmutableBytesWritable,put) } ).saveAsNewAPIHadoopDataset(job.getConfiguration) sc.stop() } }
注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。
可参考:http://lxw1234.com/archives/2015/07/332.htm
相关文章推荐
- 【转】使用PowerISO软件制作Centos7启动U盘
- 服务器部署网站遇到的问题
- linux mysql
- Linux 查找指定名称的进程并显示进程详细信息
- Apache 防盗链设置
- OpenStreetMap/Google/百度/Bing瓦片地图服务(TMS)
- Linux进程间通信——信号集函数
- Linux下搭建FTP服务器
- linux Shell编程入门
- $(srctree) is not clean, please run 'make mrproper'
- java 操作tomcat启动,停止,重启,检查操作
- 【SHELL】使用ps如何准确地打印出某个用户的进程
- Keepalived的相关应用,使用keepalived实现nginx和lvs的高可用负载均衡器
- SELinux/SEAndroid -- 基础知识介绍
- 一些可以学习的网站
- Windows ssh ubuntu / Linux
- linux下I2C驱动架构全面分析
- Open-Closed Principle,OCP
- Docker的学习--命令使用详解
- warez世界顶级压缩作品网站