您的位置:首页 > 数据库 > Oracle

SparkSQL读取Oracle数据到kafka中

2020-05-25 14:54 801 查看
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, SparkSession}

object Conn_Oracle {
def main(args: Array[String]): Unit = {

//Spark 连接Oracle数据库
//获取sparkConf
val conf = new SparkConf()
.setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setAppName(this.getClass.getName)

//获取sparksesssion
val session = SparkSession
.builder()
.config(conf)
.getOrCreate()
val sc = session.sparkContext

// --------------------------------连接oracle数据库 ----------------------------------
val reader = session.read.format("jdbc")
.option("url", "jdbc:oracle:thin:@10.10.1.253:1521:bidwdev")
.option("driver", "oracle.jdbc.driver.OracleDriver")
.option("user", "user")
.option("password", "password")
.option("dbtable", "表名字")
val jdbcDf: DataFrame = reader.load()
jdbcDf.show()

//----------------------------------广播变量------------------------------------------------
val kafkaProducer:Broadcast[KafkaSink[String,String]]={
val kafkaProducerConfig={
val p=new Properties()
p.setProperty("bootstrap.servers","10.133.232.57:9092")
// p.setProperty("zookeeper.connect","10.133.232.57:9093")
p.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
p.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
//  p.setProperty("partitioner.class","com.buba.kafka.producer.CustomPartitioner")
p.put("acks", "all")
p.put("retries", "0")
p.put("delivery.timeout.ms","30001")
p.put("request.timeout.ms","30000")
// 请求延时
p.put("linger.ms", "1")
p
}
sc.broadcast(KafkaSink[String,String](kafkaProducerConfig))
}

//将读取的接收到的数据广播分发到Oracle_test这个topic中
jdbcDf.foreachPartition(rdd=>{
rdd.foreach(row => {
kafkaProducer.value.send("Oracle_test", row.toString)
})
})
//  jdbcDf.show()
sc.stop()
session.stop()
}
}

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: