跳到主要内容
版本:nightly

Pipeline 配置

Pipeline 是 GreptimeDB 中对 log 数据进行解析和转换的一种机制, 由一个唯一的名称和一组配置规则组成,这些规则定义了如何对日志数据进行格式化、拆分和转换。目前我们支持 JSON(application/json)和纯文本(text/plain)格式的日志数据作为输入。

这些配置以 YAML 格式提供,使得 Pipeline 能够在日志写入过程中,根据设定的规则对数据进行处理,并将处理后的数据存储到数据库中,便于后续的结构化查询。

整体结构

Pipeline 由两部分组成:Processors 和 Transform,这两部分均为数组形式。一个 Pipeline 配置可以包含多个 Processor 和多个 Transform。Transform 所描述的数据类型会决定日志数据保存到数据库时的表结构。

  • Processor 用于对 log 数据进行预处理,例如解析时间字段,替换字段等。
  • Transform 用于对数据进行格式转换,例如将字符串类型转换为数字类型。

一个包含 Processor 和 Transform 的简单配置示例如下:

processors:
- urlencoding:
fields:
- string_field_a
- string_field_b
method: decode
ignore_missing: true
transform:
- fields:
- string_field_a
- string_field_b
type: string
# 写入的数据必须包含 timestamp 字段
- fields:
- reqTimeSec, req_time_sec
# epoch 是特殊字段类型,必须指定精度
type: epoch, ms
index: timestamp

Processor

Processor 用于对 log 数据进行预处理,其配置位于 YAML 文件中的 processors 字段下。 Pipeline 会按照多个 Processor 的顺序依次加工数据,每个 Processor 都依赖于上一个 Processor 处理的结果。 Processor 由一个 name 和多个配置组成,不同类型的 Processor 配置有不同的字段。

我们目前内置了以下几种 Processor:

  • date: 解析格式化的时间字符串字段,例如 2024-07-12T16:18:53.048
  • epoch: 解析数字时间戳字段,例如 1720772378893
  • dissect: 对 log 数据字段进行拆分。
  • gsub: 对 log 数据字段进行替换。
  • join: 对 log 中的 array 类型字段进行合并。
  • letter: 对 log 数据字段进行字母转换。
  • regex: 对 log 数据字段进行正则匹配。
  • urlencoding: 对 log 数据字段进行 URL 编解码。
  • csv: 对 log 数据字段进行 CSV 解析。

大多数 Processor 都有 fieldfields 字段,用于指定需要被处理的字段。大部分 Processor 处理完成后会覆盖掉原先的 field。如果你不想影响到原数据中的对应字段,我们可以把结果输出到其他字段来避免覆盖。

当字段名称包含 , 时,该字段将被重命名。例如,reqTimeSec, req_time_sec 表示将 reqTimeSec 字段重命名为 req_time_sec,处理完成后的数据将写入中间状态的 req_time_sec 字段中。原始的 reqTimeSec 字段不受影响。如果某些 Processor 不支持字段重命名,则重命名字段名称将被忽略,并将在文档中注明。

例如:

processors:
- letter:
fields:
- message, message_upper
method: upper
ignore_missing: true

message 字段将被转换为大写并存储在 message_upper 字段中。

date

date Processor 用于解析时间字段。示例配置如下:

processors:
- date:
fields:
- time
formats:
- '%Y-%m-%d %H:%M:%S%.3f'
ignore_missing: true
timezone: 'Asia/Shanghai'

如上所示,date Processor 的配置包含以下字段:

  • fields: 需要解析的时间字段名列表。
  • formats: 时间格式化字符串,支持多个时间格式化字符串。按照提供的顺序尝试解析,直到解析成功。你可以在这里找到格式化的语法说明。
  • ignore_missing: 忽略字段不存在的情况。默认为 false。如果字段不存在,并且此配置为 false,则会抛出异常。
  • timezone: 时区。使用tz_database 中的时区标识符来指定时区。默认为 UTC

epoch

epoch Processor 用于解析时间戳字段,示例配置如下:

processors:
- epoch:
fields:
- reqTimeSec
resolution: millisecond
ignore_missing: true

如上所示,epoch Processor 的配置包含以下字段:

  • fields: 需要解析的时间戳字段名列表。
  • resolution: 时间戳精度,支持 s, sec , second , ms, millisecond, milli, us, microsecond, micro, ns, nanosecond, nano。默认为 ms
  • ignore_missing: 忽略字段不存在的情况。默认为 false。如果字段不存在,并且此配置为 false,则会抛出异常。

dissect

dissect Processor 用于对 log 数据字段进行拆分,示例配置如下:

processors:
- dissect:
fields:
- message
patterns:
- '%{key1} %{key2}'
ignore_missing: true
append_separator: '-'

如上所示,dissect Processor 的配置包含以下字段:

  • fields: 需要拆分的字段名列表。不支持字段重命名。
  • patterns: 拆分的 dissect 模式。
  • ignore_missing: 忽略字段不存在的情况。默认为 false。如果字段不存在,并且此配置为 false,则会抛出异常。
  • append_separator: 对于多个追加到一起的字段,指定连接符。默认是一个空字符串。

Dissect 模式

和 Logstash 的 Dissect 模式类似,Dissect 模式由 %{key} 组成,其中 %{key} 为一个字段名。例如:

"%{key1} %{key2} %{+key3} %{+key4/2} %{key5->} %{?key6}"

Dissect 修饰符

Dissect 模式支持以下修饰符:

修饰符说明示例
+将两个或多个字段追加到一起%{+key} %{+key}
+/n按照指定的顺序将两个或多个字段追加到一起%{+key/2} %{+key/1}
->忽略右侧的任何重复字符%{key1->} %{key2->}
?忽略匹配的值%{?key}

dissect 示例

例如,对于以下 log 数据:

"key1 key2 key3 key4 key5 key6"

使用以下 Dissect 模式:

"%{key1} %{key2} %{+key3} %{+key3/2} %{key5->} %{?key6}"

将得到以下结果:

{
"key1": "key1",
"key2": "key2",
"key3": "key3 key4",
"key5": "key5"
}

gsub

gsub Processor 用于对 log 数据字段进行替换,示例配置如下:

processors:
- gsub:
fields:
- message
pattern: 'old'
replacement: 'new'
ignore_missing: true

如上所示,gsub Processor 的配置包含以下字段:

  • fields: 需要替换的字段名列表。
  • pattern: 需要替换的字符串。支持正则表达式。
  • replacement: 替换后的字符串。
  • ignore_missing: 忽略字段不存在的情况。默认为 false。如果字段不存在,并且此配置为 false,则会抛出异常。

join

join Processor 用于对 log 中的 Array 类型字段进行合并,示例配置如下:

processors:
- join:
fields:
- message
separator: ','
ignore_missing: true

如上所示,join Processor 的配置包含以下字段:

  • fields: 需要合并的字段名列表。注意,这里每行字段的值需要是 Array 类型,每行字段会单独合并自己数组内的值,所有行的字段不会合并到一起。
  • separator: 合并后的分隔符。
  • ignore_missing: 忽略字段不存在的情况。默认为 false。如果字段不存在,并且此配置为 false,则会抛出异常。

join 示例

例如,对于以下 log 数据:

{
"message": ["a", "b", "c"]
}

使用以下配置:

processors:
- join:
fields:
- message
separator: ','

将得到以下结果:

{
"message": "a,b,c"
}

letter

letter Processor 用于对 log 数据字段进行字母转换,示例配置如下:

processors:
- letter:
fields:
- message
method: upper
ignore_missing: true

如上所示,letter Processor 的配置包含以下字段:

  • fields: 需要转换的字段名列表。
  • method: 转换方法,支持 upper, lowercapital。默认为 lower。注意 capital 只会将第一个字母转换为大写。
  • ignore_missing: 忽略字段不存在的情况。默认为 false。如果字段不存在,并且此配置为 false,则会抛出异常。

regex

regex Processor 用于对 log 数据字段进行正则匹配,示例配置如下:

processors:
- regex:
fields:
- message
patterns:
- ':(?<id>[0-9])'
ignore_missing: true

如上所示,regex Processor 的配置包含以下字段:

  • fields: 需要匹配的字段名列表。如果重命名了字段,重命名后的字段名将与 pattern 中的命名捕获组名进行拼接。
  • pattern: 要进行匹配的正则表达式,需要使用命名捕获组才可以从对应字段中取出对应数据。
  • ignore_missing: 忽略字段不存在的情况。默认为 false。如果字段不存在,并且此配置为 false,则会抛出异常。

regex 命名捕获组的规则

regex Processor 支持使用 (?<group-name>...) 的语法来命名捕获组,最终将数据处理为这种形式:

{
"<field-name>_<group-name>": "<value>"
}

例如 regex Processor 中 field 填写的字段名为 message,对应的内容为 "[ERROR] error message", 你可以将 pattern 设置为 \[(?<level>[A-Z]+)\] (?<content>.+), 最终数据会被处理为:

{
"message_level": "ERROR",
"message_content": "error message"
}

urlencoding

urlencoding Processor 用于对 log 数据字段进行 URL 编码,示例配置如下:

processors:
- urlencoding:
fields:
- string_field_a
- string_field_b
method: decode
ignore_missing: true

如上所示,urlencoding Processor 的配置包含以下字段:

  • fields: 需要编码的字段名列表。
  • method: 编码方法,支持 encode, decode。默认为 encode
  • ignore_missing: 忽略字段不存在的情况。默认为 false。如果字段不存在,并且此配置为 false,则会抛出异常。

csv

csv Processor 用于对 log 数据中没有携带 header 的 CSV 类型字段解析,示例配置如下:

processors:
- csv:
fields:
- message
separator: ','
quote: '"'
trim: true
ignore_missing: true

如上所示,csv Processor 的配置包含以下字段:

  • fields: 需要解析的字段名列表。
  • separator: 分隔符。
  • quote: 引号。
  • trim: 是否去除空格。默认为 false
  • ignore_missing: 忽略字段不存在的情况。默认为 false。如果字段不存在,并且此配置为 false,则会抛出异常。

json_path(实验性)

注意:json_path 处理器目前处于实验阶段,可能会有所变动。

json_path 处理器用于从 JSON 数据中提取字段。以下是一个配置示例:

processors:
- json_path:
fields:
- complex_object
json_path: "$.shop.orders[?(@.active)].id"
ignore_missing: true
result_index: 1

在上述示例中,json_path processor 的配置包括以下字段:

  • fields:要提取的字段名称列表。
  • json_path:要提取的 JSON 路径。
  • ignore_missing:忽略字段缺失的情况。默认为 false。如果字段缺失且此配置设置为 false,将抛出异常。
  • result_index:指定提取数组中要用作结果值的下标。默认情况下,包含所有值。Processor 提取的结果值是包含 path 中所有值的数组。如果指定了索引,将使用提取数组中对应的下标的值作为最终结果。

JSON 路径语法

JSON 路径语法基于 jsonpath-rust 库。

在此阶段,我们仅推荐使用一些简单的字段提取操作,以便将嵌套字段提取到顶层。

json_path 示例

例如,给定以下日志数据:

{
"product_object": {
"hello": "world"
},
"product_array": [
"hello",
"world"
],
"complex_object": {
"shop": {
"orders": [
{
"id": 1,
"active": true
},
{
"id": 2
},
{
"id": 3
},
{
"id": 4,
"active": true
}
]
}
}
}

使用以下配置:

processors:
- json_path:
fields:
- product_object, object_target
json_path: "$.hello"
result_index: 0
- json_path:
fields:
- product_array, array_target
json_path: "$.[1]"
result_index: 0
- json_path:
fields:
- complex_object, complex_target_1
json_path: "$.shop.orders[?(@.active)].id"
- json_path:
fields:
- complex_target_1, complex_target_2
json_path: "$.[1]"
result_index: 0
- json_path:
fields:
- complex_object, complex_target_3
json_path: "$.shop.orders[?(@.active)].id"
result_index: 1
transform:
- fields:
- object_target
- array_target
type: string
- fields:
- complex_target_3
- complex_target_2
type: uint32
- fields:
- complex_target_1
type: json

结果将是:

{
"object_target": "world",
"array_target": "world",
"complex_target_3": 4,
"complex_target_2": 4,
"complex_target_1": [1, 4]
}

Transform

Transform 用于对 log 数据进行转换,其配置位于 YAML 文件中的 transform 字段下。

Transform 由一个或多个配置组成,每个配置包含以下字段:

  • fields: 需要转换的字段名列表。
  • type: 转换类型
  • index: 索引类型(可选)
  • on_failure: 转换失败时的处理方式(可选)
  • default: 默认值(可选)

fields 字段

每个字段名都是一个字符串,当字段名称包含 , 时,会进行字段重命名。例如,reqTimeSec, req_time_sec 表示将 reqTimeSec 字段重命名为 req_time_sec, 最终数据将被写入到 GreptimeDB 的 req_time_sec 列。

type 字段

GreptimeDB 目前内置了以下几种转换类型:

  • int8, int16, int32, int64: 整数类型。
  • uint8, uint16, uint32, uint64: 无符号整数类型。
  • float32, float64: 浮点数类型。
  • string: 字符串类型。
  • time: 时间类型。将被转换为 GreptimeDB timestamp(9) 类型。
  • epoch: 时间戳类型。将被转换为 GreptimeDB timestamp(n) 类型。n 为时间戳精度,n 的值视 epoch 精度而定。当精度为 s 时,n 为 0;当精度为 ms 时,n 为 3;当精度为 us 时,n 为 6;当精度为 ns 时,n 为 9。

如果字段在转换过程中获得了非法值,Pipeline 将会抛出异常。例如将一个字符串 abc 转换为整数时,由于该字符串不是一个合法的整数,Pipeline 将会抛出异常。

index 字段

Pipeline 会将处理后的数据写入到 GreptimeDB 自动创建的数据表中。为了提高查询效率,GreptimeDB 会为表中的某些列创建索引。index 字段用于指定哪些字段需要被索引。关于 GreptimeDB 的列类型,请参考数据模型文档。

GreptimeDB 支持以下三种字段的索引类型:

  • tag: 用于指定某列为 Tag 列
  • fulltext: 用于指定某列使用 fulltext 类型的索引,该列需要是字符串类型
  • timestamp: 用于指定某列是时间索引列

不提供 index 字段时,GreptimeDB 会将该字段作为 Field 列。

在 GreptimeDB 中,一张表里必须包含一个 timestamp 类型的列作为该表的时间索引列,因此一个 Pipeline 有且只有一个时间索引列。

时间戳列

通过 index: timestamp 指定哪个字段是时间索引列,写法请参考下方的 Transform 示例

Tag 列

通过 index: tag 指定哪个字段是 Tag 列,写法请参考下方的 Transform 示例

Fulltext 列

通过 index: fulltext 指定哪个字段将会被用于全文搜索,该索引可大大提升 日志搜索 的性能,写法请参考下方的 Transform 示例

on_failure 字段

on_failure 字段用于指定转换失败时的处理方式,支持以下几种方式:

  • ignore: 忽略转换失败的字段,不写入数据库。
  • default: 写入默认值。默认值由 default 字段指定。

default 字段

default 字段用于指定转换失败时的默认值。

Transform 示例

例如,对于以下 log 数据:

{
"num_field_a": "3",
"string_field_a": "john",
"string_field_b": "It was snowing when he was born.",
"time_field_a": 1625760000
}

使用以下配置:

transform:
- fields:
- string_field_a, name
type: string
index: tag
- fields:
- num_field_a, age
type: int32
- fields:
- string_field_b, description
type: string
index: fulltext
- fields:
- time_field_a, bron_time
type: epoch, s
index: timestamp

将得到以下结果:

{
"name": "john",
"age": 3,
"description": "It was snowing when he was born.",
"bron_time": 2021-07-08 16:00:00
}