Pipeline 配置
Pipeline 是 GreptimeDB 中对 log 数据进行解析和转换的一种机制,由一个唯一的名称和一组配置规则组成,这些规则定义了如何对日志数据进行格式化、拆分和转换。目前我们支持 JSON(application/json 或者 application/x-ndjson,推荐后者)和纯文本(text/plain)格式的日志数据作为输入。
这些配置以 YAML 格式提供,使得 Pipeline 能够在日志写入过程中,根据设定的规则对数据进行处理,并将处理后的数据存储到数据库中,便 于后续的结构化查询。
输入格式
一般情况下,Pipeline 接收一组键值对形式的 map 作为 Pipeline 的输入。如下三种格式均可作为 Pipeline 的输入:
JSON 格式
当请求的 Content-Type 为 application/json 时,请求的数据格式为标准的 JSON 格式。
[
{"message": "hello world", "time": "2024-07-12T16:18:53.048"},
{"message": "hello greptime", "time": "2024-07-12T16:18:53.048"}
]
NDJSON 格式
当请求的 Content-Type 为 application/x-ndjson 时,每行数据会被视为一条独立的标准 JSON 对象。
{"message": "hello world", "time": "2024-07-12T16:18:53.048"}
{"message": "hello greptime", "time": "2024-07-12T16:18:53.048"}
纯文本格式
当请求的 Content-Type 为 text/plain 时,每行数据会被视为一条独立的对象。并且会将整行数据视为一个字符串,存储在一个只包含 message 字段的 map 对象中
hello world 2024-07-12T16:18:53.048
hello greptime 2024-07-12T16:18:53.048
上述的纯文本格式数据会被转换为如下的等价形式:
{"message": "hello world 2024-07-12T16:18:53.048"}
{"message": "hello greptime 2024-07-12T16:18:53.048"}
也就是说,当输入内容为纯文本格式时,需要在编写 Processor 和 Transform 的过程中,使用 message 来指代每行数据的内容。
整体结构
Pipeline 由四部分组成:Processors、Dispatcher、Transform 和 Table suffix。 Processors 对数据进行预处理。 Dispatcher 可以将 pipeline 执行上下文转发到不同的后续 pipeline 上。 Transform 决定最终保存在数据库中的数据类型和表结构。 Table suffix 支持将数据保存到不同的表中。
- Version 用于指定 pipeline 配置的格式。虽然它是可选的,但是我们强烈建议使用 version 2 来编写新配置。更多详情请参考这个章节。
- Processor 用于对 log 数据进行预处理,例如解析时间字段,替换字段等。
- Dispatcher(可选) 用于将执行上下文转发到另一个 pipeline,同一个输入批次的数据可以基于特定的值被不同的 pipeline 进行处理。
- Transform(可选) 用于对数据进行格式转换,例如将字符串类型转换为数字类型,并且指定数据库表中列的索引信息。
- Table suffix(可选) 用于将数据存储到不同的表中,以便后续使用。
一个包含 Processor 和 Transform 的简单配置示例如下:
version: 2
processors:
- urlencoding:
fields:
- string_field_a
- string_field_b
method: decode
ignore_missing: true
dispatcher:
field: type
rules:
- value: http
table_suffix: http
pipeline: http
- value: db
table_suffix: db
transform:
- fields:
- string_field_a
- string_field_b
type: string
# 写入的数据必 须包含 timestamp 字段
- fields:
- reqTimeSec, req_time_sec
# epoch 是特殊字段类型,必须指定精度
type: epoch, ms
index: timestamp
table_suffix: _${string_field_a}
从 v0.15 开始, GreptimeDB 引入了一个新的文件格式版本。
其主要的差别在于 Transform 的处理逻辑。
请参考下述章节查看更多细节。
Processor
Processor 用于对 log 数据进行预处理,其配置位于 YAML 文件中的 processors 字段下。
Pipeline 会按照多个 Processor 的顺序依次加工数据,每个 Processor 都依赖于上一个 Processor 处理的结果。
Processor 由一个 name 和多个配置组成,不同类型的 Processor 配置有不同的字段。
我们目前内置了以下几种 Processor:
date: 解析格式化的时间字符串字段,例如2024-07-12T16:18:53.048。epoch: 解析数字时间戳字段,例如1720772378893。decolorize: 移除日志数据中的 ANSI 颜色代码。dissect: 对 log 数据字段进行拆分。gsub: 对 log 数据字段进行替换。join: 对 log 中的 array 类型字段进行合并。letter: 对 log 数据字段进行字母转换。regex: 对 log 数据字段进行正则匹配。urlencoding: 对 log 数据字段进行 URL 编解码。csv: 对 log 数据字段进行 CSV 解析。json_path: 从 JSON 数据中提取字段。(已废弃,请使用vrl)json_parse: 将一个字段解析成 JSON 对象。simple_extract: 使用简单的 key 从 JSON 数据中提取字段。digest: 提取日志消息模板。select: 从 pipeline 执行上下文中保留或移除字段。vrl: 使用 pipeline 上下文执行 vrl 脚本。filter: 过滤不需要写入的行数据。
Processor 的输入和输出
大多数 Processor 接收一个 field 或者 fields 参数(一个串行处理的 field 列表)作为输入数据。
Processor 会产出一个或者多个输出数据。
对于那些只产出一个输出数据的 processor,输出的数据会替换上下文中原始 key 所关联的数据。
下述的示例中,在 letter processor 之后,一个大写版本的字符串会被保存在 message 字段中。
processors:
- letter:
fields:
- message
method: upper
我们可以将输出数据保存到另一个字段中,使得原有的字段不变。
下述的示例中,我们依然使用 message 字段作为 letter processor 的输入,但是将输出保存到一个名为 upper_message 的新字段中。
processors:
- letter:
fields:
- key: message
rename_to: upper_message
method: upper
这个重命名的语法有一种便捷的书写方式:通过 , 将两个字段分割即可。
以下是一个示例:
processors:
- letter:
fields:
- message, upper_message
method: upper
重命名主要有两个场景:
- 保持原字段不变,使得它可以被多个 processor 使用,或者作为原始记录被保存到数据库中。
- 统一命名。例如为了一致性,将驼峰命名法的变量重命名为蛇形命名法。
date
date Processor 用于解析时间字段。示例配置如下:
processors:
- date:
fields:
- time
formats:
- '%Y-%m-%d %H:%M:%S%.3f'
ignore_missing: true
timezone: 'Asia/Shanghai'
如上所示,date Processor 的配置包含以下字段:
fields: 需要解析的时间字段名列表。formats: 时间格式化字符串,支持多个时间格式化字符串。按照提供的顺序尝试解析,直到解析成功。你可以在这里找到格式化的语法说明。ignore_missing: 忽略字段不存在的情况。默认为false。如果字段不存在,并且此配置为 false,则会抛出异常。timezone:时区。使用tz_database 中的时区标识符来指定时区。默认为UTC。
epoch
epoch Processor 用于解析时间戳字段,示例配置如下:
processors:
- epoch:
fields:
- reqTimeSec
resolution: millisecond
ignore_missing: true
如上所示,epoch Processor 的配置包含以下字段:
fields: 需要解析的时间戳字段名列表。resolution: 时间戳精度,支持s,sec,second,ms,millisecond,milli,us,microsecond,micro,ns,nanosecond,nano。默认为ms。ignore_missing: 忽略字段不存在的情况。默认为false。如果字段不存在,并且此配置为 false,则会抛出异常。
decolorize
decolorize Processor 用于移除日志数据中的 ANSI 颜色代码。示例配置如下:
processors:
- decolorize:
fields:
- message
如上所示,decolorize Processor 的配置包含以下字段:
fields: 需要移除颜色代码的字段名列表。
dissect
dissect Processor 用于对 log 数据字段进行拆分,示例配置如下:
processors:
- dissect:
fields:
- message
patterns:
- '%{key1} %{key2}'
ignore_missing: true
append_separator: '-'
如上所示,dissect Processor 的配置包含以下字段:
fields: 需要拆分的字段名列表。不支持字段重命名。patterns: 拆分的 dissect 模式。ignore_missing: 忽略字段不存在的情况。默认为false。如果字段不存在,并且此配置为 false,则会抛出异常。append_separator: 对于多个追加到一起的字段,指定连接符。默认是一个空字符串。