您的位置:首页 > 其它

[置顶] spark数据导入、处理实例

2017-05-24 22:58 183 查看
当项目中遇到所要分析的数据量较大情况时,本地python直接处理或导入数据库等普通的处理方式显然并不合适,不仅效率低下,且容易引起数据库崩溃。用spark将本地数据上传hdfs,写入hive,会更加高效。

下面实现简单的spark下数据处理方式,,语言为scala,供大家参考。 

import com.databricks.spark.csv

import org.apache.spark._

import org.apache.spark.sql.hive.HiveContext

import scala.xml._

import org.apache.hadoop.fs.FileSystem

import org.apache.hadoop.fs.Path

import org.apache.hadoop.conf.Configuration

class CDataProcess{

    def dataProcess(){

    //initialize spark

    val conf = new SparkConf()

    val sc = new SparkContext(conf)

            val sqlContext = new HiveContext(sc)

    //initialize fs

    val fs = FileSystem.get(new Configuration)

    //get config

    val config = XML.loadFile("./config.xml")

    val nodes = config \ "FILE"

    nodes.foreach { n =>

        val database = (n \ "HIVE").text

val tableName = (n \ "TABLE").text

sqlContext.sql(s"use $database")

//import to hive

if( (n \\ "IMPORT TO HIVE").text.toBoolean ){

    val filepath = ( n \\ "FILEPATH").text

    //update file to hdfs

    fs.copyFromLocationFile(false,new Path(filePath),new Path("/user/spark"))

    val hdfsPath = "user/spark" + filePath.split("/").last

    try{

        val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").option("inferSchema","true").load(hdfsPath)

df.show()

df.write.mode("overwrite").saveAsTable(tableName)

    }

    catch case e:Exception => e.printStackTrace()

    finally fs.delete(new Path(hdfsPath),false)

}

        }

    }

}

以上代码为将本地文件上传hdfs,在写入hive,如有不当之处,欢迎指正。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: