您的位置:首页 > 大数据 > Hadoop

Spark Streaming、HDFS结合Spark JDBC External DataSouces处理案例

2015-01-26 14:43 519 查看
场景:使用Spark Streaming接收HDFS上的文件数据与关系型数据库中的表进行相关的查询操作;

使用技术:Spark Streaming + Spark JDBC External DataSources

HDFS上文件的数据格式为:id、name、cityId,分隔符为tab

1       zhangsan        1
2       lisi    1
3       wangwu  2
4       zhaoliu 3


MySQL的表city结构为:id int, name varchar

1    bj
2    sz
3    sh


本案例的结果为:select s.id, s.name, s.cityId, c.name from student s join city c on s.cityId=c.id;

示例代码:

package com.asiainfo.ocdc

case class Student(id: Int, name: String, cityId: Int)


package com.asiainfo.ocdc

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.HiveContext

/**
* Spark Streaming处理HDFS上的数据并结合Spark JDBC外部数据源处理
*
* @author luogankun
*/
object HDFSStreaming {
def main(args: Array[String]) {

if (args.length < 1) {
System.err.println("Usage: HDFSStreaming <path>")
System.exit(1)
}

val location = args(0)

val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(5))

val sqlContext = new HiveContext(sc)
import sqlContext._

import com.luogankun.spark.jdbc._
//使用External Data Sources处理MySQL中的数据
val cities = sqlContext.jdbcTable("jdbc:mysql://hadoop000:3306/test", "root", "root", "select id, name from city")
//将cities RDD注册成city临时表
cities.registerTempTable("city")

val inputs = ssc.textFileStream(location)
inputs.foreachRDD(rdd => {
if (rdd.partitions.length > 0) {
//将Streaming中接收到的数据注册成student临时表
rdd.map(_.split("\t")).map(x => Student(x(0).toInt, x(1), x(2).toInt)).registerTempTable("student");

//关联Streaming和MySQL表进行查询操作
sqlContext.sql("select s.id, s.name, s.cityId, c.name from student s join city c on s.cityId=c.id").collect().foreach(println)
}
})

ssc.start()
ssc.awaitTermination()
}
}


提交到集群执行脚本:sparkstreaming_hdfs_jdbc.sh

#!/bin/sh
. /etc/profile
set -x

cd $SPARK_HOME/bin

spark-submit \
--name HDFSStreaming \
--class com.asiainfo.ocdc.HDFSStreaming \
--master spark://hadoop000:7077 \
--executor-memory 1G \
--total-executor-cores 1 \
/home/spark/software/source/streaming-app/target/streaming-app-V00B01C00-SNAPSHOT-jar-with-dependencies.jar \
hdfs://hadoop000:8020/data/hdfs
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐