您的位置:首页 > 数据库

spark sql dataframe操作

2016-11-10 16:59 501 查看
package com.xmgps.yfzx.sparkhadoop

import com.xmgps.yfzx.sparkhadoop.TraceData.Trace
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object SparkHadoopFileDF {

def main(args: Array[String]): Unit = {
val path = "hdfs://mycluster/LINYUN/TAXIDATA/2016/11/09/TAXI20161109"

val conf = new SparkConf().setAppName("spark sql dataframe").setMaster("local[1]")
val sc = SparkContext.getOrCreate(conf)

val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

val traceRdd = sc.textFile(path)

val traceDF = traceRdd.map(f => f + "end").map(_.split(",")).map(p => Trace(p(0),p(1),
p(2).toLong,p(3).toLong,p(4),p(5),p(6).toLong,p(7).toLong,p(8).toInt,p(9).toDouble,p(10),p(11),p(12),p(13),
p(14),p(15),p(16),p(17),p(18),p(19))).toDF()

val result = traceDF.select("carNo","carColor").distinct().count()
println(result)
}
}


package com.xmgps.yfzx.sparkhadoop

/**
* Created by JiKai Lin on 11/9/2016.
*/
object TraceData {

case class Trace(
carNo: String, carColor: String, gpsTime: Long, sysTime: Long, vehicleStatus: String, alarmStatus: String,
longitude: Long, latidude: Long, height: Int, speed: Double, direction: String, totalKMS: String, attacheds: String,
enterPriseId: String, isResend: String, lineId: String, preStationOrder: String, preStationDist: String,
vec2: String, gnssCenterId: String
)
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: