Flink Sql
Table API
和 SQL
是最上层的 API ,在 Flink
中这两种 API 被集成在一起,SQL 执行的对象是 Flink
中的表(Table
),所以一般认为他们是一体的。
流处理中的表
无界流的数据是源源不断的,这点和传统关系型数据库中的表有些区别。 Flink
采用 Sql
的方式处理流,步骤如下:
- 流
stream
被转换为动态表dynamic table
- 对动态表进行持续查询
continuous query
,生成新的动态表 - 生成的动态表被转换为流
将流转换为动态表
持续查询
流中每条数据来的时候都会持续查询,会触发 追加 和 更新 查询。
动态表转换为流
- 仅追加流 (
Append-only
)
仅通过插入 Insert
更改来修改的动态表,可以直接转换为 仅追加 流。这个流中发出的数据,其实就是动态表中新增的每一行。
- 撤回流 (
Retract
)
撤回流包含两类消息的流,添加消息add
和撤回消息retract
。如下图所示,当需要更新 Alice,2
时,先将 Alice,1
这条数据撤回,再添加 Alice,2
这条数据。
- 更新插入流
Upsert
注意,在代码里将动态表转换为数据流时,只支持仅追加和撤回流,调用 toChangelogStream()
得到的其实就是撤回流。Upsert
流则取决于外部系统是否支持。
时间属性
时间属性是每个表模式结构 schema
的一部分,它可以在创建表的 DDL 里直接定义为一个字段,也可以在 DataStream 里转换为表时定义。时间属性的数据类型为 Timestamp
,它的行为类似于常规时间戳,可以直接访问并且进行计算。
事件时间
sql
CREATE TABLE EventTable (
user STRING,
url STRING,
ts TIMESTAMP(3),
# 通过 WATERMARK 来定义事件时间
WATERMARK FOR ts AS CURRENT_TIMESTAMP - INTERVAL '5' SECOND
)
处理时间
sql
CRAETE TABLE EventTable(
user STRING,
url STRING,
# 调用系统内置函数 PROCTIME() 来定义处理时间
ts AS PROCTIME()
)
DDL 语法
创建库
sql
CRAETE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
[COMMENT database_comment] WITH (key1=val1,key2=val2,...);
查询库
- 查询所有数据库
sql
SHOW DATABASES;
- 查询当前数据库
sql
SHOW CURRENT DATABASE;
修改数据库
sql
ALTER DATABASE [catalog_name.] db_name SET (key1=val1,key2=val2,...)
删除数据库
- RESTRICT (默认) 删除数据库时,如果数据库中有表,则报错;
- CASCADE 删除数据库时,如果数据库中有表,则删除表
sql
DROP DATABASE [IF EXISTS] [catalog_name.] db_name [CASCADE | RESTRICT];
切换数据库
sql
USE db_name;
表操作语法
创建表
sql
CREATE TABLE [IF NOT EXISTS]