您的位置:首页 > 其它

Spark Streaming实现实时WordCount,DStream的使用,updateStateByKey(func)实现累计计算单词出现频率

2017-07-13 12:00 701 查看

一、 实战

1.用Spark Streaming实现实时WordCount

架构图:



说明:在hadoop1:9999下的nc上发送消息,消费端接收消息,然后并进行单词统计计算。

* 2.安装并启动生成者 *

首先在一台Linux(ip:192.168.10.101)上用YUM安装nc工具

yum install -y nc

启动一个服务端并监听9999端口

nc -lk 9999

2.编写Spark Streaming程序

编写Pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>

<groupId>cn.toto.spark</groupId>
<artifactId>bigdata</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.10.6</scala.version>
<spark.version>1.6.2</spark.version>
<hadoop.version>2.6.4</hadoop.version>
</properties>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.10</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.10</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-make:transitive</arg>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>cn.toto.spark.JdbcRDDDemo</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>


package cn.toto.spark.streams

import org.apache.log4j.{Level, Logger}
import org.apache.spark.Logging

import org.apache.log4j.{Logger, Level}
import org.apache.spark.Logging

object LoggerLevels extends Logging {

def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
logInfo("Setting log level to [WARN] for streaming example." +
" To override add a custom log4j.properties to the classpath.")
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}

package cn.toto.spark

import cn.toto.spark.streams.LoggerLevels
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* Created by toto on 2017/7/13.
*/
object NetworkWordCount {
def main(args: Array[String]) {
//设置日志级别
LoggerLevels.setStreamingLogLevels()
//创建SparkConf并设置为本地模式运行
//注意local[2]代表开两个线程
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
//设置DStream批次时间间隔为5秒
val ssc = new StreamingContext(conf, Seconds(5))
//通过网络读取数据
val lines = ssc.socketTextStream("hadoop1", 9999)
//将读到的数据用空格切成单词
val words = lines.flatMap(_.split(" "))
//将单词和1组成一个pair
val pairs = words.map(word => (word, 1))
//按单词进行分组求相同单词出现的次数
val wordCounts = pairs.reduceByKey(_ + _)
//打印结果到控制台
wordCounts.print()
//开始计算
ssc.start()
//等待停止
ssc.awaitTermination()
}
}


3.启动Spark Streaming程序:由于使用的是本地模式”local[2]”所以可以直接在本地运行该程序

注意: 要指定并行度,如在本地运行设置setMaster(“local[2]”),相当于启动两个线程,一个给receiver,一个给computer。如果是在集群中运行,必须要求集群中可用core数大于1



4.在Linux端命令行中输入单词



5.在IDEA控制台中查看结果



二、DStream的使用

package cn.toto.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* Created by toto on 2017/7/13.
*/
object StreamingWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[2]")
//创建StreamingContext并设置产生批次的间隔时间
val ssc = new StreamingContext(conf, Seconds(5))
//从Socket端口中创建RDD
val lines:ReceiverInputDStream[String] = ssc.socketTextStream("hadoop1",9999)
val words: DStream[String] = lines.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
//打印
result.print()
//开启程序
ssc.start()
//等待结束
ssc.awaitTermination()
}
}


运行结果:



上面的案例中,所有的都是临时计算,然后获得到结果内容,第二次计算的时候结果值不是在上一次基础上进行累加的。下面的案例中将实现累加的效果:

在上述的wordCount案例中,每次在Linux端输入的单词次数都被正确的统计出来,但是结果不能累加,如果需要累加需要使用updateStateByKey(func)来更新状态

package cn.toto.spark

import cn.toto.spark.streams.LoggerLevels
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object NetworkUpdateStateWordCount {

/**
* String : 单词
* Seq[Int] :单词在当前批次出现的次数
* Option[Int] : 历史结果
*/
val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
//iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
iter.flatMap{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))}
}

def main(args: Array[String]) {
LoggerLevels.setStreamingLogLevels()
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkUpdateStateWordCount")
val ssc = new StreamingContext(conf, Seconds(5))
//做checkpoint 写入共享存储中
ssc.checkpoint("E://workspace//netresult")
val lines = ssc.socketTextStream("hadoop1", 9999)
//reduceByKey 结果不累加
//val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
//updateStateByKey结果可以累加但是需要传入一个自定义的累加函数:updateFunc
val results = lines.flatMap(_.split(" "))
.map((_,1)).updateStateByKey(
updateFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism),true)
results.print()
ssc.start()
ssc.awaitTermination()
}

}


在nc上输入内容:



运行结果如下:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: