您的位置:首页 > 数据库

SparkSQL DataFrames操作

2015-03-03 15:41 113 查看
Hive中已经存在emp和dept表:

select * from emp;
+--------+---------+------------+-------+-------------+---------+---------+---------+
| empno  |  ename  |    job     |  mgr  |  hiredate   |   sal   |  comm   | deptno  |
+--------+---------+------------+-------+-------------+---------+---------+---------+
| 7369   | SMITH   | CLERK      | 7902  | 1980-12-17  | 800.0   | NULL    | 20      |
| 7499   | ALLEN   | SALESMAN   | 7698  | 1981-2-20   | 1600.0  | 300.0   | 30      |
| 7521   | WARD    | SALESMAN   | 7698  | 1981-2-22   | 1250.0  | 500.0   | 30      |
| 7566   | JONES   | MANAGER    | 7839  | 1981-4-2    | 2975.0  | NULL    | 20      |
| 7654   | MARTIN  | SALESMAN   | 7698  | 1981-9-28   | 1250.0  | 1400.0  | 30      |
| 7698   | BLAKE   | MANAGER    | 7839  | 1981-5-1    | 2850.0  | NULL    | 30      |
| 7782   | CLARK   | MANAGER    | 7839  | 1981-6-9    | 2450.0  | NULL    | 10      |
| 7788   | SCOTT   | ANALYST    | 7566  | 1987-4-19   | 3000.0  | NULL    | 20      |
| 7839   | KING    | PRESIDENT  | NULL  | 1981-11-17  | 5000.0  | NULL    | 10      |
| 7844   | TURNER  | SALESMAN   | 7698  | 1981-9-8    | 1500.0  | 0.0     | 30      |
| 7876   | ADAMS   | CLERK      | 7788  | 1987-5-23   | 1100.0  | NULL    | 20      |
| 7900   | JAMES   | CLERK      | 7698  | 1981-12-3   | 950.0   | NULL    | 30      |
| 7902   | FORD    | ANALYST    | 7566  | 1981-12-3   | 3000.0  | NULL    | 20      |
| 7934   | MILLER  | CLERK      | 7782  | 1982-1-23   | 1300.0  | NULL    | 10      |
+--------+---------+------------+-------+-------------+---------+---------+---------+

select * from dept;
+---------+-------------+-----------+
| deptno  |    dname    |    loc    |
+---------+-------------+-----------+
| 10      | ACCOUNTING  | NEW YORK  |
| 20      | RESEARCH    | DALLAS    |
| 30      | SALES       | CHICAGO   |
| 40      | OPERATIONS  | BOSTON    |
+---------+-------------+-----------+


DataFrame常用功能测试

val hc = new org.apache.spark.sql.hive.HiveContext(sc)
val emp = hc.table("emp")    //根据hive表创建DataFrame

emp.dtypes.foreach(println)   //查看所有字段名称和类型
(empno,IntegerType)
(ename,StringType)
(job,StringType)
(mgr,IntegerType)
(hiredate,StringType)
(sal,DoubleType)
(comm,DoubleType)
(deptno,IntegerType)

emp.columns.foreach(println)  //查看所有字段名称
empno
ename
job
mgr
hiredate
sal
comm
deptno

emp.printSchema    //打印schema信息
root
|-- empno: integer (nullable = true)
|-- ename: string (nullable = true)
|-- job: string (nullable = true)
|-- mgr: integer (nullable = true)
|-- hiredate: string (nullable = true)
|-- sal: double (nullable = true)
|-- comm: double (nullable = true)
|-- deptno: integer (nullable = true)

emp.explain  //查看物理执行计划
== Physical Plan ==
HiveTableScan [empno#0,ename#1,job#2,mgr#3,hiredate#4,sal#5,comm#6,deptno#7], (MetastoreRelation default, emp, None), None

emp.show  #默认显示20行
empno ename  job       mgr  hiredate   sal    comm   deptno
7369  SMITH  CLERK     7902 1980-12-17 800.0  null   20
7499  ALLEN  SALESMAN  7698 1981-2-20  1600.0 300.0  30
7521  WARD   SALESMAN  7698 1981-2-22  1250.0 500.0  30
7566  JONES  MANAGER   7839 1981-4-2   2975.0 null   20
7654  MARTIN SALESMAN  7698 1981-9-28  1250.0 1400.0 30
7698  BLAKE  MANAGER   7839 1981-5-1   2850.0 null   30
7782  CLARK  MANAGER   7839 1981-6-9   2450.0 null   10
7788  SCOTT  ANALYST   7566 1987-4-19  3000.0 null   20
7839  KING   PRESIDENT null 1981-11-17 5000.0 null   10
7844  TURNER SALESMAN  7698 1981-9-8   1500.0 0.0    30
7876  ADAMS  CLERK     7788 1987-5-23  1100.0 null   20
7900  JAMES  CLERK     7698 1981-12-3  950.0  null   30
7902  FORD   ANALYST   7566 1981-12-3  3000.0 null   20
7934  MILLER CLERK     7782 1982-1-23  1300.0 null   10

emp.show(10) #显示指定行数

emp.limit(5).show
emp.head(3)
emp.head   #等价于head(1)
emp.first  #等价于head(1)
val emp_as = emp.as("emp_as")   #别名
emp_as.select("empno","ename","deptno").show

#查看指定列:
emp.select("empno","ename","deptno").show
emp.select($"empno",$"ename",$"deptno").show
emp.selectExpr("empno", "ename as name", "substr(ename,0,4)").show     #配合udf使用
emp.select($"empno",$"sal"+100).show  #给sal加100

#条件过滤:
emp.filter("empno>7698").show
emp.filter($"empno" > 7698).show
emp.where($"empno" > 7698).show

#排序:
emp.sort("empno").show  #默认升序
emp.sort($"empno").show
emp.sort("empno").show
emp.sort($"empno".desc).show
emp.sort($"deptno", $"empno".desc).show #多字段排序

emp.orderBy($"empno").show
emp.orderBy($"empno".desc).show
emp.orderBy($"deptno", $"empno".desc).show

#分组:
emp.groupBy("deptno").count.show
emp.groupBy($"deptno").avg().show   #所有的列求平均值
emp.groupBy($"deptno").avg("sal").show   #sal列求平均值
emp.groupBy($"deptno").agg("sal"->"max").show   #sal取最大
emp.groupBy($"deptno").agg("sal"->"min").show   #sal取最小
emp.groupBy($"deptno").agg("sal"->"sum").show   #sal求和
emp.groupBy($"deptno").agg("sal"->"avg").show   #sal求平均值
#agg中能有的方法有: avg/max/min/sum/count

#join:
val dept = hc.table("dept")
dept.show
emp.join(dept,emp.col("deptno") === dept.col("deptno"),"left_outer").show
emp.join(dept,emp.col("deptno") === dept.col("deptno"),"right_outer").show
emp.join(dept,emp.col("deptno") === dept.col("deptno"),"inner").show
emp.join(dept,$"emp.deptno"===$"dept.deptno" ,"inner").select("empno","ename","dname").show


DataFrames结合SQL使用测试

val emp_dept = emp.join(dept,emp.col("deptno") === dept.col("deptno"),"left_outer")
emp_dept.registerTempTable("emp_dept_temp")
hc.sql("select count(*) from emp_dept_temp").collect


DataFrames结合hive和mysql jdbc external datasource使用测试:

mysql中准备数据:

DROP TABLE IF EXISTS `dept`;
CREATE TABLE `dept` (
`deptno` int(11) NOT NULL DEFAULT '0',
`dname` varchar(30) DEFAULT NULL,
`loc` varchar(30) DEFAULT NULL,
PRIMARY KEY (`deptno`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

INSERT INTO `dept` VALUES ('10', 'ACCOUNTING', 'NEW YORK');
INSERT INTO `dept` VALUES ('20', 'RESEARCH', 'DALLAS');
INSERT INTO `dept` VALUES ('30', 'SALES', 'CHICAGO');
INSERT INTO `dept` VALUES ('40', 'OPERATIONS', 'BOSTON');


val hc = new org.apache.spark.sql.hive.HiveContext(sc)
val emp = hc.table("emp")
val dept_jdbc = hc.jdbc("jdbc:mysql://hadoop000:3306/hive?user=root&password=root", "dept")
emp.join(dept_jdbc, emp.col("deptno") === dept_jdbc.col("deptno"), "left_outer").show

empno ename  job       mgr  hiredate   sal    comm   deptno deptno dname      loc
7782  CLARK  MANAGER   7839 1981-6-9   2450.0 null   10     10     ACCOUNTING NEW YORK
7839  KING   PRESIDENT null 1981-11-17 5000.0 null   10     10     ACCOUNTING NEW YORK
7934  MILLER CLERK     7782 1982-1-23  1300.0 null   10     10     ACCOUNTING NEW YORK
7369  SMITH  CLERK     7902 1980-12-17 800.0  null   20     20     RESEARCH   DALLAS
7566  JONES  MANAGER   7839 1981-4-2   2975.0 null   20     20     RESEARCH   DALLAS
7788  SCOTT  ANALYST   7566 1987-4-19  3000.0 null   20     20     RESEARCH   DALLAS
7876  ADAMS  CLERK     7788 1987-5-23  1100.0 null   20     20     RESEARCH   DALLAS
7902  FORD   ANALYST   7566 1981-12-3  3000.0 null   20     20     RESEARCH   DALLAS
7499  ALLEN  SALESMAN  7698 1981-2-20  1600.0 300.0  30     30     SALES      CHICAGO
7521  WARD   SALESMAN  7698 1981-2-22  1250.0 500.0  30     30     SALES      CHICAGO
7654  MARTIN SALESMAN  7698 1981-9-28  1250.0 1400.0 30     30     SALES      CHICAGO
7698  BLAKE  MANAGER   7839 1981-5-1   2850.0 null   30     30     SALES      CHICAGO
7844  TURNER SALESMAN  7698 1981-9-8   1500.0 0.0    30     30     SALES      CHICAGO
7900  JAMES  CLERK     7698 1981-12-3  950.0  null   30     30     SALES      CHICAGO


DataFrames结合parquet和mysql jdbc external datasource使用测试:
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: