您的位置:首页 > 其它

Spark Streaming 实战(2) kafka+zookeeper+spark streaming 的windows本地测试Demo

2017-05-16 11:14 465 查看
Spark Streaming 实战(2) kafka+zookeeper+spark streaming 的windows本地测试Demo
之前搭建了kafka+zookeeper+spark streaming 的windows本地测试环境,先做一个小demo,spark和本地kafka跑通

1,spark streaming测试代码
public class BeijingStreamingStatsDemo {

public static void main(String[] args) {

/*
* 第一步:配置SparkConf: 1,至少2条线程:因为Spark Streaming应用程序在运行的时候,至少有一条
* 线程用于不断的循环接收数据,并且至少有一条线程用于处理接受的数据(否则的话无法
* 有线程用于处理数据,随着时间的推移,内存和磁盘都会不堪重负);
* 2,对于集群而言,每个Executor一般肯定不止一个Thread,那对于处理Spark Streaming的
* 应用程序而言,每个Executor一般分配多少Core比较合适?根据我们过去的经验,5个左右的
* Core是最佳的(一个段子分配为奇数个Core表现最佳,例如3个、5个、7个Core等);
*
*
*/
SparkConf conf = new SparkConf().setMaster("local")
.setAppName("BeijingStreamingStatsDemo");

/*
* 第二步:创建SparkStreamingContext: 1,这个是SparkStreaming应用程序所有功能的起始点和程序调度的核心
* SparkStreamingContext的构建可以基于SparkConf参数,
* 也可基于持久化的SparkStreamingContext的内容 来恢复过来(典型的场景是Driver崩溃后重新启动,由于Spark
* Streaming具有连续7*24小时不间断运行的特征,
* 所有需要在Driver重新启动后继续上次的状态,此时的状态恢复需要基于曾经的Checkpoint); 2,在一个Spark
* Streaming应用程序中可以创建若干个SparkStreamingContext对象,
* 使用下一个SparkStreamingContext 之前需要把前面正在运行的SparkStreamingContext对象关闭掉,由此,
* 我们获得一个重大的启发SparkStreaming框架也只是 Spark Core上的一个应用程序而已,只不过Spark
* Streaming框架箱运行的话需要Spark工程师写业务逻辑处理代码;
*/
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(10));
jsc.checkpoint("data/BeijingCheckpoint");
/*
* 第三步:创建Spark Streaming输入数据来源input Stream:
* 1,数据输入来源可以基于File、HDFS、Flume、Kafka、Socket等 2,
* 在这里我们指定数据来源于网络Socket端口,Spark Streaming连接上该端口并在运行的时候一直监听该端口
* 的数据(当然该端口服务首先必须存在),并且在后续会根据业务需要不断的有数据产生(当然对于Spark Streaming
* 应用程序的运行而言,有无数据其处理流程都是一样的);
* 3,如果经常在每间隔5秒钟没有数据的话不断的启动空的Job其实是会造成调度资源的浪费,因为并没有数据需要发生计算,所以
* 实例的企业级生成环境的代码在具体提交Job前会判断是否有数据,如果没有的话就不再提交Job; 4,在本案例中具体参数含义:
* 第一个参数是StreamingContext实例;
* 第二个参数是ZooKeeper集群信息(接受Kafka数据的时候会从ZooKeeper中获得Offset等元数据信息)
* 第三个参数是Consumer Group 第四个参数是消费的Topic以及并发读取Topic中Partition的线程数
*/

/**
* 创建Kafka元数据,来让Spark Streaming这个Kafka Consumer利用
*/
Map<String, String> kafkaParameters = new HashMap<String, String>();
kafkaParameters.put("metadata.broker.list", "localhost:9092");

Set<String> topics = new HashSet<String>();
topics.add("AdClicked114");

JavaPairInputDStream<String, String> BeijingStreaming = KafkaUtils.createDirectStream(jsc, String.class,
String.class, StringDecoder.class, StringDecoder.class, kafkaParameters, topics);

BeijingStreaming.print();

/*
* Spark
* Streaming执行引擎也就是Driver开始运行,Driver启动的时候是位于一条新的线程中的,当然其内部有消息循环体,用于
* 接受应用程序本身或者Executor中的消息;
*/
jsc.start();

try {
jsc.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
jsc.close();

}

}


2,本地运行报错
17/05/16 10:59:06 INFO VerifiableProperties: Property zookeeper.connect is overridden to
Exception in thread "main" java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
at kafka.utils.Pool.<init>(Pool.scala:28)
at kafka.consumer.FetchRequestAndResponseStatsRegistry$.<init>(FetchRequestAndResponseStats.scala:60)
at kafka.consumer.FetchRequestAndResponseStatsRegistry$.<clinit>(FetchRequestAndResponseStats.scala)
at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:39)
at org.apache.spark.streaming.kafka.KafkaCluster.connect(KafkaCluster.scala:59)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:364)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:361)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at org.apache.spark.streaming.kafka.KafkaCluster.org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:361)
at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:132)
at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:119)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
at com.dt.beijing.YueMeStreamingStatsDemo.main(YueMeStreamingStatsDemo.java:74)
Caused by: java.lang.ClassNotFoundException: scala.collection.GenTraversableOnce$class
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 17 more

3,  java.lang.ClassNotFoundException 初步分析是scala的版本问题,scala的版本现在是scala 2.1.1.8,替换成scala 2.1.0 版本
scala 2.1.0.4 下载地址 http://www.scala-lang.org/download/2.10.4.html
部署目录G:\1.BeiJingSpark\scala-2.10.4
修改path配置 G:\1.BeiJingSpark\scala-2.10.4\bin;
注销windows 生效。

4,检查scala版本
C:\Windows\System32>scala -version
Scala code runner version 2.10.4 -- Copyright 2002-2013, LAMP/EPFL

5,IDEA中更新scala版本原来的版本



更新以后的版本



6,再次运行spark,报错
17/05/16 13:32:19 INFO VerifiableProperties: Property zookeeper.connect is overridden to
17/05/16 13:32:20 INFO SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException
Exception in thread "main" org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:385)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:385)
at scala.util.Either.fold(Either.scala:98)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:384)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
at com.dt.beijing.YueMeStreamingStatsDemo.main(YueMeStreamingStatsDemo.java:74)

7,zookeeper,kafka本地没有启动,kafka本地运行报错:
at kafka.utils.VerifiableProperties.logger$lzycompute(VerifiableProperti
es.scala:26)
at kafka.utils.VerifiableProperties.logger(VerifiableProperties.scala:26
)
at kafka.utils.Logging$class.info(Logging.scala:67)
at kafka.utils.VerifiableProperties.info(VerifiableProperties.scala:26)
at kafka.utils.VerifiableProperties.verify(VerifiableProperties.scala:21
7)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:32)
at kafka.Kafka$.main(Kafka.scala:35)
at kafka.Kafka.main(Kafka.scala)
log4j:ERROR Either File or DatePattern options are not set for appender [control
lerAppender].
log4j:ERROR setFile(null,true) call failed.
java.io.FileNotFoundException: logs\server.log (系统找不到指定的路径。)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
at java.io.FileOutputStream.<init>(FileOutputStream.java:133)
at org.apache.log4j.FileAppender.setFile(FileAppender.java:294)
at org.apache.log4j.FileAppender.activateOptions(FileAppender.java:165)
at org.apache.log4j.DailyRollingFileAppender.activateOptions(DailyRollin
gFileAppender.java:223)
8,kafka更新本地版本:kafka之前安装的本地版本是\kafka_2.11-0.8.2.0是基于scala 2.1.1.版本的 ,scala更新版本以后,kafka运行出错,kakfa更新版本为
更新版本:kafka_2.10-0.10.2.0
下载地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.10-0.10.2.0.tgz
进入G:\1.BeiJingSpark\kafka_2.10-0.10.2.0\config

编辑文件“server.properties”

log.dirs=G:\1.BeiJingSpark\kafka_2.10-0.10.2.0\kafkalog

9,kafka_2.10-0.10.2.0 更新的版本运行

启动本地zookeeper    G:\1.BeiJingSpark\zookeeper-3.4.6\bin\zkserver
启动本地kafkakafka_2.10-0.10.2.0 
       G:\1.BeiJingSpark\kafka_2.10-0.10.2.0\bin\windows\kafka-server-start.bat G:\1.BeiJingSpark\kafka_2.10-0.10.2.0\config\server.properties 

      测试kafka kafka_2.10-0.10.2.0,新建一个topic
G:\1.BeiJingSpark\kafka_2.10-0.10.2.0\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Topic114

生产数据  G:\1.BeiJingSpark\kafka_2.10-0.10.2.0\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic Topic114
消费数据  
         G:\1.BeiJingSpark\kafka_2.10-0.10.2.0\bin\windows\kafka-console-consumer.bat --zookeeper  localhost:2181  --topic Topic114



10,再次运行spark streaming,运行OK,跟kafka的连接已经ok
17/05/16 14:10:40 INFO ReceivedBlockTracker: Deleting batches:
17/05/16 14:10:40 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Attempting to clear 0 old log files in file:/G:/1.BeiJingSpark/BeiJingGitRepository2017new/regional_offline_quality_analysis_hour/data/YueMeCheckpoint/receivedBlockMetadata older than 1494915030000:
17/05/16 14:10:40 INFO InputInfoTracker: remove old batch metadata: 1494915020000 ms
-------------------------------------------
Time: 1494915050000 ms
-------------------------------------------

17/05/16 14:10:50 INFO JobScheduler: Added jobs for time 1494915050000 ms
17/05/16 14:10:50 INFO JobGenerator: Checkpointing graph for time 1494915050000 ms
17/05/16 14:10:50 INFO DStreamGraph: Updating checkpoint data for time 1494915050000 ms
17/05/16 14:10:50 INFO DStreamGraph: Updated checkpoint data for time 1494915050000 ms
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐