Skip to content

管理 Flows

每一个 flow 是 GreptimeDB 中的一个连续聚合查询。 它根据传入的数据持续更新并聚合数据。 本文档描述了如何创建、更新和删除一个 flow。

创建或更新 flow

创建 flow 的语法是:

sql
CREATE FLOW [ IF NOT EXISTS ] <flow-name>
SINK TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT = "<string>" ]
AS 
<SQL>;
CREATE FLOW [ IF NOT EXISTS ] <flow-name>
SINK TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT = "<string>" ]
AS 
<SQL>;
  • flow-name 是目录级别的唯一标识符。
  • sink-table-name 是存储聚合数据的表名。 它可以是一个现有的表或一个新表。如果目标表不存在,flow 将创建目标表。
  • EXPIRE AFTER 是一个可选的时间间隔,用于从 Flow 引擎中过期数据。 有关更多详细信息,请参考 EXPIRE AFTER 部分。
  • COMMENT 是 flow 的描述。
  • SQL 部分定义了用于连续聚合的查询。 它定义了为 flow 提供数据的源表。 每个 flow 可以有多个源表。 有关详细信息,请参考编写查询 部分。

一个创建 flow 的简单示例:

sql
CREATE FLOW IF NOT EXISTS my_flow
SINK TO my_sink_table
COMMENT = "My first flow in GreptimeDB"
AS
SELECT count(item) from my_source_table GROUP BY tumble(time_index, INTERVAL '5 minutes', '2024-05-20 00:00:00');
CREATE FLOW IF NOT EXISTS my_flow
SINK TO my_sink_table
COMMENT = "My first flow in GreptimeDB"
AS
SELECT count(item) from my_source_table GROUP BY tumble(time_index, INTERVAL '5 minutes', '2024-05-20 00:00:00');

创建的 flow 将每 5 分钟计算 count(item) 并将结果存储在 my_sink_table 中。所有在 1 小时内的数据将在 flow 中使用。有关 tumble() 函数,请参考定义时间窗口 部分。

EXPIRE AFTER 语句

Flow 引擎使用两个时间概念:数据时间戳和处理时间。

数据时间戳是 source 表中 time index 列中存储的时间, 处理时间是 Flow 引擎执行聚合操作的时刻。

EXPIRE AFTER 子句指定数据过期的时间间隔。 任何时间戳早于处理时间减去时间间隔的数据将被过期。

例如,如果 Flow 引擎在 10:00:00 执行聚合操作,并设置了 INTERVAL '1 hour', 则在处理时间之前 1 小时的数据(09:00:00 之前的数据)将被过期。 只有在 09:00:00 之后的数据将被用于聚合。

EXPIRE 操作仅从 Flow 引擎中过期数据,不会影响源表中的数据。

删除 flow

请使用以下 DROP FLOW 子句删除 flow:

To delete a flow, use the following DROP FLOW clause:

sql
DROP FLOW [IF EXISTS] <name>
DROP FLOW [IF EXISTS] <name>

例如:

sql
DROP FLOW IF EXISTS my_flow;
DROP FLOW IF EXISTS my_flow;