Flink之实时统计指定时间段内热门商品的TopN(双11一小时内的热门品牌排行榜)
2020-04-04 07:13
971 查看
文章目录
一、需求说明
统计一定时间段内的,热门商品/品牌TopN
1、以案例驱动理解
- 数据:
{"userId": "u001", "itemId": "p1001", "categoryId": "c11", type: "pv", "timestamp": "2020-03-08 11:11:11"} {"userId": "u002", "itemId": "p1001", "categoryId": "c11", type: "pv", "timestamp": "2020-03-08 11:11:11"} {"userId": "u003", "itemId": "p1001", "categoryId": "c11", type: "pv", "timestamp": "2020-03-08 11:11:11"} {"userId": "u003", "itemId": "p1001", "categoryId": "c11", type: "cart", "timestamp": "2020-03-08 11:11:11"} {"userId": "u011", "itemId": "p2222", "categoryId": "c22", type: "pv", "timestamp": "2020-03-08 11:11:11"} {"userId": "u012", "itemId": "p2222", "categoryId": "c22", type: "pv", "timestamp": "2020-03-08 11:11:11"} {"userId": "u012", "itemId": "p2222", "categoryId": "c22", type: "pv", "timestamp": "2020-03-08 11:12:01"} {"userId": "u001", "itemId": "p1001", "categoryId": "c11", type: "pv", "timestamp": "2020-03-08 11:12:01"} {"userId": "u002", "itemId": "p1001", "categoryId": "c11", type: "pv", "timestamp": "2020-03-08 11:12:01"} {"userId": "u003", "itemId": "p1001", "categoryId": "c11", type: "pv", "timestamp": "2020-03-08 11:12:01"} {"userId": "u003", "itemId": "p1001", "categoryId": "c11", type: "cart", "timestamp": "2020-03-08 11:12:01"} {"userId": "u011", "itemId": "p2222", "categoryId": "c22", type: "pv", "timestamp": "2020-03-08 11:12:01"} {"userId": "u012", "itemId": "p2222", "categoryId": "c22", type: "pv", "timestamp": "2020-03-08 11:12:01"} {"userId": "u011", "itemId": "p2222", "categoryId": "c22", type: "pv", "timestamp": "2020-03-08 11:13:01"}
二、技术点
- Flink的EventTime
- Flink的滑动窗口(滚动窗口也可以完成 ,但是生成的结果太突兀,没有平滑性)☆☆☆☆
- Flink的定时器 ☆☆☆☆☆
三、代码实现(一) 比较捞
- 使用window.apply( )方法 → 见第5步
- 窗口触发时,会执行一次apply,相当于对窗口中的全量数据进行计算(全部拿出在计算)
- 窗口不触发,会把数据缓存在内存中,当窗口特别长时,那么这种apply不太好
1、调用底层的Process(可做类似map的操作),将Json字符串解析成MyBehavior对象
import com.alibaba.fastjson.JSON; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; /** * @date: 2020/3/14 14:04 * @site: www.ianlou.cn * @author: lekko 六水 * @qq: 496208110 * @description: */ public class HotGoodsTopN { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 选择EventTime作为Flink的时间 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 设置checkPoint时间 env.enableCheckpointing(60000); // 设置并行度 env.setParallelism(1); DataStreamSource<String> lines = env.socketTextStream("linux01", 8888); SingleOutputStreamOperator<MyBehavior> process = lines.process(new ProcessFunction<String, MyBehavior>() { @Override public void processElement(String input, Context ctx, Collector<MyBehavior> out) throws Exception { try { // FastJson 会自动把时间解析成long类型的TimeStamp MyBehavior behavior = JSON.parseObject(input, MyBehavior.class); out.collect(behavior); } catch (Exception e) { e.printStackTrace(); //TODO 记录出现异常的数据 } } });
2、提取EventTime,转换成Timestamp格式,生成WaterMark
// 设定延迟时间 SingleOutputStreamOperator<MyBehavior> behaviorDSWithWaterMark = process.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyBehavior>(Time.seconds(0)) { @Override public long extractTimestamp(MyBehavior element) { return element.timestamp; } });
3、按照指定事件分组
// 某个商品,在窗口时间内,被(点击、购买、添加购物车、收藏)了多少次 KeyedStream<MyBehavior, Tuple> keyed = behaviorDSWithWaterMark.keyBy("itemId", "type");
4、把分好组的数据,划分窗口:假设窗口总长10分钟, 步长1分钟滑动一次
WindowedStream<MyBehavior, Tuple, TimeWindow> window = keyed.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)));
5、窗口内的数据进行聚合,拿出窗口Star时间和窗口End时间
//参数:输入的数据类, 输出的数据类,分组字段tuple, 窗口对象TimeWindow SingleOutputStreamOperator<ItemViewCount> result = window.apply(new WindowFunction<MyBehavior, ItemViewCount, Tuple, TimeWindow>() { @Override public void apply(Tuple tuple, TimeWindow window, Iterable<MyBehavior> input, Collector<ItemViewCount> out) throws Exception { //拿出分组的字段 String itemId = tuple.getField(0); String type = tuple.getField(1); //拿出窗口的起始和结束时间 long start = window.getStart(); long end = window.getEnd(); // 编写累加的逻辑 int count = 0; for (MyBehavior myBehavior : input) { count += 1; } //输出结果 out.collect(ItemViewCount.of(itemId, type, start, end, count)); } }); result.print(); env.execute("HotGoodsTopN"); } }
四、定义的单独类MyBehavior 和 ItemViewCount
- MyBehavior → 解析Json字符串后生成的JavaBean
- ItemViewCount → 最后结果输出的格式类
1、MyBehavior
import java.sql.Timestamp; public class MyBehavior { public String userId; // 用户ID public String itemId; // 商品ID public String categoryId; // 商品类目ID public String type; // 用户行为, 包括("pv", "buy", "cart", "fav") public long timestamp; // 行为发生的时间戳,单位秒 public long counts = 1; public static MyBehavior of(String userId, String itemId, String categoryId, String type, long timestamp) { MyBehavior behavior = new MyBehavior(); behavior.userId = userId; behavior.itemId = itemId; behavior.categoryId = categoryId; behavior.type = type; behavior.timestamp = timestamp; return behavior; } public static MyBehavior of(String userId, String itemId, String categoryId, String type, long timestamp, long counts) { MyBehavior behavior = new MyBehavior(); behavior.userId = userId; behavior.itemId = itemId; behavior.categoryId = categoryId; behavior.type = type; behavior.timestamp = timestamp; behavior.counts = counts; return behavior; } @Override public String toString() { return "MyBehavior{" + "userId='" + userId + '\'' + ", itemId='" + itemId + '\'' + ", categoryId='" + categoryId + '\'' + ", type='" + type + '\'' + ", timestamp=" + timestamp + "," + new Timestamp(timestamp) + "counts=" + counts + '}'; } public String getUserId() { return userId; } public String getItemId() { return itemId; } public String getCategoryId() { return categoryId; } public String getType() { return type; } public long getTimestamp() { return timestamp; } public long getCounts() { return counts; } }
2、ItemViewCount
import java.sql.Timestamp; public class ItemViewCount { public String itemId; // 商品ID public String type; // 事件类型 public long windowStart; // 窗口开始时间戳 public long windowEnd; // 窗口结束时间戳 public long viewCount; // 商品的点击量 public static ItemViewCount of(String itemId, String type, long windowStart, long windowEnd, long viewCount) { ItemViewCount result = new ItemViewCount(); result.itemId = itemId; result.type = type; result.windowStart = windowStart; result.windowEnd = windowEnd; result.viewCount = viewCount; return result; } @Override public String toString() { return "{" + "itemId='" + itemId + '\'' + "type='" + type + '\'' + ", windowStart=" + windowStart + " , " + new Timestamp(windowStart) + ", windowEnd=" + windowEnd + " , " + new Timestamp(windowEnd) + ", viewCount=" + viewCount + '}'; } }
五、最终结果
- 1分钟窗口一滑动一统计
- 11:11:12:01统计一次之前的,11:13:01统计一次之前的
{itemId='p1001'type='pv', windowStart=1583636520000 , 2020-03-08 11:02:00.0, windowEnd=1583637120000 , 2020-03-08 11:12:00.0, viewCount=3} {itemId='p1001'type='cart', windowStart=1583636520000 , 2020-03-08 11:02:00.0, windowEnd=1583637120000 , 2020-03-08 11:12:00.0, viewCount=1} {itemId='p2222'type='pv', windowStart=1583636520000 , 2020-03-08 11:02:00.0, windowEnd=1583637120000 , 2020-03-08 11:12:00.0, viewCount=2} {itemId='p1001'type='cart', windowStart=1583636580000 , 2020-03-08 11:03:00.0, windowEnd=1583637180000 , 2020-03-08 11:13:00.0, viewCount=2} {itemId='p1001'type='pv', windowStart=1583636580000 , 2020-03-08 11:03:00.0, windowEnd=1583637180000 , 2020-03-08 11:13:00.0, viewCount=6} {itemId='p2222'type='pv', windowStart=1583636580000 , 2020-03-08 11:03:00.0, windowEnd=1583637180000 , 2020-03-08 11:13:00.0, viewCount=5}
六、代码实现(二) 更高级
优化点:在窗口内增量聚合 (来一个加一个,内存中只保存一个数字而已)
/** 使用这种aggregate聚合方法: * * public <ACC, V, R> SingleOutputStreamOperator<R> aggregate( * AggregateFunction<T, ACC, V> aggFunction, * WindowFunction<V, R, K, W> windowFunction) {} */ SingleOutputStreamOperator<ItemViewCount> windowAggregate = window.aggregate(new MyWindowAggFunction(), new MyWindowFunction());
1、单独类 MyWindowAggFunction
- 拿到聚合字段(MyBehavior中counts)
三个泛型:
- 第一个输入的类型
- 第二个计数/累加器的类型
- 第三个输出的数据类型
// public static class MyWindowAggFunction implements AggregateFunction<MyBehavior, Long, Long> { //初始化一个计数器 @Override public Long createAccumulator() { return 0L; } //每输入一条数据就调用一次add方法 @Override public Long add(MyBehavior input, Long accumulator) { return accumulator + input.counts; } @Override public Long getResult(Long accumulator) { return accumulator; } //只针对SessionWindow有效,对应滚动窗口、滑动窗口不会调用此方法 @Override public Long merge(Long a, Long b) { return null; } }
2、单独类 MyWindowFunction
-
拿到窗口的开始时间和结束时间,拿出分组字段
传入4个泛型:
第一个:输入的数据类型(Long类型的次数),也就是 MyWindowAggFunction中聚合后的结果值 - 第二个:输出的数据类型(ItemViewCount)
- 第三个:分组的key(分组的字段)
- 第四个:窗口对象(起始时间、结束时间)
public static class MyWindowFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> { @Override public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception { String itemId = tuple.getField(0); String type = tuple.getField(1); long windowStart = window.getStart(); long windowEnd = window.getEnd(); //窗口集合的结果 Long aLong = input.iterator().next(); //输出数据 out.collect(ItemViewCount.of(itemId, type, windowStart, windowEnd, aLong)); }
五、对聚合好的窗口内数据排序
- 按照窗口的start、end进行分组,将窗口相同的数据进行排序
- 必须是在同一时间段的窗口
1、分组
KeyedStream<ItemViewCount, Tuple> soredKeyed = windowAggregate.keyBy("type", "windowStart", "windowEnd");
2、排序
SingleOutputStreamOperator<List<ItemViewCount>> sored = soredKeyed.process(new KeyedProcessFunction<Tuple, ItemViewCount, List<ItemViewCount>>() { private transient ValueState<List<ItemViewCount>> valueState; // 要把这个时间段的所有的ItemViewCount作为中间结果聚合在一块,引入ValueState @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor<List<ItemViewCount>> VSDescriptor = new ValueStateDescriptor<>("list-state", TypeInformation.of(new TypeHint<List<ItemViewCount>>() { }) ); valueState = getRuntimeContext().getState(VSDescriptor); } //更新valueState 并注册定时器 @Override public void processElement(ItemViewCount input, Context ctx, Collector<List<ItemViewCount>> out) throws Exception { List<ItemViewCount> buffer = valueState.value(); if (buffer == null) { buffer = new ArrayList<>(); } buffer.add(input); valueState.update(buffer); //注册定时器,当为窗口最后的时间时,通过加1触发定时器 ctx.timerService().registerEventTimeTimer(input.windowEnd + 1); } // 做排序操作 @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<ItemViewCount>> out) throws Exception { //将ValueState中的数据取出来 List<ItemViewCount> buffer = valueState.value(); buffer.sort(new Comparator<ItemViewCount>() { @Override public int compare(ItemViewCount o1, ItemViewCount o2) { //按照倒序,转成int类型 return -(int) (o1.viewCount - o2.viewCount); } }); valueState.update(null); out.collect(buffer); } }); sored.print(); env.execute("HotGoodsTopNAdv"); } }
- 点赞
- 收藏
- 分享
- 文章举报
相关文章推荐
- Flink根据配置实时计算热门商品TopN
- [Scala] Flink项目实时热门商品统计(一)
- Flink 零基础实战教程:如何计算实时热门商品
- Flink 零基础实战教程:如何计算实时热门商品
- Flink 零基础实战教程:如何计算实时热门商品
- 40_ElasticSearch 搜索+聚合:统计指定品牌下每个颜色的销量
- 统计指定时间段内的周未(非周未)天数
- Java 基于Spring、MyBatis使用HashMap嵌套列表统计不同国家、指定类型船舶指定时间段在某区域进出量计算方法
- javascript实现点击商品列表checkbox实时统计金额的方法
- Spark日志分析项目Demo(7)--临时表查询,各区域top3热门商品统计
- 41.搜索+聚合:统计指定品牌下每个颜色的销量
- (SQL语句)按指定时间段分组统计
- 热门文章排行榜topn推荐实例
- 用sql语句按指定时间段分组统计
- [Scala] Flink项目实时流量统计(二)
- 统计指定时间段的访问真正WEB页面(去除静态请求)的IP的TOP100排行
- sparkStreaming统计各平台最近一分钟实时注册收入 时间段,平台,金额,订单数
- ecshop 首页调用全部分类树 并且分类下的品牌 &调用商品分类指定分类下级分类
- javascript实现点击商品列表checkbox实时统计金额的方法
- Git统计指定时间段内代码增删汇总行数