您的位置:首页 > 数据库 > MySQL

Spark Streaming之使用Spark Streaming完成词频统计,并将结果写入到MySQL数据库中

2018-02-17 23:26 423 查看
package com.yys.spark.project

import java.sql.DriverManager

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

/**

  * 使用Spark Streaming完成词频统计,并将结果写入到MySQL数据库中

  */

object ForeachRDDApp {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")

    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val lines = ssc.socketTextStream("spark01", 9999)

    val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

    //result.print()  //此处仅仅是将统计结果输出到控制台

    //TODO... 将结果写入到MySQL

    //    result.foreachRDD(rdd =>{

    //      val connection = createConnection()  // executed at the driver

    //      rdd.foreach { record =>

    //        val sql = "insert into wordcount(word, wordcount) values('"+record._1 + "'," + record._2 +")"

    //        connection.createStatement().execute(sql)

    //      }

    //    })

    result.print()

    result.foreachRDD(rdd => {

      rdd.foreachPartition(partitionOfRecords => {

        val connection = createConnection()

        partitionOfRecords.foreach(record => {

          val sql = "insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")"

          connection.createStatement().execute(sql)

        })

        connection.close()

      })

    })

    ssc.start()

    ssc.awaitTermination()

  }

  /**

    * 获取MySQL的连接

    */

  def createConnection() = {

    Class.forName("com.mysql.jdbc.Driver")

    DriverManager.getConnection("jdbc:mysql://192.168.20.128:3306/yys_spark", "root", "Root-123")

  }

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐