SparkException: org.apache.spark.streaming.dstream.MappedDStream has not been initialized
2016-12-16 16:33
711 查看
在使用故障恢复的时候采用此方法进行业务逻辑进行恢复的时候,所有的业务逻辑应该放在 functionToCreateContext
函数内部才能实现checkpoint目录数据的恢复。
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
//org.spark.streaming.checkpoint.recovery.MyRecoverableNetworkWordCount
object RecoverableKafkaDirectWordCount {
// var checkpointDirectory = "hdfs:///user/spark/streaming_checkpoint"
val formatDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// 将所有的业务逻辑放在该方法中
def createContext( topic: String, checkpointDirectory: String) = {
// If you do not see this printed, that means the StreamingContext has been loaded
// from the new checkpoint
println("------------Creating new context------------")
val sparkConf = new SparkConf().setAppName("RecoverableKafkaDirectWordCount.scala")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint(checkpointDirectory)
// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
kafka + zookeeper,当消息被消费时,会想zk提交当前groupId的consumer消费的offset信息,当consumer再次启动将会从此offset开始继续消费.
在consumter端配置文件中(或者是ConsumerConfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset),有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的开始位置消费所有消息.
val kafkaParams = Map(
"metadata.broker.list" -> " xxx:9092" ,
// "auto.offset.reset" -> "largest",
// "auto.offset.reset" -> "smallest",
"group.id" -> "streaming-group");
val topics = Set(topic)
var kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topics)
val logger = LogFactory.getLog("RecoverableKafkaDirectWordCount.scala")
var batchDur = 4
var windowDuration = 20
var parallelism = 20
// kafkaStream.print()
val forAppDStream = kafkaStream.map(_._2).map { x =>
{
x + "-" + formatDate.format(new Date())
}
}
forAppDStream.print(2)
val forapp = forAppDStream.foreachRDD(locusRDD => {
locusRDD.foreachPartition(
locusPoints => {
val tableName = "test:user"
val myConf = HBaseConfiguration.create()
myConf.set("hbase.zookeeper.quorum", " ")
myConf.set("hbase.zookeeper.property.clientPort", "21810")
val myTable = new HTable(myConf, TableName.valueOf(tableName))
myTable.setAutoFlush(false, false)
myTable.setWriteBufferSize(3 * 1024 * 1024)
import collection.JavaConversions._
var putList = scala.collection.immutable.List[Put]() //
while (locusPoints.hasNext) {
val name = locusPoints.next();
val p = new Put(Bytes.toBytes(name));
p.addColumn("f1".getBytes, "gps_time".getBytes, Bytes.toBytes(formatDate.format(new Date())));
putList = putList.::(p)
if (putList.size % 200 == 0) {
myTable.put(putList)
putList = scala.collection.immutable.List[Put]()
}
}
myTable.put(putList)
myTable.flushCommits()
myTable.close()
})
})
// Update the cumulative count using updateStateByKey
// This will give a Dstream made of state (which is the cumulative count of the words)
ssc
}
def main(args: Array[String]) {
if (args.length != 2) {
System.err.println("You arguments were " + args.mkString("[", ", ", "]"))
System.err.println(
"""
|Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>
| . <hostname> and <port> describe the TCP server that Spark
| Streaming would connect to receive data. <checkpoint-directory> directory to
| HDFS-compatible file system which checkpoint data <output-file> file to which the
| word counts will be appended
|
|In local mode, <master> should be 'local
' with n > 1
|Both <checkpoint-directory> and <output-file> must be absolute paths
""".stripMargin
)
System.exit(1)
}
val Array( topic, checkpointDirectory) = args
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => {
createContext(topic, checkpointDirectory)
})
ssc.start()
ssc.awaitTermination()
}
}
I think you have to put your streaming related logic into the function functionToCreateContext,
you could refer to the related Spark Streaming example RecoverableNetworkWordCountto
change your code.
refer to this link ->https://issues.apache.org/jira/browse/SPARK-6770
函数内部才能实现checkpoint目录数据的恢复。
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
//org.spark.streaming.checkpoint.recovery.MyRecoverableNetworkWordCount
object RecoverableKafkaDirectWordCount {
// var checkpointDirectory = "hdfs:///user/spark/streaming_checkpoint"
val formatDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// 将所有的业务逻辑放在该方法中
def createContext( topic: String, checkpointDirectory: String) = {
// If you do not see this printed, that means the StreamingContext has been loaded
// from the new checkpoint
println("------------Creating new context------------")
val sparkConf = new SparkConf().setAppName("RecoverableKafkaDirectWordCount.scala")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint(checkpointDirectory)
// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
kafka + zookeeper,当消息被消费时,会想zk提交当前groupId的consumer消费的offset信息,当consumer再次启动将会从此offset开始继续消费.
在consumter端配置文件中(或者是ConsumerConfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset),有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的开始位置消费所有消息.
val kafkaParams = Map(
"metadata.broker.list" -> " xxx:9092" ,
// "auto.offset.reset" -> "largest",
// "auto.offset.reset" -> "smallest",
"group.id" -> "streaming-group");
val topics = Set(topic)
var kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topics)
val logger = LogFactory.getLog("RecoverableKafkaDirectWordCount.scala")
var batchDur = 4
var windowDuration = 20
var parallelism = 20
// kafkaStream.print()
val forAppDStream = kafkaStream.map(_._2).map { x =>
{
x + "-" + formatDate.format(new Date())
}
}
forAppDStream.print(2)
val forapp = forAppDStream.foreachRDD(locusRDD => {
locusRDD.foreachPartition(
locusPoints => {
val tableName = "test:user"
val myConf = HBaseConfiguration.create()
myConf.set("hbase.zookeeper.quorum", " ")
myConf.set("hbase.zookeeper.property.clientPort", "21810")
val myTable = new HTable(myConf, TableName.valueOf(tableName))
myTable.setAutoFlush(false, false)
myTable.setWriteBufferSize(3 * 1024 * 1024)
import collection.JavaConversions._
var putList = scala.collection.immutable.List[Put]() //
while (locusPoints.hasNext) {
val name = locusPoints.next();
val p = new Put(Bytes.toBytes(name));
p.addColumn("f1".getBytes, "gps_time".getBytes, Bytes.toBytes(formatDate.format(new Date())));
putList = putList.::(p)
if (putList.size % 200 == 0) {
myTable.put(putList)
putList = scala.collection.immutable.List[Put]()
}
}
myTable.put(putList)
myTable.flushCommits()
myTable.close()
})
})
// Update the cumulative count using updateStateByKey
// This will give a Dstream made of state (which is the cumulative count of the words)
ssc
}
def main(args: Array[String]) {
if (args.length != 2) {
System.err.println("You arguments were " + args.mkString("[", ", ", "]"))
System.err.println(
"""
|Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>
| . <hostname> and <port> describe the TCP server that Spark
| Streaming would connect to receive data. <checkpoint-directory> directory to
| HDFS-compatible file system which checkpoint data <output-file> file to which the
| word counts will be appended
|
|In local mode, <master> should be 'local
' with n > 1
|Both <checkpoint-directory> and <output-file> must be absolute paths
""".stripMargin
)
System.exit(1)
}
val Array( topic, checkpointDirectory) = args
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => {
createContext(topic, checkpointDirectory)
})
ssc.start()
ssc.awaitTermination()
}
}
I think you have to put your streaming related logic into the function functionToCreateContext,
you could refer to the related Spark Streaming example RecoverableNetworkWordCountto
change your code.
refer to this link ->https://issues.apache.org/jira/browse/SPARK-6770
相关文章推荐
- 解决:org.apache.axis2.AxisFault: Transport out has not been set
- spark streaming restart error: org.apache.spark.SparkException: Yarn application has already ended!
- 解决:org.apache.tomcat.jni.Error: 70023: This function has not been implemented on this platform
- Spark-sql执行部分语句是报错:WARN org.apache.spark.scheduler.TaskSchedulerImpl: Initial job has not accepted a
- error: object kafka is not a member of package org.apache.spark.streaming
- Tomcat 开发web项目报Illegal access: this web application instance has been stopped already. Could not load [org.apache.commons.pool.impl.CursorableLinkedList$Cursor]. 错误
- The artifact org.apache.commons:commons-io:jar:1.3.2 has been relocated to commons-io:commons-io:jar
- org.apache.subversion.javahl.ClientException: Previous operation has not finished; run 'cleanup' if
- spark出现task不能序列化错误的解决方法 org.apache.spark.SparkException: Task not serializable
- Method createStream([class org.apache.spark.streaming.api.java.JavaStreamingContext, class java.uti
- spark + quartz : org.apache.spark.SparkException: Task not serializable
- org.apache.subversion.javahl.ClientException: Previous operation has not finished
- spark出现task不能序列化错误的解决方法 org.apache.spark.SparkException: Task not serializable
- SparkStreaming+Flume出现ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.jboss.netty.channel.ChannelException
- The Product License has not been initialized
- org.apache.subversion.javahl.ClientException: Previous operation has not finished
- value toDF is not a member of org.apache.spark.rdd.RDD[People]
- org.apache.subversion.javahl.ClientException:Previous operation has not finished; run 'cleanup'
- spark1.4 读取hbase 0.96 报错 java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytes
- org.apache.subversion.javahl.ClientException: Previous operation has not finished