Spark Streaming、HDFS结合Spark JDBC External DataSouces处理案例
2015-01-26 14:43
519 查看
场景:使用Spark Streaming接收HDFS上的文件数据与关系型数据库中的表进行相关的查询操作;
使用技术:Spark Streaming + Spark JDBC External DataSources
HDFS上文件的数据格式为:id、name、cityId,分隔符为tab
MySQL的表city结构为:id int, name varchar
本案例的结果为:select s.id, s.name, s.cityId, c.name from student s join city c on s.cityId=c.id;
示例代码:
提交到集群执行脚本:sparkstreaming_hdfs_jdbc.sh
使用技术:Spark Streaming + Spark JDBC External DataSources
HDFS上文件的数据格式为:id、name、cityId,分隔符为tab
1 zhangsan 1 2 lisi 1 3 wangwu 2 4 zhaoliu 3
MySQL的表city结构为:id int, name varchar
1 bj 2 sz 3 sh
本案例的结果为:select s.id, s.name, s.cityId, c.name from student s join city c on s.cityId=c.id;
示例代码:
package com.asiainfo.ocdc case class Student(id: Int, name: String, cityId: Int)
package com.asiainfo.ocdc import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.SparkContext import org.apache.spark.sql.hive.HiveContext /** * Spark Streaming处理HDFS上的数据并结合Spark JDBC外部数据源处理 * * @author luogankun */ object HDFSStreaming { def main(args: Array[String]) { if (args.length < 1) { System.err.println("Usage: HDFSStreaming <path>") System.exit(1) } val location = args(0) val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(5)) val sqlContext = new HiveContext(sc) import sqlContext._ import com.luogankun.spark.jdbc._ //使用External Data Sources处理MySQL中的数据 val cities = sqlContext.jdbcTable("jdbc:mysql://hadoop000:3306/test", "root", "root", "select id, name from city") //将cities RDD注册成city临时表 cities.registerTempTable("city") val inputs = ssc.textFileStream(location) inputs.foreachRDD(rdd => { if (rdd.partitions.length > 0) { //将Streaming中接收到的数据注册成student临时表 rdd.map(_.split("\t")).map(x => Student(x(0).toInt, x(1), x(2).toInt)).registerTempTable("student"); //关联Streaming和MySQL表进行查询操作 sqlContext.sql("select s.id, s.name, s.cityId, c.name from student s join city c on s.cityId=c.id").collect().foreach(println) } }) ssc.start() ssc.awaitTermination() } }
提交到集群执行脚本:sparkstreaming_hdfs_jdbc.sh
#!/bin/sh . /etc/profile set -x cd $SPARK_HOME/bin spark-submit \ --name HDFSStreaming \ --class com.asiainfo.ocdc.HDFSStreaming \ --master spark://hadoop000:7077 \ --executor-memory 1G \ --total-executor-cores 1 \ /home/spark/software/source/streaming-app/target/streaming-app-V00B01C00-SNAPSHOT-jar-with-dependencies.jar \ hdfs://hadoop000:8020/data/hdfs
相关文章推荐
- Spark Streaming、Kafka结合Spark JDBC External DataSouces处理案例
- 第85讲:基于HDFS的SparkStreaming案例实战和内幕源码解密
- 第85课:基于HDFS的SparkStreaming案例实战和内幕源码解密
- Spark Streaming 图片处理案例介绍
- 第85课:基于HDFS的SparkStreaming案例实战和内幕源码解密
- Spark Streaming 实时监控一个HDFS的文件夹,当新的文件进来(名字不能重复),将对新文件进行处理。
- Spark Streaming结合Spark SQL开发案例:电商中不同类别中最热门的商品排名
- IMF传奇行动第85课:Spark Streaming第四课:基于HDFS的Spark Streaming案例实战和内幕源码解密
- 第121课:Spark Streaming性能优化:通过摄像头图像处理案例来说明Spark流处理性能评估新方法及性能调优参数调试
- 第121课: Spark Streaming性能优化:通过摄像头图像处理案例来说明Spark流处理性能评估新方法及性能调优参数测试
- 第97课:Spark Streaming 结合Spark SQL 案例
- Spark Streaming 结合Spark SQL 案例
- 大数据IMF传奇行动绝密课程第85课:基于HDFS的SparkStreaming案例实战和内幕源码解密
- Spark Streaming 图片处理案例介绍
- 自己标注(不注意坑不少)-Spark+Kafka构建实时分析Dashboard案例——步骤三:Spark Streaming实时处理数据
- 基于HDFS的SparkStreaming案例实战和内幕源码解密
- 基于HDFS的SparkStreaming案例实战
- 发表在IBM Developworks上的文章,Spark Streaming 图片处理案例介绍
- 第85课:基于HDFS的SparkStreaming案例实战和内幕源码解密
- Spark 2.2.1 处理HDFS文件数据源的案例与解读