基于HDFS,Spark Stream的实时统计
2016-06-27 17:15
351 查看
最近在搞一个小功能,具体要求是:数据到了hdfs,然后统计。需求很简,程序实现也挺简单的,但是目录有点复杂,如base目录下面有/业务/省/yyyyMMdd/h/aa.txt文件
如果是按照之前的约定的方式的话,是可以实现的,但是这个文件夹太复杂了,所以按照约定的方式来弄好像难度也挺复杂的,所以这种方法我放弃了。还有一种方案就是把文件目录放到kafka中,然后订阅kafka的内容,取得了之后将参数传进程序执行,但是这个方案的问题卡在了如何实时订阅kafka的内容,而且把参数传到spark程序中执行,而且肯定是使用shell来订阅,然后获取参数再提交给spark应用,没人用过,暂时搁置了;还有一种方案就是使用spark stream 实时的从hdfs中监控,但是貌似这东西只能从监控hdfs的某个目录下面的文件,而监控不了某个目录下面的子目录。代码如下。但是不应该啊,为什么这点功能都做不到,难道设计者都没想到这个问题吗?很简单的需求啊。
public class HDFSWordCount {
private static String BASE="hdfs://hadoop0:9000/data/xx/yy/zz/";
public static void main(String[] args) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("HDFSWordCount");
// sc.textFile("hdfs://n1:8020/user/hdfs/input");
// sc.textFile("hdfs://hadoop0:9000/spark/");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
// 首先,使用JavaStreamingContext的textFileStream()方法,针对HDFS目录创建输入数据流
JavaDStream<String> callLines = jssc.textFileStream(BASE+"oidd_call/");
// JavaDStream<String> smsLines = jssc.textFileStream(BASE+"oidd_sms/*/*/*/");
// JavaDStream<String> locationLines = jssc.textFileStream(BASE+"oidd_location/*/*/*/");
callLines.print();
// smsLines.print();
// locationLines.print();
jssc.start();
jssc.awaitTermination();
jssc.close();
}
如果是按照之前的约定的方式的话,是可以实现的,但是这个文件夹太复杂了,所以按照约定的方式来弄好像难度也挺复杂的,所以这种方法我放弃了。还有一种方案就是把文件目录放到kafka中,然后订阅kafka的内容,取得了之后将参数传进程序执行,但是这个方案的问题卡在了如何实时订阅kafka的内容,而且把参数传到spark程序中执行,而且肯定是使用shell来订阅,然后获取参数再提交给spark应用,没人用过,暂时搁置了;还有一种方案就是使用spark stream 实时的从hdfs中监控,但是貌似这东西只能从监控hdfs的某个目录下面的文件,而监控不了某个目录下面的子目录。代码如下。但是不应该啊,为什么这点功能都做不到,难道设计者都没想到这个问题吗?很简单的需求啊。
public class HDFSWordCount {
private static String BASE="hdfs://hadoop0:9000/data/xx/yy/zz/";
public static void main(String[] args) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("HDFSWordCount");
// sc.textFile("hdfs://n1:8020/user/hdfs/input");
// sc.textFile("hdfs://hadoop0:9000/spark/");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
// 首先,使用JavaStreamingContext的textFileStream()方法,针对HDFS目录创建输入数据流
JavaDStream<String> callLines = jssc.textFileStream(BASE+"oidd_call/");
// JavaDStream<String> smsLines = jssc.textFileStream(BASE+"oidd_sms/*/*/*/");
// JavaDStream<String> locationLines = jssc.textFileStream(BASE+"oidd_location/*/*/*/");
callLines.print();
// smsLines.print();
// locationLines.print();
jssc.start();
jssc.awaitTermination();
jssc.close();
}
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Spark随谈——开发指南(译)
- Java IO与NIO的一些文件拷贝测试
- Spark,一种快速数据分析替代方案
- Stream、WshShell、WshUrlShortcut对象及Shell.Application的参数与使用
- Node.js中的流(Stream)介绍
- php中stream(流)的用法
- php错误提示failed to open stream: HTTP request failed!的完美解决方法
- C# Stream 和 byte[] 之间的转换
- 浅析Node.js 中 Stream API 的使用
- Nodejs Stream 数据流使用手册
- 利用stream实现一个简单的http下载器
- 浅谈PHP中Stream(流)
- php常用Stream函数集介绍
- eclipse 开发 spark Streaming wordCount
- Understanding Spark Caching
- ClassNotFoundException:scala.PreDef$
- Windows 下Spark 快速搭建Spark源码阅读环境
- Spark中将对象序列化存储到hdfs