您的位置:首页 > 运维架构 > 网站架构

第98课: 使用Spark Streaming实战对论坛网站动态行为的多维度分析(上)

2016-05-05 21:37 651 查看
第98课: 使用Spark Streaming实战对论坛网站动态行为的多维度分析(上)

三件事情:

1、明天开始100天,超越美国开发团队

2、2016年,投资1亿美金,投资100家spark创业服务公司,技术支撑

3、100天地狱式、死亡式训练

================================================================================

场景: 电商 库存,物流,采购

这节课程的内容

1、kafka的producer,模拟用户的点击行为,通过代码的方式操作kafka

2、pv或uv

SparkSQLDataManually.java的代码复制过来SparkStreamingDataMannuallyProducerForKafka

生成的数据作为Producer的方式发送给Kafka,然后SparkSTreaming程序在Kafka中在线pull到论坛或网站的用

户在线行为信息,进行多维度的分析

给kafka写数据,要弄一个线程,不断的写数据,那我们构建一个线程;或者继承线程也可以,简化一点。

public class SparkStreamingDataMannuallyProducerForKafka extends Thread

写一个构造器,指定topic:

private String topic ;//发送给Kafka的数据的类别

private Producer <Integer,String> producerForKafka;

private static Sting dateToday;

private static Random random;

public SparkStreamingDataMannuallyProducerForKafka(String topic){

dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date());

this.topic = topic ;

//连上kafka

Properties conf = new Properties();

conf.put("metadata.broker.list","master:9092,worker1:9092,worker2:9092");

conf.put("serializer.class","kafka.serializer.StringEncoder");

//构造器的核心是要生成数据

//先要导入kafka的jar包 import kafka。producer

producerForKafka= new Producer <Integer,String> (new ProducerConfig(conf));

}

//作为线程而言,要复写run方法,先写业务逻辑,再写控制

public void run() {

int cunter =0 ;//搞500条

while(true){ //模拟实际情况,不断循环,异步过程,不可能是同步

counter ++;

String userLog = userLogs();

System.out.println("product:"+userLog );

//send是核心代码

producerForKafka.send(new KeyedMessage<Integer, String>(topic, userLog ));

if( 0 == counter%500) {

counter = 0;

try {

Thread.sleep(1000);}catch (....)

}

}

userLogs()方法中 修改代码

userLogBuffer.append(dateToday)

======================================

main中很简单了

public static void main (String[] args){

new SparkStreamingDataMannuallyProducerForKafka("userlogs").start();

}

============================

数据生成好了,下面写一下uv pv

===========================

OnlineBBSUserLogs.java

val sparkConf = new SparkConf().setAppName("OnlineBBSUserLogs ")

.setMaster("local[2]")

val ssc = new StreamingContext(sparkConf, Seconds(15))

Map<String,String> kafkaParameters =new HashMap<String,String>();

kafkaParameters .put("metadata.broker.list","master:9092,worker1:9092,worker2:9092");

Set topics = new HashSet<String>();

topics.add("UserLogs");

//StringDecoder.class要导入kafka的包

JavaPairInputDStream<String,String> lines = KafkaUtils.createDirectStream

(jsc,String.class,String.class,

StringDecoder.class,StringDecoder.class,kafkaParametes,topics);

看源代码,createDirectStream返回的类型看一下,继承InputDStream。

业务代码找pv,有两种view register,那么写一个filter代码

JavaPairDStream<String,String> logsDStream= lines.filter(new

Funtion<Tuple2<String,Sting>,Boolean(){

@Override

public Boolean call(Tuple2<String,Sting>, v1) throws Exception {

String[] logs = v1._2.split("\t");

String action = logs[5];

if ("view".equals(action)) {

return true;

} else {

return false; }

}

});

JavaPairDStream<String,String> pairs= logsDStream.mapToPair(new

PairFuntion<Tuple2<String,Sting>,Long,Long (){

@Override

public Tuple2<Long,Long> call(Tuple2<String,Sting>, t) throws Exception {

String[] logs = t._2.split("\t");

Long pageId=Long.valudeOf(logs[3]);

retun new Tuple2<Long,Long>(pageId=Long.valudeOf,1L);

}

});

//过滤出了目标数据,下一步reducebykey

JavaPairDStream<Long, Long> wordsCount = pairs.reduceByKey(new Function2<Long,

Long, Long>() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)

@Override

public Long call(Long v1, Long v2) throws Exception {

return v1 + v2;

}

});

//打印出来,在企业生产环境中,一般会把计算的数据保存到Redis或者DB中,采用J2EE等技术进行趋势绘制,

领导爱看这个,就像动态更新的股票交易一样,实现在线的监控

wordsCount.print();

jsc.start();

jsc.awaitTermination();
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: