您的位置:首页 > 数据库

Flink SQL-Client 的使用

2020-06-03 06:18 615 查看

flink sql client 介绍

The SQL Client aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of Java or Scala code. The SQL Client CLI allows for retrieving and visualizing real-time results from the running distributed application on the command line.

flink sql-client
是一种实用的工具,方便
flink
开发人员编写,调试,提交实时table代码, 不用编写
Java
Scala
代码。同时在
sql-client
上能够可视化的看到实时统计的 retract 和 append 结果。

部署环境 (单机)

  • java 1.8
  • zookeeper 3.4.13
  • kafka 0.11
  • flink 1.6
  1. 启动
    zookeeper

 

[code]yizhou@pro:~$ zkServer start
ZooKeeper JMX enabled by default
Using config: /usr/local/etc/zookeeper/zoo.cfg
Starting zookeeper ... STARTED

yizhou@pro:~$ zkServer status
ZooKeeper JMX enabled by default
Using config: /usr/local/etc/zookeeper/zoo.cfg
Mode: standalone
  1. 启动
    kafka

 

[code]yizhou@pro:${KAFKA_HOME}$ nohup bin/kafka-server-start.sh config/server.properties &
[1] 70358

# 查看 topic
yizhou@pro:${KAFKA_HOME}$ bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
idea
order_sql

# 往 order_sql 这个 topic发送 json 消息
yizhou@pro:${KAFKA_HOME}$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic order_sql
>{"order_id": "1","shop_id": "AF18","member_id": "3410211","trade_amt": "100.00","pay_time": "1556420980000"}
>{"order_id": "2","shop_id": "AF20","member_id": "3410213","trade_amt": "130.00","pay_time": "1556421040000"}
>{"order_id": "3","shop_id": "AF18","member_id": "3410212","trade_amt": "120.00","pay_time": "1556421100000"}
>{"order_id": "4","shop_id": "AF19","member_id": "3410212","trade_amt": "100.00","pay_time": "1556421120000"}
>{"order_id": "5","shop_id": "AF18","member_id": "3410211","trade_amt": "150.00","pay_time": "1556421480000"}
>{"order_id": "6","shop_id": "AF18","member_id": "3410211","trade_amt": "110.00","pay_time": "1556421510000"}
>{"order_id": "7","shop_id": "AF19","member_id": "3410213","trade_amt": "110.00","pay_time": "1556421570000"}
>{"order_id": "8","shop_id": "AF20","member_id": "3410211","trade_amt": "100.00","pay_time": "1556421630000"}
>{"order_id": "9","shop_id": "AF17","member_id": "3410212","trade_amt": "110.00","pay_time": "1556421655000"}
>
  1. 启动
    flink

 

[code]yizhou@pro:${FLINK_HOME}/bin$ ./start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host pro.local.
Starting taskexecutor daemon on host pro.local.

注册
kafka
消息为
flink sql
的动态表

kafka
消息映射为
flink sql
的动态表,是非常重要的操作。整个操作通过配置
${FLINK_HOME}/conf
目录下的
yaml
文件实现。以下例子是将
kafka
topic
: order_sql, 映射为
table

 

[code]#==============================================================================
# Table Sources
#==============================================================================

# Define table sources and sinks here.

tables: # empty list
# A typical table source definition looks like:
- name: orders
type: source
update-mode: append
connector:
property-version: 1
type: kafka
version: 0.11
topic: order_sql
startup-mode: earliest-offset
properties:
- key: zookeeper.connect
value: localhost:2181
- key: bootstrap.servers
value: localhost:9092
- key: group.id
value: test-consumer-group
format:
property-version: 1
type: json
schema: "ROW(order_id LONG, shop_id VARCHAR, member_id LONG, trade_amt DOUBLE, pay_time TIMESTAMP)"
schema:
- name: order_id
type: LONG
- name: shop_id
type: VARCHAR
- name: member_id
type: LONG
- name: trade_amt
type: DOUBLE
- name: payment_time
type: TIMESTAMP
rowtime:
timestamps:
type: "from-field"
from: "pay_time"
watermarks:
type: "periodic-bounded"
delay: "60000"

#==============================================================================
# User-defined functions
#==============================================================================

# Define scalar, aggregate, or table functions here.

functions: [] # empty list
# A typical function definition looks like:
# - name: ...
#   from: class
#   class: ...
#   constructor: ...

#==============================================================================
# Execution properties
#==============================================================================

# Execution properties allow for changing the behavior of a table program.

execution:
# 'batch' or 'streaming' execution
type: streaming
# allow 'event-time' or only 'processing-time' in sources
time-characteristic: event-time
# interval in ms for emitting periodic watermarks
periodic-watermarks-interval: 200
# 'changelog' or 'table' presentation of results
result-mode: table
# maximum number of maintained rows in 'table' presentation of results
max-table-result-rows: 1000000
# parallelism of the program
parallelism: 1
# maximum parallelism
max-parallelism: 128
# minimum idle state retention in ms
min-idle-state-retention: 3600000
# maximum idle state retention in ms
max-idle-state-retention: 7200000

#==============================================================================
# Deployment properties
#==============================================================================

# Deployment properties allow for describing the cluster to which table
# programs are submitted to.

deployment:
# general cluster communication timeout in ms
response-timeout: 5000
# (optional) address from cluster to gateway
gateway-address: ""
# (optional) port from cluster to gateway
gateway-port: 0

从配置文件中可以看出,

flink
中注册的表名为
orders
,作为source的数据源表,append的方式不断添加。以 0.11 版本的
kafka
的作为
connector
,
topic
信息、消息偏移量、
zookeeper
地址、
broker
地址、消费组信息都写入配置。同时
kafka
消息的
json
消息如何映射到
flink sql table
shecma
,选择
pay_time
作为
event-time
watermark
设置为60s。

启动 flink sql client

命令行启动

bin/sql-client.sh embedded -d conf/sql.my.yaml -l sql-libs/
。 其中
sql.my.yaml
是上述的
yaml
配置文件名称。
${FLINK_HOME}/sql-libs
目录下需要提前下载 flink-connector-kafka-0.11flink-json-1.6.1-sql-jar.jar 两个jar包

 

[code]yizhou@pro:${FLINK_HOME}$ bin/sql-client.sh embedded -d conf/sql.my.yaml -l sql-libs/
Reading default environment from: file:/usr/local/Cellar/apache-flink/1.6.2/libexec/conf/sql.my.yaml
No session environment specified.

▒▓██▓██▒
▓████▒▒█▓▒▓███▓▒
▓███▓░░        ▒▒▒▓██▒  ▒
░██▒   ▒▒▓▓█▓▓▒░      ▒████
██▒         ░▒▓███▒    ▒█▒█▒
░▓█            ███   ▓░▒██
▓█       ▒▒▒▒▒▓██▓░▒░▓▓█
█░ █   ▒▒░       ███▓▓█ ▒█▒▒▒
████░   ▒▓█▓      ██▒▒▒ ▓███▒
░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░
▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒
███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒
░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒
███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░
██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓
▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒
▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒
▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓█
██▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █
▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓
█▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓
██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓
▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒
██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒
▓█   ▒█▓   ░     █░                ▒█              █▓
█▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░
█▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒█
██   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓
▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██
░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓
░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒
▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░

______ _ _       _       _____  ____  _         _____ _ _            _  BETA
|  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |
| |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_
|  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
| |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_
|_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|

Welcome! Enter HELP to list all available commands. QUIT to exit.

Flink SQL> show tables;
orders

Flink SQL> describe orders;
root
|-- order_id: Long
|-- shop_id: String
|-- member_id: Long
|-- trade_amt: Double
|-- payment_time: TimeIndicatorTypeInfo(rowtime)

Flink SQL>

运行 sql 语句

  • 首先执行最简单的
    select *

 

[code]Flink SQL> select * from orders;

select * from orders

  • 1分钟固定窗口计算

 

[code]SELECT
shop_id
, TUMBLE_START(payment_time, INTERVAL '1' MINUTE) AS tumble_start
, TUMBLE_END(payment_time, INTERVAL '1' MINUTE)   AS tumble_end
, sum(trade_amt)                             AS amt
FROM orders
GROUP BY shop_id, TUMBLE(payment_time, INTERVAL '1' MINUTE);

1分钟固定窗口计算

参考资料

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐