Spark Streaming之使用Spark Streaming完成词频统计,并将结果写入到MySQL数据库中
2018-02-17 23:26
423 查看
package com.yys.spark.project
import java.sql.DriverManager
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 使用Spark Streaming完成词频统计,并将结果写入到MySQL数据库中
*/
object ForeachRDDApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.socketTextStream("spark01", 9999)
val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
//result.print() //此处仅仅是将统计结果输出到控制台
//TODO... 将结果写入到MySQL
// result.foreachRDD(rdd =>{
// val connection = createConnection() // executed at the driver
// rdd.foreach { record =>
// val sql = "insert into wordcount(word, wordcount) values('"+record._1 + "'," + record._2 +")"
// connection.createStatement().execute(sql)
// }
// })
result.print()
result.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
val connection = createConnection()
partitionOfRecords.foreach(record => {
val sql = "insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")"
connection.createStatement().execute(sql)
})
connection.close()
})
})
ssc.start()
ssc.awaitTermination()
}
/**
* 获取MySQL的连接
*/
def createConnection() = {
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://192.168.20.128:3306/yys_spark", "root", "Root-123")
}
}
import java.sql.DriverManager
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 使用Spark Streaming完成词频统计,并将结果写入到MySQL数据库中
*/
object ForeachRDDApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.socketTextStream("spark01", 9999)
val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
//result.print() //此处仅仅是将统计结果输出到控制台
//TODO... 将结果写入到MySQL
// result.foreachRDD(rdd =>{
// val connection = createConnection() // executed at the driver
// rdd.foreach { record =>
// val sql = "insert into wordcount(word, wordcount) values('"+record._1 + "'," + record._2 +")"
// connection.createStatement().execute(sql)
// }
// })
result.print()
result.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
val connection = createConnection()
partitionOfRecords.foreach(record => {
val sql = "insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")"
connection.createStatement().execute(sql)
})
connection.close()
})
})
ssc.start()
ssc.awaitTermination()
}
/**
* 获取MySQL的连接
*/
def createConnection() = {
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://192.168.20.128:3306/yys_spark", "root", "Root-123")
}
}
相关文章推荐
- sparkstreaming之使用Spark Streaming完成有状态统计
- 利用Spark解析Tomcat日志,并将统计结果存入Mysql数据库
- Spark-StructStreaming-计算结果写入到文本文件
- Spark Streaming从Kafka自定义时间间隔内实时统计行数、TopN并将结果存到hbase中
- spark-python从hdfs文本数据(美国历年出生人数)统计然后把结果数据写入mysql
- 记录一下SparkStreaming中因为使用redis做数据验证而导致数据结果不对的问题
- 将Hive统计分析结果导入到MySQL数据库表中(二)——使用Hive和MySQL JDBC驱动
- Spark Streaming从Kafka中拉取数据,并且使用过“窗口函数”统计一些流量信息
- 利用Spark解析Tomcat日志,并将统计结果存入Mysql数据库
- 将Hive统计分析结果导入到MySQL数据库表中(三)——使用Hive UDF或GenericUDF
- Spark Streaming从Kafka自定义时间间隔内实时统计行数、TopN并将结果存到hbase中
- 在Spark中使用UDF对HIVE表进行查询,再将查询结果RDD写入另一个HIVE表
- 使用Spark、Ansj分词进行词频统计
- 使用hadoop实现IP个数统计~并将结果写入数据库
- Spark查找某个IP的归属地,二分算法,try{}catch{}的使用,将结果存MySQL数据库
- SparkSQL结合SparkStreaming,使用SQL完成实时计算中的数据统计
- 使用SQL统计某个表每天的总量和增量 mysql数据库
- SparkSQL和Spark Streaming结合统计热词
- 大数据IMF传奇行动绝密课程第118课:Spark Streaming性能优化:如何获得和持续使用足够的集群计算资源
- SparkStreaming updateStateByKey 使用