您的位置:首页 > 大数据 > Hadoop

spark streaming 写入db,hdfs

2016-03-21 10:57 405 查看
http://blog.csdn.net/zhong_han_jun/article/details/50813981

package main.java

import java.sql.Connection

import com.jolbox.bonecp.{BoneCP, BoneCPConfig}
import org.slf4j.LoggerFactory

/**
* Created  on 2016-3-15.
*/
object ConnectionPool {
val logger = LoggerFactory.getLogger(this.getClass)
private val connectionPool = {
try{
Class.forName("oracle.jdbc.driver.OracleDriver")//com.mysql.jdbc.Driver
val config = new BoneCPConfig()
config.setJdbcUrl("jdbc:oracle:thin:@xxx:1521:fzkfs")//jdbc:mysql://xxx:3306/test
config.setUsername("xxx")
config.setPassword("xxx")
config.setLazyInit(true)

config.setMinConnectionsPerPartition(3)
config.setMaxConnectionsPerPartition(5)
config.setPartitionCount(5)
config.setCloseConnectionWatch(true)
config.setLogStatementsEnabled(false)

Some(new BoneCP(config))
} catch {
case exception:Exception=>
logger.warn("Error in creation of connection pool"+exception.printStackTrace())
None
}
}
def getConnection:Option[Connection] ={
connectionPool match {
case Some(connPool) => Some(connPool.getConnection)
case None => None
}
}
def closeConnection(connection:Connection): Unit = {
if(!connection.isClosed) {
connection.close()

}
}
}


package main.java

import java.sql.Connection
import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory

/**
* Created on 2016-3-15.
*/
object StreamingToDB {
val logger = LoggerFactory.getLogger(this.getClass)
//PropertyConfigurator.configure("log4j.properties")
def main(args: Array[String]) {

if (args.length < 4) {
System.err.println("Usage: StreamingToDB <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}

val sql ="insert into SPATK_TEST(time,context) values(?,?)"

val Array(zkQuorum, group, topics, numThreads) = args
//log4j.logger.org.apache.spark.rpc.akka.ErrorMonitor=FATAL
val sparkConf = new SparkConf().set("log4j.logger.org.apache.spark.rpc.akka.ErrorMonitor","FATAL").setAppName("ErrorLogtoDB").setMaster("local[2]")
val sc =  new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(1))
val totalcounts = sc.accumulator(0L,"Total count")

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val rowrdd = lines.filter(_.contains("ERROR")).map[(Date, String)](eventRecord => {
//[2016-03-03 10:54:33 ERROR] {DevAppWebDaoImpl.java:317}-For input string: "sd"
val time = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").parse(eventRecord.substring(eventRecord.indexOf('[') + 1, eventRecord.indexOf(']')).trim)
logger.info("time:====="+time+"-------eventRecord:==="+eventRecord)
System.out.println("time:====="+time+"-------eventRecord:==="+eventRecord)
(time, eventRecord.toString)
})

rowrdd.foreachRDD(rdd =>if(rdd.count()!=0){
rdd.foreachPartition(
res =>{
if(!res.isEmpty){
val connection = ConnectionPool.getConnection.getOrElse(null)
res.foreach(r=>{
insertIntoMySQL(connection, sql,r._1,r._2)
logger.info("finish time:====="+r._1+"-------eventRecord:==="+r._2)
}
)
ConnectionPool.closeConnection(connection)
}
}
)
})

//写入hdfs
//    rowrdd.foreachRDD(rdd =>if(rdd.count()!=0){
//      val out = rdd.map(_._1).first().toString
//      rdd.map(_._2).saveAsTextFile("/user/root/hive/jboss/"+out)

ssc.start()
ssc.awaitTermination()

}

def insertIntoMySQL(con:Connection,sql:String,ftime:Date,fcotext:String): Unit ={
// println(data.toString)
try {
val ps = con.prepareStatement(sql)
ps.setString(1, ftime.toString)
ps.setString(2, fcotext)
ps.executeUpdate()
ps.close()

}catch{
case exception:Exception=>
logger.error("Error in execution of query "+exception.getMessage+"\n--------\n"+exception.printStackTrace()+"\n--------------")
}
}
def selectIntoMySQL(con:Connection,sql:String,ftime:Date,fcotext:String): Unit ={
// println(data.toString)
try {
val sql ="select * from SPATK_TEST t"
val ps = con.prepareStatement(sql)
val res = ps.executeQuery()
while(res.next()){       //ѭ
System.out.println(""+res.getString(1));
}
ps.close()
}catch{
case exception:Exception=>
logger.error("Error in execution of query "+exception.getMessage+"\n--------\n"+exception.printStackTrace()+"\n--------------")
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark