Learning Spark——Spark连接Mysql、mapPartitions高效连接HBase
2017-05-20 16:12
1516 查看
执行Spark任务免不了从多个数据源拿数据,除了从HDFS获取数据以外,我们还经常从Mysql和HBase中拿数据,今天讲一下如何使用Spark查询Mysql和HBase
还有一种更方便的方法就是直接将这个jar包放到Spark放jar包的目录下面,我的目录是
然后给出Spark读Mysql时的标准代码:
解释一下这段代码:
1、其中spark就是SparkSession
2、如果是读操作,就是
3、
4、最后的结果是一个DataFrame,可以很方便地使用SparkSql继续其他操作
mapPartitions函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器,返回的结果也是每一个分区的迭代器。如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的多
比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用map函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用mapPartitions,那么只需要针对每一个分区建立一个connection
下面上栗子:RDD中的内容是字符串,这段字符串是HBase主键rowKey的一部分,现在要根据这条字符串查出HBase中的一条信息。例如HBase的主键是aaabbb111222,RDD中存的内容是aaa,我们要查出HBase主键前缀是aaa的所有记录的主键、包名列表和时间
代码如下:
最后存的结果是:
上面的完整代码见我的github:
https://github.com/Trigl/SparkLearning/blob/master/src/main/scala/com/trigl/spark/main/JDBC2Mysql.scala
https://github.com/Trigl/SparkLearning/blob/master/src/main/scala/com/trigl/spark/main/HBaseDemo.scala
Refer:
http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
http://lxw1234.com/archives/2015/07/348.htm
http://blog.csdn.net/lsshlsw/article/details/48627737
1. Spark查询Mysql
首先,Spark连接Mysql当然需要有Mysql的驱动包,你可以在启动时加上如下命令:bin/spark-shell --driver-class-path /home/hadoop/jars/mysql-connector-java-5.1.34.jar --jars /home/hadoop/jars/mysql-connector-java-5.1.34.jar
还有一种更方便的方法就是直接将这个jar包放到Spark放jar包的目录下面,我的目录是
/data/install/spark-2.0.0-bin-hadoop2.7/jars,这样Spark就可以直接找到Mysql驱动包了
然后给出Spark读Mysql时的标准代码:
val imeis = spark.read.format("jdbc").options( Map("url" -> DbUtil.IMEI_DB_URL, // "dbtable" -> "(SELECT id,imei,imeiid FROM t_imei_all) a", "dbtable" -> DbUtil.IMEI_ALL_TABLE, "user" -> DbUtil.IMEI_DB_USERNAME, "password" -> DbUtil.IMEI_DB_PASSWORD, "driver" -> "com.mysql.jdbc.Driver", // "fetchSize" -> "1000", "partitionColumn" -> "id", "lowerBound" -> "1", "upperBound" -> "15509195", "numPartitions" -> "20")).load()
解释一下这段代码:
1、其中spark就是SparkSession
2、如果是读操作,就是
spark.read,如果是写操作,就是
spark.write
3、
options里面就是我们需要查询的一些具体信息,有关配置如下:
配置项 | 含义 |
---|---|
url | 数据库连接地址,如:jdbc:mysql://localhost:3306/IMEI?useUnicode=true&characterEncoding=UTF8 |
dbtable | 要查询的表,这里有两种书写方式,一种可以直接写一个表名,如:t_test;另一种是写一条查询语句,但是注意要给一个别名,如:(SELECT id,imei,imeiid FROM t_imei_all) a。建议使用第一种,第二种查询会很慢 |
driver | 数据库驱动,如Mysql的是:com.mysql.jdbc.Driver |
user | 数据库用户名 |
password | 数据库密码 |
partitionColumn, lowerBound, upperBound, numPartitions | 这几个参数用来指定用哪个列来分区,当我们查询的量很大时,例如超过千万的数据量,如果Spark不分区查询的话很快就会报OOM异常了。而且这几个参数只要指定其中一个,其他的就也要指定,partitionColumn是要分区的列,必须是整数类型;lowerBound和upperBound是分区的上下限;numPartitions是分区数 |
fetchsize | 用于读操作,每次读取多少条记录 |
batchsize | 用于写操作,每次写入多少条记录 |
isolationLevel | 用于写操作,数据库的隔离级别 |
truncate | 用于写操作,当Spark要执行覆盖表操作时,即启用了SaveMode.Overwrite,使用truncate比使用drop或者recreate操作更高效,默认是false |
createTableOptions | 用于写操作,可以指定建表的语句,如: CREATE TABLE t (name string) ENGINE=InnoDB |
2. 如何在RDD中高效连接HBase
连接HBase直接用HBase的API就好了,我们这里重点讲的是在RDD中连接HBase,大家都知道Spark处理的都是很大的数据量,而RDD连接HBase的时候势必会产生很多与HBase的连接,这样很快就会用光连接数,这里我们使用一个算子mapPartitions来解决这个问题mapPartitions函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器,返回的结果也是每一个分区的迭代器。如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的多
比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用map函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用mapPartitions,那么只需要针对每一个分区建立一个connection
下面上栗子:RDD中的内容是字符串,这段字符串是HBase主键rowKey的一部分,现在要根据这条字符串查出HBase中的一条信息。例如HBase的主键是aaabbb111222,RDD中存的内容是aaa,我们要查出HBase主键前缀是aaa的所有记录的主键、包名列表和时间
代码如下:
package com.trigl.spark.main import com.trigl.spark.util.HbaseUtil import org.apache.hadoop.hbase.TableName import org.apache.hadoop.hbase.client.{Result, Scan} import org.apache.hadoop.hbase.util.Bytes import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} /** * 高效连接HBase示例 * created by Trigl at 2017-05-20 15:34 */ object HBaseDemo { def main(args: Array[String]) { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sparkConf = new SparkConf().setAppName("HBaseDemo") val sc = new SparkContext(sparkConf) val data = sc.textFile("/test/imei.txt").mapPartitions(getHBaseInfo) // 结果以 主键|包列表|时间 的格式存入HDFS data.map(l => { val rowKey = l._2._1 val pkgList = l._2._2 val time = l._1 rowKey + "|" + pkgList + "|" + time // 用"|"分隔 }).repartition(1).saveAsTextFile("/test/fenxi/cpz") sc.stop() } /** * 从HBase查询 * * @param iter mapPartion算子的参数是Iterator * @return 返回的也是Iterator */ def getHBaseInfo(iter: Iterator[String]): Iterator[(String, (String, String))] = { var pkgList = List[(String, (String, String))]() // 结果格式为(日期,(主键,包名集合)) // 建立连接查询表 val conn = HbaseUtil.getConnection(HbaseUtil.TABLE_NAME_CPZ_APP) val table = conn.getTable(TableName.valueOf(HbaseUtil.TABLE_NAME_CPZ_APP)) // 新建Scan用于指定查询内容 val scan = new Scan() scan.setCaching(10000) scan.setCacheBlocks(false) // 要查询的列 scan.addColumn(HbaseUtil.COLUMN_FAMILY.getBytes, "packagelist".getBytes) scan.addColumn(HbaseUtil.COLUMN_FAMILY.getBytes, "cdate".getBytes) while (iter.hasNext) { // 要查询的前缀 val imei = iter.next() // HBase前缀查询 scan.setRowPrefixFilter(imei.getBytes) // 查询结果 val resultScanner = table.getScanner(scan) val it = resultScanner.iterator() if (it.hasNext) { val result: Result = it.next() // 主键 val key = Bytes.toString(result.getRow) // 日期 val cdate = Bytes.toString(result.getValue(HbaseUtil.COLUMN_FAMILY.getBytes, "cdate".getBytes)) // 包列表 val packagelist = Bytes.toString(result.getValue(HbaseUtil.COLUMN_FAMILY.getBytes, "packagelist".getBytes)) // 添加到集合中 pkgList.::=(cdate, (key, packagelist)) } } // 关闭HBase连接 table.close() conn.close() // 结果返回iterator pkgList.iterator } }
最后存的结果是:
8643960350683910864396035068383020170421134920XHt3IGTdqIKtV5Y|2627,com.sogou.activity.src,fc8dbdce14c111859fd0111b03e80cd7,0;2856,com.qihoo.appstore,aa90bca1ab548eadd44a0c1d8c34cbda,0|2017-04-21 13:49:18
上面的完整代码见我的github:
https://github.com/Trigl/SparkLearning/blob/master/src/main/scala/com/trigl/spark/main/JDBC2Mysql.scala
https://github.com/Trigl/SparkLearning/blob/master/src/main/scala/com/trigl/spark/main/HBaseDemo.scala
Refer:
http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
http://lxw1234.com/archives/2015/07/348.htm
http://blog.csdn.net/lsshlsw/article/details/48627737
相关文章推荐
- spark任务中报连接不到hbase的错误
- Spark踩坑记——数据库(Hbase+Mysql)
- MySQL高效处理大量并发的数据库连接方法
- Spark web Framework 结合sql2o连接MySQL
- Scala Spark 连接 HBase ( IDEA) 调试Spark Standalone
- Spark踩坑记——数据库(Hbase+Mysql)
- spark 连接hbase
- SparkSQL JDBC连接 mysql
- spark 操作hbase及mysql
- Spark连接需Kerberos认证的HBase
- Spark 1.4连接mysql诡异的问题及解决
- sqoop连接hbase以及spark sql使用
- Spark如何写入HBase/Redis/MySQL/Kafka
- Spark连接mysql
- Spark踩坑记——数据库(Hbase+Mysql)
- sparkStreaming连接数据库(mysql)
- CDH5.8手动安装spark2.1的运行错误整合 spark访问Hbase 数据导入mysql
- How-to: use spark to suport query across mysql tables and hbase tables
- Spark 连接mysql 及MongoDB
- Spark学习(SparkSQL连接oralce,MySQL)