[2.6]Spark SQL 操作各种数据源笔记
2016-06-01 17:10
537 查看
参考
spark官网DT大数据梦工厂
spark sql操作各种数据源的数据流转 :
各种数据源的输入 => RDD(lines) =>RDD(Rows) => DataFrame(注册临时表) => 分析与过滤(各种sql操作、机器学习等)=> RDD(Row) => 各种格式的输出
场景
Spark sql怎么操作各种数据源: json格式的文件、hive、mysql、hbase等分析
以spark sql内置函数的agg的操作为例,解读 sql 数据流转代码
package main.scala import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.functions._ /** * 使用spark sql 内置函数对数据进行分析 * 内置函数返回一个列Column * 分类: * 1、聚合函数 2、集合函数 3、日期、时间函数 4、数学函数 5、开窗函数 6、字符串处理函数 7、其他 */ object sqlagg { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("sparkinnerfunctions") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") val sqlContext = new SQLContext(sc) /* * 1. 获取数据源 - RDD(line) */ val userData = Array( "2016-5-29,001,http://spark.apache.org/,1000", "2016-5-20,001,http://spark.apache.org/,1090", "2016-5-20,001,http://spark.apache.org/,1060", "2016-5-30,002,http://spark.apache.org/,1000", "2016-5-30,003,http://spark.apache.org/,1000", "2016-5-10,003,http://spark.apache.org/,1020", "2016-5-10,003,http://spark.apache.org/,1020", "2016-5-10,003,http://spark.apache.org/,1000" ) var dataRDD = sc.parallelize(userData) /* * 2. 转换成 RDD(Row) */ val rowRDD = dataRDD.map(line => { val splited = line.split(",") Row(splited(0),splited(1),splited(2),splited(3)) }) /* * 3. 指定Row的数据结构 并生成 DataFrame */ val structTypes = StructType( Array( StructField("time",StringType,true), StructField("userid",StringType,true), StructField("url",StringType,true), StructField("amount",StringType,true) ) ) val userDataDF = sqlContext.createDataFrame(rowRDD, structTypes) /* * 4 . 使用Spark SQL提供的内置函数对DataFrame进行操作(需要导入相关隐式转换信息) * :内置函数生成column对象 */ import sqlContext.implicits._ //按日期分类,然后进行聚合操作: 去重 userid, 计算每天的销售总量 userDataDF.groupBy("time").agg('time, countDistinct('userid)).map(row=>Row(row(1),row(2))).collect().foreach(println) userDataDF.groupBy("time").agg('time, sum('amount)).show() } }
执行结果
[2016-5-10,1] [2016-5-20,1] [2016-5-29,1] [2016-5-30,2] +---------+---------+-----------+ | time| time|sum(amount)| +---------+---------+-----------+ |2016-5-10|2016-5-10| 3040.0| |2016-5-20|2016-5-20| 2150.0| |2016-5-29|2016-5-29| 1000.0| |2016-5-30|2016-5-30| 2000.0| +---------+---------+-----------+
一. json
代码
package main.scala import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext /** - spark sql操作 本地json文件 */ object DataFrameOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("DataFram Ops") val sqlContext = new SQLContext(new SparkContext(conf)) val df = sqlContext.read.json("file:///home/hadoop/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json") df.show df.printSchema df.select("name").show df.select("name", "age").show df.select(df("name"),df("age")+10).show df.filter(df("age")>10).show } }
执行结果
+---+----+ |age|name| +---+----+ | 30|Andy| +---+----+ root |-- age: long (nullable = true) |-- name: string (nullable = true) +----+ |name| +----+ |Andy| +----+ +----+---+ |name|age| +----+---+ |Andy| 30| +----+---+ +----+----------+ |name|(age + 10)| +----+----------+ |Andy| 40| +----+----------+ +---+----+ |age|name| +---+----+ | 30|Andy| +---+----+
二. Hive
代码
package cool.pengych.spark.sql; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.hive.HiveContext; /** * SparkSQL OPS on Hive * @author pengyucheng * */ public class SparkSQL2Hive { public static void main(String[] args) { SparkConf config = new SparkConf().setAppName("SparkSQL2Hive"); SparkContext sc = new SparkContext(config); HiveContext hiveContext = new HiveContext(sc); hiveContext.sql("use hive"); hiveContext.sql("DROP TABLE IF EXISTS people"); hiveContext.sql("CREATE TABLE IF NOT EXISTS people(name STRING,age INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'"); /* * (把本地数据加载到Hive数据仓库中(背后实际上发生了数据的拷贝), * 当然也可以通过LOAD DATA INPATH去获取HDFS等上的数据到Hive(此时发生了数据的移动) */ hiveContext.sql("LOAD DATA LOCAL INPATH '/home/pengyucheng/resource/people.txt' INTO TABLE people "); hiveContext.sql("DROP TABLE IF EXISTS peoplescores"); hiveContext.sql("CREATE TABLE IF NOT EXISTS peoplescores(name STRING,score INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'"); hiveContext.sql("LOAD DATA LOCAL INPATH '/home/pengyucheng/resource/peoplescores.txt' INTO TABLE peoplescores"); /* * 通过HiveContext使用join直接基于Hive中的两张表进行操作 */ DataFrame resultDF = hiveContext.sql("SELECT pi.name,pi.age,ps.score FROM people pi JOIN peoplescores ps ON pi.name = ps.name WHERE ps.score > 75"); /* * 通过saveAsTable创建一张 Hive Managerd Table,数据的元数据和数据的具体位置都是由Hive数据仓库 * 进行管理的,当删除该表的时候,数据也会一起被删除(磁盘上的数据不再存在) */ hiveContext.sql("DROP TABLE IF EXISTS peopleinformationresult"); resultDF.saveAsTable("peopleinformationresult"); /* * 使用HiveContext的table方法可以直接读取Hive数据仓库中的Table并生成DataFrame,接下来就可以进行机器学习、图计算 * 等各种复杂ETL操作 */ DataFrame dataFromHive = hiveContext.table("peopleinformationresult"); dataFromHive.show(); } }
SparkSQL JDBC 2 Mysql
package cool.pengych.spark.sql; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.DataFrameReader; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; /** * SparkSQL 通过 JDBC 操作 MySQL数据库 * @author pengyucheng * */ public class SparkSQLJDBC2MySQL { public static void main(String[] args) { SparkConf config = new SparkConf().setMaster("local[*]").setAppName("SparkSQLJDBC2MySQL"); SparkContext sc = new SparkContext(config); sc.addJar("/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/mysql-connector-java-5.1.39-bin.jar"); SQLContext sqlContext = new SQLContext(sc); /* * 1、连接数据库:通过format(“jdbc”)的方式说明SparkSQL操作的数据来源是通过JDBC获得,JDBC后端 * 一般都是数据库,eg、MySQL; 传递相关DB链接信息 */ DataFrameReader reader = sqlContext.read().format("jdbc"); reader.option("url", "jdbc:mysql://112.74.21.122:3306/hive"); reader.option("driver", "com.mysql.jdbc.Driver"); reader.option("dbtable", "spark"); reader.option("user", "hive"); reader.option("password", "hive"); /* * 2、加载相关数据 */ DataFrame sparkDF = reader.load(); reader.option("dbtable", "hadoop"); DataFrame hadoopDF = reader.load(); /* * 3、用Spark core组织待处理的数据:这里以进行join操作(DataFrame转换成 RDD后进行)为例 */ JavaPairRDD<String, Tuple2<Integer, Integer>> resultRDD = sparkDF.javaRDD().mapToPair(new PairFunction<Row, String,Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String,Integer> call(Row row) throws Exception { return new Tuple2<String, Integer>(String.valueOf(row.getAs("name")), (Integer) row.getAs("age")); } }).join(hadoopDF.javaRDD().mapToPair(new PairFunction<Row,String,Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String,Integer> call(Row row) throws Exception { return new Tuple2<String, Integer>(String.valueOf(row.getAs("name")), (Integer)row.getAs("score")); } })); /* * 4、将组织好的数据交给 DataFrame 做业务处理 - 可以利用 Spark SQL 、Core、ML等进行复杂的操作!!! */ // 获取 JavaRDD<Row> JavaRDD<Row> resultRowRDD = resultRDD.map(new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() { private static final long serialVersionUID = 1L; @Override public Row call(Tuple2<String, Tuple2<Integer, Integer>> tuple) throws Exception { return RowFactory.create(tuple._1,tuple._2._1,tuple._2._2); } }); //构建StructType,用于最后DataFrame元数据的描述 List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true)); fields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true)); StructType type = DataTypes.createStructType(fields); DataFrame personsDF = sqlContext.createDataFrame(resultRowRDD, type); // 具体业务处理 - 这里只是简单的 show 一下 System.out.println("========================业务处理开始:ML,图计算等工具处理================="); System.out.println("==== start showing ===="); personsDF.show(); System.out.println("========================业务处理结束:ML,图计算等工具处理================="); /* * 5、保存处理后的数据:可以存放到hive,db等数据仓库中 */ personsDF.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() { private static final long serialVersionUID = 1L; @Override public void call(Iterator<Row> iterator) throws SQLException { Connection conn = null; StringBuilder sql = new StringBuilder("INSERT INTO dfperson VALUES ( "); while(iterator.hasNext()) { Row row = iterator.next(); sql.append(String.valueOf(row.getAs("name"))).append(",").append(row.getInt(1)).append(",").append(row.getInt(2)); } sql.append(")"); try { conn = DriverManager.getConnection("jdbc:mysql://112.74.21.122:3306/hive", "hive", "hive"); boolean flag = conn.createStatement().execute(sql.toString()); } catch (SQLException e) { e.printStackTrace(); } finally { if(null != conn) conn.close(); } } }); } }
SparkSQL JDBC 2 ThriftServer
package cool.pengych.spark.sql; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; /** * @author pengyucheng * 通过JDBC访问Thrift Server,进而访问Spark SQL,进而访问Hive * */ public class SparkSQLJDBC2ThriftServer { public static void main(String[] args) throws ClassNotFoundException, SQLException { String sql = "select name from people wher age = ? "; Class.forName("org.apache.hive.jdbc.HiveDriver"); Connection conn = DriverManager.getConnection("jdbc:hive2://localhost:10000/default?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice", "root",""); conn.prepareStatement(sql); PreparedStatement preparedStatement = conn.prepareStatement(sql); preparedStatement.setInt(0, 27); ResultSet rs = preparedStatement.executeQuery(); while(rs.next()) { System.out.println(rs.getString(1)); } conn.close(); } }
SparkSQL 2 Parquet
package cool.pengych.spark.sql; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; /** * Spark SQL操作 Parquet 格式的文件内容 * @author pengyucheng * */ public class SparkSQLParquet { public static void main(String[] args) { /* * 创建SQLContext */ SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLParquet"); JavaSparkContext jsc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(jsc); /* * 注册成为临时表以供后续SQL查询操作 */ DataFrame df = sqlContext.read().parquet("file:///home/hadoop/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/users.parquet"); df.registerTempTable("users"); /* * 进行数据的多维度分析 */ DataFrame result = sqlContext.sql("select name from users"); JavaRDD<String> strs = result.javaRDD().map(new Function<Row, String>() { @Override public String call(Row row) throws Exception { return "The name is :"+row.getAs("name"); } }); /* * 对结果进行处理 */ List<String> listRow = strs.collect(); for (String row : listRow) { System.out.println(row); } } }
总结
用蓬勃的生命力战胜人性的悲剧性!package cool.pengych.spark.sql;相关文章推荐
- oracle-7参数文件的管理
- mysql错误的解决办法
- SQL 左外连接,右外连接,全连接,内连接
- unable create table from sparksql
- MSSql关闭自增列
- MSSql关闭自增列
- MYSQL企业常见架构与调优
- mysql Access denied for user root@localhost错误解决方法总结
- MongoDB番外篇
- SQL Server 2016新特性: In-Memory OLTP
- 通过show status 来优化MySQL数据库
- 查找oracle数据文件、表空间的位置
- Oracle计算时间差
- Oracle增加表空间大小的四种方法
- C#批量插入数据SqlBulkCopy
- sql无限级树型查询
- 【Win 10 应用开发】Sqlite 数据库的简单用法
- 如何修改WAMP中mysql默认空密码 以及修改时报错的处理方法
- mysql处理海量数据时的一些优化查询速度方法
- oracle表分区详解