基于Spark DataFrame的数据仓库框架
2015-11-30 10:55
731 查看
数据存储的多样性,对数据分析、挖掘带来众多不变。应用瓶颈表现在两个方面:
1. 传统数据库mysql等的数据处理能力有限,随着数据量的增加,join、groupby、orderby等操作出现速度极慢,甚至将机器资源耗尽、不能运行的情况;另一方面,将数据存储转移到分布式系统比如hdfs的代价太大。
2. 不能进行跨数据源的访问。比如对hive table、htable、mysql中的数据进行混合查询。以前的普遍做法是将一个数据源的数据导入另一数据源。这其中的技术应用包括sqoop、jdbc、hive外表等。此种方法,处理周期长,且sqoop对特殊字符的处理有些问题。
利用spark dataframe,针对以上两个问题,解决方案:
1. 针对传统数据库,比如mysql,将计算压力转移到hadoop(spark-on-yarn)集群中;针对是否有实时性要求的数据分析需求,分为两条线处理:
1) 实时数据:利用下图中的sparkquery system,将mysql tables转换为dataframe,执行对dataframe的sql运算;
2) 非实时数据:此为大部分需求,一般分析截止到前一天的数据,此种情况下,利用图中spark loading system,每天定时将数据导入hdfs,以parquet表(列式存储)格式存储。
此种情况下,mysql只有导表的压力,此压力相当于执行select * from table limit $spark_partition_num,其中spark_partition_num是spark在导入mysql数据时,根据表情况进行的分片大小,用户可设定。
2. 针对跨表操作,通过spark query system将各类数据源转换为dataframe并利用spark sql运行跨表的sql运算。
Sql parser: 提取需要导入的表名,按需导入。Key code:
def getTables(sql: String)= {
tables_name=""
valsqlparser =new
TGSqlParser(EDbVendor.dbvmysql)
sqlparser.sqltext = sql
valret =sqlparser.parse();
if(ret==0)
{
valstm_size =sqlparser.sqlstatements.size()
for(i <-0
to stm_size -1) {
valstm =sqlparser.sqlstatements.get(i)
analyzeStmt(stm)
}
} else{
System.out.println("Failed to parse sql:"+ sql)
System.out.println(sqlparser.getErrormessage())
tables_name=""
}
tables_name.replaceFirst(",","")
}
def analyzeStmt(stm: TCustomSqlStatement) {
valtables = stm.tables
valtables_size =tables.size()
for(j <-0
to tables_size -1) {
if(tables.getTable(j).isBaseTable())
{
valcurrent_table =tables.getTable(j).getFullName
if (!tables_name.split(",").contains(current_table)){
tables_name =tables_name.+(","
+tables.getTable(j).getFullName.replace("`",""))
}
}
}
for(i <-0
tostm.getStatements().size() -1) {
analyzeStmt(stm.getStatements().get(i))
}
}
Spark query system:根据不同数据源选择不同读表模式,目前支持hbase、parquet、mysql。最大优势在于运行在spark-on-yarn带来的速度提升,及跨数据源操作。读表key code:
Parquet表:
sqlContext.parquetFile(base_dir+
"/"+ table).toDF().registerTempTable(table)
Mysql表:
LoadMysqlTables.register_mysql_table(sqlContext,url,user,password,mysql_table,"id")
HBase表:
//Need to transfer htable toJavaRDD<Row>
sqlContext.createDataFrame(JavaRDD<Row>,htable_schema).toDF().registerTempTable(table)
//JavaRDD<row> could be get from following toRowRDD function
Spark loading system:针对传统数据库比如mysql的数据,每天定时。导入速度比较快,在9台测试集群(spark-on-yarn)上,3亿条数据量的表大约需要2分钟。导入模式按需求不同,串行导入不满足时间需求的情况下,可使用多表并行导入;或者进行每天(月)更新导入数据,比如只导入前一天(月)更新的数据,并按照天(月)做分区进行存储。
varoptions: HashMap[String,String] =newHashMap
options.put("driver","com.mysql.jdbc.Driver")
options.put("url",url)//mysql
url withusername/password and others like tinyInt1isBit
options.put("dbtable",table)
//"select * from " + table + " where "+id+" >="+lower_bound+" and "+id+" <= "+upper_bound
options.put("lowerBound",lower_bound.toString())
options.put("upperBound",upper_bound.toString())
options.put("numPartitions",partitions.toString())
options.put("partitionColumn",id);
sqlContext.load("jdbc",options).save(output, SaveMode)
1. 传统数据库mysql等的数据处理能力有限,随着数据量的增加,join、groupby、orderby等操作出现速度极慢,甚至将机器资源耗尽、不能运行的情况;另一方面,将数据存储转移到分布式系统比如hdfs的代价太大。
2. 不能进行跨数据源的访问。比如对hive table、htable、mysql中的数据进行混合查询。以前的普遍做法是将一个数据源的数据导入另一数据源。这其中的技术应用包括sqoop、jdbc、hive外表等。此种方法,处理周期长,且sqoop对特殊字符的处理有些问题。
利用spark dataframe,针对以上两个问题,解决方案:
1. 针对传统数据库,比如mysql,将计算压力转移到hadoop(spark-on-yarn)集群中;针对是否有实时性要求的数据分析需求,分为两条线处理:
1) 实时数据:利用下图中的sparkquery system,将mysql tables转换为dataframe,执行对dataframe的sql运算;
2) 非实时数据:此为大部分需求,一般分析截止到前一天的数据,此种情况下,利用图中spark loading system,每天定时将数据导入hdfs,以parquet表(列式存储)格式存储。
此种情况下,mysql只有导表的压力,此压力相当于执行select * from table limit $spark_partition_num,其中spark_partition_num是spark在导入mysql数据时,根据表情况进行的分片大小,用户可设定。
2. 针对跨表操作,通过spark query system将各类数据源转换为dataframe并利用spark sql运行跨表的sql运算。
Sql parser: 提取需要导入的表名,按需导入。Key code:
def getTables(sql: String)= {
tables_name=""
valsqlparser =new
TGSqlParser(EDbVendor.dbvmysql)
sqlparser.sqltext = sql
valret =sqlparser.parse();
if(ret==0)
{
valstm_size =sqlparser.sqlstatements.size()
for(i <-0
to stm_size -1) {
valstm =sqlparser.sqlstatements.get(i)
analyzeStmt(stm)
}
} else{
System.out.println("Failed to parse sql:"+ sql)
System.out.println(sqlparser.getErrormessage())
tables_name=""
}
tables_name.replaceFirst(",","")
}
def analyzeStmt(stm: TCustomSqlStatement) {
valtables = stm.tables
valtables_size =tables.size()
for(j <-0
to tables_size -1) {
if(tables.getTable(j).isBaseTable())
{
valcurrent_table =tables.getTable(j).getFullName
if (!tables_name.split(",").contains(current_table)){
tables_name =tables_name.+(","
+tables.getTable(j).getFullName.replace("`",""))
}
}
}
for(i <-0
tostm.getStatements().size() -1) {
analyzeStmt(stm.getStatements().get(i))
}
}
Spark query system:根据不同数据源选择不同读表模式,目前支持hbase、parquet、mysql。最大优势在于运行在spark-on-yarn带来的速度提升,及跨数据源操作。读表key code:
Parquet表:
sqlContext.parquetFile(base_dir+
"/"+ table).toDF().registerTempTable(table)
Mysql表:
LoadMysqlTables.register_mysql_table(sqlContext,url,user,password,mysql_table,"id")
HBase表:
//Need to transfer htable toJavaRDD<Row>
sqlContext.createDataFrame(JavaRDD<Row>,htable_schema).toDF().registerTempTable(table)
//JavaRDD<row> could be get from following toRowRDD function
Spark loading system:针对传统数据库比如mysql的数据,每天定时。导入速度比较快,在9台测试集群(spark-on-yarn)上,3亿条数据量的表大约需要2分钟。导入模式按需求不同,串行导入不满足时间需求的情况下,可使用多表并行导入;或者进行每天(月)更新导入数据,比如只导入前一天(月)更新的数据,并按照天(月)做分区进行存储。
varoptions: HashMap[String,String] =newHashMap
options.put("driver","com.mysql.jdbc.Driver")
options.put("url",url)//mysql
url withusername/password and others like tinyInt1isBit
options.put("dbtable",table)
//"select * from " + table + " where "+id+" >="+lower_bound+" and "+id+" <= "+upper_bound
options.put("lowerBound",lower_bound.toString())
options.put("upperBound",upper_bound.toString())
options.put("numPartitions",partitions.toString())
options.put("partitionColumn",id);
sqlContext.load("jdbc",options).save(output, SaveMode)
相关文章推荐
- MySQL中的integer 数据类型
- MySQL存储过程
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- mysql中int、bigint、smallint 和 tinyint的区别与长度
- mysql load data 导出、导入 csv
- source命令执行SQL脚本文件
- MySQL创建用户及权限控制
- MySQL管理数据表
- linux下mysql添加用户
- mysql procedure
- mysql触发器
- MySQL 备份和恢复策略
- Spark随谈——开发指南(译)
- mac下安装mysql(转载)
- mysql 修改编码 Linux/Mac/Unix/通用(杜绝修改后无法启动的情况!)
- MySQL数据的导出、导入(mysql内部命令:mysqldump、mysql)
- mysql数据行转列
- Linux下修改MySQL编码的方法