spark streaming 写入db,hdfs
2016-03-21 10:57
405 查看
转http://blog.csdn.net/zhong_han_jun/article/details/50813981
package main.java import java.sql.Connection import com.jolbox.bonecp.{BoneCP, BoneCPConfig} import org.slf4j.LoggerFactory /** * Created on 2016-3-15. */ object ConnectionPool { val logger = LoggerFactory.getLogger(this.getClass) private val connectionPool = { try{ Class.forName("oracle.jdbc.driver.OracleDriver")//com.mysql.jdbc.Driver val config = new BoneCPConfig() config.setJdbcUrl("jdbc:oracle:thin:@xxx:1521:fzkfs")//jdbc:mysql://xxx:3306/test config.setUsername("xxx") config.setPassword("xxx") config.setLazyInit(true) config.setMinConnectionsPerPartition(3) config.setMaxConnectionsPerPartition(5) config.setPartitionCount(5) config.setCloseConnectionWatch(true) config.setLogStatementsEnabled(false) Some(new BoneCP(config)) } catch { case exception:Exception=> logger.warn("Error in creation of connection pool"+exception.printStackTrace()) None } } def getConnection:Option[Connection] ={ connectionPool match { case Some(connPool) => Some(connPool.getConnection) case None => None } } def closeConnection(connection:Connection): Unit = { if(!connection.isClosed) { connection.close() } } }
package main.java import java.sql.Connection import java.text.SimpleDateFormat import java.util.Date import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} import org.slf4j.LoggerFactory /** * Created on 2016-3-15. */ object StreamingToDB { val logger = LoggerFactory.getLogger(this.getClass) //PropertyConfigurator.configure("log4j.properties") def main(args: Array[String]) { if (args.length < 4) { System.err.println("Usage: StreamingToDB <zkQuorum> <group> <topics> <numThreads>") System.exit(1) } val sql ="insert into SPATK_TEST(time,context) values(?,?)" val Array(zkQuorum, group, topics, numThreads) = args //log4j.logger.org.apache.spark.rpc.akka.ErrorMonitor=FATAL val sparkConf = new SparkConf().set("log4j.logger.org.apache.spark.rpc.akka.ErrorMonitor","FATAL").setAppName("ErrorLogtoDB").setMaster("local[2]") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(1)) val totalcounts = sc.accumulator(0L,"Total count") val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val rowrdd = lines.filter(_.contains("ERROR")).map[(Date, String)](eventRecord => { //[2016-03-03 10:54:33 ERROR] {DevAppWebDaoImpl.java:317}-For input string: "sd" val time = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").parse(eventRecord.substring(eventRecord.indexOf('[') + 1, eventRecord.indexOf(']')).trim) logger.info("time:====="+time+"-------eventRecord:==="+eventRecord) System.out.println("time:====="+time+"-------eventRecord:==="+eventRecord) (time, eventRecord.toString) }) rowrdd.foreachRDD(rdd =>if(rdd.count()!=0){ rdd.foreachPartition( res =>{ if(!res.isEmpty){ val connection = ConnectionPool.getConnection.getOrElse(null) res.foreach(r=>{ insertIntoMySQL(connection, sql,r._1,r._2) logger.info("finish time:====="+r._1+"-------eventRecord:==="+r._2) } ) ConnectionPool.closeConnection(connection) } } ) }) //写入hdfs // rowrdd.foreachRDD(rdd =>if(rdd.count()!=0){ // val out = rdd.map(_._1).first().toString // rdd.map(_._2).saveAsTextFile("/user/root/hive/jboss/"+out) ssc.start() ssc.awaitTermination() } def insertIntoMySQL(con:Connection,sql:String,ftime:Date,fcotext:String): Unit ={ // println(data.toString) try { val ps = con.prepareStatement(sql) ps.setString(1, ftime.toString) ps.setString(2, fcotext) ps.executeUpdate() ps.close() }catch{ case exception:Exception=> logger.error("Error in execution of query "+exception.getMessage+"\n--------\n"+exception.printStackTrace()+"\n--------------") } } def selectIntoMySQL(con:Connection,sql:String,ftime:Date,fcotext:String): Unit ={ // println(data.toString) try { val sql ="select * from SPATK_TEST t" val ps = con.prepareStatement(sql) val res = ps.executeQuery() while(res.next()){ //ѭ System.out.println(""+res.getString(1)); } ps.close() }catch{ case exception:Exception=> logger.error("Error in execution of query "+exception.getMessage+"\n--------\n"+exception.printStackTrace()+"\n--------------") } } }
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- eclipse 开发 spark Streaming wordCount
- Understanding Spark Caching
- ClassNotFoundException:scala.PreDef$
- Windows 下Spark 快速搭建Spark源码阅读环境
- Spark中将对象序列化存储到hdfs
- Spark初探
- Spark Streaming初探
- Spark本地开发环境搭建
- 搭建hadoop/spark集群环境
- Spark HA部署方案
- Spark HA原理架构图
- spark内存概述
- Spark Shuffle之Hash Shuffle
- Spark Shuffle之Sort Shuffle
- Spark Shuffle之Tungsten Sort Shuffle
- 编译Spark 1.5.2