管理 Pipeline
在 GreptimeDB 中,每个 pipeline
是一个数据处理单元集合,用于解析和转换写入的日志内容。本文档旨在指导你如何创建和删除 Pipeline,以便高效地管理日志数据的处理流程。
有关 Pipeline 的具体配置,请阅读 Pipeline 配置。
鉴权
在使用 HTTP API 进行 Pipeline 管理时,你需要提供有效的鉴权信息。 请参考鉴权文档了解详细信息。
创建 Pipeline
GreptimeDB 提供了专用的 HTTP 接口用于创建 Pipeline。
假设你已经准备好了一个 Pipeline 配置文件 pipeline.yaml,使用以下命令上传配置文件,其中 test
是你指定的 Pipeline 的名称:
## 上传 pipeline 文件。test 为 Pipeline 的名称
curl -X "POST" "http://localhost:4000/v1/pipelines/test" \
-H "Authorization: Basic {{authentication}}" \
-F "file=@pipeline.yaml"
你可以在所有 Database 中使用创建的 Pipeline。
删除 Pipeline
可以使用以下 HTTP 接口删除 Pipeline:
## test 为 Pipeline 的名称
curl -X "DELETE" "http://localhost:4000/v1/pipelines/test?version=2024-06-27%2012%3A02%3A34.257312110Z" \
-H "Authorization: Basic {{authentication}}"
上面的例子中,我们删除了名为 test
的 Pipeline。version
参数是必须的,用于指定要删除的 Pipeline 的版本号。
查询 Pipeline
可以使用以下 HTTP 接口查询 Pipeline:
## test 是 Pipeline 的名称,该查询将返回最新版本的 Pipeline。
curl "http://localhost:4000/v1/pipelines/test" \
-H "Authorization: Basic {{authentication}}"
## 如果你想查询某个 Pipeline 的历史版本,可以在 URL 中添加 `version` 参数
curl "http://localhost:4000/v1/pipelines/test?version=2025-04-01%2006%3A58%3A31.335251882%2B0000" \
-H "Authorization: Basic {{authentication}}"
如果这个 pipeline 存在,输出会如下所示:
{
"pipelines": [
{
"name": "test",
"version": "2025-04-01 06:58:31.335251882",
"pipeline": "version: 2\nprocessors:\n - dissect:\n fields:\n - message\n patterns:\n - '%{ip_address} - - [%{timestamp}] \"%{http_method} %{request_line}\" %{status_code} %{response_size} \"-\" \"%{user_agent}\"'\n ignore_missing: true\n - date:\n fields:\n - timestamp\n formats:\n - \"%d/%b/%Y:%H:%M:%S %z\"\n - select:\n type: exclude\n fields:\n - message\n\ntransform:\n - fields:\n - ip_address\n type: string\n index: inverted\n tag: true\n - fields:\n - status_code\n type: int32\n index: inverted\n tag: true\n - fields:\n - request_line\n - user_agent\n type: string\n index: fulltext\n - fields:\n - response_size\n type: int32\n - fields:\n - timestamp\n type: time\n index: timestamp\n"
}
],
"execution_time_ms": 7
}
在上面的输出中,pipeline 字段是 YAML 格式的字符串。 JSON 格式无法很好地展示 YMAL 字符串,使用 echo 命令可以将其以阅读友好的方式展示出来:
echo -e "version: 2\nprocessors:\n - dissect:\n fields:\n - message\n patterns:\n - '%{ip_address} - - [%{timestamp}] \"%{http_method} %{request_line}\" %{status_code} %{response_size} \"-\" \"%{user_agent}\"'\n ignore_missing: true\n - date:\n fields:\n - timestamp\n formats:\n - \"%d/%b/%Y:%H:%M:%S %z\"\n - select:\n type: exclude\n fields:\n - message\n\ntransform:\n - fields:\n - ip_address\n type: string\n index: inverted\n tag: true\n - fields:\n - status_code\n type: int32\n index: inverted\n tag: true\n - fields:\n - request_line\n - user_agent\n type: string\n index: fulltext\n - fields:\n - response_size\n type: int32\n - fields:\n - timestamp\n type: time\n index: timestamp\n"
version: 2
processors:
- dissect:
fields:
- message
patterns:
- '%{ip_address} - - [%{timestamp}] "%{http_method} %{request_line}" %{status_code} %{response_size} "-" "%{user_agent}"'
ignore_missing: true
- date:
fields:
- timestamp
formats:
- "%d/%b/%Y:%H:%M:%S %z"
- select:
type: exclude
fields:
- message
transform:
- fields:
- ip_address
type: string
index: inverted
tag: true
- fields:
- status_code
type: int32
index: inverted
tag: true
- fields:
- request_line
- user_agent
type: string
index: fulltext
- fields:
- response_size
type: int32
- fields:
- timestamp
type: time
index: timestamp
或者使用 SQL 来查询 Pipeline:
SELECT * FROM greptime_private.pipelines;
请注意,如果你使用 MySQL 或者 PostgreSQL 协议作为连接 GreptimeDB 的方式,查询出来的 Pipeline 时间信息精度可能有所不同,可能会丢失纳秒级别的精度。
为了解决这个问题,可以将 created_at
字段强制转换为 timestamp 来查看 Pipeline 的创建时间。例如,下面的查询将 created_at
以 bigint
的格式展示:
SELECT name, pipeline, created_at::bigint FROM greptime_private.pipelines;
查询结果如下:
name | pipeline | greptime_private.pipelines.created_at
------+-----------------------------------+---------------------------------------
test | processors: +| 1719489754257312110
| - date: +|
| field: time +|
| formats: +|
| - "%Y-%m-%d %H:%M:%S%.3f"+|
| ignore_missing: true +|
| +|
| transform: +|
| - fields: +|
| - id1 +|
| - id2 +|
| type: int32 +|
| - fields: +|
| - type +|
| - logger +|
| type: string +|
| index: inverted +|
| - fields: +|
| - log +|
| type: string +|
| index: fulltext +|
| - field: time +|
| type: time +|
| index: timestamp +|
| |
(1 row)
然后可以使用程序将 SQL 结果中的 bigint 类型的时间戳转换为时间字符串。
timestamp_ns="1719489754257312110"; readable_timestamp=$(TZ=UTC date -d @$((${timestamp_ns:0:10}+0)) +"%Y-%m-%d %H:%M:%S").${timestamp_ns:10}Z; echo "Readable timestamp (UTC): $readable_timestamp"
输出:
Readable timestamp (UTC): 2024-06-27 12:02:34.257312110Z
输出的 Readable timestamp (UTC)
即为 Pipeline 的创建时间同时也是版本号。
问题调试
首先,请参考 快速入门示例来查看 Pipeline 正确的执行情况。