Skip to main content
Version: 0.11

Manage Flows

Each flow is a continuous aggregation query in GreptimeDB. It continuously updates the aggregated data based on the incoming data. This document describes how to create, and delete a flow.

Create a Source Table

Before creating a flow, you need to create a source table to store the raw data. Like this:

CREATE TABLE temp_sensor_data (
sensor_id INT,
loc STRING,
temperature DOUBLE,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(sensor_id, loc)
);

However, if you don't want to store the raw data, you can use a temporary table as the source table by creating table using WITH ('ttl' = 'instant') table option:

CREATE TABLE temp_sensor_data (
sensor_id INT,
loc STRING,
temperature DOUBLE,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(sensor_id, loc)
) WITH ('ttl' = 'instant');

Setting 'ttl' to 'instant' will make the table a temporary table, which means it will automatically discard all inserted data and the table will always be empty, only sending them to flow task for computation.

Create a Sink Table

Before creating a flow, you need a sink table to store the aggregated data generated by the flow. While it is the same to a regular time series table, there are a few important considerations:

  • Column order and type: Ensure the order and type of the columns in the sink table match the query result of the flow.
  • Time index: Specify the TIME INDEX for the sink table, typically using the time window column generated by the time window function.
  • Update time: The Flow engine automatically appends the update time to the end of each computation result row. This update time is stored in the updated_at column. Ensure that this column is included in the sink table schema.
  • Tags: Use PRIMARY KEY to specify Tags, which together with the time index serves as a unique identifier for row data and optimizes query performance.

For example:

/* Create sink table */
CREATE TABLE temp_alerts (
sensor_id INT,
loc STRING,
max_temp DOUBLE,
time_window TIMESTAMP TIME INDEX,
update_at TIMESTAMP,
PRIMARY KEY(sensor_id, loc)
);

CREATE FLOW temp_monitoring
SINK TO temp_alerts
AS
SELECT
sensor_id,
loc,
max(temperature) AS max_temp,
date_bin(INTERVAL '10 seconds', ts) AS time_window
FROM temp_sensor_data
GROUP BY
sensor_id,
loc,
time_window
HAVING max_temp > 100;

The sink table has the columns sensor_id, loc, max_temp, time_window, and update_at.

  • The first four columns correspond to the query result columns of flow: sensor_id, loc, max(temperature) and date_bin(INTERVAL '10 seconds', ts) respectively.
  • The time_window column is specified as the TIME INDEX for the sink table.
  • The update_at column is the last one in the schema to store the update time of the data.
  • The PRIMARY KEY at the end of the schema definition specifies sensor_id and loc as the tag columns. This means the flow will insert or update data based on the tags sensor_id and loc along with the time index time_window.

Create a flow

The grammar to create a flow is:

CREATE [ OR REPLACE ] FLOW [ IF NOT EXISTS ] <flow-name>
SINK TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT '<string>' ]
AS
<SQL>;

When OR REPLACE is specified, any existing flow with the same name will be updated to the new version. It's important to note that this only affects the flow task itself; the source and sink tables will remain unchanged.

Conversely, when IF NOT EXISTS is specified, the command will have no effect if the flow already exists, rather than reporting an error. Additionally, please note that OR REPLACE cannot be used in conjunction with IF NOT EXISTS.

  • flow-name is an unique identifier in the catalog level.
  • sink-table-name is the table name where the materialized aggregated data is stored. It can be an existing table or a new one. flow will create the sink table if it doesn't exist.
  • EXPIRE AFTER is an optional interval to expire the data from the Flow engine. For more details, please refer to the EXPIRE AFTER part.
  • COMMENT is the description of the flow.
  • SQL part defines the continuous aggregation query. It defines the source tables provide data for the flow. Each flow can have multiple source tables. Please Refer to Write a Query for the details.

A simple example to create a flow:

CREATE FLOW IF NOT EXISTS my_flow
SINK TO my_sink_table
EXPIRE AFTER INTERVAL '1 hour'
COMMENT 'My first flow in GreptimeDB'
AS
SELECT
max(temperature) as max_temp,
date_bin(INTERVAL '10 seconds', ts) as time_window,
FROM temp_sensor_data
GROUP BY time_window;

The created flow will compute max(temperature) for every 10 seconds and store the result in my_sink_table. All data comes within 1 hour will be used in the flow.

EXPIRE AFTER clause

The EXPIRE AFTER clause specifies the interval after which data will expire from the flow engine. This expiration only affects the data in the flow engine and does not impact the data in the source table.

When the flow engine processes the aggregation operation (the update_at time), data with a time index older than the specified interval will expire.

For example, if the flow engine processes the aggregation at 10:00:00 and the INTERVAL '1 hour' is set, any data older than 1 hour (before 09:00:00) will expire. Only data timestamped from 09:00:00 onwards will be used in the aggregation.

Write a SQL query

The SQL part of the flow is similar to a standard SELECT clause with a few differences. The syntax of the query is as follows:

SELECT AGGR_FUNCTION(column1, column2,..) [, TIME_WINDOW_FUNCTION() as time_window] FROM <source_table> GROUP BY {time_window | column1, column2,.. };

Only the following types of expressions are allowed after the SELECT keyword:

  • Aggregate functions: Refer to the Expressions documentation for details.
  • Time window functions: Refer to the define time window section for details.
  • Scalar functions: Such as col, to_lowercase(col), col + 1, etc. This part is the same as in a standard SELECT clause in GreptimeDB.

The following points should be noted about the rest of the query syntax:

  • The query must include a FROM clause to specify the source table. As join clauses are currently not supported, the query can only aggregate columns from a single table.
  • WHERE and HAVING clauses are supported. The WHERE clause filters data before aggregation, while the HAVING clause filters data after aggregation.
  • DISTINCT currently only works with the SELECT DISTINCT column1 .. syntax. It is used to remove duplicate rows from the result set. Support for SELECT count(DISTINCT column1) ... is not available yet but will be added in the future.
  • The GROUP BY clause works the same as a standard queries, grouping data by specified columns. The time window column in the GROUP BY clause is crucial for continuous aggregation scenarios. Other expressions in GROUP BY can include literals, columns, or scalar expressions.
  • ORDER BY, LIMIT, and OFFSET are not supported.

Refer to Continuous Aggregation for more examples of how to use continuous aggregation in real-time analytics, monitoring, and dashboards.

Define time window

A time window is a crucial attribute of your continuous aggregation query. It determines how data is aggregated within the flow. These time windows are left-closed and right-open intervals.

A time window represents a specific range of time. Data from the source table is mapped to the corresponding window based on the time index column. The time window also defines the scope for each calculation of an aggregation expression, resulting in one row per time window in the result table.

You can use date_bin() after the SELECT keyword to define fixed time windows. For example:

SELECT
max(temperature) as max_temp,
date_bin(INTERVAL '10 seconds', ts) as time_window
FROM temp_sensor_data
GROUP BY time_window;

In this example, the date_bin(INTERVAL '10 seconds', ts) function creates 10-second time windows starting from UTC 00:00:00. The max(temperature) function calculates the maximum temperature value within each time window.

For more details on the behavior of the function, please refer to date_bin.

NOTE

Currently, the internal state of the flow, such as the accumulator's value for incremental query results (e.g., the accumulator for count(col) which records the current count), is not persistently stored. To minimize data loss in case of internal state failure, it is advisable to use smaller time windows.

This internal state loss does not affect the existing data in the sink table.

Flush a flow

The flow engine automatically processes aggregation operations within 1 second when new data arrives in the source table. However, you can manually trigger the flow engine to process the aggregation operation immediately using the ADMIN FLUSH_FLOW command.

ADMIN FLUSH_FLOW('<flow-name>')

Delete a flow

To delete a flow, use the following DROP FLOW clause:

DROP FLOW [IF EXISTS] <name>

For example:

DROP FLOW IF EXISTS my_flow;