您的位置:首页 > 运维架构

Flink 实践教程-进阶(4):TOP-N

2021-12-22 02:08 806 查看

​作者:腾讯云流计算 Oceanus 团队

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。本文将会介绍如何使用 Flink 实现常见的 TopN 统计需求。首先使用 Python 脚本模拟生成商品购买数据(每秒钟发送一条)并发送到 CKafka,随后在 Oceanus 平台创建 Flink SQL 作业实时读取 CKafka 中的商品数据,经过滚动窗口(基于事件时间)统计每分钟内商品购买种类的前三名(Top3),最后将结果存储于 PostgreSQL。

 

操作视频

前置准备

创建流计算 Oceanus 集群

进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。

创建消息队列 CKafka

进入 CKafka 控制台 [3],点击左上角【新建】,创建 CKafka 实例,具体可参考 CKafka 创建实例 [4]。随后点击进入实例,单击【topic 管理】>【新建】,即可完成 Topic 的创建,具体可参考 CKafka 创建 Topic [5]。

数据准备

本示例使用 Python 脚本向 Topic 发送模拟数据,前提条件需要网络互通。这里我们选择的是与 CKafka 同 VPC 的 CVM 进入,并且安装 Python 环境。如若网络不通,可在 CKafka 实例里面【基本信息】>【接入方式】>【添加路由策略】>【路由类型】里面选择 VPC 网络公网域名接入 的方式打通网络,具体可参考 CKafka 官网 入门流程指引 [6]。

#!/usr/bin/python3
# 首次使用该脚本,需 "pip3 install kafka" 安装kafka模块
import json
import random
import time
from kafka import KafkaProducer
broker_lists = ['10.0.0.29:9092']
kafka_topic_oceanus = 'oceanus_advanced4_input'
producer = KafkaProducer(bootstrap_servers=broker_lists,
value_serializer=lambda m: json.dumps(m).encode('ascii'))
def send_data(topic):
user_id = random.randint(1,50)
item_id = random.randint(1,1000)
category_id = random.randint(1,20)
user_behaviors = ['pv','buy','cart','fav']
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
msg = {
'user_id':user_id,
'item_id':item_id,
'category_id':category_id,
'user_behavior':user_behaviors[random.randint(0,len(user_behaviors)-1)],
'time_stamp':current_time
}
producer.send(topic, msg)
print(msg)
producer.flush()
if __name__ == '__main__':
count = 1
while True:
# 每秒发送一条数据
time.sleep(1)
send_data(kafka_topic_oceanus)

更多接入方式请参考 CKafka 收发消息 [7]

创建 PostgreSQL 实例

进入 PostgreSQL 控制台 [8],点击左上角【新建】创建实例,具体参考 创建 PostgreSQL 实例 [9]。进入实例数据库,创建 

oceanus_advanced4_output
 表,用于接收数据。

-- 建表语句
create table public.oceanus_advanced4_output (
win_start     TIMESTAMP,
category_id   INT,
buy_count     INT,
PRIMARY KEY(win_start,category_id)  );

笔者这里使用 DBeaver 进行外网连接,更多连接方式参考官网文档 连接 PostgreSQL 实例 [10]

 

流计算 Oceanus 作业

1. 创建 Source

CREATE TABLE `kafka_json_source_table` (
user_id        INT,
item_id        INT,
category_id    INT,
user_behavior  VARCHAR,
time_stamp     TIMESTAMP(3),
WATERMARK FOR time_stamp AS time_stamp - INTERVAL '3' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'oceanus_advanced4_input',    -- 替换为您要消费的 Topic
'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种
'properties.bootstrap.servers' = '10.0.0.29:9092',  -- 替换为您的 Kafka 连接地址
'properties.group.id' = 'testGroup',     -- 必选参数, 一定要指定 Group ID
'format' = 'json',
'json.fail-on-missing-field' = 'false',  -- 如果设置为 false, 则遇到缺失字段不会报错。
'json.ignore-parse-errors' = 'true'      -- 如果设置为 true,则忽略任何解析报错。
);

2. 创建 Sink

CREATE TABLE `jdbc_upsert_sink_table` (
win_start     TIMESTAMP(3),
category_id   INT,
buy_count     INT,
PRIMARY KEY (win_start,category_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://10.0.0.236:5432/postgres?currentSchema=public&reWriteBatchedInserts=true',              -- 请替换为您的实际 MySQL 连接参数
'table-name' = 'oceanus_advanced4_output', -- 需要写入的数据表
'username' = 'root',                      -- 数据库访问的用户名(需要提供 INSERT 权限)
'password' = 'Tencent123$',               -- 数据库访问的密码
'sink.buffer-flush.max-rows' = '200',     -- 批量输出的条数
'sink.buffer-flush.interval' = '2s'       -- 批量输出的间隔
);

3. 编写业务 SQL

-- 创建临时视图,用于将原始数据过滤、窗口聚合
CREATE VIEW `kafka_json_source_view` AS
SELECT
TUMBLE_START(time_stamp,INTERVAL '1' MINUTE) AS win_start,
category_id,
COUNT(1) AS buy_count
FROM `kafka_json_source_table`
WHERE user_behavior = 'buy'
GROUP BY TUMBLE(time_stamp,INTERVAL '1' MINUTE),category_id;
-- 统计每分钟 Top3 购买种类
INSERT INTO `jdbc_upsert_sink_table`
SELECT
b.win_start,
b.category_id,
CAST(b.buy_count AS INT) AS buy_count
FROM (SELECT *
,ROW_NUMBER() OVER (PARTITION BY win_start ORDER BY buy_count DESC) AS rn
FROM `kafka_json_source_view`
) b
WHERE b.rn <= 3;

 

总结

本文使用 TUMBLE WINDOW 配合 ROW_NUMBER 函数,统计分析了每分钟内购买量前三的商品种类,用户可根据实际需求选择相应的窗口函数统计对应的 TopN。更多窗口函数的使用参考 时间窗口函数 [11]。

作者在落表时将
rn
字段和
win_end
字段裁剪后写入(即无排名优化写入),在使用无
rn
的场景下,需对结果表主键的特别小心,如果定义有误会直接导致 TopN 的结果不准确。

 

参考链接

[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview  

[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298  

[3] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1  

[4] CKafka 创建实例:https://cloud.tencent.com/document/product/597/54839  

[5] Ckafka 创建 Topic:https://cloud.tencent.com/document/product/597/54854  

[6] CKafka 入门流程指引:https://cloud.tencent.com/document/product/597/54837  

[7] CKafka 收发消息:https://cloud.tencent.com/document/product/597/54834  

[8] PostgreSQL 控制台:https://console.cloud.tencent.com/postgres/index  

[9] 创建 PostgreSQL 实例:https://cloud.tencent.com/document/product/409/56961  

[10] 连接 PostgreSQL 实例:https://cloud.tencent.com/document/product/409/40429  

[11] 时间窗口函数:https://cloud.tencent.com/document/product/849/18077 

流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓

 

关注“腾讯云大数据”公众号,技术交流、最新活动、服务专享一站Get~




内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: