spark streaming - kafka updateStateByKey 统计用户消费金额
2015-09-08 22:09
429 查看
场景
餐厅老板想要统计每个用户来他的店里总共消费了多少金额,我们可以使用updateStateByKey来实现从kafka接收用户消费json数据,统计每分钟用户的消费情况,并且统计所有时间所有用户的消费情况(使用updateStateByKey来实现)
数据格式
{"user":"zhangsan","payment":8} {"user":"wangwu","payment":7} ....
往kafka写入消息(kafka producer)
package producer import java.util.Properties import kafka.javaapi.producer.Producer import kafka.producer.{KeyedMessage, ProducerConfig} import org.codehaus.jettison.json.JSONObject import scala.util.Random object KafkaProducer extends App{ //所有用户 private val users = Array( "zhangsan", "lisi", "wangwu", "zhaoliu") private val random = new Random() //消费的金额(0-9) def payMount() : Double = { random.nextInt(10) } //随机获得用户名称 def getUserName() : String = { users(random.nextInt(users.length)) } //kafka参数 val topic = "user_payment" val brokers = "192.168.6.55:9092,192.168.6.56:9092" val props = new Properties() props.put("metadata.broker.list", brokers) props.put("serializer.class", "kafka.serializer.StringEncoder") val kafkaConfig = new ProducerConfig(props) val producer = new Producer[String, String](kafkaConfig) while(true) { // 创建json串 val event = new JSONObject() event .put("user", getUserName()) .put("payment", payMount) // 往kafka发送数据 producer.send(new KeyedMessage[String, String](topic, event.toString)) println("Message sent: " + event) //每隔200ms发送一条数据 Thread.sleep(200) } }
使用spark Streaming处理数据
import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{StreamingContext, Seconds} import org.apache.spark.{SparkContext, SparkConf} import net.liftweb.json._ object UpdateStateByKeyTest { def main (args: Array[String]) { def functionToCreateContext(): StreamingContext = { //创建streamingContext val conf = new SparkConf().setAppName("test").setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(60)) //将数据进行保存(这里作为演示,生产中保存在hdfs) ssc.checkpoint("checkPoint") val zkQuorum = "192.168.6.55:2181,192.168.6.56:2181,192.168.6.57:2181" val consumerGroupName = "user_payment" val kafkaTopic = "user_payment" val kafkaThreadNum = 1 val topicMap = kafkaTopic.split(",").map((_, kafkaThreadNum.toInt)).toMap //从kafka读入数据并且将json串进行解析 val user_payment = KafkaUtils.createStream(ssc, zkQuorum, consumerGroupName, topicMap).map(x=>{ parse(x._2) }) //对一分钟的数据进行计算 val paymentSum = user_payment.map(jsonLine =>{ implicit val formats = DefaultFormats val user = (jsonLine \ "user").extract[String] val payment = (jsonLine \ "payment").extract[String] (user,payment.toDouble) }).reduceByKey(_+_) //输出每分钟的计算结果 paymentSum.print() //将以前的数据和最新一分钟的数据进行求和 val addFunction = (currValues : Seq[Double],preVauleState : Option[Double]) => { val currentSum = currValues.sum val previousSum = preVauleState.getOrElse(0.0) Some(currentSum + previousSum) } val totalPayment = paymentSum.updateStateByKey[Double](addFunction) //输出总计的结果 totalPayment.print() ssc } //如果"checkPoint"中存在以前的记录,则重启streamingContext,读取以前保存的数据,否则创建新的StreamingContext val context = StreamingContext.getOrCreate("checkPoint", functionToCreateContext _) context.start() context.awaitTermination() } }
运行结果节选
//-----------第n分钟的结果------------------ //1分钟结果 ------------------- (zhangsan,23.0) (lisi,37.0) (wangwu,31.0) (zhaoliu,34.0) ------------------- //总和结果 (zhangsan,101.0) (lisi,83.0) (wangwu,80.0) (zhaoliu,130.0) //-----------第n+1分钟的结果------------------ //1分钟结果 ------------------- (zhangsan,43.0) (lisi,16.0) (wangwu,21.0) (zhaoliu,54.0) ------------------- //总和结果 ------------------- (zhangsan,144.0) (lisi,99.0) (wangwu,101.0) (zhaoliu,184.0) -------------------
后记
下一片文章为统计不同时间段用户平均消费金额,消费次数,消费总额等指标。点击这里
相关文章推荐
- HDU 5046 Airport (2014年上海赛区网络赛E题)
- 我的算法学习之路
- 阅读《21天学通Java》
- Xtrabackup进行MySQL备份
- Struts2的文件上传
- Linux/Unix设计思想
- iOS 之美:iOS Delegate 使用五步曲
- 使用一维数组进行简单排序(冒泡法)
- PS的简单操作!
- Linux系统上的任务计划相关命令at、crontab的使用方法
- APACHE+PHP+MYSQL环境搭建
- Ubuntu12.04安装Apache2+PHP5+MySql
- jsp学习----纯jsp页面
- iOS——UINavigationController
- 关于 assign与weak的区别?什么时候使用weak.
- MAC上安装Docker
- Java基础知识回顾
- 一个应届计算机毕业生的2012求职之路
- PS
- noip2005 篝火晚会 (模拟)