Kafka
如果你使用 Kafka 或兼容 Kafka 的消息队列进行可观测性数据传输,可以直接将数据写入到 GreptimeDB 中。
这里我们使用 Vector 作为工具将数据从 Kafka 传输到 GreptimeDB。
指标
从 Kafka 写入指标到 GreptimeDB 时,消息应采用 InfluxDB 行协议格式。例如:
census,location=klamath,scientist=anderson bees=23 1566086400000000000
然后配置 Vector 使用 influxdb 解码器来处理这些消息。
[sources.metrics_mq]
# 指定源类型为 Kafka
type = "kafka"
# Kafka 的消费者组 ID
group_id = "vector0"
# 要消费消息的 Kafka 主题列表
topics = ["test_metric_topic"]
# 要连接的 Kafka 地址
bootstrap_servers = "kafka:9092"
# `influxdb` 表示消息应采用 InfluxDB 行协议格式
decoding.codec = "influxdb"
[sinks.metrics_in]
inputs = ["metrics_mq"]
# 指定接收器类型为 `greptimedb_metrics`
type = "greptimedb_metrics"
# GreptimeDB 服务器的端点
# 将 <host> 替换为实际的主机名或 IP 地址
endpoint = "<host>:4001"
dbname = "<dbname>"
username = "<username>"
password = "<password>"
tls = {}
有关 InfluxDB 行协议指标如何映射到 GreptimeDB 数据的详细信息,请参阅 InfluxDB 行协议文档中的数据模型部分。
日志
开发人员通常处理两种类型的日志:JSON 日志和纯文本日志。 例如以下从 Kafka 发送的日志示例。
纯文本日志:
127.0.0.1 - - [25/May/2024:20:16:37 +0000] "GET /index.html HTTP/1.1" 200 612 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
或 JSON 日志:
{
"timestamp": "2024-12-23T10:00:00Z",
"level": "INFO",
"message": "Service started"
}
GreptimeDB 将这些日志转换为具有多个列的结构化数据,并自动创建必要的表。 Pipeline 在写入到 GreptimeDB 之前将日志处理为结构化数据。 不同的日志格式需要不同的 Pipeline 来解析,详情请继续阅读下面的内容。