How-to: effective store kafka data into hdfs via spark streaming
2015-07-13 15:59
399 查看
This is an improvement to
and generate one files each hour.
import java.util.Properties
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import java.nio.charset.Charset
import java.util.Calendar;
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.spark.rdd.RDD
object KafkaWriteHDFSConsumer {
def main(args: Array[String]) {
if (args.length < 6) {
System.err.println("Usage: KafkaWriteHDFSConsumer <zkQuorum> <group> <topics> <numThreads> <output> <time>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads, output, time) = args
val sparkConf = new SparkConf().setAppName("KafkaWriteHDFSConsumer")
val ssc = new StreamingContext(sparkConf, Minutes(time.toLong))
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines_ori = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap)
val lines = lines_ori.map(_._2)
lines.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
val config = new Configuration();
config.addResource(new Path(System.getenv("HADOOP_HOME") + "/etc/hadoop/core-site.xml"));
config.addResource(new Path(System.getenv("HADOOP_HOME") + "/etc/hadoop/hdfs-site.xml"));
val fs = FileSystem.get(config);
val filenamePath = new Path(output
+ "/" + Calendar.getInstance().get(Calendar.YEAR) + "-" +
(Calendar.getInstance().get(Calendar.MONTH) + 1 ) + "-" +
Calendar.getInstance().get(Calendar.DATE) + "/" + Calendar.getInstance().get(Calendar.YEAR)
+ "-" + (Calendar.getInstance().get(Calendar.MONTH) + 1 )+ "-" +
Calendar.getInstance().get(Calendar.DATE) + "-" + Calendar.getInstance().get(Calendar.HOUR_OF_DAY)
+ ".txt" );// intresting here: could not define
the file name at first due to spark lazy module.
if (!fs.exists(filenamePath)) {
val fin = fs.create(filenamePath)
if (!rdd.isEmpty()) {
rdd.collect().foreach { string =>
{
fin.writeUTF(string + "\n")
}
}
}
fin.close()
} else {
val fin = fs.append(filenamePath)
if (!rdd.isEmpty()) {
rdd.collect().foreach { string =>
{
fin.writeUTF(string + "\n")
}
}
}
fin.close()
}
}
})
ssc.start()
ssc.awaitTermination()
}
}
You could store this file as examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWriteHDFSConsumer.scala in spark project and generate new spark examples jar via "mvn -Pyarn -DskipTests clean
package" in examples directory.
Run spark as:
${SPARK_HOME}/bin/spark-submit --master yarn-cluster --class org.apache.spark.examples.streaming.KafkaWriteHDFSConsumer ${SPARK_HOME}/lib/spark-examples-1.3.0-cdh5.4.1-hadoop2.6.0-cdh5.4.1.jar zk_node:2181 hdfs-consumer
topics 1 output 10
The data will be stored in hdfs as:
[hadoop@master01 ~]$ hadoop fs -ls /user/chenfangfang/pinback/2015-6-13/
Found 2 items
-rw-r--r-- 3 chenfangfang supergroup 127147607 2015-07-13 15:59 /user/chenfangfang/pinback/2015-6-13/2015-6-13-15.txt
-rw-r--r-- 3 chenfangfang supergroup 6190824 2015-07-13 16:02 /user/chenfangfang/pinback/2015-6-13/2015-6-13-16.tx
How-to: make spark streaming collect data from Kafka topics and store data into hdfs
InHow-to: make spark streaming collect data from Kafka topics and store data into hdfs
, spark streaming will generate a part-000* file for each line log. This will be a waste for hdfs storage. As maybe one line log maybe far small than a block size. Here is a new consumer which will collect data every several minutesand generate one files each hour.
import java.util.Properties
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import java.nio.charset.Charset
import java.util.Calendar;
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.spark.rdd.RDD
object KafkaWriteHDFSConsumer {
def main(args: Array[String]) {
if (args.length < 6) {
System.err.println("Usage: KafkaWriteHDFSConsumer <zkQuorum> <group> <topics> <numThreads> <output> <time>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads, output, time) = args
val sparkConf = new SparkConf().setAppName("KafkaWriteHDFSConsumer")
val ssc = new StreamingContext(sparkConf, Minutes(time.toLong))
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines_ori = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap)
val lines = lines_ori.map(_._2)
lines.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
val config = new Configuration();
config.addResource(new Path(System.getenv("HADOOP_HOME") + "/etc/hadoop/core-site.xml"));
config.addResource(new Path(System.getenv("HADOOP_HOME") + "/etc/hadoop/hdfs-site.xml"));
val fs = FileSystem.get(config);
val filenamePath = new Path(output
+ "/" + Calendar.getInstance().get(Calendar.YEAR) + "-" +
(Calendar.getInstance().get(Calendar.MONTH) + 1 ) + "-" +
Calendar.getInstance().get(Calendar.DATE) + "/" + Calendar.getInstance().get(Calendar.YEAR)
+ "-" + (Calendar.getInstance().get(Calendar.MONTH) + 1 )+ "-" +
Calendar.getInstance().get(Calendar.DATE) + "-" + Calendar.getInstance().get(Calendar.HOUR_OF_DAY)
+ ".txt" );// intresting here: could not define
the file name at first due to spark lazy module.
if (!fs.exists(filenamePath)) {
val fin = fs.create(filenamePath)
if (!rdd.isEmpty()) {
rdd.collect().foreach { string =>
{
fin.writeUTF(string + "\n")
}
}
}
fin.close()
} else {
val fin = fs.append(filenamePath)
if (!rdd.isEmpty()) {
rdd.collect().foreach { string =>
{
fin.writeUTF(string + "\n")
}
}
}
fin.close()
}
}
})
ssc.start()
ssc.awaitTermination()
}
}
You could store this file as examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWriteHDFSConsumer.scala in spark project and generate new spark examples jar via "mvn -Pyarn -DskipTests clean
package" in examples directory.
Run spark as:
${SPARK_HOME}/bin/spark-submit --master yarn-cluster --class org.apache.spark.examples.streaming.KafkaWriteHDFSConsumer ${SPARK_HOME}/lib/spark-examples-1.3.0-cdh5.4.1-hadoop2.6.0-cdh5.4.1.jar zk_node:2181 hdfs-consumer
topics 1 output 10
The data will be stored in hdfs as:
[hadoop@master01 ~]$ hadoop fs -ls /user/chenfangfang/pinback/2015-6-13/
Found 2 items
-rw-r--r-- 3 chenfangfang supergroup 127147607 2015-07-13 15:59 /user/chenfangfang/pinback/2015-6-13/2015-6-13-15.txt
-rw-r--r-- 3 chenfangfang supergroup 6190824 2015-07-13 16:02 /user/chenfangfang/pinback/2015-6-13/2015-6-13-16.tx
相关文章推荐
- 详解HDFS Short Circuit Local Reads
- Hadoop_2.1.0 MapReduce序列图
- 使用Hadoop搭建现代电信企业架构
- Kafka 之 中级
- Spark随谈——开发指南(译)
- 单机版搭建Hadoop环境图文教程详解
- Spark,一种快速数据分析替代方案
- hadoop常见错误以及处理方法详解
- hadoop 单机安装配置教程
- hadoop的hdfs文件操作实现上传文件到hdfs
- hadoop实现grep示例分享
- Apache Hadoop版本详解
- linux下搭建hadoop环境步骤分享
- hadoop client与datanode的通信协议分析
- hadoop中一些常用的命令介绍
- Hadoop单机版和全分布式(集群)安装
- 用PHP和Shell写Hadoop的MapReduce程序
- hadoop map-reduce中的文件并发操作
- Hadoop1.2中配置伪分布式的实例
- java结合HADOOP集群文件上传下载