您的位置:首页 > 运维架构 > Apache

SparkException: org.apache.spark.streaming.dstream.MappedDStream has not been initialized

2016-12-16 16:33 711 查看
在使用故障恢复的时候采用此方法进行业务逻辑进行恢复的时候,所有的业务逻辑应该放在 functionToCreateContext
函数内部才能实现checkpoint目录数据的恢复。

import java.text.SimpleDateFormat

import java.util.Date

import org.apache.commons.logging.LogFactory

import org.apache.hadoop.hbase.HBaseConfiguration

import org.apache.hadoop.hbase.TableName

import org.apache.hadoop.hbase.client.HTable

import org.apache.hadoop.hbase.client.Put

import org.apache.hadoop.hbase.util.Bytes

import org.apache.spark.SparkConf

import org.apache.spark.streaming.Seconds

import org.apache.spark.streaming.StreamingContext

import org.apache.spark.streaming.kafka.KafkaUtils

import kafka.serializer.StringDecoder

 

//org.spark.streaming.checkpoint.recovery.MyRecoverableNetworkWordCount

object RecoverableKafkaDirectWordCount {

//  var checkpointDirectory = "hdfs:///user/spark/streaming_checkpoint"

  val formatDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

// 将所有的业务逻辑放在该方法中

  def createContext( topic: String, checkpointDirectory: String) = {

    // If you do not see this printed, that means the StreamingContext has been loaded  

    // from the new checkpoint  

    println("------------Creating new context------------")  

    val sparkConf = new SparkConf().setAppName("RecoverableKafkaDirectWordCount.scala")  

    // Create the context with a 1 second batch size  

    val ssc = new StreamingContext(sparkConf, Seconds(2))  

    ssc.checkpoint(checkpointDirectory)  

  

    // Create a ReceiverInputDStream on target ip:port and count the  

    // words in input stream of \n delimited test (eg. generated by 'nc')  

kafka + zookeeper,当消息被消费时,会想zk提交当前groupId的consumer消费的offset信息,当consumer再次启动将会从此offset开始继续消费.
在consumter端配置文件中(或者是ConsumerConfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset),有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的开始位置消费所有消息.

    val kafkaParams = Map(

      "metadata.broker.list" -> " xxx:9092" ,

      //      "auto.offset.reset" -> "largest",

      //      "auto.offset.reset" -> "smallest",

      "group.id" -> "streaming-group");

    val topics = Set(topic)

    var kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](

      ssc,

      kafkaParams,

      topics)

    val logger = LogFactory.getLog("RecoverableKafkaDirectWordCount.scala")

    

    var batchDur = 4

    var windowDuration = 20

    var parallelism = 20

     

    //  kafkaStream.print()

    val forAppDStream = kafkaStream.map(_._2).map { x =>

      {

        x + "-" + formatDate.format(new Date())

      }

    }

    forAppDStream.print(2)

    val forapp = forAppDStream.foreachRDD(locusRDD => {

     

      locusRDD.foreachPartition(

        locusPoints => {

          val tableName = "test:user"

          val myConf = HBaseConfiguration.create()

          myConf.set("hbase.zookeeper.quorum", " ")

          myConf.set("hbase.zookeeper.property.clientPort", "21810")

          val myTable = new HTable(myConf, TableName.valueOf(tableName))

          myTable.setAutoFlush(false, false) 

          myTable.setWriteBufferSize(3 * 1024 * 1024)  

          import collection.JavaConversions._

          var putList = scala.collection.immutable.List[Put]() //   

          while (locusPoints.hasNext) {

            val name = locusPoints.next();

            val p = new Put(Bytes.toBytes(name));

            p.addColumn("f1".getBytes, "gps_time".getBytes, Bytes.toBytes(formatDate.format(new Date())));

            putList = putList.::(p)

            if (putList.size % 200 == 0) {

              myTable.put(putList)

              putList = scala.collection.immutable.List[Put]()

            }

          }

          myTable.put(putList)

          myTable.flushCommits()  

          myTable.close()

        })

    })

    // Update the cumulative count using updateStateByKey  

    // This will give a Dstream made of state (which is the cumulative count of the words)  

    ssc

  }  

  

  def main(args: Array[String]) {  

    if (args.length != 2) {  

      System.err.println("You arguments were " + args.mkString("[", ", ", "]"))  

      System.err.println(  

        """  

          |Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>  

          |     . <hostname> and <port> describe the TCP server that Spark  

          |     Streaming would connect to receive data. <checkpoint-directory> directory to  

          |     HDFS-compatible file system which checkpoint data <output-file> file to which the  

          |     word counts will be appended  

          |  

          |In local mode, <master> should be 'local
' with n > 1  

          |Both <checkpoint-directory> and <output-file> must be absolute paths  

        """.stripMargin  

      )  

      System.exit(1)  

    }  

  

    val Array( topic, checkpointDirectory) = args  

    val ssc = StreamingContext.getOrCreate(checkpointDirectory,  

      () => {  

        createContext(topic, checkpointDirectory)  

      })  

    ssc.start()  

    ssc.awaitTermination()  

  }  

}

I think you have to put your streaming related logic into the function functionToCreateContext,
you could refer to the related Spark Streaming example RecoverableNetworkWordCountto
change your code.

refer to  this link ->https://issues.apache.org/jira/browse/SPARK-6770
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐