Pipeline 配置
Pipeline 是 GreptimeDB 中对 log 数据进行解析和转换的一种机制,由一个唯一的名称和一组配置规则组成,这些规则定义了如何对日志数据进行格式化、拆分和转换。目前我们支持 JSON(application/json
或者 application/x-ndjson
,推荐后者)和纯文本(text/plain
)格式的日志数据作为输入。
这些配置以 YAML 格式提供,使得 Pipeline 能够在日志写入过程中,根据设定的规则对数据进行处理,并将处理后的数据存储到数据库中,便于后续的结构化查询。
输入格式
一般情况下,Pipeline 接收一组键值对形式的 map 作为 Pipeline 的输入。如下三种格式均可作为 Pipeline 的输入:
JSON 格式
当请求的 Content-Type 为 application/json
时,请求的数据格式为标准的 JSON 格式。
[
{"message": "hello world", "time": "2024-07-12T16:18:53.048"},
{"message": "hello greptime", "time": "2024-07-12T16:18:53.048"}
]
NDJSON 格式
当请求的 Content-Type 为 application/x-ndjson
时,每行数据会被视为一条独立的标准 JSON 对象。
{"message": "hello world", "time": "2024-07-12T16:18:53.048"}
{"message": "hello greptime", "time": "2024-07-12T16:18:53.048"}
纯文本格式
当请求的 Content-Type 为 text/plain
时,每行数据会被视为一条独立的对象。并且会将整行数据视为一个字符串,存储在一个只包含 message
字段的 map 对象中
hello world 2024-07-12T16:18:53.048
hello greptime 2024-07-12T16:18:53.048
上述的纯文本格式数据会被转换为如下的等价形式:
{"message": "hello world 2024-07-12T16:18:53.048"}
{"message": "hello greptime 2024-07-12T16:18:53.048"}
也就是说,当输入内容为纯文本格式时,需要在编写 Processor
和 Transform
的过程中,使用 message
来指代每行数据的内容。
整体结构
Pipeline 由四部分组成:Processors、Dispatcher、Transform 和 Table suffix。 Processors 对数据进行预处理。 Dispatcher 可以将 pipeline 执行上下文转发到不同的后续 pipeline 上。 Transform 决定最终保存在数据库中的数据类型和表结构。 Table suffix 支持将数据保存到不同的表中。
- Version 用于指定 pipeline 配置的格式。虽然它是可选的,但是我们强烈建议使用 version 2 来编写新配置。更多详情请参考这个章节。
- Processor 用于对 log 数据进行预处理,例如解析时间字段,替换字段等。
- Dispatcher(可选) 用于将执行上下文转发到另一个 pipeline,同一个输入批次的数据可以基于特定的值被不同的 pipeline 进行处理。
- Transform(可选) 用于对数据进行格式转换,例如将字符串类型转换为数字类型,并且指定数据库表中列的索引信息。
- Table suffix(可选) 用于将数据存储到不同的表中,以便后续使用。
一个包含 Processor 和 Transform 的简单配置示例如下:
version: 2
processors:
- urlencoding:
fields:
- string_field_a
- string_field_b
method: decode
ignore_missing: true
dispatcher:
field: type
rules:
- value: http
table_suffix: http
pipeline: http
- value: db
table_suffix: db
transform:
- fields:
- string_field_a
- string_field_b
type: string
# 写入的数据必须包含 timestamp 字段
- fields:
- reqTimeSec, req_time_sec
# epoch 是特殊字段类型,必须指定精度
type: epoch, ms
index: timestamp
table_suffix: _${string_field_a}
从 v0.15
开始, GreptimeDB 引入了一个新的文件格式版本。
其主要的差别在于 Transform 的处理逻辑。
请参考下述章节查看更多细节。
Processor
Processor 用于对 log 数据进行预处理,其配置位于 YAML 文件中的 processors
字段下。
Pipeline 会按照多个 Processor 的顺序依次加工数据,每个 Processor 都依赖于上一个 Processor 处理的结果。
Processor 由一个 name 和多个配置组成,不同类型的 Processor 配置有不同的字段。
我们目前内置了以下几种 Processor:
date
: 解析格式化的时间字符串字段,例如2024-07-12T16:18:53.048
。epoch
: 解析数字时间戳字段,例如1720772378893
。decolorize
: 移除日志数据中的 ANSI 颜色代码。dissect
: 对 log 数据字段进行拆分。gsub
: 对 log 数据字段进行替换。join
: 对 log 中的 array 类型字段进行合并。letter
: 对 log 数据字段进行字母转换。regex
: 对 log 数据字段进行正则匹配。urlencoding
: 对 log 数据字段进行 URL 编解码。csv
: 对 log 数据字段进行 CSV 解析。json_path
: 从 JSON 数据中提取字段。(已废弃,请使用vrl
)json_parse
: 将一个字段解析成 JSON 对象。simple_extract
: 使用简单的 key 从 JSON 数据中提取字段。digest
: 提取日志消息模板。select
: 从 pipeline 执行上下文中保留或移除字段。vrl
: 使用 pipeline 上下文执行 vrl 脚本。filter
: 过滤不需要写入的行数据。
Processor 的输入和输出
大多数 Processor 接收一个 field
或者 fields
参数(一个串行处理的 field
列表)作为输入数据。
Processor 会产出一个或者多个输出数据。
对于那些只产出一个输出数据的 processor,输出的数据会替换上下文中原始 key 所关联的数据。
下述的示例中,在 letter
processor 之后,一个大写版本的字符串会被保存在 message
字段中。
processors:
- letter:
fields:
- message
method: upper
我们可以将输出数据保存到另一个字段中,使得原有的字段不变。
下述的示例中,我们依然使用 message
字段作为 letter
processor 的输入,但是将输出保存到一个名为 upper_message
的新字段中。
processors:
- letter:
fields:
- key: message
rename_to: upper_message
method: upper
这个重命名的语法有一种便捷的书写方式:通过 ,
将两个字段分割即可。
以下是一个示例:
processors:
- letter:
fields:
- message, upper_message
method: upper
重命名主要有两个场景:
- 保持原字段不变,使得它可以被多个 processor 使用,或者作为原始记录被保存到数据库中。
- 统一命名。例如为了一致性,将驼峰命名法的变量重命名为蛇形命名法。
date
date
Processor 用于解析时间字段。示例配置如下:
processors:
- date:
fields:
- time
formats:
- '%Y-%m-%d %H:%M:%S%.3f'
ignore_missing: true
timezone: 'Asia/Shanghai'
如上所示,date
Processor 的配置包含以下字段:
fields
: 需要解析的时间字段名列表。formats
: 时间格式化字符串,支持多个时间格式化字符串。按照提供的顺序尝试解析,直到解析成功。你可以在这里找到格式化的语法说明。ignore_missing
: 忽略字段不存在的情况。默认为false
。如果字段不存在,并且此配置为 false,则会抛出异常。timezone
:时区。使用tz_database 中的时区标识符来指定时区。默认为UTC
。
epoch
epoch
Processor 用于解析时间戳字段,示例配置如下:
processors:
- epoch:
fields:
- reqTimeSec
resolution: millisecond
ignore_missing: true
如上所示,epoch
Processor 的配置包含以 下字段:
fields
: 需要解析的时间戳字段名列表。resolution
: 时间戳精度,支持s
,sec
,second
,ms
,millisecond
,milli
,us
,microsecond
,micro
,ns
,nanosecond
,nano
。默认为ms
。ignore_missing
: 忽略字段不存在的情况。默认为false
。如果字段不存在,并且此配置为 false,则会抛出异常。
decolorize
decolorize
Processor 用于移除日志数据中的 ANSI 颜色代码。示例配置如下:
processors:
- decolorize:
fields:
- message
如上所示,decolorize
Processor 的配置包含以下字段:
fields
: 需要移除颜色代码的字段名列表。
dissect
dissect
Processor 用于对 log 数据字段进行拆分,示例配置如下:
processors:
- dissect:
fields:
- message
patterns:
- '%{key1} %{key2}'
ignore_missing: true
append_separator: '-'
如上所示,dissect
Processor 的配置包含以下字段:
fields
: 需要拆分的字段名列表。不支持字段重命名。patterns
: 拆分的 dissect 模式。ignore_missing
: 忽略字段不存在的情况。默认为false
。如果字段不存在,并且此配置为 false,则会抛出异常。append_separator
: 对于多个追加到一起的字段,指定连接符。默认是一个空字符串。
Dissect 模式
和 Logstash 的 Dissect 模式类似,Dissect 模式由 %{key}
组成,其中 %{key}
为一个字段名。例如:
"%{key1} %{key2} %{+key3} %{+key4/2} %{key5->} %{?key6}"
Dissect 修饰符
Dissect 模式支持以下修饰符:
修饰符 | 说明 | 示例 |
---|---|---|
+ | 将两个或多个字段追加到一起 | %{+key} %{+key} |
+ 和 /n | 按照指定的顺序将两个或多个字段追加到一起 | %{+key/2} %{+key/1} |
-> | 忽略右侧的任何重复字符 | %{key1->} %{key2->} |
? | 忽略匹配的值 | %{?key} |
dissect
示例
例如,对于以下 log 数据:
"key1 key2 key3 key4 key5 key6"
使用以下 Dissect 模式:
"%{key1} %{key2} %{+key3} %{+key3/2} %{key5->} %{?key6}"
将得到以下结果:
{
"key1": "key1",
"key2": "key2",
"key3": "key3 key4",
"key5": "key5"
}
gsub
gsub
Processor 用于对 log 数据字段进行替换,示例配置如下:
processors:
- gsub:
fields:
- message
pattern: 'old'
replacement: 'new'
ignore_missing: true
如上所示,gsub
Processor 的配置包含以下字段:
fields
: 需要替换的字段名列表。pattern
: 需要替换的字符串。支持正则表达式。replacement
: 替换后的字符串。ignore_missing
: 忽略字段不存在的情况。默认为false
。如果字段不存在,并且此配置为 false,则会抛出异常。
join
join
Processor 用于对 log 中的 Array 类型字段进行合并,示例配置如下:
processors:
- join:
fields:
- message
separator: ','
ignore_missing: true
如上所示,join
Processor 的配置包含以下字段:
fields
: 需要合并的字段名列表。注意,这里每行字段的值需要是 Array 类型,每行字段会单独合并自己数组内的值,所有行的字段不会合并到一起。separator
: 合并后的分隔符。ignore_missing
: 忽略字段不存在的情况。默认为false
。如果字段不存在,并且此配置为 false,则会抛出异常。
join
示例
例如,对于以下 log 数据:
{
"message": ["a", "b", "c"]
}
使用以下配置:
processors:
- join:
fields:
- message
separator: ','
将得到以下结果:
{
"message": "a,b,c"
}
letter
letter
Processor 用于对 log 数据字段进行字母转换,示例配置如下:
processors:
- letter:
fields:
- message
method: upper
ignore_missing: true
如上所示,letter
Processor 的配置包含以下字段:
fields
: 需要转换的字段名列表。method
: 转换方法,支持upper
,lower
,capital
。默认为lower
。 注意capital
只会将第一个字母转换为大写。ignore_missing
: 忽略字段不存在的情况。默认为false
。如果字段不存在,并且此配置为 false,则会抛出异常。
regex
regex
Processor 用于对 log 数据字段进行正则匹配,示例配置如下:
processors:
- regex:
fields:
- message
patterns:
- ':(?<id>[0-9])'
ignore_missing: true
如上所示,regex
Processor 的配置包含以下字段:
fields
: 需要匹配的字段名列表。如果重命名了字段,重命名后的字段名将与pattern
中的命名捕获组名进行拼接。pattern
: 要进行匹配的正则表达式,需要使用命名捕获组才可以从对应字段中取出对应数据。ignore_missing
: 忽略字段不存在的情况。默认为false
。如果字段不存在,并且此配置为 false,则会抛出异常。
regex 命名捕获组的规则
regex
Processor 支持使用 (?<group-name>...)
的语法来命名捕获组,最终将数据处理为这种形式:
{
"<field-name>_<group-name>": "<value>"
}
例如 regex
Processor 中 field 填写的字段名为 message
,对应的内容为 "[ERROR] error message"
,
你可以将 pattern 设置为 \[(?<level>[A-Z]+)\] (?<content>.+)
,
最终数据会被处理为:
{
"message_level": "ERROR",
"message_content": "error message"
}
urlencoding
urlencoding
Processor 用于对 log 数据字段进行 URL 编码,示例配置如下:
processors:
- urlencoding:
fields:
- string_field_a
- string_field_b
method: decode
ignore_missing: true
如上所示,urlencoding
Processor 的配置包含以下字段:
fields
: 需要编码的字段名列表。method
: 编码方法,支持encode
,decode
。默认为encode
。ignore_missing
: 忽略字段不存在的情况。默认为false
。如果字段不存在,并且此配置为 false,则会抛出异常。
csv
csv
Processor 用于对 log 数据中没有携带 header 的 CSV 类型字段解析,示例配置如下:
processors:
- csv:
fields:
- message
separator: ','
quote: '"'
trim: true
ignore_missing: true
如上所示,csv
Processor 的配置包含以下字段:
fields
: 需要解析的字段名列表。separator
: 分隔符。quote
: 引号。trim
: 是否去除空格。默认为false
。ignore_missing
: 忽略字段不存在的情况。默认为false
。如果字段不存在,并且此 配置为 false,则会抛出异常。
json_path
(废弃)
增加 vrl processor 后,json_path
处理器的使用场景已经大大减少。
如果你需要从 JSON 数据中提取字段,建议使用 vrl
处理器来实现更灵活的处理。
我们计划在未来的版本中废弃 json_path
处理器。
json_path
处理器用于从 JSON 数据中提取字段。以下是一个配置示例:
processors:
- json_path:
fields:
- complex_object
json_path: "$.shop.orders[?(@.active)].id"
ignore_missing: true
result_index: 1
在上述示例中,json_path
processor 的配置包括以下字段:
fields
:要提取的字段名称列表。json_path
:要提取的 JSON 路径。ignore_missing
:忽略字段缺失的情况。默认为false
。如果字段缺失且此配置设置为false
,将抛出异常。result_index
:指定提取数组中要用作结果值的下标。默认情况下,包含所有值。Processor 提取的结果值是包含 path 中所有值的数组。如果指定了索引,将使用提取数组中对应的下标的值作为最终结果。
JSON 路径语法
JSON 路径语法基于 jsonpath-rust 库。
在此阶段,我们仅推荐使用一些简单的字段提取操作,以便将嵌套字段提取到顶层。
json_path
示例
例如,给定以下日志数据:
{
"product_object": {
"hello": "world"
},
"product_array": [
"hello",
"world"
],
"complex_object": {
"shop": {
"orders": [
{
"id": 1,
"active": true
},
{
"id": 2
},
{
"id": 3
},
{
"id": 4,
"active": true
}
]
}
}
}
使用以下配置:
processors:
- json_path:
fields:
- product_object, object_target
json_path: "$.hello"
result_index: 0
- json_path:
fields:
- product_array, array_target
json_path: "$.[1]"
result_index: 0
- json_path:
fields:
- complex_object, complex_target_1
json_path: "$.shop.orders[?(@.active)].id"
- json_path:
fields:
- complex_target_1, complex_target_2
json_path: "$.[1]"
result_index: 0
- json_path:
fields:
- complex_object, complex_target_3
json_path: "$.shop.orders[?(@.active)].id"
result_index: 1
transform:
- fields:
- object_target
- array_target
type: string
- fields:
- complex_target_3
- complex_target_2
type: uint32
- fields:
- complex_target_1
type: json
结果将是:
{
"object_target": "world",
"array_target": "world",
"complex_target_3": 4,
"complex_target_2": 4,
"complex_target_1": [1, 4]
}
json_parse
json_parse
,如其名字所示,将一个字符串解析成一个 JSON 对象。以下是一份示例配置:
processors:
- json_parse:
fields:
- complex_object, target_field
ignore_missing: true
在上述示例中,json_parse
处理器的配置包含以下字段:
fields
:对于每个字段,第一个 key 表示输入字符串中的 key,第二个 key 表示输出的 key 名称。如果不填写第二个 key,则默认使用第一个 key 名作为输出名,也就是替换第一个 key 中的值。上面的示例表示解析complex_object
并将输出放入target_field
中。ignore_missing
:忽略字段不存在的情况。默认为false
。如果字段不存在且此配置为false
,将抛出异常。
json_parse
example
例如,给定以下日志数据:
{
"product_object": "{\"hello\":\"world\"}",
}
使用以下配置:
processors:
- json_parse:
fields:
- product_object
结果将是:
{
"product_object": {
"hello": "world"
}
}
simple_extract
虽然 json_path
处理器能够使用复杂表达式从 JSON 对象中提取字段,但它相对较慢且成本较高。simple_extract
处理器提供了一种简单的方法,仅使用键名来提取字段。以下是示例配置:
processors:
- simple_extract:
fields:
- complex_object, target_field
key: "shop.name"
ignore_missing: true
在上述示例中,simple_extract
处理器的配置包含以下字段:
fields
:对于每个字段,第一个键表示上下文中的输入 JSON 对象,第二个键表示输出键名。上面的示例表示从complex_object
中提取数据并将输出放入target_field
中。key
:提取键,使用x.x
格式,每个.
表示一个新的嵌套层。ignore_missing
:忽略字段不存在的情况。默认为false
。如果字段不存在且此配置为false
,将抛出异常。
simple_extract
示例
例如,给定以下日志数据:
{
"product_object": {
"hello": "world"
},
"product_array": [
"hello",
"world"
],
"complex_object": {
"shop": {
"name": "some_shop_name"
}
}
}
使用以下配置:
processors:
- simple_extract:
fields:
- complex_object, shop_name
key: "shop.name"
transform:
- fields:
- shop_name
type: string
结果将是:
{
"shop_name": "some_shop_name"
}
digest
digest
处理器用于从日志内容中提取日志模板,它通过识别并移除可变内容(如数字、UUID、IP 地址、引号中的内容和括号中的文本等)来实现。提取出的模板可用于日志的分类和分析。配置示例如下:
processors:
- digest:
fields:
- message
presets:
- numbers
- uuid
- ip
- quoted
- bracketed
regex:
- "foobar"
ignore_missing: true
在上述示例中,digest
处理器的配置包含以下字段:
fields
:要进行摘要处理的字段名列表。处理后的结果将存储在带有_digest
后缀的新字段中。presets
:要移除的预设模式列表。支持以下模式:numbers
:匹配数字序列uuid
:匹配 UUID 字符串,如123e4567-e89b-12d3-a456-426614174000
ip
:匹配 IPv4/IPv6 地址(可选带端口号)quoted
:匹配单引号/双引号内的文本(包括各种 Unicode 引号)bracketed
:匹配各种类型括号内的文本(包括各种 Unicode 括号)
regex
:要移除的自定义正则表达式列表ignore_missing
:是否忽略字段不存在的情况。默认为false
。如果字段不存在且此配置为false
,将抛出异常。
digest
示例
例如,给定以下日志数据:
{
"message": "User 'john.doe' from [192.168.1.1] accessed resource 12345 with UUID 123e4567-e89b-12d3-a456-426614174000"
}
使用以下配置:
processors:
- digest:
fields:
- message
presets:
- numbers
- uuid
- ip
- quoted
- bracketed