sparkSQL用jdbc访问mysql
2017-03-17 17:51
274 查看
maven:
<!-- spark --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.0</version> </dependency> <!-- google工具类 --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>18.0</version> </dependency>
public class JDBCDataSource { public static void main(String[] args) { /** * 设置mysql 无密访问 ,spark1为服务器hostname * grant all on testdb " to "@'spark1' with grant option; * flush privileges; */ SparkConf conf = new SparkConf().setAppName("DataFrameDynamic").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); Map<String, String> 4000 ; options = Maps.newHashMap(); options.put("url","jdbc:mysql://spark1:3306/testdb"); options.put("dbtable","student_infos"); DataFrame studentInfosDF = sqlContext.read().format("jdbc").options(options).load(); options.put("dbtable","student_scores"); DataFrame studentScoresDF = sqlContext.read().format("jdbc").options(options).load(); JavaPairRDD<String, Tuple2<Integer, Integer>> studentsRDD = studentInfosDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() { public Tuple2<String, Integer> call(Row row) throws Exception { return new Tuple2<String, Integer>(row.getString(0), Integer.valueOf(String.valueOf(row.get(1)))); } }).join(studentScoresDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() { public Tuple2<String, Integer> call(Row row) throws Exception { return new Tuple2<String, Integer>(row.getString(0), Integer.valueOf(String.valueOf(row.get(1)))); } })); //将JavaPairRDD转换为JavaRDD<Row> JavaRDD<Row> studentsRowRDD = studentsRDD.map(new Function<Tuple2<String, Tuple2<Integer, Integer>>, Row>() { public Row call(Tuple2<String, Tuple2<Integer, Integer>> v1) throws Exception { return RowFactory.create(v1._1, v1._2._1, v1._2._2); } }); JavaRDD<Row> filterStudentsRowRDD = studentsRowRDD.filter(new Function<Row, Boolean>() { public Boolean call(Row v1) throws Exception { if (v1.getInt(2) > 80) { return true; } return false; } }); //转化为DataFrame ArrayList<StructField> fields = Lists.newArrayList(); fields.add(DataTypes.createStructField("name",DataTypes.StringType,true)); fields.add(DataTypes.createStructField("age",DataTypes.IntegerType,true)); fields.add(DataTypes.createStructField("scores",DataTypes.IntegerType,true)); StructType structType = DataTypes.createStructType(fields); DataFrame studentsDF = sqlContext.createDataFrame(filterStudentsRowRDD, structType); //将DataFrame数据保存到mysql表中 studentsDF.javaRDD().foreach(new VoidFunction<Row>() { public void call(Row row) throws Exception { String sql = "insert into good_student_infos value('"+String.valueOf(row.get(0))+"','"+ Integer.valueOf(String.valueOf(row.get(1)))+"','"+ Integer.valueOf(String.valueOf(row.get(2)))+"')"; Class.forName("com.mysql.jdbc.Driver"); Connection conn = null; Statement stmt = null; try { conn = DriverManager.getConnection("jdbc:mysql://spark1:3306/testdb","",""); stmt = conn.createStatement(); stmt.executeUpdate(sql); }catch (Exception e){ e.printStackTrace(); }finally { stmt.close(); conn.close(); } } }); sc.close(); } }
相关文章推荐
- sparkSQL用jdbc访问mysql
- sparkSQL用jdbc访问mysql
- sparkSQL用jdbc访问mysql
- sparkSQL用jdbc访问mysql
- sparkSQL用jdbc访问mysql
- sparkSQL用jdbc访问mysql
- sparkSQL用jdbc访问mysql
- sparkSQL用jdbc访问mysql
- sparkSQL用jdbc访问mysql
- sparkSQL用jdbc访问mysql
- sparkSQL用jdbc访问mysql
- sparkSQL用jdbc访问mysql
- sparkSQL用jdbc访问mysql
- sparkSQL用jdbc访问mysql
- sparkSQL用jdbc访问mysql
- sparkSQL用jdbc访问mysql
- sparkSQL用jdbc访问mysql
- sparkSQL用jdbc访问mysql
- 在Eclipse中新建web工程访问MySQL出现 java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
- JDBC访问Mysql进行读写分离测试