SparkSQL读取Oracle数据到kafka中
2020-05-25 14:54
801 查看
import java.util.Properties import org.apache.spark.SparkConf import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.{DataFrame, SparkSession} object Conn_Oracle { def main(args: Array[String]): Unit = { //Spark 连接Oracle数据库 //获取sparkConf val conf = new SparkConf() .setMaster("local[*]") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .setAppName(this.getClass.getName) //获取sparksesssion val session = SparkSession .builder() .config(conf) .getOrCreate() val sc = session.sparkContext // --------------------------------连接oracle数据库 ---------------------------------- val reader = session.read.format("jdbc") .option("url", "jdbc:oracle:thin:@10.10.1.253:1521:bidwdev") .option("driver", "oracle.jdbc.driver.OracleDriver") .option("user", "user") .option("password", "password") .option("dbtable", "表名字") val jdbcDf: DataFrame = reader.load() jdbcDf.show() //----------------------------------广播变量------------------------------------------------ val kafkaProducer:Broadcast[KafkaSink[String,String]]={ val kafkaProducerConfig={ val p=new Properties() p.setProperty("bootstrap.servers","10.133.232.57:9092") // p.setProperty("zookeeper.connect","10.133.232.57:9093") p.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer") p.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer") // p.setProperty("partitioner.class","com.buba.kafka.producer.CustomPartitioner") p.put("acks", "all") p.put("retries", "0") p.put("delivery.timeout.ms","30001") p.put("request.timeout.ms","30000") // 请求延时 p.put("linger.ms", "1") p } sc.broadcast(KafkaSink[String,String](kafkaProducerConfig)) } //将读取的接收到的数据广播分发到Oracle_test这个topic中 jdbcDf.foreachPartition(rdd=>{ rdd.foreach(row => { kafkaProducer.value.send("Oracle_test", row.toString) }) }) // jdbcDf.show() sc.stop() session.stop() } }
相关文章推荐
- sparksql 从oracle读取数据然后整合到elasticsearch
- spark读取kafka数据写入hbase
- spark-sql读取映射hbase数据的hive外部表
- 大数据IMF传奇行动绝密课程第100-101课:使用Spark Streaming+Spark SQL+Kafka+FileSystem综合案例
- Java实现SparkSQL Thrift方式读取Hive数据
- SparkSQL读取Hive中的数据
- SparkSQL读取Hive中的数据
- Spark Streaming场景应用-Kafka数据读取方式
- spark streaming读取kafka数据,记录offset
- SparkStreaming读取kafka数据进行反序列化以及mapPartition优化实例
- Spark Streaming 读取Kafka数据写入Elasticsearch
- Oracle java.sql.SQLException: 无法从套接字读取更多的数据
- Kafka+Spark Streaming+Redis Spark streaming实时读取kafka中数据完成wordcount并写入redis中
- Spark Streaming场景应用-Kafka数据读取方式
- spark streaming 读取kafka数据问题
- SQL 数据的导入导出,对远程(MSsql,OracleAccess,)数据库的操作以及读取Excel,txt文件中的数据
- Oracle ORA-3137[12333] 关闭的连接 java.sql.SQLRecoverableException: 无法从套接字读取更多的数据 _optim_peek_user_binds
- SPark SQL 从 DB 读取数据方法和方式 scala
- Spark SQL读取hive数据时报找不到mysql驱动
- 从HBase数据库表中读取数据动态转为DataFrame格式,方便后续用Spark SQL操作(scala实现)