您的位置:首页 > 运维架构

Flink之实时统计指定时间段内热门商品的TopN(双11一小时内的热门品牌排行榜)

2020-04-04 07:13 971 查看

文章目录

  • 二、技术点
  • 三、代码实现(一) 比较捞
  • 2、提取EventTime,转换成Timestamp格式,生成WaterMark
  • 3、按照指定事件分组
  • 4、把分好组的数据,划分窗口:假设窗口总长10分钟, 步长1分钟滑动一次
  • 5、窗口内的数据进行聚合,拿出窗口Star时间和窗口End时间
  • 四、定义的单独类MyBehavior 和 ItemViewCount
  • 五、最终结果
  • 六、代码实现(二) 更高级
  • 五、对聚合好的窗口内数据排序
  • 一、需求说明

    统计一定时间段内的,热门商品/品牌TopN

    1、以案例驱动理解

    • 数据:
    {"userId": "u001", "itemId": "p1001", "categoryId": "c11", type: "pv", "timestamp": "2020-03-08 11:11:11"}
    {"userId": "u002", "itemId": "p1001", "categoryId": "c11", type: "pv", "timestamp": "2020-03-08 11:11:11"}
    {"userId": "u003", "itemId": "p1001", "categoryId": "c11", type: "pv", "timestamp": "2020-03-08 11:11:11"}
    {"userId": "u003", "itemId": "p1001", "categoryId": "c11", type: "cart", "timestamp": "2020-03-08 11:11:11"}
    {"userId": "u011", "itemId": "p2222", "categoryId": "c22", type: "pv", "timestamp": "2020-03-08 11:11:11"}
    {"userId": "u012", "itemId": "p2222", "categoryId": "c22", type: "pv", "timestamp": "2020-03-08 11:11:11"}
    {"userId": "u012", "itemId": "p2222", "categoryId": "c22", type: "pv", "timestamp": "2020-03-08 11:12:01"}
    {"userId": "u001", "itemId": "p1001", "categoryId": "c11", type: "pv", "timestamp": "2020-03-08 11:12:01"}
    {"userId": "u002", "itemId": "p1001", "categoryId": "c11", type: "pv", "timestamp": "2020-03-08 11:12:01"}
    {"userId": "u003", "itemId": "p1001", "categoryId": "c11", type: "pv", "timestamp": "2020-03-08 11:12:01"}
    {"userId": "u003", "itemId": "p1001", "categoryId": "c11", type: "cart", "timestamp": "2020-03-08 11:12:01"}
    {"userId": "u011", "itemId": "p2222", "categoryId": "c22", type: "pv", "timestamp": "2020-03-08 11:12:01"}
    {"userId": "u012", "itemId": "p2222", "categoryId": "c22", type: "pv", "timestamp": "2020-03-08 11:12:01"}
    
    {"userId": "u011", "itemId": "p2222", "categoryId": "c22", type: "pv", "timestamp": "2020-03-08 11:13:01"}

    二、技术点

    • Flink的EventTime
    • Flink的滑动窗口(滚动窗口也可以完成 ,但是生成的结果太突兀,没有平滑性)☆☆☆☆
    • Flink的定时器 ☆☆☆☆☆

    三、代码实现(一) 比较捞

    • 使用window.apply( )方法 → 见第5步
    • 窗口触发时,会执行一次apply,相当于对窗口中的全量数据进行计算(全部拿出在计算)
    • 窗口不触发,会把数据缓存在内存中,当窗口特别长时,那么这种apply不太好
    1、调用底层的Process(可做类似map的操作),将Json字符串解析成MyBehavior对象
    import com.alibaba.fastjson.JSON;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.datastream.WindowedStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.ProcessFunction;
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    /**
    * @date: 2020/3/14 14:04
    * @site: www.ianlou.cn
    * @author: lekko 六水
    * @qq: 496208110
    * @description:
    */
    public class HotGoodsTopN {
    public static void main(String[] args) throws Exception {
    
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    // 选择EventTime作为Flink的时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    // 设置checkPoint时间
    env.enableCheckpointing(60000);
    // 设置并行度
    env.setParallelism(1);
    
    DataStreamSource<String> lines = env.socketTextStream("linux01", 8888);
    
    SingleOutputStreamOperator<MyBehavior> process = lines.process(new ProcessFunction<String, MyBehavior>() {
    @Override
    public void processElement(String input, Context ctx, Collector<MyBehavior> out) throws Exception {
    
    try {
    // FastJson 会自动把时间解析成long类型的TimeStamp
    MyBehavior behavior = JSON.parseObject(input, MyBehavior.class);
    out.collect(behavior);
    } catch (Exception e) {
    e.printStackTrace();
    //TODO 记录出现异常的数据
    }
    }
    });

    2、提取EventTime,转换成Timestamp格式,生成WaterMark

    //      设定延迟时间
    SingleOutputStreamOperator<MyBehavior> behaviorDSWithWaterMark =
    process.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyBehavior>(Time.seconds(0)) {
    @Override
    public long extractTimestamp(MyBehavior element) {
    return element.timestamp;
    }
    });

    3、按照指定事件分组

    //  某个商品,在窗口时间内,被(点击、购买、添加购物车、收藏)了多少次
    KeyedStream<MyBehavior, Tuple> keyed = behaviorDSWithWaterMark.keyBy("itemId", "type");

    4、把分好组的数据,划分窗口:假设窗口总长10分钟, 步长1分钟滑动一次

    WindowedStream<MyBehavior, Tuple, TimeWindow> window =
    keyed.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)));

    5、窗口内的数据进行聚合,拿出窗口Star时间和窗口End时间

    //参数:输入的数据类, 输出的数据类,分组字段tuple, 窗口对象TimeWindow
    SingleOutputStreamOperator<ItemViewCount> result = window.apply(new WindowFunction<MyBehavior, ItemViewCount,
    Tuple, TimeWindow>() {
    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<MyBehavior> input,
    Collector<ItemViewCount> out) throws Exception {
    //拿出分组的字段
    String itemId = tuple.getField(0);
    String type = tuple.getField(1);
    
    //拿出窗口的起始和结束时间
    long start = window.getStart();
    long end = window.getEnd();
    
    // 编写累加的逻辑
    int count = 0;
    
    for (MyBehavior myBehavior : input) {
    count += 1;
    }
    
    //输出结果
    out.collect(ItemViewCount.of(itemId, type, start, end, count));
    }
    });
    
    result.print();
    env.execute("HotGoodsTopN");
    
    }
    }

    四、定义的单独类MyBehavior 和 ItemViewCount

    • MyBehavior → 解析Json字符串后生成的JavaBean
    • ItemViewCount → 最后结果输出的格式类

    1、MyBehavior

    import java.sql.Timestamp;
    
    public class MyBehavior {
    public String userId;           // 用户ID
    public String itemId;           // 商品ID
    public String categoryId;       // 商品类目ID
    public String type;             // 用户行为, 包括("pv", "buy", "cart", "fav")
    public long timestamp;          // 行为发生的时间戳,单位秒
    public long counts = 1;
    
    public static MyBehavior of(String userId, String itemId, String categoryId, String type, long timestamp) {
    MyBehavior behavior = new MyBehavior();
    behavior.userId = userId;
    behavior.itemId = itemId;
    behavior.categoryId = categoryId;
    behavior.type = type;
    behavior.timestamp = timestamp;
    return behavior;
    }
    
    public static MyBehavior of(String userId, String itemId, String categoryId, String type, long timestamp,
    long counts) {
    MyBehavior behavior = new MyBehavior();
    behavior.userId = userId;
    behavior.itemId = itemId;
    behavior.categoryId = categoryId;
    behavior.type = type;
    behavior.timestamp = timestamp;
    behavior.counts = counts;
    return behavior;
    }
    
    @Override
    public String toString() {
    return "MyBehavior{" + "userId='" + userId + '\'' + ", itemId='" + itemId + '\''
    + ", categoryId='" + categoryId + '\'' + ", type='" + type + '\''
    + ", timestamp=" + timestamp + "," + new Timestamp(timestamp)
    + "counts=" + counts + '}';
    }
    
    public String getUserId() {
    return userId;
    }
    public String getItemId() {
    return itemId;
    }
    public String getCategoryId() {
    return categoryId;
    }
    public String getType() {
    return type;
    }
    public long getTimestamp() {
    return timestamp;
    }
    public long getCounts() {
    return counts;
    }
    }

    2、ItemViewCount

    import java.sql.Timestamp;
    
    public class ItemViewCount {
    public String itemId;     // 商品ID
    public String type;     // 事件类型
    public long windowStart;  // 窗口开始时间戳
    public long windowEnd;  // 窗口结束时间戳
    public long viewCount;  // 商品的点击量
    
    public static ItemViewCount of(String itemId, String type, long windowStart, long windowEnd, long viewCount) {
    ItemViewCount result = new ItemViewCount();
    result.itemId = itemId;
    result.type = type;
    result.windowStart = windowStart;
    result.windowEnd = windowEnd;
    result.viewCount = viewCount;
    return result;
    }
    
    @Override
    public String toString() {
    return "{" +
    "itemId='" + itemId + '\'' +
    "type='" + type + '\'' +
    ", windowStart=" + windowStart + " , " + new Timestamp(windowStart) +
    ", windowEnd=" + windowEnd + " , " + new Timestamp(windowEnd) +
    ", viewCount=" + viewCount +
    '}';
    }
    }

    五、最终结果

    • 1分钟窗口一滑动一统计
    • 11:11:12:01统计一次之前的,11:13:01统计一次之前的
    {itemId='p1001'type='pv', windowStart=1583636520000 , 2020-03-08 11:02:00.0, windowEnd=1583637120000 , 2020-03-08 11:12:00.0, viewCount=3}
    {itemId='p1001'type='cart', windowStart=1583636520000 , 2020-03-08 11:02:00.0, windowEnd=1583637120000 , 2020-03-08 11:12:00.0, viewCount=1}
    {itemId='p2222'type='pv', windowStart=1583636520000 , 2020-03-08 11:02:00.0, windowEnd=1583637120000 , 2020-03-08 11:12:00.0, viewCount=2}
    
    {itemId='p1001'type='cart', windowStart=1583636580000 , 2020-03-08 11:03:00.0, windowEnd=1583637180000 , 2020-03-08 11:13:00.0, viewCount=2}
    {itemId='p1001'type='pv', windowStart=1583636580000 , 2020-03-08 11:03:00.0, windowEnd=1583637180000 , 2020-03-08 11:13:00.0, viewCount=6}
    {itemId='p2222'type='pv', windowStart=1583636580000 , 2020-03-08 11:03:00.0, windowEnd=1583637180000 , 2020-03-08 11:13:00.0, viewCount=5}

    六、代码实现(二) 更高级

    优化点:在窗口内增量聚合 (来一个加一个,内存中只保存一个数字而已)

    /**  使用这种aggregate聚合方法:
    *
    *  	public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
    * 			AggregateFunction<T, ACC, V> aggFunction,
    * 			WindowFunction<V, R, K, W> windowFunction) {}
    */
    SingleOutputStreamOperator<ItemViewCount> windowAggregate = window.aggregate(new MyWindowAggFunction(),
    new MyWindowFunction());

    1、单独类 MyWindowAggFunction

    • 拿到聚合字段(MyBehavior中counts)

    三个泛型:

    • 第一个输入的类型
    • 第二个计数/累加器的类型
    • 第三个输出的数据类型
    //
    public static class MyWindowAggFunction implements AggregateFunction<MyBehavior, Long, Long> {
    
    //初始化一个计数器
    @Override
    public Long createAccumulator() {
    return 0L;
    }
    
    //每输入一条数据就调用一次add方法
    @Override
    public Long add(MyBehavior input, Long accumulator) {
    return accumulator + input.counts;
    }
    
    @Override
    public Long getResult(Long accumulator) {
    return accumulator;
    }
    
    //只针对SessionWindow有效,对应滚动窗口、滑动窗口不会调用此方法
    @Override
    public Long merge(Long a, Long b) {
    return null;
    }
    }

    2、单独类 MyWindowFunction

    • 拿到窗口的开始时间和结束时间,拿出分组字段

      传入4个泛型:

      第一个:输入的数据类型(Long类型的次数),也就是 MyWindowAggFunction中聚合后的结果值
    • 第二个:输出的数据类型(ItemViewCount)
    • 第三个:分组的key(分组的字段)
    • 第四个:窗口对象(起始时间、结束时间)
    public static class MyWindowFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
    
    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception {
    String itemId = tuple.getField(0);
    String type = tuple.getField(1);
    
    long windowStart = window.getStart();
    long windowEnd = window.getEnd();
    
    //窗口集合的结果
    Long aLong = input.iterator().next();
    
    //输出数据
    out.collect(ItemViewCount.of(itemId, type, windowStart, windowEnd, aLong));
    }

    五、对聚合好的窗口内数据排序

    • 按照窗口的start、end进行分组,将窗口相同的数据进行排序
    • 必须是在同一时间段的窗口

    1、分组

    KeyedStream<ItemViewCount, Tuple> soredKeyed = windowAggregate.keyBy("type", "windowStart",
    "windowEnd");

    2、排序

    SingleOutputStreamOperator<List<ItemViewCount>> sored = soredKeyed.process(new KeyedProcessFunction<Tuple, ItemViewCount, List<ItemViewCount>>() {
    private transient ValueState<List<ItemViewCount>> valueState;
    
    // 要把这个时间段的所有的ItemViewCount作为中间结果聚合在一块,引入ValueState
    @Override
    public void open(Configuration parameters) throws Exception {
    ValueStateDescriptor<List<ItemViewCount>> VSDescriptor =
    new ValueStateDescriptor<>("list-state",
    TypeInformation.of(new TypeHint<List<ItemViewCount>>() {
    })
    );
    
    valueState = getRuntimeContext().getState(VSDescriptor);
    
    }
    
    //更新valueState 并注册定时器
    @Override
    public void processElement(ItemViewCount input, Context ctx, Collector<List<ItemViewCount>> out) throws Exception {
    List<ItemViewCount> buffer = valueState.value();
    if (buffer == null) {
    buffer = new ArrayList<>();
    }
    buffer.add(input);
    valueState.update(buffer);
    //注册定时器,当为窗口最后的时间时,通过加1触发定时器
    ctx.timerService().registerEventTimeTimer(input.windowEnd + 1);
    
    }
    
    // 做排序操作
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<ItemViewCount>> out) throws Exception {
    
    //将ValueState中的数据取出来
    List<ItemViewCount> buffer = valueState.value();
    buffer.sort(new Comparator<ItemViewCount>() {
    @Override
    public int compare(ItemViewCount o1, ItemViewCount o2) {
    //按照倒序,转成int类型
    return -(int) (o1.viewCount - o2.viewCount);
    }
    });
    valueState.update(null);
    out.collect(buffer);
    }
    });
    sored.print();
    env.execute("HotGoodsTopNAdv");
    }
    }
    • 点赞
    • 收藏
    • 分享
    • 文章举报
    IT_但丁 发布了29 篇原创文章 · 获赞 8 · 访问量 2431 私信 关注
    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: