当您的 CAPTCHA 管道处理数千个任务时,grep 无法扩展。 ELK Stack(Elasticsearch、Logstash、Kibana)可让您搜索、聚合和可视化解决日志 - 查找错误模式、跟踪延迟趋势并在几秒钟内诊断问题。
建筑学
[CAPTCHA Workers] → JSON logs → [Filebeat] → [Logstash] → [Elasticsearch]
↓
[Kibana]
结构化日志记录
Python——JSON 日志输出
import os
import json
import time
import logging
import sys
import requests
API_KEY = os.environ["CAPTCHAAI_API_KEY"]
class JSONFormatter(logging.Formatter):
def format(self, record):
log_entry = {
"timestamp": self.formatTime(record),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
# Add extra fields
if hasattr(record, "captcha_id"):
log_entry["captcha_id"] = record.captcha_id
if hasattr(record, "captcha_type"):
log_entry["captcha_type"] = record.captcha_type
if hasattr(record, "solve_time"):
log_entry["solve_time"] = record.solve_time
if hasattr(record, "error_code"):
log_entry["error_code"] = record.error_code
if hasattr(record, "target_url"):
log_entry["target_url"] = record.target_url
if hasattr(record, "poll_count"):
log_entry["poll_count"] = record.poll_count
return json.dumps(log_entry)
# Configure logger
logger = logging.getLogger("captchaai")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
session = requests.Session()
def solve_captcha(sitekey, pageurl, captcha_type="recaptcha_v2"):
extra = {"captcha_type": captcha_type, "target_url": pageurl}
# Submit
resp = session.post("https://ocr.captchaai.com/in.php", data={
"key": API_KEY,
"method": "userrecaptcha",
"googlekey": sitekey,
"pageurl": pageurl,
"json": 1
})
data = resp.json()
if data.get("status") != 1:
logger.error("Submit failed", extra={
**extra, "error_code": data.get("request")
})
return {"error": data.get("request")}
captcha_id = data["request"]
extra["captcha_id"] = captcha_id
logger.info("Task submitted", extra=extra)
# Poll
start = time.time()
poll_count = 0
for _ in range(60):
time.sleep(5)
poll_count += 1
result = session.get("https://ocr.captchaai.com/res.php", params={
"key": API_KEY, "action": "get", "id": captcha_id, "json": 1
}).json()
if result.get("status") == 1:
elapsed = round(time.time() - start, 2)
logger.info("Solve success", extra={
**extra,
"solve_time": elapsed,
"poll_count": poll_count
})
return {"solution": result["request"]}
if result.get("request") != "CAPCHA_NOT_READY":
logger.error("Solve failed", extra={
**extra,
"error_code": result.get("request"),
"poll_count": poll_count
})
return {"error": result.get("request")}
logger.error("Solve timeout", extra={
**extra,
"error_code": "TIMEOUT",
"poll_count": poll_count
})
return {"error": "TIMEOUT"}
JavaScript——结构化日志记录
const axios = require("axios");
const API_KEY = process.env.CAPTCHAAI_API_KEY;
function log(level, message, fields = {}) {
const entry = {
timestamp: new Date().toISOString(),
level,
message,
service: "captcha-worker",
...fields,
};
console.log(JSON.stringify(entry));
}
async function solveCaptcha(sitekey, pageurl, captchaType = "recaptcha_v2") {
const fields = { captchaType, targetUrl: pageurl };
const submitResp = await axios.post("https://ocr.captchaai.com/in.php", null, {
params: {
key: API_KEY, method: "userrecaptcha",
googlekey: sitekey, pageurl, json: 1,
},
});
if (submitResp.data.status !== 1) {
log("error", "Submit failed", { ...fields, errorCode: submitResp.data.request });
return { error: submitResp.data.request };
}
const captchaId = submitResp.data.request;
fields.captchaId = captchaId;
log("info", "Task submitted", fields);
const startTime = Date.now();
let pollCount = 0;
for (let i = 0; i < 60; i++) {
await new Promise((r) => setTimeout(r, 5000));
pollCount++;
const pollResp = await axios.get("https://ocr.captchaai.com/res.php", {
params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
});
if (pollResp.data.status === 1) {
const solveTime = ((Date.now() - startTime) / 1000).toFixed(2);
log("info", "Solve success", { ...fields, solveTime: parseFloat(solveTime), pollCount });
return { solution: pollResp.data.request };
}
if (pollResp.data.request !== "CAPCHA_NOT_READY") {
log("error", "Solve failed", { ...fields, errorCode: pollResp.data.request, pollCount });
return { error: pollResp.data.request };
}
}
log("error", "Solve timeout", { ...fields, errorCode: "TIMEOUT", pollCount });
return { error: "TIMEOUT" };
}
module.exports = { solveCaptcha };
Filebeat 配置
# filebeat.yml
filebeat.inputs:
- type: log
paths:
- /var/log/captcha-worker/*.log
json:
keys_under_root: true
add_error_key: true
message_key: message
output.logstash:
hosts: ["logstash:5044"]
Logstash 管道
# logstash-captcha.conf
input {
beats {
port => 5044
}
}
filter {
# Parse JSON logs
json {
source => "message"
target => "captcha"
}
# Add computed fields
if [captcha][solve_time] {
mutate {
add_field => {
"solve_time_bucket" => "fast"
}
}
if [captcha][solve_time] > 30 {
mutate { update => { "solve_time_bucket" => "medium" } }
}
if [captcha][solve_time] > 90 {
mutate { update => { "solve_time_bucket" => "slow" } }
}
}
# Extract date
date {
match => ["[captcha][timestamp]", "ISO8601"]
target => "@timestamp"
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "captcha-logs-%{+YYYY.MM.dd}"
}
}
Elasticsearch 索引模板
{
"index_patterns": ["captcha-logs-*"],
"template": {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"captcha_type": { "type": "keyword" },
"captcha_id": { "type": "keyword" },
"error_code": { "type": "keyword" },
"solve_time": { "type": "float" },
"poll_count": { "type": "integer" },
"target_url": { "type": "keyword" },
"level": { "type": "keyword" },
"message": { "type": "text" }
}
}
}
}
Kibana 仪表板面板
| 控制板 | 可视化 | 询问 |
|---|---|---|
| 解决成功率 | 公制 | level:info AND message:"Solve success" / 总计 |
| 错误细分 | 饼图 | level:error 按 error_code 分组 |
| 随着时间的推移延迟 | 折线图 | 随着时间的推移平均 solve_time |
| 随着时间的推移出现错误 | 条形图 | 每 5 分钟桶计数 level:error |
| 最慢的解决方案 | 数据表 | 按 solve_time 降序排列的前 10 名 |
| 队列活动 | 面积图 | 按message计数(“任务已提交”与“解决成功”) |
有用的查询
# All errors in the last hour
level:error AND @timestamp:[now-1h TO now]
# Timeout errors for reCAPTCHA
error_code:TIMEOUT AND captcha_type:recaptcha_v2
# Slow solves (> 60 seconds)
solve_time:>60
# Errors for a specific target URL
level:error AND target_url:"example.com"
# Specific CAPTCHA ID investigation
captcha_id:"73519847"
故障排除
| 问题 | 原因 | 处理方式 |
|---|---|---|
| 日志未出现在 Kibana 中 | Filebeat 不传送日志 | 检查Filebeat日志;验证路径模式匹配 |
| JSON 解析错误 | 日志文件中的非 JSON 行 | 将 json.keys_under_root 添加到 Filebeat;修复记录器输出 |
| 索引过多 | 无 ILM 的每日索引 | 设置保留 30 天的索引生命周期管理 |
| 查询速度慢 | 缺少关键字映射 | 对可过滤字段使用 keyword 类型,而不是 text |
常问问题
我应该保留验证码日志多长时间?
操作日志 30 天。如果您需要趋势分析,则需要 90 天。使用 Elasticsearch ILM 自动删除旧索引。
我可以使用 OpenSearch 代替 Elasticsearch 吗?
是的。 OpenSearch 与 Elasticsearch 的 API 兼容。 Logstash 输出插件、Filebeat 和 Kibana 替代品(OpenSearch 仪表板)的工作方式相同。
我应该记录验证码解决方案文本吗?
不会。解决方案是一次性令牌,没有诊断价值。记录它们会增加存储成本并可能产生安全问题。仅记录元数据(ID、类型、延迟、状态)。
下一步
搜索并分析您的验证码日志 -获取您的 CaptchaAI API 密钥并设置ELK。
相关指南:
- 结构化日志记录
- 数据狗监控
- 开放遥测追踪