Flink SQL-Client 的使用
2020-06-03 06:18
615 查看
flink sql client 介绍
The SQL Client aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of Java or Scala code. The SQL Client CLI allows for retrieving and visualizing real-time results from the running distributed application on the command line.
flink sql-client是一种实用的工具,方便
flink开发人员编写,调试,提交实时table代码, 不用编写
Java或
Scala代码。同时在
sql-client上能够可视化的看到实时统计的 retract 和 append 结果。
部署环境 (单机)
- java 1.8
- zookeeper 3.4.13
- kafka 0.11
- flink 1.6
- 启动
zookeeper
[code]yizhou@pro:~$ zkServer start ZooKeeper JMX enabled by default Using config: /usr/local/etc/zookeeper/zoo.cfg Starting zookeeper ... STARTED yizhou@pro:~$ zkServer status ZooKeeper JMX enabled by default Using config: /usr/local/etc/zookeeper/zoo.cfg Mode: standalone
- 启动
kafka
[code]yizhou@pro:${KAFKA_HOME}$ nohup bin/kafka-server-start.sh config/server.properties & [1] 70358 # 查看 topic yizhou@pro:${KAFKA_HOME}$ bin/kafka-topics.sh --list --zookeeper localhost:2181 __consumer_offsets idea order_sql # 往 order_sql 这个 topic发送 json 消息 yizhou@pro:${KAFKA_HOME}$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic order_sql >{"order_id": "1","shop_id": "AF18","member_id": "3410211","trade_amt": "100.00","pay_time": "1556420980000"} >{"order_id": "2","shop_id": "AF20","member_id": "3410213","trade_amt": "130.00","pay_time": "1556421040000"} >{"order_id": "3","shop_id": "AF18","member_id": "3410212","trade_amt": "120.00","pay_time": "1556421100000"} >{"order_id": "4","shop_id": "AF19","member_id": "3410212","trade_amt": "100.00","pay_time": "1556421120000"} >{"order_id": "5","shop_id": "AF18","member_id": "3410211","trade_amt": "150.00","pay_time": "1556421480000"} >{"order_id": "6","shop_id": "AF18","member_id": "3410211","trade_amt": "110.00","pay_time": "1556421510000"} >{"order_id": "7","shop_id": "AF19","member_id": "3410213","trade_amt": "110.00","pay_time": "1556421570000"} >{"order_id": "8","shop_id": "AF20","member_id": "3410211","trade_amt": "100.00","pay_time": "1556421630000"} >{"order_id": "9","shop_id": "AF17","member_id": "3410212","trade_amt": "110.00","pay_time": "1556421655000"} >
- 启动
flink
[code]yizhou@pro:${FLINK_HOME}/bin$ ./start-cluster.sh Starting cluster. Starting standalonesession daemon on host pro.local. Starting taskexecutor daemon on host pro.local.
注册 kafka
消息为 flink sql
的动态表
将
kafka消息映射为
flink sql的动态表,是非常重要的操作。整个操作通过配置
${FLINK_HOME}/conf目录下的
yaml文件实现。以下例子是将
kafka的
topic: order_sql, 映射为
table。
[code]#============================================================================== # Table Sources #============================================================================== # Define table sources and sinks here. tables: # empty list # A typical table source definition looks like: - name: orders type: source update-mode: append connector: property-version: 1 type: kafka version: 0.11 topic: order_sql startup-mode: earliest-offset properties: - key: zookeeper.connect value: localhost:2181 - key: bootstrap.servers value: localhost:9092 - key: group.id value: test-consumer-group format: property-version: 1 type: json schema: "ROW(order_id LONG, shop_id VARCHAR, member_id LONG, trade_amt DOUBLE, pay_time TIMESTAMP)" schema: - name: order_id type: LONG - name: shop_id type: VARCHAR - name: member_id type: LONG - name: trade_amt type: DOUBLE - name: payment_time type: TIMESTAMP rowtime: timestamps: type: "from-field" from: "pay_time" watermarks: type: "periodic-bounded" delay: "60000" #============================================================================== # User-defined functions #============================================================================== # Define scalar, aggregate, or table functions here. functions: [] # empty list # A typical function definition looks like: # - name: ... # from: class # class: ... # constructor: ... #============================================================================== # Execution properties #============================================================================== # Execution properties allow for changing the behavior of a table program. execution: # 'batch' or 'streaming' execution type: streaming # allow 'event-time' or only 'processing-time' in sources time-characteristic: event-time # interval in ms for emitting periodic watermarks periodic-watermarks-interval: 200 # 'changelog' or 'table' presentation of results result-mode: table # maximum number of maintained rows in 'table' presentation of results max-table-result-rows: 1000000 # parallelism of the program parallelism: 1 # maximum parallelism max-parallelism: 128 # minimum idle state retention in ms min-idle-state-retention: 3600000 # maximum idle state retention in ms max-idle-state-retention: 7200000 #============================================================================== # Deployment properties #============================================================================== # Deployment properties allow for describing the cluster to which table # programs are submitted to. deployment: # general cluster communication timeout in ms response-timeout: 5000 # (optional) address from cluster to gateway gateway-address: "" # (optional) port from cluster to gateway gateway-port: 0
从配置文件中可以看出,
flink中注册的表名为
orders,作为source的数据源表,append的方式不断添加。以 0.11 版本的
kafka的作为
connector,
topic信息、消息偏移量、
zookeeper地址、
broker地址、消费组信息都写入配置。同时
kafka消息的
json消息如何映射到
flink sql table的
shecma,选择
pay_time作为
event-time,
watermark设置为60s。
启动 flink sql client
命令行启动
bin/sql-client.sh embedded -d conf/sql.my.yaml -l sql-libs/。 其中
sql.my.yaml是上述的
yaml配置文件名称。
${FLINK_HOME}/sql-libs目录下需要提前下载 flink-connector-kafka-0.11 和 flink-json-1.6.1-sql-jar.jar 两个jar包
[code]yizhou@pro:${FLINK_HOME}$ bin/sql-client.sh embedded -d conf/sql.my.yaml -l sql-libs/ Reading default environment from: file:/usr/local/Cellar/apache-flink/1.6.2/libexec/conf/sql.my.yaml No session environment specified. ▒▓██▓██▒ ▓████▒▒█▓▒▓███▓▒ ▓███▓░░ ▒▒▒▓██▒ ▒ ░██▒ ▒▒▓▓█▓▓▒░ ▒████ ██▒ ░▒▓███▒ ▒█▒█▒ ░▓█ ███ ▓░▒██ ▓█ ▒▒▒▒▒▓██▓░▒░▓▓█ █░ █ ▒▒░ ███▓▓█ ▒█▒▒▒ ████░ ▒▓█▓ ██▒▒▒ ▓███▒ ░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░ ▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒ ███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒ ░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒ ███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░ ██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓ ▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒ ▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒ ▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█ ██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █ ▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓ █▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓ ██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓ ▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒ ██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒ ▓█ ▒█▓ ░ █░ ▒█ █▓ █▓ ██ █░ ▓▓ ▒█▓▓▓▒█░ █▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█ ██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓ ▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██ ░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓ ░▓██▒ ▓░ ▒█▓█ ░░▒▒▒ ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░ ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter HELP to list all available commands. QUIT to exit. Flink SQL> show tables; orders Flink SQL> describe orders; root |-- order_id: Long |-- shop_id: String |-- member_id: Long |-- trade_amt: Double |-- payment_time: TimeIndicatorTypeInfo(rowtime) Flink SQL>
运行 sql 语句
- 首先执行最简单的
select *
[code]Flink SQL> select * from orders;
select * from orders
- 1分钟固定窗口计算
[code]SELECT shop_id , TUMBLE_START(payment_time, INTERVAL '1' MINUTE) AS tumble_start , TUMBLE_END(payment_time, INTERVAL '1' MINUTE) AS tumble_end , sum(trade_amt) AS amt FROM orders GROUP BY shop_id, TUMBLE(payment_time, INTERVAL '1' MINUTE);
1分钟固定窗口计算
参考资料
相关文章推荐
- 本机不安装Oracle客户端,使用PL/SQL Developer和 Instant Client 工具包连接oracle 11g远程数据库
- 使用instantclient_11_2和PL/SQL Developer工具包连接oracle 11g远程数据库
- Oracle instantclient basic + pl/sql 安装和使用
- SQuirreL SQL Client使用入门2---插件使用2
- Windows下安装Oracle Instant Client并使用 PL/SQL Developer.
- 使用instantclient_11_2 和PL/SQL Developer工具包连接oracle 11g远程数据库
- Code First for Mysql 错误:未为提供程序“MySql.Data.MySqlClient”找到任何 MigrationSqlGenerator。请在目标迁移配置类中使用 SetSql
- Oracle instant client及pl sql developer的使用
- 使用instantclient_11_2和PL/SQL Developer工具包连接oracle 11g远程数据库
- 使用instantclient_11_2 和PL/SQL Developer工具包连接oracle 11g远程数据库
- 使用instantclient_11_2 和PL/SQL Developer工具包连接oracle 11g远程数据库(转)
- 使用instantclient_11_2 和PL/SQL Developer工具包连接oracle 11g远程数据库
- oracle免安装服务器,pl/sql加instantclient的配置使用
- 使用instantclient_11_2和PL/SQL Developer工具包连接oracle 11g远程数据库
- PL/SQL Developer 使用oracle_client 连接虚拟机oracle的一些问题
- Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL
- 安装XEClient使用PL/SQL链接Oracle
- 使用instantclient_11_2和pl/sql Developer连接oracle远程数据库
- 使用instantclient_11_2和pl/sql Developer连接oracle远程数据库
- SQuirreL SQL Client 使用记录