第98课: 使用Spark Streaming实战对论坛网站动态行为的多维度分析(上)
2016-05-05 21:37
651 查看
第98课: 使用Spark Streaming实战对论坛网站动态行为的多维度分析(上)
三件事情:
1、明天开始100天,超越美国开发团队
2、2016年,投资1亿美金,投资100家spark创业服务公司,技术支撑
3、100天地狱式、死亡式训练
================================================================================
场景: 电商 库存,物流,采购
这节课程的内容
1、kafka的producer,模拟用户的点击行为,通过代码的方式操作kafka
2、pv或uv
SparkSQLDataManually.java的代码复制过来SparkStreamingDataMannuallyProducerForKafka
生成的数据作为Producer的方式发送给Kafka,然后SparkSTreaming程序在Kafka中在线pull到论坛或网站的用
户在线行为信息,进行多维度的分析
给kafka写数据,要弄一个线程,不断的写数据,那我们构建一个线程;或者继承线程也可以,简化一点。
public class SparkStreamingDataMannuallyProducerForKafka extends Thread
写一个构造器,指定topic:
private String topic ;//发送给Kafka的数据的类别
private Producer <Integer,String> producerForKafka;
private static Sting dateToday;
private static Random random;
public SparkStreamingDataMannuallyProducerForKafka(String topic){
dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
this.topic = topic ;
//连上kafka
Properties conf = new Properties();
conf.put("metadata.broker.list","master:9092,worker1:9092,worker2:9092");
conf.put("serializer.class","kafka.serializer.StringEncoder");
//构造器的核心是要生成数据
//先要导入kafka的jar包 import kafka。producer
producerForKafka= new Producer <Integer,String> (new ProducerConfig(conf));
}
//作为线程而言,要复写run方法,先写业务逻辑,再写控制
public void run() {
int cunter =0 ;//搞500条
while(true){ //模拟实际情况,不断循环,异步过程,不可能是同步
counter ++;
String userLog = userLogs();
System.out.println("product:"+userLog );
//send是核心代码
producerForKafka.send(new KeyedMessage<Integer, String>(topic, userLog ));
if( 0 == counter%500) {
counter = 0;
try {
Thread.sleep(1000);}catch (....)
}
}
userLogs()方法中 修改代码
userLogBuffer.append(dateToday)
======================================
main中很简单了
public static void main (String[] args){
new SparkStreamingDataMannuallyProducerForKafka("userlogs").start();
}
============================
数据生成好了,下面写一下uv pv
===========================
OnlineBBSUserLogs.java
val sparkConf = new SparkConf().setAppName("OnlineBBSUserLogs ")
.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(15))
Map<String,String> kafkaParameters =new HashMap<String,String>();
kafkaParameters .put("metadata.broker.list","master:9092,worker1:9092,worker2:9092");
Set topics = new HashSet<String>();
topics.add("UserLogs");
//StringDecoder.class要导入kafka的包
JavaPairInputDStream<String,String> lines = KafkaUtils.createDirectStream
(jsc,String.class,String.class,
StringDecoder.class,StringDecoder.class,kafkaParametes,topics);
看源代码,createDirectStream返回的类型看一下,继承InputDStream。
业务代码找pv,有两种view register,那么写一个filter代码
JavaPairDStream<String,String> logsDStream= lines.filter(new
Funtion<Tuple2<String,Sting>,Boolean(){
@Override
public Boolean call(Tuple2<String,Sting>, v1) throws Exception {
String[] logs = v1._2.split("\t");
String action = logs[5];
if ("view".equals(action)) {
return true;
} else {
return false; }
}
});
JavaPairDStream<String,String> pairs= logsDStream.mapToPair(new
PairFuntion<Tuple2<String,Sting>,Long,Long (){
@Override
public Tuple2<Long,Long> call(Tuple2<String,Sting>, t) throws Exception {
String[] logs = t._2.split("\t");
Long pageId=Long.valudeOf(logs[3]);
retun new Tuple2<Long,Long>(pageId=Long.valudeOf,1L);
}
});
//过滤出了目标数据,下一步reducebykey
JavaPairDStream<Long, Long> wordsCount = pairs.reduceByKey(new Function2<Long,
Long, Long>() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
//打印出来,在企业生产环境中,一般会把计算的数据保存到Redis或者DB中,采用J2EE等技术进行趋势绘制,
领导爱看这个,就像动态更新的股票交易一样,实现在线的监控
wordsCount.print();
jsc.start();
jsc.awaitTermination();
三件事情:
1、明天开始100天,超越美国开发团队
2、2016年,投资1亿美金,投资100家spark创业服务公司,技术支撑
3、100天地狱式、死亡式训练
================================================================================
场景: 电商 库存,物流,采购
这节课程的内容
1、kafka的producer,模拟用户的点击行为,通过代码的方式操作kafka
2、pv或uv
SparkSQLDataManually.java的代码复制过来SparkStreamingDataMannuallyProducerForKafka
生成的数据作为Producer的方式发送给Kafka,然后SparkSTreaming程序在Kafka中在线pull到论坛或网站的用
户在线行为信息,进行多维度的分析
给kafka写数据,要弄一个线程,不断的写数据,那我们构建一个线程;或者继承线程也可以,简化一点。
public class SparkStreamingDataMannuallyProducerForKafka extends Thread
写一个构造器,指定topic:
private String topic ;//发送给Kafka的数据的类别
private Producer <Integer,String> producerForKafka;
private static Sting dateToday;
private static Random random;
public SparkStreamingDataMannuallyProducerForKafka(String topic){
dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
this.topic = topic ;
//连上kafka
Properties conf = new Properties();
conf.put("metadata.broker.list","master:9092,worker1:9092,worker2:9092");
conf.put("serializer.class","kafka.serializer.StringEncoder");
//构造器的核心是要生成数据
//先要导入kafka的jar包 import kafka。producer
producerForKafka= new Producer <Integer,String> (new ProducerConfig(conf));
}
//作为线程而言,要复写run方法,先写业务逻辑,再写控制
public void run() {
int cunter =0 ;//搞500条
while(true){ //模拟实际情况,不断循环,异步过程,不可能是同步
counter ++;
String userLog = userLogs();
System.out.println("product:"+userLog );
//send是核心代码
producerForKafka.send(new KeyedMessage<Integer, String>(topic, userLog ));
if( 0 == counter%500) {
counter = 0;
try {
Thread.sleep(1000);}catch (....)
}
}
userLogs()方法中 修改代码
userLogBuffer.append(dateToday)
======================================
main中很简单了
public static void main (String[] args){
new SparkStreamingDataMannuallyProducerForKafka("userlogs").start();
}
============================
数据生成好了,下面写一下uv pv
===========================
OnlineBBSUserLogs.java
val sparkConf = new SparkConf().setAppName("OnlineBBSUserLogs ")
.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(15))
Map<String,String> kafkaParameters =new HashMap<String,String>();
kafkaParameters .put("metadata.broker.list","master:9092,worker1:9092,worker2:9092");
Set topics = new HashSet<String>();
topics.add("UserLogs");
//StringDecoder.class要导入kafka的包
JavaPairInputDStream<String,String> lines = KafkaUtils.createDirectStream
(jsc,String.class,String.class,
StringDecoder.class,StringDecoder.class,kafkaParametes,topics);
看源代码,createDirectStream返回的类型看一下,继承InputDStream。
业务代码找pv,有两种view register,那么写一个filter代码
JavaPairDStream<String,String> logsDStream= lines.filter(new
Funtion<Tuple2<String,Sting>,Boolean(){
@Override
public Boolean call(Tuple2<String,Sting>, v1) throws Exception {
String[] logs = v1._2.split("\t");
String action = logs[5];
if ("view".equals(action)) {
return true;
} else {
return false; }
}
});
JavaPairDStream<String,String> pairs= logsDStream.mapToPair(new
PairFuntion<Tuple2<String,Sting>,Long,Long (){
@Override
public Tuple2<Long,Long> call(Tuple2<String,Sting>, t) throws Exception {
String[] logs = t._2.split("\t");
Long pageId=Long.valudeOf(logs[3]);
retun new Tuple2<Long,Long>(pageId=Long.valudeOf,1L);
}
});
//过滤出了目标数据,下一步reducebykey
JavaPairDStream<Long, Long> wordsCount = pairs.reduceByKey(new Function2<Long,
Long, Long>() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
//打印出来,在企业生产环境中,一般会把计算的数据保存到Redis或者DB中,采用J2EE等技术进行趋势绘制,
领导爱看这个,就像动态更新的股票交易一样,实现在线的监控
wordsCount.print();
jsc.start();
jsc.awaitTermination();
相关文章推荐
- Reddit网站获赞最高文章/评论的爬取
- 网站应用架构
- 大型网站核心架构要素 之二(细解网站的高性能架构)
- 网站加载速度特别慢,是怎么回事?
- 猿题库 iOS 客户端架构设计
- iOS应用架构谈 view层的组织和调用方案
- 高可用Hadoop平台-Flume NG实战图解篇
- 三层架构和MVC
- Spark定制班第3课:通过案例对SparkStreaming 透彻理解三板斧之三:解密SparkStreaming运行机制和架构进阶之Job和容错
- Spark定制班第2课:通过案例对Spark Streaming透彻理解三板斧之二:解密Spark Streaming运行机制和架构
- 如何应付重度反爬虫的网站
- java从菜鸟到架构师的必看书籍
- 国外.net资源学习网站
- 大型网站的负载均衡器、db proxy和db
- 转:架构师实践日|亿级短视频应用秒拍的架构实践
- wget 下载整个网站,或者特定目录
- 使用wget递归下载某目录下的所有文件
- ImageMagick 严重漏洞导致大量网站面临被黑风险
- AngularJS相关网站存档
- 网易视频云:流媒体服务器原理和架构解析