Spark数据存储到mysql中
2015-07-01 19:13
645 查看
import org.apache.spark.SparkContext import org.bson.{BSONObject, BasicBSONObject} /** * Created by ha1 on 6/30/15. */ object DFMain { case class Person(name: String, age: Double, t:String) def main (args: Array[String]): Unit = { val sc = new SparkContext("local", "Scala Word Count") val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val bsonRDD = sc.parallelize(("foo",1,"female")::("bar",2,"male")::("baz",-1,"female")::Nil).map(tuple=>{ var bson = new BasicBSONObject() bson.put("name","bfoo") bson.put("value",0.1) bson.put("t","female") (null,bson) }) val tDf = bsonRDD.map(_._2).map( f=> Person(f.get("name").toString, f.get("value").toString.toDouble, f.get("t").toString)).toDF() tDf.limit(1).show() val MYSQL_USRENAME = "root" val MYSQL_PWD = "root" val MYSQL_CONNECTION_URL = "jdbc:mysql://localhost/test?user="+MYSQL_USRENAME+"&password="+MYSQL_PWD tDf.createJDBCTable(MYSQL_CONNECTION_URL,"stest",true) } }
需要引入的包,mysql-connector-java-xxx.jar,我选择的版本是5.1.15
spark-1.3.1
大家可能会觉得我的bsonRDD比较奇怪,其实不是,因为我需要将结果存入到mongodb和mysql, bsonRDD是存入mongodb的格式,大家重点关注tDf就可以了。
这个创建一个新的表的方式,另外一种是插入的方式
tDf.insertIntoJDBC(MYSQL_CONNECTION_URL, "users", false);
这里需要提醒大家一句,这个函数在spark-1.4.0里是不被提倡或者被抛弃的,已经改用函数save()了,详情可以参考官方doc文件,
另外需要提醒大家的是,这个与数据库的连接用完之后是没有被释放的,也就是说,如果你的程序一直调用mysqlDao插入数据,那么很快就会达到你mysql数据库设计的最大连接数而导致出错,最好的办法是使用scala写一个类似于java的mysqlDao。
scala连接mysql 博客教程
参考资料:
Save apache spark dataframe to database
Spark SQL and DataFrame Guide
相关文章推荐
- mysql5.6.24更改character-set-server的字符集
- 使用c3p0与DBCP连接池,造成的MySql 8小时问题解决方案
- MySQL PLSQL Demo - 003.静态游标
- MySQL PLSQL Demo - 002.变量定义、赋值
- MySQL PLSQL Demo - 001.创建、调用、删除过程
- nodejs之generic-pool数据库连接池(mysql)
- nodejs解决mysql和连接池(pool)自动断开问题
- 就这么简单:秒杀应用的MySQL数据库优化
- 不能链接云服务器mysql
- MySQL更改数据库文件的目录
- 图说mysql查询执行流程
- MySQL 1045登录失败
- 优化MySQL的21个建议
- mysql 中文乱码问题
- MySQL数据库负载很高连接数处理方法
- 几百万的数据,mysql快速高效创建索引
- 如何查看MySQL的当前存储引擎?
- 根据mysqlbin恢复丛库数据
- 根据mysqlbin恢复丛库数据
- 如何让mysql数据库支持超大图片