在中断期间丢失正在进行的任务的验证码解决管道会消耗您的数据、时间和金钱。灾难恢复 (DR) 规划可确保您能够从基础架构故障、API 中断或配置错误中恢复,同时将数据丢失降至最低。
灾难恢复目标
| 公制 | 定义 | CAPTCHA 管道目标 |
|---|---|---|
| RPO(恢复点目标) | 最大可容忍数据丢失 | < 5 分钟的排队任务 |
| RTO(恢复时间目标) | 恢复服务的最长时间 | < 15 分钟 |
| MTTR(平均恢复时间) | 平均恢复时间 | < 10 分钟 |
失败场景
Scenario 1: Worker crash → Restart workers, replay queue
Scenario 2: Queue data loss → Restore from persistent backup
Scenario 3: Network partition → Failover to secondary region
Scenario 4: API key compromised → Rotate key, update workers
Scenario 5: Config corruption → Rollback to last known good
任务持久层
切勿从仅内存队列中解决验证码。坚持任务以在崩溃中幸存下来。
Python——持久任务队列
import os
import json
import time
import sqlite3
import threading
import requests
from datetime import datetime
API_KEY = os.environ["CAPTCHAAI_API_KEY"]
class PersistentTaskQueue:
"""SQLite-backed task queue that survives crashes."""
def __init__(self, db_path="captcha_tasks.db"):
self.db_path = db_path
self.conn = sqlite3.connect(db_path, check_same_thread=False)
self.lock = threading.Lock()
self._init_db()
def _init_db(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
payload TEXT NOT NULL,
status TEXT DEFAULT 'pending',
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
started_at TEXT,
completed_at TEXT,
result TEXT,
attempts INTEGER DEFAULT 0
)
""")
self.conn.commit()
def enqueue(self, task_id, payload):
with self.lock:
self.conn.execute(
"INSERT INTO tasks (id, payload) VALUES (?, ?)",
(task_id, json.dumps(payload))
)
self.conn.commit()
def dequeue(self):
with self.lock:
cursor = self.conn.execute(
"SELECT id, payload FROM tasks "
"WHERE status = 'pending' ORDER BY created_at LIMIT 1"
)
row = cursor.fetchone()
if not row:
return None
task_id, payload = row
self.conn.execute(
"UPDATE tasks SET status = 'processing', "
"started_at = ?, attempts = attempts + 1 WHERE id = ?",
(datetime.utcnow().isoformat(), task_id)
)
self.conn.commit()
return {"id": task_id, "payload": json.loads(payload)}
def complete(self, task_id, result):
with self.lock:
self.conn.execute(
"UPDATE tasks SET status = 'completed', "
"completed_at = ?, result = ? WHERE id = ?",
(datetime.utcnow().isoformat(), json.dumps(result), task_id)
)
self.conn.commit()
def fail(self, task_id, error):
with self.lock:
# Requeue if under retry limit
cursor = self.conn.execute(
"SELECT attempts FROM tasks WHERE id = ?", (task_id,)
)
row = cursor.fetchone()
if row and row[0] < 3:
self.conn.execute(
"UPDATE tasks SET status = 'pending' WHERE id = ?",
(task_id,)
)
else:
self.conn.execute(
"UPDATE tasks SET status = 'failed', "
"result = ? WHERE id = ?",
(json.dumps({"error": error}), task_id)
)
self.conn.commit()
def recover_stale(self, timeout_seconds=600):
"""Reset tasks stuck in 'processing' after a crash."""
with self.lock:
cutoff = datetime.utcnow().timestamp() - timeout_seconds
self.conn.execute(
"UPDATE tasks SET status = 'pending' "
"WHERE status = 'processing' "
"AND started_at < datetime(?, 'unixepoch')",
(cutoff,)
)
count = self.conn.total_changes
self.conn.commit()
return count
@property
def stats(self):
cursor = self.conn.execute(
"SELECT status, COUNT(*) FROM tasks GROUP BY status"
)
return dict(cursor.fetchall())
# On startup: recover tasks that were processing during a crash
queue = PersistentTaskQueue()
recovered = queue.recover_stale(timeout_seconds=600)
print(f"Recovered {recovered} stale tasks after restart")
JavaScript——恢复管理器
const axios = require("axios");
const fs = require("fs");
const API_KEY = process.env.CAPTCHAAI_API_KEY;
class DisasterRecoveryManager {
constructor(checkpointDir = "./dr-checkpoints") {
this.checkpointDir = checkpointDir;
if (!fs.existsSync(checkpointDir)) {
fs.mkdirSync(checkpointDir, { recursive: true });
}
}
checkpoint(label, data) {
const filename = `${this.checkpointDir}/${label}-${Date.now()}.json`;
fs.writeFileSync(filename, JSON.stringify(data, null, 2));
this.pruneOldCheckpoints(label, 10); // Keep last 10
return filename;
}
restore(label) {
const files = fs.readdirSync(this.checkpointDir)
.filter((f) => f.startsWith(label) && f.endsWith(".json"))
.sort()
.reverse();
if (files.length === 0) return null;
const latest = fs.readFileSync(
`${this.checkpointDir}/${files[0]}`, "utf8"
);
return JSON.parse(latest);
}
pruneOldCheckpoints(label, keep) {
const files = fs.readdirSync(this.checkpointDir)
.filter((f) => f.startsWith(label) && f.endsWith(".json"))
.sort();
while (files.length > keep) {
const old = files.shift();
fs.unlinkSync(`${this.checkpointDir}/${old}`);
}
}
async healthCheck() {
try {
const resp = await axios.get("https://ocr.captchaai.com/res.php", {
params: { key: API_KEY, action: "getbalance", json: 1 },
timeout: 10000,
});
return {
healthy: resp.data.status === 1,
balance: parseFloat(resp.data.request || 0),
};
} catch (err) {
return { healthy: false, error: err.message };
}
}
}
class ResilientSolver {
constructor() {
this.dr = new DisasterRecoveryManager();
this.pendingTasks = [];
}
async solveBatch(tasks) {
// Checkpoint before starting
this.dr.checkpoint("batch-pending", {
tasks,
startedAt: new Date().toISOString(),
});
const results = [];
for (const task of tasks) {
try {
const result = await this.solveSingle(task);
results.push({ taskId: task.id, ...result });
} catch (err) {
results.push({ taskId: task.id, error: err.message });
}
// Checkpoint progress periodically
if (results.length % 10 === 0) {
this.dr.checkpoint("batch-progress", { results, remaining: tasks.length - results.length });
}
}
// Final checkpoint
this.dr.checkpoint("batch-complete", { results });
return results;
}
async recover() {
// Check for incomplete batch
const progress = this.dr.restore("batch-progress");
const pending = this.dr.restore("batch-pending");
if (progress) {
const completedIds = new Set(progress.results.map((r) => r.taskId));
const remaining = pending?.tasks.filter((t) => !completedIds.has(t.id));
console.log(
`Recovering: ${progress.results.length} done, ${remaining?.length || 0} remaining`
);
return remaining || [];
}
if (pending) {
console.log(`Recovering full batch: ${pending.tasks.length} tasks`);
return pending.tasks;
}
return [];
}
async solveSingle(task) {
const resp = await axios.post("https://ocr.captchaai.com/in.php", null, {
params: {
key: API_KEY,
method: "userrecaptcha",
googlekey: task.sitekey,
pageurl: task.pageurl,
json: 1,
},
});
if (resp.data.status !== 1) throw new Error(resp.data.request);
const captchaId = resp.data.request;
for (let i = 0; i < 60; i++) {
await new Promise((r) => setTimeout(r, 5000));
const poll = await axios.get("https://ocr.captchaai.com/res.php", {
params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
});
if (poll.data.status === 1) return { solution: poll.data.request };
if (poll.data.request !== "CAPCHA_NOT_READY")
throw new Error(poll.data.request);
}
throw new Error("TIMEOUT");
}
}
// Start with recovery check
const solver = new ResilientSolver();
solver.recover().then((remaining) => {
if (remaining.length > 0) {
console.log(`Resuming ${remaining.length} tasks from checkpoint`);
solver.solveBatch(remaining);
}
});
灾难恢复运行手册模板
RUNBOOK: CAPTCHA Pipeline Recovery
====================================
1. DETECT
- Alert fires: [PagerDuty / Slack / Email]
- Symptom: [Queue growing / Workers offline / Error spike]
2. ASSESS
- Check worker health: curl http://workers/health
- Check API status: GET /res.php?action=getbalance
- Check queue depth: SELECT COUNT(*) FROM tasks WHERE status='pending'
3. RECOVER
If: Workers crashed
→ Restart worker containers: docker-compose up -d workers
→ Run stale task recovery: recovery.py --recover-stale
If: Network partition
→ Failover to secondary region
→ Update DNS or load balancer routing
If: API key compromised
→ Generate new key at captchaai.com
→ Update secret store
→ Rolling restart workers
4. VERIFY
- Confirm solve rate > 90%
- Confirm queue draining
- Confirm no duplicate solves
5. POST-MORTEM
- Document root cause
- Update runbook if needed
恢复目标映射
- 定义哪些数据可以稍后重播,以及哪些面向用户的流需要在失败后立即恢复。
- 将每个管道组件映射到 RPO 和 RTO 目标,而不是将整个系统视为一个块。
- 记录谁可以触发故障转移、谁验证恢复以及何时恢复正常路由。
故障排除
| 问题 | 原因 | 处理方式 |
|---|---|---|
| 崩溃期间任务丢失 | 仅内存队列 | 使用持久队列(SQLite、带有 AOF 的 Redis) |
| 恢复后重复求解 | 不进行重复数据删除的情况下重新处理过时的任务 | 添加幂等键;检查是否已经解决 |
| 恢复时间 > RTO | 数据库备份太旧 | 增加检查点频率 |
| 故障转移到错误的区域 | DNS TTL 太高 | 在计划的故障转移之前将 TTL 减少到 60 秒 |
常问问题
我应该多久检查一次?
每 5-10 个完成的任务,或每 30 秒——以先到者为准。更频繁的检查点会降低 RPO,但会增加 I/O 开销。
我应该使用 SQLite 还是 Redis 来实现任务持久化?
用于单节点设置的 SQLite(更简单,无需额外的基础设施)。用于分布式系统的具有 AOF 持久性的 Redis(更快,内置 pub/sub)。
如果CaptchaAI本身停电了怎么办?
在本地对任务进行排队,并在 API 恢复时重试。 CaptchaAI 具有较长的正常运行时间,但您的管道应使用断路器和重试逻辑来优雅地处理临时不可用性。
下一步
做最坏的打算——获取您的 CaptchaAI API 密钥从第一天起就将灾难恢复纳入您的管道中。
相关指南:
- 高可用性故障转移
- 多区域架构
- 实现重试逻辑