Spark1.6.2 java实现读取txt文件插入MySql数据库代码
2016-07-28 14:09
666 查看
package com.gosun.spark1;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class MySqlWriter {
public static void main(String[] args) throws ClassNotFoundException {
long current = System.currentTimeMillis();
//String master = "spark://192.168.31.34:7077";
String localmaster = "local[5]";
SparkConf sparkConf = new SparkConf().setAppName("mysqlJdbc").setMaster(localmaster);
// spark应用的上下对象
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
String url = "jdbc:mysql://192.168.31.16:3306/db?useUnicode=true&characterEncoding=utf-8";
String table = "tb_user";
Properties connectionProperties = new Properties();
connectionProperties.put("user", "root");
connectionProperties.put("password", "mysql");
connectionProperties.put("driver", "com.mysql.jdbc.Driver");
// 加载数据库中的数据
JavaRDD<String> lines = sc.textFile( "F:/BaiduYunDownload/data/class9/user1.txt" );
JavaRDD<Row> personRDD = lines.map(new Function<String, Row>() {
private static final long serialVersionUID = 1L;
public Row call( String line )
throws Exception {
String[] split = line.split("\t");
return RowFactory.create(String.valueOf(split[0]),String.valueOf(split[1]));
}
});
List<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField( "id", DataTypes.StringType, false ));
structFields.add(DataTypes.createStructField( "name", DataTypes.StringType, true ));
StructType structType = DataTypes.createStructType( structFields );
DataFrame usersDf = sqlContext.createDataFrame( personRDD, structType);
usersDf.write().mode(SaveMode.Append).mode(SaveMode.Overwrite).jdbc(url, table, connectionProperties);
System.out.println((System.currentTimeMillis() - current) / 1000 + "s");
}
}
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class MySqlWriter {
public static void main(String[] args) throws ClassNotFoundException {
long current = System.currentTimeMillis();
//String master = "spark://192.168.31.34:7077";
String localmaster = "local[5]";
SparkConf sparkConf = new SparkConf().setAppName("mysqlJdbc").setMaster(localmaster);
// spark应用的上下对象
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
String url = "jdbc:mysql://192.168.31.16:3306/db?useUnicode=true&characterEncoding=utf-8";
String table = "tb_user";
Properties connectionProperties = new Properties();
connectionProperties.put("user", "root");
connectionProperties.put("password", "mysql");
connectionProperties.put("driver", "com.mysql.jdbc.Driver");
// 加载数据库中的数据
JavaRDD<String> lines = sc.textFile( "F:/BaiduYunDownload/data/class9/user1.txt" );
JavaRDD<Row> personRDD = lines.map(new Function<String, Row>() {
private static final long serialVersionUID = 1L;
public Row call( String line )
throws Exception {
String[] split = line.split("\t");
return RowFactory.create(String.valueOf(split[0]),String.valueOf(split[1]));
}
});
List<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField( "id", DataTypes.StringType, false ));
structFields.add(DataTypes.createStructField( "name", DataTypes.StringType, true ));
StructType structType = DataTypes.createStructType( structFields );
DataFrame usersDf = sqlContext.createDataFrame( personRDD, structType);
usersDf.write().mode(SaveMode.Append).mode(SaveMode.Overwrite).jdbc(url, table, connectionProperties);
System.out.println((System.currentTimeMillis() - current) / 1000 + "s");
}
}
相关文章推荐
- error: 'Can't connect to local MySQL server through socket '/var/run/mysqld/mysqld.sock' (2)'
- mysql 1449 : The user specified as a definer ('root'@'%') does not exist 解决方法
- Mysql日常开发注意要点
- mysql连接超时与jndi数据源配置
- MySQL查询结果保存到本地
- MySQL处理千万级数据查询、分页
- 记录mysql的具体操作明细
- mac下使用brew安装mysql
- MySQL索引总结
- MySQL与PostgreSQL:该选择哪个开源数据库?哪一个更好?
- mysql日志
- MySQL基础(四)——操作数据表
- 关于mysql的join(图)
- mysql 的一些基本操作
- Mybatis插入一条数据后返回刚插入数据主键
- mysql索引优化 btree rtree hash full-text
- SHOWENGINE INNODB STATUS详细介绍
- Mysql自学笔记五(存储过程)
- MySQL备份之分库分表备份脚本
- mysql深入之视图和索引