[置顶] spark数据导入、处理实例
2017-05-24 22:58
183 查看
当项目中遇到所要分析的数据量较大情况时,本地python直接处理或导入数据库等普通的处理方式显然并不合适,不仅效率低下,且容易引起数据库崩溃。用spark将本地数据上传hdfs,写入hive,会更加高效。
下面实现简单的spark下数据处理方式,,语言为scala,供大家参考。
import com.databricks.spark.csv
import org.apache.spark._
import org.apache.spark.sql.hive.HiveContext
import scala.xml._
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
class CDataProcess{
def dataProcess(){
//initialize spark
val conf = new SparkConf()
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
//initialize fs
val fs = FileSystem.get(new Configuration)
//get config
val config = XML.loadFile("./config.xml")
val nodes = config \ "FILE"
nodes.foreach { n =>
val database = (n \ "HIVE").text
val tableName = (n \ "TABLE").text
sqlContext.sql(s"use $database")
//import to hive
if( (n \\ "IMPORT TO HIVE").text.toBoolean ){
val filepath = ( n \\ "FILEPATH").text
//update file to hdfs
fs.copyFromLocationFile(false,new Path(filePath),new Path("/user/spark"))
val hdfsPath = "user/spark" + filePath.split("/").last
try{
val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").option("inferSchema","true").load(hdfsPath)
df.show()
df.write.mode("overwrite").saveAsTable(tableName)
}
catch case e:Exception => e.printStackTrace()
finally fs.delete(new Path(hdfsPath),false)
}
}
}
}
以上代码为将本地文件上传hdfs,在写入hive,如有不当之处,欢迎指正。
下面实现简单的spark下数据处理方式,,语言为scala,供大家参考。
import com.databricks.spark.csv
import org.apache.spark._
import org.apache.spark.sql.hive.HiveContext
import scala.xml._
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
class CDataProcess{
def dataProcess(){
//initialize spark
val conf = new SparkConf()
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
//initialize fs
val fs = FileSystem.get(new Configuration)
//get config
val config = XML.loadFile("./config.xml")
val nodes = config \ "FILE"
nodes.foreach { n =>
val database = (n \ "HIVE").text
val tableName = (n \ "TABLE").text
sqlContext.sql(s"use $database")
//import to hive
if( (n \\ "IMPORT TO HIVE").text.toBoolean ){
val filepath = ( n \\ "FILEPATH").text
//update file to hdfs
fs.copyFromLocationFile(false,new Path(filePath),new Path("/user/spark"))
val hdfsPath = "user/spark" + filePath.split("/").last
try{
val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").option("inferSchema","true").load(hdfsPath)
df.show()
df.write.mode("overwrite").saveAsTable(tableName)
}
catch case e:Exception => e.printStackTrace()
finally fs.delete(new Path(hdfsPath),false)
}
}
}
}
以上代码为将本地文件上传hdfs,在写入hive,如有不当之处,欢迎指正。
相关文章推荐
- phpexcel导入excel处理大数据(实例讲解)
- 基因数据处理101之SparkBWA本地运行配置和实例
- 基因数据处理102之SparkBWA本地运行100万条paired-reads实例
- spark处理大数据的几个实例介绍
- ThinkPHP使用PHPExcel实现Excel数据导入导出完整实例
- [置顶] python语言处理get类型请求,调试模式获取数据代码
- 大数据处理为何选择Spark,而不是Hadoop
- ajax(二) ajax处理返回数据格式xml 实例
- c#处理3种json数据的实例
- 大数据IMF传奇行动绝密课程第116课:Spark Streaming性能优化:如何在毫秒内处理大吞吐量和数据波动比较大的流计算
- Spark——为数据分析处理提供更为灵活的赋能
- oracle 数据泵导入导出实例
- kafka->spark->streaming->mysql(scala)实时数据处理示例
- 即时通讯下数据粘包、断包处理实例(基于CocoaAsyncSocket)
- Kafka 和 Spark Streaming 构建实时数据处理系统
- 转自美团技术:机器学习中的数据清洗与特征处理综述&实例详解机器学习如何解决问题
- SQL SERVER2000教程-第五章 处理数据 第十四节 数据导入导出
- 详解 Java中日期数据类型的处理之格式转换的实例
- SSIS处理导入数据时, 存在的更新, 不存在的插入