Kafka+Spark Streaming+Redis Spark streaming实时读取kafka中数据完成wordcount并写入redis中
2019-07-23 10:54
2216 查看
版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_42520986/article/details/96971199
集群或虚拟机提前安装好spark、kafka和redis
windows环境安装好scala
用idea创建maven工程
spark版本2.2.0
pom文件内容如下:
[code]<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>spark</groupId> <artifactId>sparklearning</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.11.7</scala.version> <spark.version>2.2.0</spark.version> <!--<hadoop.version>2.6.0</hadoop.version>--> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> </dependencies> <build> <sourceDirectory>${basedir}/src/main/scala</sourceDirectory> <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory> <resources> <resource> <directory>${basedir}/src/main/resources</directory> </resource> </resources> <testResources> <testResource> <directory>${basedir}/src/test/resources</directory> </testResource> </testResources> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <configuration> <createDependencyReducedPom>true</createDependencyReducedPom> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <includes> <include>*:*</include> </includes> </artifactSet> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer"> <resource>log4j.properties</resource> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
启动kafka
[code]bin/kafka-server-start.sh config/server.properties
在集群或者虚拟机中创建一个topic,取名wordcount2
我的机器名字为storm
[code]bin/kafka-topics.sh --create --zookeeper storm:2181 --replication-factor 1 --partitions 3 --topic wordcount2
打开idea
创建 StatefulKafkaWordCountNew.scala的object文件
[code]import org.apache.log4j.{Level, Logger} import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import redis.RedisClient import redis.clients.jedis.Jedis object StatefulKafkaWordCountNew { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) /** * 第一个参数:聚合的key,就是单词 * 第二个参数,当前批次该单词出现的次数 * 第三个参数,初始值或累加中间结果 */ val updateFunc =(iter: Iterator[(String, Seq[Int],Option[Int])]) => { //iter.map(t => (t._1,t._2.sum + t._3.getOrElse(0))) iter.map{case(x, y, z) => (x, y.sum+z.getOrElse(0))} } def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("KafkaWordCountNew").setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(3)) //如果要使用可更新历史数据(累加历史数据),那么就要把中间结果保存起来 ssc.che 3ff7 ckpoint("ck-streaming") val zkQuorum = "storm:2181" val groupId = "wordcountnew" val topic = Map[String, Int]("wordcount2" ->1) //创建DStream,需要KafkaDStream val data: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topic) val lines: DStream[String] = data.map(_._2) val words: DStream[String] = lines.flatMap(_.split(" ")) val wordAndOne: DStream[(String, Int)] = words.map((_, 1)) val reduced: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism),true) //将结果写到redis中 reduced.foreachRDD({rdd => rdd.foreachPartition({it => //将一个分区里的数据一条一条写出 it.foreach({wordCount => //建立redis的客户端连接 val jedis: Jedis = RedisClient.pool.getResource jedis.auth("DEVElop123")//redis的密码 jedis.hincrBy("wordCount",wordCount._1,wordCount._2) }) }) }) reduced.print() ssc.start() ssc.awaitTermination() } }
创建RedisClient.scala的object文件
[code]import org.apache.commons.pool2.impl.GenericObjectPoolConfig import redis.clients.jedis.JedisPool object RedisClient extends Serializable { val redisHost = "fri.robot.kafka"//redis的host名称 val redisPort = 6379//端口 val redisTimeOut = 30000 lazy val pool = new JedisPool(new GenericObjectPoolConfig(),redisHost, redisPort, redisTimeOut) lazy val hook = new Thread { override def run = { println("Execute hook thread: "+this) pool.destroy() } } sys.addShutdownHook(hook.run) }
打开xshell,在kafka的wordcount2主题中创建消费者产生数据
[code]kafka-console-producer.sh --broker-list storm:9092 --topic wordcount2
输入hello hello spark spark spark,等idea程序跑起来后回车
运行idea中 StatefulKafkaWordCountNew
再次输入另外一个新批次,产生新的数据
这次输入hello hello USTB USTB
再看idea中的程序运行控制台
最后查看redis中是否记录了数据
到此本过程就完成了
相关文章推荐
- java实现kafka整合spark streaming完成wordCount,updateStateByKey完成实时状态更新
- 在idea上用SparkStreaming实现从远程socket读取数据并完成Wordcount
- Kafka和Spark Streaming Java版本集成并将数据实时写入HBase及代码
- Spark Streaming 读取Kafka数据写入Elasticsearch
- kafka常用命令&&flume和kafka整合&&用spark消费kafka中的数据实现wordcount&&将处理好的数据存到redis中
- 用spark streaming实时读取hdfs数据并写入elasticsearch中
- Kafka和Spark Streaming Java版本集成并将数据实时写入HBase
- SparkStreaming+Kafka 实现基于缓存的实时wordcount程序
- spark streaming 接收 kafka 数据java代码WordCount示例
- SparkStreaming读取kafka数据进行反序列化以及mapPartition优化实例
- spark Streaming实时写入数据到HBase
- Spark Streaming 04 整合flume&kafka完成数据采集
- kafka + spark streaming 实时读取计算 nginx 日志,存储结果到 mongodb/mysql
- sparkstreaming整合kafka参数设置,message偏移量写入redis
- SparkStreaming读取Kafka Json格式数据
- sparkstreaming整合kafka参数设置,message偏移量写入redis
- Spark Streaming整合logstash + Kafka wordCount
- Kafka+Spark Streaming+Redis实时计算整合实践
- SparkStreaming+Kafka 处理实时WIFI数据
- flume-kafka- spark streaming(pyspark) - redis 实时日志收集实时计算 + Spark 基于pyspark下的实时日志分析