搜索中...
🔍

未找到相关结果

Akemi

两周复习EFK第八天之Fluentd+Fluent Bit数据清洗

2026/04/03

Fluentd数据处理模型理论深入

tag与多级tag

tag是Fluentd和fluent bit都有的机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
K8s节点 → Fluent Bit → Fluentd → ES
↓ ↓ ↓
生成Tag 基于Tag路由 基于内容存储
↓ ↓ ↓
kube.** 匹配规则 索引选择

第一层:Fluent Bit的Tag路由
# Fluent Bit配置示例
[INPUT]
Name tail
Tag kube.nginx # 生成Tag

[OUTPUT]
Name forward
Match kube.nginx # 匹配Tag,发送到Fluentd

第二层:Fluentd的Tag路由
# Fluentd配置示例
<source>
@type forward
# 接收Fluent Bit发来的数据,保持Tag不变
</source>

<match kube.nginx> # 基于同样的Tag继续路由
# 深度清洗、字段丰富
</match>

<match kube.**> # 更通用的匹配规则
# 通用处理逻辑
</match>

tag分层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
1.常见分层模式
kube.nginx.access # K8s中Nginx访问日志
kube.nginx.error # K8s中Nginx错误日志
kube.system.kubelet # K8s系统组件日志
host.system.ssh # 主机SSH日志
app.payment.api # 应用支付接口日志

2.match匹配规则与方式
Fluentd按配置文件顺序匹配,第一个匹配的规则生效:
所以具体的规则在前,通用规则在后
<match kube.nginx> # 只匹配 "kube.nginx"
<match kube.nginx.access> # 只匹配 "kube.nginx.access"
<match kube.**> # 匹配所有以 kube. 开头的Tag
<match **.error> # 匹配所有以 .error 结尾的Tag
<match app.*.api> # 匹配 app.xxx.api 模式
<match kube.{nginx,apache}> # 匹配 kube.nginx 或 kube.apache
<match {app,web}.api> # 匹配 app.api 或 web.api

3.分层的意义
(1)处理逻辑分离(给不同filter
kube.nginx.access → 解析访问日志,提取URL、状态码
kube.nginx.error → 解析错误日志,提取错误级别、堆栈
kube.system.kubelet → 解析系统日志,关注资源指标

(2)输出目标分流(给不同output
app.payment.** → 发送到高安全级别的ES集群
app.analytics.** → 发送到分析专用ES集群
** → 发送到通用ES集群(兜底)

(3)性能优化
kube.nginx.access(高频)→ 简单解析,快速转发
kube.system.kubelet(低频)→ 复杂解析,深度处理
**(其他)→ 基础处理,保证不丢数据

4.推荐的分层结构
第一级:环境标识 → {cluster}
第二级:命名空间 → {cluster}.{namespace}
第三级:工作负载 → {cluster}.{namespace}.{workload}
第四级:容器 → {cluster}.{namespace}.{workload}.{container}
第五级:日志类型 → {cluster}.{namespace}.{workload}.{container}.{log_type}

Fluentd插件链架构解析

Fluent Bit能做的

  • 多行日志合并
  • 简单正则解析(如提取IP、时间戳)
  • K8s元数据自动添加
  • 字段重命名/删除

Fluentd能做的

  • 复杂grok/regex解析
  • 日期时间标准化
  • 用户代理、地理位置等数据丰富
  • 基于业务逻辑的字段计算和转换
  • 多级插件链协同处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
输入插件 → Filter A → Filter B → Filter C → 输出插件
↓ ↓ ↓ ↓ ↓
接收数据 → 清洗数据 → 增强数据 → 转换数据 → 发送数据

顺序性:插件按配置顺序依次执行
数据传递:每个插件接收上一步的输出作为输入
增量处理:数据在链中逐步丰富和转换
条件执行:某些插件可以基于数据内容决定是否执行

# 举例说明
原始日志
192.168.1.100 - - [02/Apr/2026:11:45:32 +0800] "GET /api/v1/users HTTP/1.1" 200 1234 "Mozilla/5.0" "http://example.com"

1.解析器parser
将文本日志拆解为结构化字段
client_ip: "192.168.1.100"
timestamp: "02/Apr/2026:11:45:32 +0800"
method: "GET"
url: "/api/v1/users"
status_code: "200"
response_size: "1234"
user_agent: "Mozilla/5.0"
referrer: "http://example.com"

2.日期处理插件Date
将文本时间戳转换为标准化时间格式
@timestamp: "2026-04-02T03:45:32.000Z"(UTC时间)
移除原始的timestamp字段

3.用户代理解析插件User Agent
解析User-Agent字符串,提取设备信息
browser: {name: "Chrome", version: "120.0.0.0"}
os: {name: "Windows", version: "10"}
device: {type: "desktop"}

4.字段增强插件Record Transformer
添加业务逻辑相关的派生字段
api_version: "v1" # 从URL提取
endpoint_category: "users" #从URL提取
is_success: true # 基于status_code>199且<300

5.地理位置插件(GeoIP)
geo: {country: "中国", city: "北京", coordinates: {lat: 39.9042, lon: 116.4074}}

# 其他高级特性
# 条件执行
插件链:解析器 → [条件]用户代理解析 → 地理位置 → 输出
条件逻辑:
- 如果`user_agent`字段存在 → 执行用户代理解析插件
- 如果`client_ip`是公网IP → 执行地理位置插件
- 否则 → 跳过该插件

# 分支处理
状态码为5xx的错误日志 → 进入"错误处理链"(添加告警标记,发送到专门索引)
状态码为2xx的成功日志 → 进入"分析处理链"(添加业务标签,发送到分析索引)

# 并行处理
分支A:解析URL结构
分支B:分析User-Agent
分支C:验证请求合法性
聚合:合并所有结果

# 插件链设计原则
1. 失败率低的插件在前(避免后续插件白执行)
2. 计算量小的插件在前(快速过滤无效数据)
3. 必需的插件在前(确保基础字段存在)
4. 可选插件在后(基于已有字段决定是否执行)

与logstash架构的对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Logstash:经典的“管道-过滤器”模型
Input → Filter → Output
核心思想:数据流过固定的三段式管道
类比:工厂流水线,每个工位完成特定任务
配置范式:集中式、声明式配置

Fluentd:基于“标签路由”的插件链模型
输入 → 路由决策 → 插件链A → 输出A
→ 插件链B → 输出B
→ 插件链C → 输出C
核心思想:数据根据标签被路由到不同的处理链
类比:快递分拣中心,按地址分发到不同处理线
配置范式:分布式、路由式配置

1.配置差异
Logstash:在代码中表达路由逻辑(if [type] == "nginx" { ... })
Fluentd:在结构中表达路由逻辑(<match nginx.**>)

2.数据处理差异
隔离性:Fluentd不同链间完全隔离,Logstash共享同一处理上下文
并发性:Fluentd天然支持并行处理不同数据流
故障域:Fluentd链间故障隔离更好

3. 性能与资源特性
Logstash:单点处理能力强,适合集中式日志处理
Fluentd:资源效率高,适合云原生分布式环境

4.错误处理
Logstash:一个插件崩溃可能影响所有数据处理
Fluentd:一个链故障通常不影响其他链

5. 扩展与生态
两者都强:Elasticsearch输出、文件输入、正则解析
Logstash特强:复杂Grok模式、数据丰富化、协议解析
Fluentd特强:K8s集成、容器日志、指标收集

Fluentd数据清洗实践

当前我的k8s环境日志来源包括:
k8s组件日志
milvus 向量数据库组件日志
Prometheus及组件日志

fluent bit分流日志

  • 识别特征:容器名包含kindnet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 filters: |
[FILTER]
Name kubernetes
Match kube.var.log.containers.*
Merge_Log On
Keep_Log Off

[FILTER]
Name rewrite_tag
Match kube.var.log.containers.*
Rule $kubernetes['container_name'] ^(kindnet-cni)$ kube.net.kindnet false
# container_name 匹配成功,则改标签为 kube.system.go.apiserver
# false表示匹配到后,旧标签的数据就不再往下流了
# 如果要区分Nginx
# Rule $kubernetes['container_name'] ^.*nginx.*$ kube.app.nginx false


helm upgrade --install fluent-bit . -f ./values.yaml -n fluent

fluentd清洗kindnet日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# 添加fluentd配置,使其直接打印出kindnet的日志
02_output.conf: |-
<match kube.net.kindnet>
@type stdout
</match>
...

helm upgrade --install fluentd . -f ./values.yaml -n fluent

可以看到日志的格式为:
I0402 08:41:30.590752 1 main.go:297] Handling node with IPs: map[172.19.0.3:{}]

level: I
k8s_timestamp: 0402 08:41:30.590752
pid: 1
source_file: main.go
line: 297
message: Handling node with IPs: map[172.19.0.3:{}]

# 添加fluentd配置(Ruby DSL 语法)
01_filter.conf: |-
# 数据清洗,expression使用正则将整句文本进行拆解,grok插件最终也会转化成正则
<filter kube.net.kindnet>
@type parser
key_name log # 指定要对哪个字段进行正则拆解
reserve_data true # 保留原有的kubernetes元数据字段
remove_key_name_field true # 解析成功后,删除原始的log字符串
<parse>
@type regexp
expression /^(?<level>[IWEF])(?<date>\d{4})\s+(?<time>\d{2}:\d{2}:\d{2}\.\d{6})\s+(?<pid>\d+)\s+(?<file>[^:]+):(?<line>\d+)\]\s+(?<message>.*)$/
</parse>
</filter>

# 数据(写时)增强,将告警等级变具体
<filter kube.net.kindnet>
@type record_transformer
enable_ruby true
<record>
level_name ${ {"I"=>"INFO", "W"=>"WARN", "E"=>"ERROR", "F"=>"FATAL"}[record["level"]] }
clean_status "success" # 标记该记录已成功通过清洗逻辑
processed_at ${Time.now.to_s} # 记录 Fluentd 正确处理这条日志的具体时间
processed_by "fluentd-aio-cleaner" # 记录处理这条日志的具体组件名称
</record>
</filter>

# 更新helm
helm upgrade --install fluentd . -f ./values.yaml -n fluent

# 重新查看fluentd输出的日志
2026-04-02 09:03:31.181201000 +0000 kube.net.kindnet: {"time":"2026-04-02T09:03:31.181227255Z","stream":"stderr","_p":"F","kubernetes":{"pod_name":"kindnet-dztmq","namespace_name":"kube-system","pod_id":"e3df4eab-ac3b-41eb-9c50-86269927556f","labels":{"app":"kindnet","controller-revision-hash":"5b49848c94","k8s-app":"kindnet","pod-template-generation":"1","tier":"node"},"host":"ws-k8s-worker2","pod_ip":"172.19.0.3","container_name":"kindnet-cni","docker_id":"914c97f17227aaf4940bb038b409c46fe5a58b6429d362ca7408b55687a34ca0","container_hash":"sha256:50415e5d05f05adbdfd902507532ebb86f924dc2e05511a3b47920156ee4236e","container_image":"docker.io/kindest/kindnetd:v20241108-5c6d2daf"},"level":"I","date":"0402","pid":"1","file":"main.go","line":"324","message":"Node ws-k8s-worker has CIDR [10.244.1.0/24] ","level_name":"INFO","clean_status":"success","processed_at":"2026-04-02 09:03:31 +0000","processed_by":"fluentd-aio-cleaner"}
正则片段 匹配内容 示例结果 (JSON Key: Value)
^(?<level>[IWEF]) 日志级别(首字母) "level": "I"
(?<date>\d{4}) 4位数字日期 (MMDD) "date": "0402"
\s+(?<time>...) 空格 + 详细时间戳 "time": "08:41:30.590752"
\s+(?<pid>\d+) 空格 + 进程 ID "pid": "1"
\s+(?<file>[^:]+) 空格 + 文件名(直到冒号) "file": "main.go"
:(?<line>\d+)\] 冒号 + 行号 + 右中括号 "line": "297"
\s+(?<message>.*)$ 空格 + 剩下的所有消息内容 "message": "Handling node..."

fluent bit多行日志合并(未完成)

如果 kindnet 抛出一个 Go 的多行报错,现在的正则会失效(只能匹配到第一行)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
多行日志合并的重点是定义"开始行"
开始行 (start_state):匹配到这个模式,说明这是一个新的日志块
后续行 (cont):如果不匹配"开始行",就一直把内容追加到上一条日志里

I0402 08:41:30.590752 1 main.go:297] Handling node with IPs: map[172.19.0.3:{}]

# 在 custom_parsers.conf 中定义解析器,让input中应用它
custom_parsers.conf: |
[MULTILINE_PARSER]
name klog-multiline
type regex
flush_timeout 1000
rule "start_state" "/^([IWEF])\d{4}/" "cont"
rule "cont" "/^(?![IWEF]\d{4})/" "cont"
# 规则:匹配 I/W/E/F + 4位数字(如 I0402)。只要符合这个,就是新的一行
# 规则:如果不符合上面的开头,就属于持续行,继续合并

# input使用新解析器
inputs: |
[INPUT]
Name tail
Path /var/log/containers/*.log
# Kind 环境建议加上 DB 记录读取位点
DB /fluent-bit/tail_db.db
multiline.parser klog-multiline
Tag kube.*
Mem_Buf_Limit 50MB
Skip_Long_Lines On

# 验证
kubectl exec -it -n kube-system kindnet-dztmq -- sh
# 先写一行标准的开头(符合 start_state)
echo "I0403 09:40:00.123456 1 main.go:100] Standard log line"

# 紧接着写几行不符合开头的(应该被合并进上一行)
echo " This is a fake stacktrace line 1"
echo " This is a fake stacktrace line 2"

# 再写一行标准开头(触发上一条合并结束,开启新的一条)
echo "I0403 09:40:05.654321 1 main.go:101] New standard line"

但是没反应。。而且会导致fluentd解析错误,算了,这个需求再说吧

分析与清洗milvus数据库日志(未完成)

  • 配置fluent bit重定向tag,筛选出milvus相关的日志
  • 配置fluentd将日志输出在控制台来调试,并查看日志格式
  • 配置fluentd里针对该tag的日志格式,进行数据清洗和增强
  • 根据清洗完成后的数据,定义es索引模板
  • 如果milvus的多个服务日志格式不同,需要fluentd中进行路由处理

分析日志格式

milvus组件我开启了下面这些,并观察日志格式
milvus-datanode
milvus-etcd
mil
CATALOG
  1. 1. Fluentd数据处理模型理论深入
    1. 1.1. tag与多级tag
    2. 1.2. Fluentd插件链架构解析
    3. 1.3. 与logstash架构的对比
  2. 2. Fluentd数据清洗实践
    1. 2.1. fluent bit分流日志
    2. 2.2. fluentd清洗kindnet日志
    3. 2.3. fluent bit多行日志合并(未完成)
  3. 3. 分析与清洗milvus数据库日志(未完成)
    1. 3.1. 分析日志格式