您的位置:首页 > 数据库 > MySQL

sparkSQL用jdbc访问mysql

2017-03-17 17:51 274 查看
maven:

<!-- spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.0</version>
</dependency>
<!-- google工具类 -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>

public class JDBCDataSource {
public static void main(String[] args) {
/**
* 设置mysql 无密访问 ,spark1为服务器hostname
* grant all on testdb " to "@'spark1' with grant option;
* flush privileges;
*/
SparkConf conf = new SparkConf().setAppName("DataFrameDynamic").setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
Map<String, String>
4000
; options = Maps.newHashMap();
options.put("url","jdbc:mysql://spark1:3306/testdb");
options.put("dbtable","student_infos");
DataFrame studentInfosDF = sqlContext.read().format("jdbc").options(options).load();

options.put("dbtable","student_scores");
DataFrame studentScoresDF = sqlContext.read().format("jdbc").options(options).load();

JavaPairRDD<String, Tuple2<Integer, Integer>> studentsRDD = studentInfosDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
public Tuple2<String, Integer> call(Row row) throws Exception {
return new Tuple2<String, Integer>(row.getString(0), Integer.valueOf(String.valueOf(row.get(1))));
}
}).join(studentScoresDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
public Tuple2<String, Integer> call(Row row) throws Exception {
return new Tuple2<String, Integer>(row.getString(0), Integer.valueOf(String.valueOf(row.get(1))));
}
}));
//将JavaPairRDD转换为JavaRDD<Row>
JavaRDD<Row> studentsRowRDD = studentsRDD.map(new Function<Tuple2<String, Tuple2<Integer, Integer>>, Row>() {
public Row call(Tuple2<String, Tuple2<Integer, Integer>> v1) throws Exception {
return RowFactory.create(v1._1, v1._2._1, v1._2._2);
}
});
JavaRDD<Row> filterStudentsRowRDD = studentsRowRDD.filter(new Function<Row, Boolean>() {
public Boolean call(Row v1) throws Exception {
if (v1.getInt(2) > 80) {
return true;
}
return false;
}
});
//转化为DataFrame
ArrayList<StructField> fields = Lists.newArrayList();
fields.add(DataTypes.createStructField("name",DataTypes.StringType,true));
fields.add(DataTypes.createStructField("age",DataTypes.IntegerType,true));
fields.add(DataTypes.createStructField("scores",DataTypes.IntegerType,true));
StructType structType = DataTypes.createStructType(fields);
DataFrame studentsDF = sqlContext.createDataFrame(filterStudentsRowRDD, structType);
//将DataFrame数据保存到mysql表中
studentsDF.javaRDD().foreach(new VoidFunction<Row>() {
public void call(Row row) throws Exception {
String sql = "insert into good_student_infos value('"+String.valueOf(row.get(0))+"','"+
Integer.valueOf(String.valueOf(row.get(1)))+"','"+
Integer.valueOf(String.valueOf(row.get(2)))+"')";
Class.forName("com.mysql.jdbc.Driver");
Connection conn = null;
Statement stmt = null;
try {
conn = DriverManager.getConnection("jdbc:mysql://spark1:3306/testdb","","");
stmt = conn.createStatement();
stmt.executeUpdate(sql);
}catch (Exception e){
e.printStackTrace();
}finally {
stmt.close();
conn.close();
}
}
});

sc.close();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: