flatMap功能不只是wordcount,不知不觉用flatmap实现了hive的自带函数explode功能
2016-10-17 11:24
686 查看
不知不觉用flatmap实现了hive的自带函数explode功能。 import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.RowFactory import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType import scala.annotation.meta.field import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StructField import scala.annotation.meta.field import org.apache.spark.sql.hive.HiveContext object flatmapOption { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[3]").setAppName("flatmapOption") val sc = new SparkContext(conf) val aa = Array("this_1|2|3|4") val bb = sc.parallelize(aa.toSeq, 1) bb.flatMap { line => { val prfix = line.split("_")(0) val back = line.split("_")(1).split("\\|") var str = ""; for(item <- back){ println("------" +item) str += (prfix+"_"+item +"|") } str.substring(0, str.length() -1).split("\\|") } }.map { line => { (line.split("_")(0),line.split("_")(1)) } }.foreach(println)
觉得这可能没什么但是有更实用的代码:其中message_content格式AA=>aa,BB=>bb,CC=>cc ,原来是没办法进行join操作的,转换后就可以进行join操作。
val status = hiveContext.sql("select obd_id , message_content ,create_time from device_status_log where message_id = '31' " + " and create_time>='" + startTime + "' create_time>='" + endTime + "'") val flatedRDDRow = status.flatMap { row => { val obd_id = row.getAs[String ]("obd_id") val message_content = row.getAs[String ]("message_content") val create_time = row.getAs[String ]("create_time") val items = message_content.substring(message_content.indexOf("[") + 1, message_content.indexOf("]")).split(",") var a = scala.collection.immutable.List[Row]() // 方式三 for(item <- items){ val code = descCode(item) // 过滤出正常的情况 if (! "P0000".equalsIgnoreCase(code)) { <span style="white-space:pre"> </span>a = a.::(RowFactory.create(obd_id ,descCode(item),create_time)) } } a.toArray[Row] } } // 不过在hive中提供了 函数可以进行类似的操作DataFrame.explode
df.explode("words", "word"){words: String => words.split(" ")} // words 是原始的字段,word为切分后的字段,当然这里split可以根据实际情况进行操作,只要返回数组就行。 注意:explode函数中得到的数据可能会有某些数值是不想要的,在返回的数组中直接去掉就行了,看来还是explode比价简单实用。
val sqlContext = new HiveContext(sc) // val sqlContext = new SQLContext(sc) val df = sqlContext.read.text("breakdowntable") val RDDRow = df.map { row => { val line = row.getAs[String](0) val split = line.split("\\|") Row(split(0), split(1),split(2),split(3),split(4)) } } val fields = "type,code,level,descrption,remind".split("\\,"). map( filed => StructField(filed, StringType, true) ) val schema = StructType(fields) val flatedDF = sqlContext.createDataFrame(RDDRow , schema) flatedDF.registerTempTable("breakdown_table") // 不能执行本地运行 因为hive的metastore // sqlContext.sql("select * ,row_number() over( partition by code order by type desc ) seq from breakdown_table").show(2) println("----------schema --------") // df.printSchema() // 打印如下 // root // |-- value: string (nullable = true) flatedDF.printSchema() // 打印如下 // root // |-- type: string (nullable = true) // |-- code: string (nullable = true) // |-- level: string (nullable = true) // |-- descrption: string (nullable = true) // |-- remind: string (nullable = true) println("----------schema --------") flatedDF.show() println("--------show ----------") // val result = df } } val aa = Array("this_1|2|3|4") val bb = sc.parallelize(aa.toSeq, 1) bb.flatMap { line => { val prfix = line.split("_")(0) val back = line.split("_")(1).split("\\|") var str = ""; for(item <- back){ println("------" +item) str += (prfix+"_"+item +"|") } str.substring(0, str.length() -1).split("\\|") } }.map { line => { (line.split("_")(0),line.split("_")(1)) } }.foreach(println)
相关文章推荐
- 使用hive、java api两种方式实现wordcount功能、及个人感悟
- 使用hive、java api两种方式实现wordcount功能、及个人感悟
- 使用hive、java api两种方式实现wordcount功能、及个人感悟
- 使用hive、java api两种方式实现wordcount功能、及个人感悟
- 使用hive、java api两种方式实现wordcount功能、及个人感悟
- 使用hive、java api两种方式实现wordcount功能、及个人感悟
- 使用hive、java api两种方式实现wordcount功能、及个人感悟
- 使用hive、java api两种方式实现wordcount功能、及个人感悟
- 使用hive、java api两种方式实现wordcount功能、及个人感悟
- 使用hive、java api两种方式实现wordcount功能、及个人感悟
- 使用hive、java api两种方式实现wordcount功能、及个人感悟
- 使用hive、java api两种方式实现wordcount功能、及个人感悟
- 使用hive、java api两种方式实现wordcount功能、及个人感悟
- 使用hive、java api两种方式实现wordcount功能、及个人感悟
- 使用hive、java api两种方式实现wordcount功能、及个人感悟
- 使用hive、java api两种方式实现wordcount功能、及个人感悟
- 使用hive、java api两种方式实现wordcount功能、及个人感悟
- 使用hive、java api两种方式实现wordcount功能、及个人感悟
- Map-Reduce下使用Streaming实现 WordCount
- 使用SAS实现HADOOP Map/Reduce程序-wordcount