API Tutorials

构建 CaptchaAI 使用仪表板和监控

跟踪验证码解决指标有助于优化成本、及早发现问题并规划容量。本指南构建了一个监控系统,用于记录每个解决方案并生成使用情况报告。


监控什么

公制 为什么它很重要
解决计数 跟踪使用量
成功率 检测质量问题
响应时间 识别减速因素
消费率 预算管理
误差分布 调试失败模式
平衡 防止停电
方法分解 了解使用模式

指标收集器

import time
import csv
import datetime
import threading
from collections import defaultdict


class MetricsCollector:
    """Collect and store CaptchaAI solve metrics."""

    def __init__(self, log_file="captchaai_metrics.csv"):
        self.log_file = log_file
        self.lock = threading.Lock()
        self.session_stats = defaultdict(lambda: {
            "count": 0, "success": 0, "error": 0,
            "timeout": 0, "total_time": 0,
        })
        self._init_log()

    def _init_log(self):
        try:
            with open(self.log_file, "r"):
                pass
        except FileNotFoundError:
            with open(self.log_file, "w", newline="") as f:
                writer = csv.writer(f)
                writer.writerow([
                    "timestamp", "method", "duration_s",
                    "status", "error_code", "task_id",
                ])

    def record(self, method, duration, status, error_code="", task_id=""):
        """Record a solve attempt."""
        with self.lock:
            # Update in-memory stats
            stats = self.session_stats[method]
            stats["count"] += 1
            stats["total_time"] += duration
            if status == "success":
                stats["success"] += 1
            elif status == "timeout":
                stats["timeout"] += 1
            else:
                stats["error"] += 1

            # Write to CSV
            with open(self.log_file, "a", newline="") as f:
                writer = csv.writer(f)
                writer.writerow([
                    datetime.datetime.utcnow().isoformat(),
                    method, f"{duration:.2f}",
                    status, error_code, task_id,
                ])

    def get_session_summary(self):
        """Get current session statistics."""
        summary = {}
        for method, stats in self.session_stats.items():
            avg_time = (
                stats["total_time"] / stats["count"]
                if stats["count"] > 0 else 0
            )
            success_rate = (
                stats["success"] / stats["count"] * 100
                if stats["count"] > 0 else 0
            )
            summary[method] = {
                "total": stats["count"],
                "success": stats["success"],
                "errors": stats["error"],
                "timeouts": stats["timeout"],
                "success_rate": f"{success_rate:.1f}%",
                "avg_time": f"{avg_time:.1f}s",
            }
        return summary

仪器化求解器

包装您的求解器以自动收集指标:

import requests
import time


class MonitoredSolver:
    """Solver with automatic metric collection."""

    def __init__(self, api_key, metrics=None):
        self.api_key = api_key
        self.base = "https://ocr.captchaai.com"
        self.metrics = metrics or MetricsCollector()

    def solve(self, method, **params):
        start = time.time()
        task_id = ""
        status = "error"
        error_code = ""

        try:
            # Submit
            data = {"key": self.api_key, "method": method, "json": 1}
            data.update(params)
            resp = requests.post(
                f"{self.base}/in.php", data=data, timeout=30,
            )
            result = resp.json()

            if result.get("status") != 1:
                error_code = result.get("request", "UNKNOWN")
                raise RuntimeError(f"Submit error: {error_code}")

            task_id = result["request"]

            # Poll
            token = self._poll(task_id)
            status = "success"
            return token

        except TimeoutError:
            status = "timeout"
            raise
        except Exception as e:
            error_code = str(e)[:50]
            raise
        finally:
            duration = time.time() - start
            self.metrics.record(method, duration, status, error_code, task_id)

    def _poll(self, task_id, timeout=120):
        start = time.time()
        while time.time() - start < timeout:
            time.sleep(5)
            resp = requests.get(f"{self.base}/res.php", params={
                "key": self.api_key, "action": "get",
                "id": task_id, "json": 1,
            }, timeout=15)
            data = resp.json()
            if data["request"] != "CAPCHA_NOT_READY":
                if data.get("status") == 1:
                    return data["request"]
                raise RuntimeError(f"Solve error: {data['request']}")
        raise TimeoutError("Poll timeout")

    def print_summary(self):
        """Print current session metrics."""
        summary = self.metrics.get_session_summary()
        print("\n=== CaptchaAI Usage Summary ===")
        for method, stats in summary.items():
            print(f"\n{method}:")
            for key, value in stats.items():
                print(f"  {key}: {value}")


# Usage
metrics = MetricsCollector()
solver = MonitoredSolver("YOUR_API_KEY", metrics)

# Solve some CAPTCHAs
for i in range(10):
    try:
        token = solver.solve(
            "userrecaptcha",
            googlekey="SITE_KEY",
            pageurl="https://example.com",
        )
    except Exception as e:
        print(f"Failed: {e}")

# Print results
solver.print_summary()

使用报告生成器

根据收集的指标生成每日 /weekly 报告:

import csv
import datetime
from collections import defaultdict


class UsageReport:
    """Generate usage reports from metrics CSV."""

    def __init__(self, log_file="captchaai_metrics.csv"):
        self.log_file = log_file

    def _load_data(self, days=None):
        """Load metrics, optionally filtered by date range."""
        cutoff = None
        if days:
            cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=days)

        records = []
        with open(self.log_file, "r") as f:
            reader = csv.DictReader(f)
            for row in reader:
                ts = datetime.datetime.fromisoformat(row["timestamp"])
                if cutoff and ts < cutoff:
                    continue
                row["_ts"] = ts
                row["_duration"] = float(row["duration_s"])
                records.append(row)
        return records

    def daily_summary(self, days=7):
        """Summarize usage per day."""
        records = self._load_data(days=days)
        by_day = defaultdict(lambda: {"count": 0, "success": 0, "total_time": 0})

        for rec in records:
            day = rec["_ts"].date().isoformat()
            by_day[day]["count"] += 1
            if rec["status"] == "success":
                by_day[day]["success"] += 1
            by_day[day]["total_time"] += rec["_duration"]

        print(f"=== Daily Summary (last {days} days) ===")
        print(f"{'Date':<12} {'Total':>6} {'Success':>8} {'Rate':>7} {'Avg Time':>9}")
        for day in sorted(by_day.keys()):
            stats = by_day[day]
            rate = stats["success"] / stats["count"] * 100 if stats["count"] > 0 else 0
            avg = stats["total_time"] / stats["count"] if stats["count"] > 0 else 0
            print(f"{day:<12} {stats['count']:>6} {stats['success']:>8} {rate:>6.1f}% {avg:>8.1f}s")

    def method_breakdown(self, days=30):
        """Summarize usage by CAPTCHA type."""
        records = self._load_data(days=days)
        by_method = defaultdict(lambda: {"count": 0, "success": 0, "total_time": 0})

        for rec in records:
            method = rec["method"]
            by_method[method]["count"] += 1
            if rec["status"] == "success":
                by_method[method]["success"] += 1
            by_method[method]["total_time"] += rec["_duration"]

        print(f"\n=== Method Breakdown (last {days} days) ===")
        print(f"{'Method':<25} {'Total':>6} {'Success':>8} {'Rate':>7} {'Avg Time':>9}")
        for method in sorted(by_method.keys()):
            stats = by_method[method]
            rate = stats["success"] / stats["count"] * 100
            avg = stats["total_time"] / stats["count"]
            print(f"{method:<25} {stats['count']:>6} {stats['success']:>8} {rate:>6.1f}% {avg:>8.1f}s")

    def error_breakdown(self, days=7):
        """Show error distribution."""
        records = self._load_data(days=days)
        errors = defaultdict(int)

        for rec in records:
            if rec["status"] != "success" and rec["error_code"]:
                errors[rec["error_code"]] += 1

        if errors:
            print(f"\n=== Error Breakdown (last {days} days) ===")
            for error, count in sorted(errors.items(), key=lambda x: -x[1]):
                print(f"  {error}: {count}")


# Usage
report = UsageReport()
report.daily_summary(days=7)
report.method_breakdown(days=30)
report.error_breakdown(days=7)

余额历史追踪

import requests
import time
import csv
import datetime


class BalanceDashboard:
    """Track balance over time for spending analysis."""

    def __init__(self, api_key, log_file="balance_history.csv"):
        self.api_key = api_key
        self.log_file = log_file

    def record(self):
        resp = requests.get("https://ocr.captchaai.com/res.php", params={
            "key": self.api_key,
            "action": "getbalance",
            "json": 1,
        })
        balance = float(resp.json()["request"])

        with open(self.log_file, "a", newline="") as f:
            writer = csv.writer(f)
            writer.writerow([
                datetime.datetime.utcnow().isoformat(),
                f"{balance:.4f}",
            ])
        return balance

    def get_spending(self, hours=24):
        """Calculate spending over time period."""
        cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours=hours)
        balances = []

        try:
            with open(self.log_file, "r") as f:
                reader = csv.reader(f)
                for row in reader:
                    ts = datetime.datetime.fromisoformat(row[0])
                    if ts > cutoff:
                        balances.append(float(row[1]))
        except FileNotFoundError:
            return 0

        if len(balances) < 2:
            return 0
        return balances[0] - balances[-1]

故障排除

问题 原因 处理方式
CSV 文件变得太大 长时间运行监控 每日轮换文件/weekly
缺少解决记录 求解器未检测 使用 MonitoredSolver 包装所有求解器
统计数据与账单不符 缺少错误记录 确保 finally 块始终记录
仪表板错误率高 API参数错误 检查错误细分报告

常问问题

我应该保留多少数据?

详细指标保留 30 天,汇总数据保留 90 天。归档较旧的数据以减小文件大小。

我可以将指标导出到 Prometheus/Grafana 吗?

是的。可以扩展 MetricsCollector 以使用 prometheus_client 库将指标推送到 Prometheus。请参阅 Prometheus 监控指南。

我应该在生产中进行监控吗?

是的。将每个求解记录到 CSV 的开销可以忽略不计(每次写入 <1 毫秒)。它提供的可见性对于生产管道至关重要。


相关指南


了解你的数字——开始监控CaptchaAI今天。

该文章已禁用评论。

相关文章

DevOps & Scaling 使用 PagerDuty 构建自定义 CaptchaAI 警报
使用 Pager Duty 构建自定义 Captcha AI 警报的 Dev Ops 指南,包括生产中 Captcha AI 工作流程的架构决策、操作注意事项和自动化模式。

使用 Pager Duty 构建自定义 Captcha AI 警报的 Dev Ops 指南,包括生产中 Captcha AI 工作流程的架构决策、操作注意...

May 08, 2026
DevOps & Scaling 用于 CaptchaAI Worker 部署的 Ansible Playbook
使用 Captcha AI Worker 部署 Ansible Playbook 的 Dev Ops 指南,包括生产中 Captcha AI 工作流程的架构决策、操作注意事项和自动化模式。

使用 Captcha AI Worker 部署 Ansible Playbook 的 Dev Ops 指南,包括生产中 Captcha AI 工作流程的架构决策、操作注...

Apr 19, 2026
DevOps & Scaling AWS Lambda + CaptchaAI:无服务器验证码解决
AWS Lambda + Captcha AI 的开发运营指南:无服务器验证码解决方案,包含生产中 Captcha AI 工作流程的架构决策、操作注意事项和自动化模式。

AWS Lambda + Captcha AI 的开发运营指南:无服务器验证码解决方案,包含生产中 Captcha AI 工作流程的架构决策、操作...

Apr 21, 2026