Pipeline 配置
Pipeline 是 GreptimeDB 中对 log 数据进行解析和转换的一种机制,由一个唯一的名称和一组配置规则组成,这些规则定义了如何对日志数据进行格式化、拆分和转换。目前我们支持 JSON(application/json
或者 application/x-ndjson
,推荐后者)和纯文本(text/plain
)格式的日志数据作为输入。
这些配置以 YAML 格式提供,使得 Pipeline 能够在日志写入过程中,根据设定的规则对数据进行处理,并将处理后的数据存储到数据库中,便于后续的结构化查询。
输入格式
一般情况下,Pipeline 接收一组键值对 形式的 map 作为 Pipeline 的输入。如下三种格式均可作为 Pipeline 的输入:
JSON 格式
当请求的 Content-Type 为 application/json
时,请求的数据格式为标准的 JSON 格式。
[
{"message": "hello world", "time": "2024-07-12T16:18:53.048"},
{"message": "hello greptime", "time": "2024-07-12T16:18:53.048"}
]
NDJSON 格式
当请求的 Content-Type 为 application/x-ndjson
时,每行数据会被视为一条独立的标准 JSON 对象。
{"message": "hello world", "time": "2024-07-12T16:18:53.048"}
{"message": "hello greptime", "time": "2024-07-12T16:18:53.048"}
纯文本格式
当请求的 Content-Type 为 text/plain
时,每行数据会被视为一条独立的对象。并且会将整行数据视为一个字符串,存储在一个只包含 message
字段的 map 对象中
hello world 2024-07-12T16:18:53.048
hello greptime 2024-07-12T16:18:53.048
上述的纯文本格式数据会被转换为如下的等价形式:
{"message": "hello world 2024-07-12T16:18:53.048"}
{"message": "hello greptime 2024-07-12T16:18:53.048"}
也就是说,当输入内容为纯文本格式时,需要在编写 Processor
和 Transform
的过程中,使用 message
来指代每行数据的内容。
整体结构
Pipeline 由四部分组成:Processors、Dispatcher、Transform 和 Table suffix。 Processors 对数据进行预处理。 Dispatcher 可以将 pipeline 执行上下文转发到不同的后续 pipeline 上。 Transform 决定最终保存在数据库中的数据类型和表结构。 Table suffix 支持将数据保存到不同的表中。
- Version 用于指定 pipeline 配置的格式。虽然它是可选的,但是我们强烈建议使用 version 2 来编写新配置。更多详情请参考这个章节。
- Processor 用于对 log 数据进行预处理,例如解析时间字段,替换字段等。
- Dispatcher(可选) 用于将执行上下文转发到另一个 pipeline,同一个输入批次的数据可以基于特定的值被不同的 pipeline 进行处理。
- Transform(可选) 用于对数据进行格式转换,例如将字符串类型转换为数字类型,并且指定数据库表中列的索引信息。
- Table suffix(可选) 用于将数据存储到不同的表中,以便后续使用。
一个包含 Processor 和 Transform 的简单配置示例如下:
version: 2
processors:
- urlencoding:
fields:
- string_field_a
- string_field_b
method: decode
ignore_missing: true
dispatcher:
field: type
rules:
- value: http
table_suffix: http
pipeline: http
- value: db
table_suffix: db
transform:
- fields:
- string_field_a
- string_field_b
type: string
# 写入的数据必须包含 timestamp 字段
- fields:
- reqTimeSec, req_time_sec
# epoch 是特殊字段类型,必须指定精度
type: epoch, ms
index: timestamp
table_suffix: _${string_field_a}
从 v0.15
开始, GreptimeDB 引入了一个新的文件格式版本。
其主要的差别在于 Transform 的处理逻辑。
请参考下述章节查看更多细节。
Processor
Processor 用于对 log 数据进行预处理,其配置位于 YAML 文件中的 processors
字段下。
Pipeline 会按照多个 Processor 的顺序依次加工数据,每个 Processor 都依赖于上一个 Processor 处理的结果。
Processor 由一个 name 和多个配置组成,不同类型的 Processor 配置有不同的字段。
我们目前内置了以下几种 Processor:
date
: 解析格式化的时间字符串字段,例如2024-07-12T16:18:53.048
。epoch
: 解析数字时间戳字段,例如1720772378893
。decolorize
: 移除日志数据中的 ANSI 颜色代码。dissect
: 对 log 数据字段进行拆分。gsub
: 对 log 数据字段进行替换。join
: 对 log 中的 array 类型字段进行合并。letter
: 对 log 数据字段进行字母转换。regex
: 对 log 数据字段进行正则匹配。urlencoding
: 对 log 数据字段进行 URL 编解码。csv
: 对 log 数据字段进行 CSV 解析。json_path
: 从 JSON 数据中提取字段。(已废弃,请使用vrl
)json_parse
: 将一个字段解析成 JSON 对象。simple_extract
: 使用简单的 key 从 JSON 数据中提取字段。digest
: 提取日志消息模板。select
: 从 pipeline 执行上下文中保留或移除字段。vrl
: 使用 pipeline 上下文执行 vrl 脚本。filter
: 过滤不需要写入的行数据。
Processor 的输入和输出
大多数 Processor 接收一个 field
或者 fields
参数(一个串行处理的 field
列表)作为输入数据。
Processor 会产出一个或者多个输出数据。
对于那些只产出一个输出数据的 processor,输出的数据会替换上下文中原始 key 所关联的数据。
下述的示例中,在 letter
processor 之后,一个大写版本的字符串会被保存在 message
字段中。
processors:
- letter:
fields:
- message
method: upper
我们可以将输出数据保存到另一个字段中,使得原有的字段不变。
下述的示例中,我们依然使用 message
字段作为 letter
processor 的输入,但是将输出保存到一个名为 upper_message
的新字段中。
processors:
- letter:
fields:
- key: message
rename_to: upper_message
method: upper