DevOps 与扩展

用于验证码解决日志分析的 ELK Stack

当您的 CAPTCHA 管道处理数千个任务时,grep 无法扩展。 ELK Stack(Elasticsearch、Logstash、Kibana)可让您搜索、聚合和可视化解决日志 - 查找错误模式、跟踪延迟趋势并在几秒钟内诊断问题。

建筑学

[CAPTCHA Workers] → JSON logs → [Filebeat] → [Logstash] → [Elasticsearch]
                                                                ↓
                                                           [Kibana]

结构化日志记录

Python——JSON 日志输出

import os
import json
import time
import logging
import sys
import requests

API_KEY = os.environ["CAPTCHAAI_API_KEY"]


class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_entry = {
            "timestamp": self.formatTime(record),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
        }
        # Add extra fields
        if hasattr(record, "captcha_id"):
            log_entry["captcha_id"] = record.captcha_id
        if hasattr(record, "captcha_type"):
            log_entry["captcha_type"] = record.captcha_type
        if hasattr(record, "solve_time"):
            log_entry["solve_time"] = record.solve_time
        if hasattr(record, "error_code"):
            log_entry["error_code"] = record.error_code
        if hasattr(record, "target_url"):
            log_entry["target_url"] = record.target_url
        if hasattr(record, "poll_count"):
            log_entry["poll_count"] = record.poll_count
        return json.dumps(log_entry)


# Configure logger
logger = logging.getLogger("captchaai")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)

session = requests.Session()


def solve_captcha(sitekey, pageurl, captcha_type="recaptcha_v2"):
    extra = {"captcha_type": captcha_type, "target_url": pageurl}

    # Submit
    resp = session.post("https://ocr.captchaai.com/in.php", data={
        "key": API_KEY,
        "method": "userrecaptcha",
        "googlekey": sitekey,
        "pageurl": pageurl,
        "json": 1
    })
    data = resp.json()

    if data.get("status") != 1:
        logger.error("Submit failed", extra={
            **extra, "error_code": data.get("request")
        })
        return {"error": data.get("request")}

    captcha_id = data["request"]
    extra["captcha_id"] = captcha_id
    logger.info("Task submitted", extra=extra)

    # Poll
    start = time.time()
    poll_count = 0
    for _ in range(60):
        time.sleep(5)
        poll_count += 1
        result = session.get("https://ocr.captchaai.com/res.php", params={
            "key": API_KEY, "action": "get", "id": captcha_id, "json": 1
        }).json()

        if result.get("status") == 1:
            elapsed = round(time.time() - start, 2)
            logger.info("Solve success", extra={
                **extra,
                "solve_time": elapsed,
                "poll_count": poll_count
            })
            return {"solution": result["request"]}

        if result.get("request") != "CAPCHA_NOT_READY":
            logger.error("Solve failed", extra={
                **extra,
                "error_code": result.get("request"),
                "poll_count": poll_count
            })
            return {"error": result.get("request")}

    logger.error("Solve timeout", extra={
        **extra,
        "error_code": "TIMEOUT",
        "poll_count": poll_count
    })
    return {"error": "TIMEOUT"}

JavaScript——结构化日志记录

const axios = require("axios");

const API_KEY = process.env.CAPTCHAAI_API_KEY;

function log(level, message, fields = {}) {
  const entry = {
    timestamp: new Date().toISOString(),
    level,
    message,
    service: "captcha-worker",
    ...fields,
  };
  console.log(JSON.stringify(entry));
}

async function solveCaptcha(sitekey, pageurl, captchaType = "recaptcha_v2") {
  const fields = { captchaType, targetUrl: pageurl };

  const submitResp = await axios.post("https://ocr.captchaai.com/in.php", null, {
    params: {
      key: API_KEY, method: "userrecaptcha",
      googlekey: sitekey, pageurl, json: 1,
    },
  });

  if (submitResp.data.status !== 1) {
    log("error", "Submit failed", { ...fields, errorCode: submitResp.data.request });
    return { error: submitResp.data.request };
  }

  const captchaId = submitResp.data.request;
  fields.captchaId = captchaId;
  log("info", "Task submitted", fields);

  const startTime = Date.now();
  let pollCount = 0;

  for (let i = 0; i < 60; i++) {
    await new Promise((r) => setTimeout(r, 5000));
    pollCount++;

    const pollResp = await axios.get("https://ocr.captchaai.com/res.php", {
      params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
    });

    if (pollResp.data.status === 1) {
      const solveTime = ((Date.now() - startTime) / 1000).toFixed(2);
      log("info", "Solve success", { ...fields, solveTime: parseFloat(solveTime), pollCount });
      return { solution: pollResp.data.request };
    }

    if (pollResp.data.request !== "CAPCHA_NOT_READY") {
      log("error", "Solve failed", { ...fields, errorCode: pollResp.data.request, pollCount });
      return { error: pollResp.data.request };
    }
  }

  log("error", "Solve timeout", { ...fields, errorCode: "TIMEOUT", pollCount });
  return { error: "TIMEOUT" };
}

module.exports = { solveCaptcha };

Filebeat 配置

# filebeat.yml
filebeat.inputs:

  - type: log
    paths:

      - /var/log/captcha-worker/*.log
    json:
      keys_under_root: true
      add_error_key: true
      message_key: message

output.logstash:
  hosts: ["logstash:5044"]

Logstash 管道

# logstash-captcha.conf
input {
  beats {
    port => 5044
  }
}

filter {
  # Parse JSON logs
  json {
    source => "message"
    target => "captcha"
  }

  # Add computed fields
  if [captcha][solve_time] {
    mutate {
      add_field => {
        "solve_time_bucket" => "fast"
      }
    }
    if [captcha][solve_time] > 30 {
      mutate { update => { "solve_time_bucket" => "medium" } }
    }
    if [captcha][solve_time] > 90 {
      mutate { update => { "solve_time_bucket" => "slow" } }
    }
  }

  # Extract date
  date {
    match => ["[captcha][timestamp]", "ISO8601"]
    target => "@timestamp"
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "captcha-logs-%{+YYYY.MM.dd}"
  }
}

Elasticsearch 索引模板

{
  "index_patterns": ["captcha-logs-*"],
  "template": {
    "settings": {
      "number_of_shards": 1,
      "number_of_replicas": 0
    },
    "mappings": {
      "properties": {
        "captcha_type": { "type": "keyword" },
        "captcha_id": { "type": "keyword" },
        "error_code": { "type": "keyword" },
        "solve_time": { "type": "float" },
        "poll_count": { "type": "integer" },
        "target_url": { "type": "keyword" },
        "level": { "type": "keyword" },
        "message": { "type": "text" }
      }
    }
  }
}

Kibana 仪表板面板

控制板 可视化 询问
解决成功率 公制 level:info AND message:"Solve success" / 总计
错误细分 饼图 level:errorerror_code 分组
随着时间的推移延迟 折线图 随着时间的推移平均 solve_time
随着时间的推移出现错误 条形图 每 5 分钟桶计数 level:error
最慢的解决方案 数据表 solve_time 降序排列的前 10 名
队列活动 面积图 message计数(“任务已提交”与“解决成功”)

有用的查询

# All errors in the last hour
level:error AND @timestamp:[now-1h TO now]

# Timeout errors for reCAPTCHA
error_code:TIMEOUT AND captcha_type:recaptcha_v2

# Slow solves (> 60 seconds)
solve_time:>60

# Errors for a specific target URL
level:error AND target_url:"example.com"

# Specific CAPTCHA ID investigation
captcha_id:"73519847"

故障排除

问题 原因 处理方式
日志未出现在 Kibana 中 Filebeat 不传送日志 检查Filebeat日志;验证路径模式匹配
JSON 解析错误 日志文件中的非 JSON 行 json.keys_under_root 添加到 Filebeat;修复记录器输出
索引过多 无 ILM 的每日索引 设置保留 30 天的索引生命周期管理
查询速度慢 缺少关键字映射 对可过滤字段使用 keyword 类型,而不是 text

常问问题

我应该保留验证码日志多长时间?

操作日志 30 天。如果您需要趋势分析,则需要 90 天。使用 Elasticsearch ILM 自动删除旧索引。

我可以使用 OpenSearch 代替 Elasticsearch 吗?

是的。 OpenSearch 与 Elasticsearch 的 API 兼容。 Logstash 输出插件、Filebeat 和 Kibana 替代品(OpenSearch 仪表板)的工作方式相同。

我应该记录验证码解决方案文本吗?

不会。解决方案是一次性令牌,没有诊断价值。记录它们会增加存储成本并可能产生安全问题。仅记录元数据(ID、类型、延迟、状态)。

下一步

搜索并分析您的验证码日志 -获取您的 CaptchaAI API 密钥并设置ELK。

相关指南:

该文章已禁用评论。