您的位置:首页 > 数据库

Spark-Sql

2016-01-05 23:42 393 查看
将RDD转化成SchemaRDD:

1.4的API(事例代码,官网有)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._

// Define the schema using a case class.

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,

// you can use custom classes that implement the Product interface.

case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.

val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.

// The columns of a row in the result can be accessed by field index:

teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// or by field name:

teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]

teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)

// Map("name" -> "Justin", "age" -> 19)
1.1的API(官网有):

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.

import sqlContext.createSchemaRDD

// Define the schema using a case class.

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,

// you can use custom classes that implement the Product interface.

case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))

people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.

val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.

// The columns of a row in the result can be accessed by ordinal.

teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
以上用到了SQLContext其实用HiveContext也可以,根据使用需要来.
随手记:

1.HiveContext的cacheTable能够将临时表保存到内存中,如果需要重复读取数据的时候,将需要重复读取的表放进内存里,这个方法和RDD的cache方法作用差不多.
2.parquet相关资料:https://blog.twitter.com/2013/dremel-made-simple-with-parquet

spark-sql配置hive存储原数据:
#解压官网的spark编译包,虽然官网说默认编译没有编译hive支持,但是事实上已经支持了,写这个文档的时候spark到了1.4.1,默认支持0.13版本的hive。更高级版本的hive可以兼容,但是在saprk-sql还不能使用高级版本hive的功能。
#如果没有编译,那就自己编译,在mvn编译中使用如下语句:

mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package

[/code]

#只需将hive中的hive-site.xml配置复制一份到${SPARK_HOME}/conf目录下就能够直接使用hive存储元数据了。
cp ${HIVE_HOME}/conf/hive-site.xml ${SPARK_HOME}/conf
#接着给spark增加classpath,编辑spark-en.sh设置增加如下:
vim ${SPARK_HOME}/conf/spark-env.sh
#增加如下:
export SPARK_CLASSPATH=${SPARK_HOME}/lib/*
#在${SPARK_CLASSPATH}中增加mysql驱动,直接创建软链接
ln -s ${HIVE_HOME}/lib/mysql-connector-java-5.1.26-bin.jar ${SPARK_HOME}/lib/mysql-connector-java-5.1.26-bin.jar

指令说明:
set; #查看spark-sql里面的所有参数。

参数记录:

spark-sql完全分布式配置hive做内核的时候不用每个节点都配置,只要配置master节点就行了。

spark-sql和hbase整合:
hive可以和hbase整合,在spark-sql中同样能操作整合后的hive和hbase。只要把整合后配置好的hive-site.xml放到spark的conf中就可以了,如果整合后spark-sql中操作hbase报错,可以尝试直接启动spark之后,将参数在启动后的spark-sql中set进去。
set hbase.zookeeper.quorum = slave01,slave02,slave03,slave04,slave05,slave11;
set hbase.zookeeper.property.clientPort = 2222;
https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: