搜索中...
🔍

未找到相关结果

Akemi

Dify创建工作流并处理Prometheus数据"

字数统计: 1.9k阅读时长: 8 min
2026/03/18

在开始之前,我收回之前对于langchain的所有诋毁,langchain难啃是难啃,东西确实都是干货

总体思路

Prometheus → Crontab脚本 → Dify API → DeepSeek

少了一层Alertmanager和Webhook转发,只需要维护脚本

  • 提取参数,从用户自然语言中提取登录总人数
  • 得到方案,通过代码判断异常登录的比例,从而判断是否要继续
  • 细化方案,输入基本解决方案,通过知识库检索详细解决方案
  • 生成报表LLM,输出解决方案

dify安装部署

1
2
3
4
5
6
7
8
9
10
11
git clone https://github.com/langgenius/dify.git

cd dify/docker/
cp .env.example .env

sed -i 's/^EXPOSE_NGINX_PORT.*/EXPOSE_NGINX_PORT=8085/g' .env
sed -i 's/^EXPOSE_NGINX_SSL_PORT.*/EXPOSE_NGINX_SSL_PORT=8443/g' .env

docker compose up -d

网页访问192.168.10.100:8085进行注册登录

创建工作流

1.开始节点,用来接受用户输入
2.参数提取器,将参数从用户输入中提取出来
3.代码执行模块,根据提取出的参数提出解决方案(粗略
4.知识检索,添加知识库并选择所使用的嵌入式模型,提供详细的方案(详细
5.大模型根据代码执行的结果和知识检索,给出最终的方案

参数提取

代码执行模块

基于不同的登录失败率,生成初步的方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def main(arg1: str, arg2: str) -> dict:
"""
参数:
arg1: 总登录人数(字符串类型,需要转换为整数)
arg2: 异常登录人数(字符串类型,需要转换为整数)

返回:
包含处理建议的字典
"""
# 将字符串参数转换为整数
try:
total_login = int(arg1)
failed_login_count = int(arg2)
except ValueError:
return {"result": "参数类型错误,请输入有效的数字"}

# 边界情况处理
if total_login == 0:
return {"result": "总登录人数为0,无法计算异常率,请检查指标采集是否正常"}

# 计算异常率(保留2位小数)
anomaly_rate = round(failed_login_count / total_login * 100, 2)

# 1级:轻微异常(异常率≤2%)
if anomaly_rate <= 2:
suggestion = """
1. 监控层面:开启5分钟粒度的异常趋势跟踪,无需触发告警
2. 操作层面:抽样检查3-5条异常日志,确认是否为用户操作失误
3. 后续跟进:若10分钟内异常率持续上升至2%以上,手动升级处理
""".strip()

# 2级:一般异常(2%<异常率≤5%)
elif anomaly_rate <= 5:
suggestion = """
1. 服务检查:执行kubectl get pods | grep login-service确认服务存活
2. 日志分析:按错误类型统计(4xx/5xx),排查是否集中在特定区域
3. 处理动作:重启异常服务实例,清理登录相关缓存
4. 通知:同步当日值班运维工程师关注
""".strip()

# 3级:中度异常(5%<异常率≤10%)
elif anomaly_rate <= 10:
suggestion = """
1. 紧急排查:检查服务资源使用率(CPU<80%、内存<85%),分析登录SQL慢查询
2. 控制影响:封禁1分钟内失败次数>20次的IP(临时1小时),启用缓存降级
3. 协同响应:通知运维团队全员,同步开发查看近期代码变更
4. 进度跟踪:每15分钟输出一次异常处理进展
""".strip()

# 4级:严重异常(10%<异常率≤20%)
elif anomaly_rate <= 20:
suggestion = """
1. 资源扩容:登录服务实例紧急扩容至原数量×2,数据库连接数调至2000
2. 服务降级:关闭登录积分等非核心功能,对非会员启用验证码
3. 应急响应:电话通知运维负责人(15分钟内到场),启动应急预案
4. 进度同步:每10分钟向技术总监汇报处理进度
""".strip()

# 5级:紧急异常(异常率>20%)
else:
suggestion = """
1. 全网告警:触发短信+电话告警(技术总监、运维/开发负责人)
2. 系统处理:启动灾备系统,实施流量限流(仅允许30%正常流量)
3. 业务协同:通知业务负责人评估影响,成立临时应急小组
4. 高层汇报:每5分钟在应急群更新进展,30分钟向管理层汇报
""".strip()

return {"result": suggestion}

创建知识库

rerank就是细化检索#

将最后的结果送给大模型分析

添加最后的输出结果

Agent调用工作流

打包完成后发布为工具,可以将其作为一个agent进行调用

创建AI Agent。说实话学过langchain,这种东西看一眼就知道是啥意思了,langchain还是太权威了

这里显然告警等级有点问题,但是不重要,估计是代码执行的时候出了点问题

Crontab脚本调用工作流

使用crontab调用脚本,脚本中使用api进行调用(api_key可以在dify网页端创建)

脚本一方面使用curl从prometheus中获取数据,一方面将获取的数据格式化成自然语言后,调用dify的接口发给它来运行工作流

注:我这个脚本抓取的是有史以来所有的数据,但在实践中会对抓取窗口做限制

省略的步骤:
使用了prometheus的sdk,在本机上暴露了端口发送登录的信息,prometheus配置抓取该端口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#!/bin/bash

# ===== 配置部分 =====
API_KEY="app-RWFngEfUSKmWPVDHXQy4mHvB"
BASE_URL="http://192.168.10.100:8085/v1"
USER_ID="wangsheng" # 用户的ID,随意
PROM_URL="http://localhost:9090"
LOG_FILE="inspector.log"

# ===== jq 路径 =====
JQ_CMD="/usr/bin/jq"

# ===== 日志函数 =====
log() {
TIMESTAMP=$(date +"%Y-%m-%d %H:%M:%S")
echo "[$TIMESTAMP] $1" | tee -a "$LOG_FILE"
}

log "脚本开始执行"

# ===== 查询 Prometheus 指标 =====
log "查询 Prometheus: 总登录次数"
TOTAL_LOGINS=$(curl -s "${PROM_URL}/api/v1/query?query=user_login_total" \
| $JQ_CMD '[.data.result[] | .value[1] | tonumber] | add')
TOTAL_LOGINS=${TOTAL_LOGINS:-0}
log "总登录次数: $TOTAL_LOGINS"

log "查询 Prometheus: 登录成功次数"
SUCCESS_LOGINS=$(curl -s "${PROM_URL}/api/v1/query?query=user_login_total" \
| $JQ_CMD '[.data.result[] | select(.metric.login_status=="['\''success'\'']") | .value[1] | tonumber] | add')
SUCCESS_LOGINS=${SUCCESS_LOGINS:-0}
log "登录成功次数: $SUCCESS_LOGINS"

log "查询 Prometheus: 登录失败次数"
FAIL_LOGINS=$(curl -s "${PROM_URL}/api/v1/query?query=user_login_total" \
| $JQ_CMD '[.data.result[] | select(.metric.login_status=="['\''fail'\'']") | .value[1] | tonumber] | add')
FAIL_LOGINS=${FAIL_LOGINS:-0}
log "登录失败次数: $FAIL_LOGINS"

# ===== 生成 Workflow 输入文本 =====
INPUT_TEXT="用户登录统计:总登录次数: ${TOTAL_LOGINS},登录成功: ${SUCCESS_LOGINS},登录失败: ${FAIL_LOGINS}"
SAFE_INPUT_TEXT=$(echo "$INPUT_TEXT" | $JQ_CMD -Rs .)
log "生成 Workflow 输入文本: $INPUT_TEXT"

# ===== 调用 Workflow =====
log "🚀 正在调用 Workflow ..."
RESPONSE=$(curl -s -X POST "${BASE_URL}/workflows/run" \
-H "Authorization: Bearer ${API_KEY}" \
-H "Content-Type: application/json" \
-d "{
\"inputs\": {
\"user_query\": $SAFE_INPUT_TEXT
},
\"response_mode\": \"blocking\",
\"user\": \"${USER_ID}\"
}")

log "收到 Workflow 原始响应: $RESPONSE"

# ===== 输出结果 =====
echo "✅ Workflow 返回结果:"
RESULT=$(echo "${RESPONSE}" | $JQ_CMD -r '.data.outputs.tool_response')
echo "$RESULT"
log "Workflow 中文结果: $RESULT"

log "脚本执行完成"

# ===== 保存报告 =====
TIMESTAMP=$(date +"%Y%m%d%H%M%S")
REPORT_DIR="/Users/cuihao/docker/dify/report"
mkdir -p "$REPORT_DIR"
REPORT_FILE="${REPORT_DIR}/inspector_${TIMESTAMP}.json"
echo "$RESULT" > "$REPORT_FILE"
log "Workflow 结果已保存到: $REPORT_FILE"

log "脚本执行完成"

Crontab脚本调用结果

dify层面日志

CATALOG
  1. 1. 总体思路
  2. 2. dify安装部署
  3. 3. 创建工作流
    1. 3.1. 参数提取
    2. 3.2. 代码执行模块
    3. 3.3. 创建知识库
    4. 3.4. 将最后的结果送给大模型分析
  4. 4. Agent调用工作流
  5. 5. Crontab脚本调用工作流