[Scala] Flink项目实时热门商品统计(一)
2020-01-13 15:58
916 查看
传送区
[Scala] Flink项目实例系列(零)
[Scala] Flink项目实时热门商品统计(一)
[Scala] Flink项目实时流量统计(二)
[Scala] Flink项目恶意登录监控(三)
[Scala] Flink项目订单支付失效监控(四)
[Scala] Flink项目订单支付实时对账(五)
[Scala] Flink项目小彩蛋(六)
本项目的代码及文件见这这这,友情码是:3n9z。
项目需求
统计最近1小时的热门商品,每5分钟更新一次
思路
首先看到跟时间相关的需求,肯定需要使用于Window相关的算子,另外需要统计的主体是商品,所以需要针对商品的标识(也就是itemId做keyBy),然后窗口内聚合,再使用process进行排序处理。
代码
热门商品统计
数据源结构
userId | itemId | categoryId | behavior | timestamp |
---|---|---|---|---|
543462 | 1715 | 1464116 | pv | 1511658000 |
因为是第一份码,所以我在这里写了较多的注释,之后类似的地方会省略。
[code]import java.sql.Timestamp import java.util.Properties import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.api.java.tuple.{Tuple, Tuple1, Tuple2} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.util.Collector import scala.collection.mutable.ListBuffer case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long) // 定义窗口聚合结果样例类,这里主要作用还是当成一个数据结构来使用,方便管理内部的数据 case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long) object HotItems { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置时间语义,EventTime简单理解就是以时间戳的时间为准,因为需要考虑网络延迟等 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 实际项目多用kafka,以下是配置方法示例 val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") // 构建数据结构,指定EventTime字段 val source = getClass.getResource("/UserBehavior.csv") // 获取相对路径 // val dataStream = env.addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(), properties)) val dataStream = env.readTextFile(source.getPath) .map(data => { val dataArray = data.split(",") UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2) .trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong) }) // 将数据转为UserBehavior,理解为打包成一个整体作为流的一个单位(数据包?) // 因为源数据时间戳为升序,所以直接用下边这个API,乘1000转单位为秒,默认是以秒为单位;乱序时间会在下个案例中使用 .assignAscendingTimestamps(_.timestamp * 1000) .filter(_.behavior == "pv") // 筛出pv数据 .keyBy(_.itemId) // 用itemId划出keyedStream,简单理解就是变成多个流了 .timeWindow(Time.hours(1), Time.minutes(5)) // 对流进行窗口操作,前参为窗口大小,后为步长 .aggregate(new CountAgg(), new WindowResult()) // 窗口聚合,前为预聚合,可以提高效率,不至于把数据全摞到一起计算 val processedStream = dataStream .keyBy(_.windowEnd) // 因前边逻辑已经划好了1小时内的窗口,所以这里直接按窗口进行分组统计 .process(new TopNHotItems(3)) // 自定义ProcessFunction // Sink直接输出 processedStream.print("process") // 执行 env.execute("HotItems job") } } // 自定义预聚合是,减少State压力,效率更高 class CountAgg() extends AggregateFunction[UserBehavior, Long, Long] { override def createAccumulator(): Long = 0L // 初始值 override def merge(acc: Long, acc1: Long): Long = acc + acc1 override def getResult(acc: Long): Long = acc // 输出终值 override def add(in: UserBehavior, acc: Long): Long = acc + 1 } // 自定义预聚合函数计算平均数,仅是个例子,与项目无关 class AverageAgg() extends AggregateFunction[UserBehavior, (Long, Int), Double] { override def add(value: UserBehavior, accumulator: (Long, Int)): (Long, Int) = ( accumulator._1 + value.timestamp, accumulator._2 + 1 ) override def createAccumulator(): (Long, Int) = (0L, 0) override def getResult(accumulator: (Long, Int)): Double = accumulator._1 / accumulator._2 override def merge(a: (Long, Int), b: (Long, Int)): (Long, Int) = (a._1 + b._1, a._2 + b._2) } // 将每个Key每个窗口聚合后的结果带上其它信息输出 // 注意WindowFunction源码第四个参数中<: Window,意思是参数必须要为Window的子类,<:表示上界 // WindowFunction接收的四个参数分别是IN OUT KEY Window,IN是CountAgg聚合后的结果,OUT一个ItemViewCount // key是keyBy所用的字段,最后一个不是很懂,只知道是Window的子类即可,传TimeWindow就完事儿了 // 输出ItemViewCount需要有三个字段,注意CountAgg保存在Iterable中,注意取出方式 class WindowResult() extends WindowFunction[Long, ItemViewCount, Long, TimeWindow] { override def apply(key: Long, w: TimeWindow, iterable: Iterable[Long], collector: Collector[ItemViewCount]): Unit = { collector.collect(ItemViewCount(key, w.getEnd, iterable.iterator.next)) } } // KeyedProcessFunction会处理流的每一个元素,输出0到多个元素 // KeyedProcessFunction需要的三个参数分别是key, in, out class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Long, ItemViewCount, String] { // 定义列表状态,就是用来保存数据流的数据结构,共有四种,初始化在open中完成,后续案例有简化写法 private var itemState: ListState[ItemViewCount] = _ // 初始化,定义列表状态中内容 override def open(parameters: Configuration): Unit = { itemState = getRuntimeContext .getListState(new ListStateDescriptor[ItemViewCount]("item-state", classOf[ItemViewCount])) } override def processElement(i: ItemViewCount, context: KeyedProcessFunction[Long, ItemViewCount, String]#Context, collector: Collector[String]): Unit = { // 把每条数据存入状态列表 itemState.add(i) // 注册一个定时器,+ 100表示延迟100毫秒触发,触发指启动onTimer方法 context.timerService().registerEventTimeTimer(i.windowEnd + 100) } // 定时器触发时,对所有数据排序,并输出结果 override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = { // 将所有State中数据取出放到一个List Buffer中 val allItems: ListBuffer[ItemViewCount] = new ListBuffer() // 注意遍历ListState需要引入下边这个包 import scala.collection.JavaConversions._ for (item <- itemState.get()) { allItems += item } // 按照count大小排序,并取前N个 val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize) // 清空状态 itemState.clear() // 将排名结果格式化输出 val result: StringBuilder = new StringBuilder() // 此处的- 100与定时器呼应,结果会保持0;Timestamp是格式化用的 result.append("时间:").append(new Timestamp(timestamp - 100)).append("\n") // 输出每一个商品的信息 for (i <- sortedItems.indices) { val currentItem = sortedItems(i) result.append("No").append(i + 1).append(":") .append(" 商品ID=").append(currentItem.itemId) .append(" 浏览量=").append(currentItem.count) .append("\n") } result.append("================================") // 控制输出频率 Thread.sleep(1000) out.collect(result.toString()) } }
Kafka生产者示例
实际工作中会有很多时候需要对接Kafka,上述代码中有consumer,正面再贴个生产者,方便今后查阅。
[code]import java.util.Properties import scala.io import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} object KafkaProducer { def main(args: Array[String]): Unit = { writeToKafka("hotItems") } def writeToKafka(topic: String): Unit ={ val properties = new Properties() properties.put("bootstrap.servers", "localhost:9092") properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") // 定义一个kafka producer val producer = new KafkaProducer[String, String](properties) // 从文件中读取数据,发送 val bufferedSource = io.Source.fromFile("yourPath/UserBehavior.csv") for (line <- bufferedSource.getLines()) { val record = new ProducerRecord[String, String](topic, line) producer.send(record) } producer.close() } }
- 点赞
- 收藏
- 分享
- 文章举报
相关文章推荐
- [Scala] Flink项目实时流量统计(二)
- Spark日志分析项目Demo(7)--临时表查询,各区域top3热门商品统计
- Flink 零基础实战教程:如何计算实时热门商品
- [Scala] Flink项目订单支付实时对账(五)
- Flink 零基础实战教程:如何计算实时热门商品
- Kafka项目实战-用户日志上报实时统计之分析与设计
- javascript实现点击商品列表checkbox实时统计金额的方法
- Kafka项目实战-用户日志上报实时统计之编码实践
- 《三易通进销存系统——“采购进货,商品管理,进货统计分析”模块》项目研发阶段性总结
- Flink1.8实时数仓项目实战
- 大数据项目实战之新闻话题的实时统计分析
- Spark日志分析项目Demo(8)--SparkStream,广告点击流量实时统计
- 【Github热门项目】JavaScript实时图表库 Epoch
- 数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战
- javascript实现点击商品列表checkbox实时统计金额的方法
- 使用Project统计项目实时进度
- [Scala + Python] Flink实时分析B站公开数据
- 《商城项目06-2》--ActiveMQ整合Spring实现Solr库中商品信息的实时同步
- [Scala] Flink项目小彩蛋(六)
- 广告流量实时统计 scala版本 过滤黑名单 统计各省市实时广告用户点击量