您的位置:首页 > 其它

flatMap功能不只是wordcount,不知不觉用flatmap实现了hive的自带函数explode功能

2016-10-17 11:24 686 查看
不知不觉用flatmap实现了hive的自带函数explode功能。
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.RowFactory
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import scala.annotation.meta.field
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import scala.annotation.meta.field
import org.apache.spark.sql.hive.HiveContext

object flatmapOption {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[3]").setAppName("flatmapOption")

val sc = new SparkContext(conf)

val  aa = Array("this_1|2|3|4")
val bb = sc.parallelize(aa.toSeq, 1)

bb.flatMap { line => {
val prfix = line.split("_")(0)

val back = line.split("_")(1).split("\\|")

var str = "";
for(item <- back){
println("------" +item)
str += (prfix+"_"+item +"|")

}
str.substring(0, str.length() -1).split("\\|")
}
}.map { line => {

(line.split("_")(0),line.split("_")(1))
} }.foreach(println)
觉得这可能没什么但是有更实用的代码:其中message_content格式AA=>aa,BB=>bb,CC=>cc ,原来是没办法进行join操作的,转换后就可以进行join操作。
 val status = hiveContext.sql("select obd_id , message_content ,create_time from device_status_log where message_id = '31' " +
          " and create_time>='" + startTime + "' create_time>='" + endTime + "'")
      val flatedRDDRow = status.flatMap { row => {

          val obd_id = row.getAs[String ]("obd_id")
          val message_content = row.getAs[String ]("message_content")
          val create_time = row.getAs[String ]("create_time")
          val items = message_content.substring(message_content.indexOf("[") + 1, message_content.indexOf("]")).split(",")
          var a =  scala.collection.immutable.List[Row]()  // 方式三
          for(item <- items){
            val code = descCode(item)
            //  过滤出正常的情况
            if (! "P0000".equalsIgnoreCase(code)) {
            <span style="white-space:pre">	</span>a = a.::(RowFactory.create(obd_id ,descCode(item),create_time))
            }
          }
          a.toArray[Row]
        }
      }    
// 不过在hive中提供了 函数可以进行类似的操作DataFrame.explode
df.explode("words", "word"){words: String => words.split(" ")} // words 是原始的字段,word为切分后的字段,当然这里split可以根据实际情况进行操作,只要返回数组就行。 注意:explode函数中得到的数据可能会有某些数值是不想要的,在返回的数组中直接去掉就行了,看来还是explode比价简单实用。


val sqlContext = new HiveContext(sc)
//    val sqlContext = new SQLContext(sc)

val df = sqlContext.read.text("breakdowntable")
val RDDRow = df.map { row => {
val line = row.getAs[String](0)
val split = line.split("\\|")
Row(split(0), split(1),split(2),split(3),split(4))
}
}
val  fields  = "type,code,level,descrption,remind".split("\\,").
map( filed => StructField(filed, StringType, true) )
val schema = StructType(fields)
val flatedDF = sqlContext.createDataFrame(RDDRow , schema)
flatedDF.registerTempTable("breakdown_table")
//    不能执行本地运行 因为hive的metastore
//    sqlContext.sql("select * ,row_number() over( partition by code order by type desc ) seq from breakdown_table").show(2)

println("----------schema --------")
//    df.printSchema()
//  打印如下
//    root
// |-- value: string (nullable = true)

flatedDF.printSchema()
//    打印如下
//    root
// |-- type: string (nullable = true)
// |-- code: string (nullable = true)
// |-- level: string (nullable = true)
// |-- descrption: string (nullable = true)
// |-- remind: string (nullable = true)
println("----------schema --------")
flatedDF.show()
println("--------show ----------")
//    val result = df

}
}
val  aa = Array("this_1|2|3|4")
val bb = sc.parallelize(aa.toSeq, 1)

bb.flatMap { line => {
val prfix = line.split("_")(0)

val back = line.split("_")(1).split("\\|")

var str = "";
for(item <- back){
println("------" +item)
str += (prfix+"_"+item +"|")

}
str.substring(0, str.length() -1).split("\\|")
}
}.map { line => {

(line.split("_")(0),line.split("_")(1))
} }.foreach(println)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: