您的位置:首页 > Web前端

How-to: effective store kafka data into hdfs via spark streaming

2015-07-13 15:59 399 查看
This is an improvement to 

How-to: make spark streaming collect data from Kafka topics and store data into hdfs

In 

How-to: make spark streaming collect data from Kafka topics and store data into hdfs

, spark streaming will generate a part-000* file for each line log. This will be a waste for hdfs storage. As maybe one line log maybe far small than a block size.  Here is a new consumer which will collect data every several minutes
and generate one files each hour.

import java.util.Properties

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.streaming.kafka._

import org.apache.spark.SparkConf

import java.nio.charset.Charset

import java.util.Calendar;

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.fs.FileSystem

import org.apache.hadoop.fs.Path

import org.apache.hadoop.fs.FSDataOutputStream

import org.apache.spark.rdd.RDD

object KafkaWriteHDFSConsumer {

  def main(args: Array[String]) {

    if (args.length < 6) {

      System.err.println("Usage: KafkaWriteHDFSConsumer <zkQuorum> <group> <topics> <numThreads> <output> <time>")

      System.exit(1)

    }

    val Array(zkQuorum, group, topics, numThreads, output, time) = args

    val sparkConf = new SparkConf().setAppName("KafkaWriteHDFSConsumer")

    val ssc = new StreamingContext(sparkConf, Minutes(time.toLong))

    ssc.checkpoint("checkpoint")

    val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap

    val lines_ori = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap)

    val lines = lines_ori.map(_._2)

    lines.foreachRDD(rdd => {

      if (!rdd.isEmpty()) {

        val config = new Configuration();

        config.addResource(new Path(System.getenv("HADOOP_HOME") + "/etc/hadoop/core-site.xml"));

        config.addResource(new Path(System.getenv("HADOOP_HOME") + "/etc/hadoop/hdfs-site.xml"));

        val fs = FileSystem.get(config);

        val filenamePath = new Path(output
+ "/" + Calendar.getInstance().get(Calendar.YEAR) + "-" +
(Calendar.getInstance().get(Calendar.MONTH) + 1 ) + "-" +
Calendar.getInstance().get(Calendar.DATE) + "/" + Calendar.getInstance().get(Calendar.YEAR)
+ "-" + (Calendar.getInstance().get(Calendar.MONTH) + 1 )+ "-" +
Calendar.getInstance().get(Calendar.DATE) + "-" + Calendar.getInstance().get(Calendar.HOUR_OF_DAY)
+ ".txt" );// intresting here: could not define
the file name at first due to spark lazy module.
        if (!fs.exists(filenamePath)) {

          val fin = fs.create(filenamePath)

          if (!rdd.isEmpty()) {

            rdd.collect().foreach { string =>

              {

                fin.writeUTF(string + "\n")

              }

            }

          }

          fin.close()

        } else {

          val fin = fs.append(filenamePath)

          if (!rdd.isEmpty()) {

            rdd.collect().foreach { string =>

              {

                fin.writeUTF(string + "\n")

              }

            }

          }

          fin.close()

        }

      }

    })

    ssc.start()

    ssc.awaitTermination()

  }

}

You could store this file as examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWriteHDFSConsumer.scala in spark project and generate new spark examples jar via "mvn -Pyarn -DskipTests clean
package" in examples directory.

Run spark as: 
${SPARK_HOME}/bin/spark-submit --master yarn-cluster --class org.apache.spark.examples.streaming.KafkaWriteHDFSConsumer ${SPARK_HOME}/lib/spark-examples-1.3.0-cdh5.4.1-hadoop2.6.0-cdh5.4.1.jar zk_node:2181 hdfs-consumer
topics 1 output 10
The data will be stored in hdfs as:

[hadoop@master01 ~]$ hadoop fs -ls /user/chenfangfang/pinback/2015-6-13/

Found 2 items

-rw-r--r--   3 chenfangfang supergroup  127147607 2015-07-13 15:59 /user/chenfangfang/pinback/2015-6-13/2015-6-13-15.txt

-rw-r--r--   3 chenfangfang supergroup    6190824 2015-07-13 16:02 /user/chenfangfang/pinback/2015-6-13/2015-6-13-16.tx
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息