DevOps 与扩展

CAPTCHA 解决管道的灾难恢复规划

在中断期间丢失正在进行的任务的验证码解决管道会消耗您的数据、时间和金钱。灾难恢复 (DR) 规划可确保您能够从基础架构故障、API 中断或配置错误中恢复,同时将数据丢失降至最低。

灾难恢复目标

公制 定义 CAPTCHA 管道目标
RPO(恢复点目标) 最大可容忍数据丢失 < 5 分钟的排队任务
RTO(恢复时间目标) 恢复服务的最长时间 < 15 分钟
MTTR(平均恢复时间) 平均恢复时间 < 10 分钟

失败场景

Scenario 1: Worker crash         → Restart workers, replay queue
Scenario 2: Queue data loss      → Restore from persistent backup
Scenario 3: Network partition    → Failover to secondary region
Scenario 4: API key compromised  → Rotate key, update workers
Scenario 5: Config corruption    → Rollback to last known good

任务持久层

切勿从仅内存队列中解决验证码。坚持任务以在崩溃中幸存下来。

Python——持久任务队列

import os
import json
import time
import sqlite3
import threading
import requests
from datetime import datetime

API_KEY = os.environ["CAPTCHAAI_API_KEY"]


class PersistentTaskQueue:
    """SQLite-backed task queue that survives crashes."""

    def __init__(self, db_path="captcha_tasks.db"):
        self.db_path = db_path
        self.conn = sqlite3.connect(db_path, check_same_thread=False)
        self.lock = threading.Lock()
        self._init_db()

    def _init_db(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS tasks (
                id TEXT PRIMARY KEY,
                payload TEXT NOT NULL,
                status TEXT DEFAULT 'pending',
                created_at TEXT DEFAULT CURRENT_TIMESTAMP,
                started_at TEXT,
                completed_at TEXT,
                result TEXT,
                attempts INTEGER DEFAULT 0
            )
        """)
        self.conn.commit()

    def enqueue(self, task_id, payload):
        with self.lock:
            self.conn.execute(
                "INSERT INTO tasks (id, payload) VALUES (?, ?)",
                (task_id, json.dumps(payload))
            )
            self.conn.commit()

    def dequeue(self):
        with self.lock:
            cursor = self.conn.execute(
                "SELECT id, payload FROM tasks "
                "WHERE status = 'pending' ORDER BY created_at LIMIT 1"
            )
            row = cursor.fetchone()
            if not row:
                return None

            task_id, payload = row
            self.conn.execute(
                "UPDATE tasks SET status = 'processing', "
                "started_at = ?, attempts = attempts + 1 WHERE id = ?",
                (datetime.utcnow().isoformat(), task_id)
            )
            self.conn.commit()
            return {"id": task_id, "payload": json.loads(payload)}

    def complete(self, task_id, result):
        with self.lock:
            self.conn.execute(
                "UPDATE tasks SET status = 'completed', "
                "completed_at = ?, result = ? WHERE id = ?",
                (datetime.utcnow().isoformat(), json.dumps(result), task_id)
            )
            self.conn.commit()

    def fail(self, task_id, error):
        with self.lock:
            # Requeue if under retry limit
            cursor = self.conn.execute(
                "SELECT attempts FROM tasks WHERE id = ?", (task_id,)
            )
            row = cursor.fetchone()
            if row and row[0] < 3:
                self.conn.execute(
                    "UPDATE tasks SET status = 'pending' WHERE id = ?",
                    (task_id,)
                )
            else:
                self.conn.execute(
                    "UPDATE tasks SET status = 'failed', "
                    "result = ? WHERE id = ?",
                    (json.dumps({"error": error}), task_id)
                )
            self.conn.commit()

    def recover_stale(self, timeout_seconds=600):
        """Reset tasks stuck in 'processing' after a crash."""
        with self.lock:
            cutoff = datetime.utcnow().timestamp() - timeout_seconds
            self.conn.execute(
                "UPDATE tasks SET status = 'pending' "
                "WHERE status = 'processing' "
                "AND started_at < datetime(?, 'unixepoch')",
                (cutoff,)
            )
            count = self.conn.total_changes
            self.conn.commit()
            return count

    @property
    def stats(self):
        cursor = self.conn.execute(
            "SELECT status, COUNT(*) FROM tasks GROUP BY status"
        )
        return dict(cursor.fetchall())


# On startup: recover tasks that were processing during a crash
queue = PersistentTaskQueue()
recovered = queue.recover_stale(timeout_seconds=600)
print(f"Recovered {recovered} stale tasks after restart")

JavaScript——恢复管理器

const axios = require("axios");
const fs = require("fs");

const API_KEY = process.env.CAPTCHAAI_API_KEY;

class DisasterRecoveryManager {
  constructor(checkpointDir = "./dr-checkpoints") {
    this.checkpointDir = checkpointDir;
    if (!fs.existsSync(checkpointDir)) {
      fs.mkdirSync(checkpointDir, { recursive: true });
    }
  }

  checkpoint(label, data) {
    const filename = `${this.checkpointDir}/${label}-${Date.now()}.json`;
    fs.writeFileSync(filename, JSON.stringify(data, null, 2));
    this.pruneOldCheckpoints(label, 10); // Keep last 10
    return filename;
  }

  restore(label) {
    const files = fs.readdirSync(this.checkpointDir)
      .filter((f) => f.startsWith(label) && f.endsWith(".json"))
      .sort()
      .reverse();

    if (files.length === 0) return null;
    const latest = fs.readFileSync(
      `${this.checkpointDir}/${files[0]}`, "utf8"
    );
    return JSON.parse(latest);
  }

  pruneOldCheckpoints(label, keep) {
    const files = fs.readdirSync(this.checkpointDir)
      .filter((f) => f.startsWith(label) && f.endsWith(".json"))
      .sort();

    while (files.length > keep) {
      const old = files.shift();
      fs.unlinkSync(`${this.checkpointDir}/${old}`);
    }
  }

  async healthCheck() {
    try {
      const resp = await axios.get("https://ocr.captchaai.com/res.php", {
        params: { key: API_KEY, action: "getbalance", json: 1 },
        timeout: 10000,
      });
      return {
        healthy: resp.data.status === 1,
        balance: parseFloat(resp.data.request || 0),
      };
    } catch (err) {
      return { healthy: false, error: err.message };
    }
  }
}

class ResilientSolver {
  constructor() {
    this.dr = new DisasterRecoveryManager();
    this.pendingTasks = [];
  }

  async solveBatch(tasks) {
    // Checkpoint before starting
    this.dr.checkpoint("batch-pending", {
      tasks,
      startedAt: new Date().toISOString(),
    });

    const results = [];
    for (const task of tasks) {
      try {
        const result = await this.solveSingle(task);
        results.push({ taskId: task.id, ...result });
      } catch (err) {
        results.push({ taskId: task.id, error: err.message });
      }

      // Checkpoint progress periodically
      if (results.length % 10 === 0) {
        this.dr.checkpoint("batch-progress", { results, remaining: tasks.length - results.length });
      }
    }

    // Final checkpoint
    this.dr.checkpoint("batch-complete", { results });
    return results;
  }

  async recover() {
    // Check for incomplete batch
    const progress = this.dr.restore("batch-progress");
    const pending = this.dr.restore("batch-pending");

    if (progress) {
      const completedIds = new Set(progress.results.map((r) => r.taskId));
      const remaining = pending?.tasks.filter((t) => !completedIds.has(t.id));
      console.log(
        `Recovering: ${progress.results.length} done, ${remaining?.length || 0} remaining`
      );
      return remaining || [];
    }

    if (pending) {
      console.log(`Recovering full batch: ${pending.tasks.length} tasks`);
      return pending.tasks;
    }

    return [];
  }

  async solveSingle(task) {
    const resp = await axios.post("https://ocr.captchaai.com/in.php", null, {
      params: {
        key: API_KEY,
        method: "userrecaptcha",
        googlekey: task.sitekey,
        pageurl: task.pageurl,
        json: 1,
      },
    });

    if (resp.data.status !== 1) throw new Error(resp.data.request);

    const captchaId = resp.data.request;
    for (let i = 0; i < 60; i++) {
      await new Promise((r) => setTimeout(r, 5000));
      const poll = await axios.get("https://ocr.captchaai.com/res.php", {
        params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
      });
      if (poll.data.status === 1) return { solution: poll.data.request };
      if (poll.data.request !== "CAPCHA_NOT_READY")
        throw new Error(poll.data.request);
    }
    throw new Error("TIMEOUT");
  }
}

// Start with recovery check
const solver = new ResilientSolver();
solver.recover().then((remaining) => {
  if (remaining.length > 0) {
    console.log(`Resuming ${remaining.length} tasks from checkpoint`);
    solver.solveBatch(remaining);
  }
});

灾难恢复运行手册模板

RUNBOOK: CAPTCHA Pipeline Recovery
====================================

1. DETECT
   - Alert fires: [PagerDuty / Slack / Email]
   - Symptom: [Queue growing / Workers offline / Error spike]

2. ASSESS
   - Check worker health: curl http://workers/health
   - Check API status: GET /res.php?action=getbalance
   - Check queue depth: SELECT COUNT(*) FROM tasks WHERE status='pending'

3. RECOVER
   If: Workers crashed
     → Restart worker containers: docker-compose up -d workers
     → Run stale task recovery: recovery.py --recover-stale

   If: Network partition
     → Failover to secondary region
     → Update DNS or load balancer routing

   If: API key compromised
     → Generate new key at captchaai.com
     → Update secret store
     → Rolling restart workers

4. VERIFY
   - Confirm solve rate > 90%
   - Confirm queue draining
   - Confirm no duplicate solves

5. POST-MORTEM
   - Document root cause
   - Update runbook if needed

恢复目标映射

  • 定义哪些数据可以稍后重播,以及哪些面向用户的流需要在失败后立即恢复。
  • 将每个管道组件映射到 RPO 和 RTO 目标,而不是将整个系统视为一个块。
  • 记录谁可以触发故障转移、谁验证恢复以及何时恢复正常路由。

故障排除

问题 原因 处理方式
崩溃期间任务丢失 仅内存队列 使用持久队列(SQLite、带有 AOF 的 Redis)
恢复后重复求解 不进行重复数据删除的情况下重新处理过时的任务 添加幂等键;检查是否已经解决
恢复时间 > RTO 数据库备份太旧 增加检查点频率
故障转移到错误的区域 DNS TTL 太高 在计划的故障转移之前将 TTL 减少到 60 秒

常问问题

我应该多久检查一次?

每 5-10 个完成的任务,或每 30 秒——以先到者为准。更频繁的检查点会降低 RPO,但会增加 I/O 开销。

我应该使用 SQLite 还是 Redis 来实现任务持久化?

用于单节点设置的 SQLite(更简单,无需额外的基础设施)。用于分布式系统的具有 AOF 持久性的 Redis(更快,内置 pub/sub)。

如果CaptchaAI本身停电了怎么办?

在本地对任务进行排队,并在 API 恢复时重试。 CaptchaAI 具有较长的正常运行时间,但您的管道应使用断路器和重试逻辑来优雅地处理临时不可用性。

下一步

做最坏的打算——获取您的 CaptchaAI API 密钥从第一天起就将灾难恢复纳入您的管道中。

相关指南:

该文章已禁用评论。