SparkSql之RDD和DataFrame的相互转换
2018-01-07 22:44
441 查看
package com.lyzx.day18 import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.Row; import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}; /** * Spark SQL * RDD和DataFrame的相互转换 */ class T4 { /* 通过反射的方式把RDD[User]转换为DataFrame */ def f1(sc:SparkContext): Unit ={ val sqlCtx = new SQLContext(sc) // 读取文件并转换为RDD[User] val rdd = sc.textFile("User.txt") val userRdd = rdd.map(item=>item.split(",")) .map(item=>User(item(0).toInt,item(1),item(2).toInt,item(3).toInt)) // 引入隐式转换的函数 import sqlCtx.implicits._ // 把RDD[User]转换为DataFrame,这里数据的列名不能指定,因为使用方法了反射,所以列名就是User的属性名 val df = userRdd.toDF() // 把DataFrame注册为一个临时表 即把df里面的数据"放入"一张临时表里面并起一个名字 df.registerTempTable("user") // 通过SQLContext的实例写SQL并返回包含结果集的DataFrame的对象 val result = sqlCtx.sql("select id,name,age,height from user where id >=2") // 遍历结果集 result.foreach(println) } /* 动态得把RDD转换为DataFrame 可以动态的指定Schema(这是Spark里面的称呼,其实就是列名+类型+是否为空,不知道spark为什么把这些东西叫Schema) */ def f2(sc:SparkContext): Unit ={ val sqlCtx = new SQLContext(sc) val rdd = sc.textFile("./User.txt") val mapRdd = rdd.map(item=>item.split(",")) .map(item=>Row(item(0),item(1),item(2),item(3))) def getSchema2(columnName:String): StructType ={ StructType(columnName.split(",").map(item=>StructField(item,StringType,true))) } //这就是schema即列名+类型+是否为空 val schema = getSchema2("id_x,name_y,age_z,height_m") //通过sqlContext的实例创建DataFrame val df = sqlCtx.createDataFrame(mapRdd,schema) df.registerTempTable("user") val result = sqlCtx.sql("select id_x,name_y,age_z,height_m from user where id_x >=3") result.foreach(println) } def f3(sc:SparkContext): Unit ={ val sqlCtx = new SQLContext(sc) val userRdd = sc.textFile("./User.txt") .map(x=>x.split(",")) .map(x=>Row(x(0),x(1),x(2),x(3))) def getSchema2(columnName:String): StructType ={ StructType(columnName.split(",").map(item=>StructField(item,StringType,true))) } val userSchema = getSchema2("id,name,age,height") val userDf = sqlCtx.createDataFrame(userRdd,userSchema) userDf.registerTempTable("user") val goodsRdd = sc.textFile("./goods.txt") .map(x=>x.split(",")) .map(x=>Row(x(0),x(1),x(2),x(3))) val goodsSchema = getSchema2("userId,goodsName,goodsPrice,goodsCount") val goodsDf = sqlCtx.createDataFrame(goodsRdd,goodsSchema) goodsDf.registerTempTable("goods") val result = sqlCtx.sql("select a.id as userId,a.name as userName,b.goodsName,b.goodsPrice from user a left join goods b on a.id=b.userId") result.foreach(println) } /* json数据源 sqlContext可以直接读取json格式的文本文件 */ def f4(sc:SparkContext): Unit ={ val sqlCtx = new SQLContext(sc) val jsonRdd = sqlCtx.read.json("./json.txt") jsonRdd.printSchema() jsonRdd.registerTempTable("person") val df = sqlCtx.sql("select * from person where age > 10") df.foreach(println) } } object T4{ def main(args: Array[String]) { val conf = new SparkConf().setAppName("day18").setMaster("local") val sc = new SparkContext(conf) val t = new T4 // t.f1(sc) // t.f2(sc) // t.f3(sc) t.f4(sc) sc.stop() } } case class User(id:Int,name:String,age:Int,height:Int){ private val _id = id; private val _name = name; private val _age = age; private val _height = height; override def toString(): String ={ "[id="+_id+" name="+_name+" age="+_age+" height="+_height+"]" } }
相关文章推荐
- spark下rdd和dataframe以及sqlcontext之间相互转换
- spark: RDD与DataFrame之间的相互转换方法
- spark: RDD与DataFrame之间的相互转换
- Spark SQL RDD与DataFrames相互转换
- 将RDD转换成DataFrame
- pyspark.sql.DataFrame与pandas.DataFrame之间的相互转换
- SPARK 使用Java 在IDE中实战RDD和DataFrame动态转换操作
- [2.3]Spark DataFrame操作(二)之通过编程动态完成RDD与DataFrame的转换
- Spark 之DataFrame与RDD 转换
- 如何将dataframe转换为rdd类型
- spark-DataFrame之RDD和DataFrame之间的转换
- RDD与DataFrame之间的转换
- numpy中的ndarray与pandas的Series和DataFrame之间的相互转换
- pyspark.sql.DataFrame与pandas.DataFrame之间的相互转换实例
- 第59课:使用Java和Scala在IDE中实战RDD和DataFrame转换操作’学习笔记
- pyspark.sql.DataFrame与pandas.DataFrame之间的相互转换
- pyspark.sql.DataFrame与pandas.DataFrame之间的相互转换
- 第59课:使用Java实战RDD和DataFrame转换操作
- [2.2]Spark DataFrame操作(二)之通过反射实现RDD与DataFrame的转换
- 第60课:使用Java和Scala在IDE中实战RDD和DataFrame动态转换操作学习笔记