当验证码解决在所有重试后失败时,任务数据将丢失,除非您捕获它。死信队列 (DLQ) 存储失败的任务以供以后重试、分析或发出警报,因此不会默默地丢弃任何工作。
当任务失败时
CAPTCHA 任务最终进入 DLQ 的常见原因:
ERROR_CAPTCHA_UNSOLVABLE– 求解器无法完成挑战ERROR_NO_SLOT_AVAILABLE– 所有工作人员都很忙,重试已耗尽- 超时 – 求解器未在截止日期内返回结果
- 网络错误 – 轮询期间连接断开
如果没有 DLQ,这些失败会生成日志行并被遗忘。
Python:带重试的内存中 DLQ
import time
import json
import requests
from collections import deque
from dataclasses import dataclass, asdict
from typing import Optional
API_KEY = "YOUR_API_KEY"
SUBMIT_URL = "https://ocr.captchaai.com/in.php"
RESULT_URL = "https://ocr.captchaai.com/res.php"
@dataclass
class FailedTask:
sitekey: str
page_url: str
error: str
attempts: int
timestamp: float
task_id: Optional[str] = None
class DeadLetterQueue:
def __init__(self, max_size=1000, max_retries=3):
self._queue = deque(maxlen=max_size)
self.max_retries = max_retries
def push(self, task: FailedTask):
self._queue.append(task)
print(f"[dlq] Added: {task.error} (attempts: {task.attempts})")
def pop(self) -> Optional[FailedTask]:
return self._queue.popleft() if self._queue else None
def size(self) -> int:
return len(self._queue)
def peek_all(self) -> list:
return [asdict(t) for t in self._queue]
def export_json(self, path: str):
with open(path, "w") as f:
json.dump(self.peek_all(), f, indent=2)
print(f"[dlq] Exported {self.size()} tasks to {path}")
dlq = DeadLetterQueue(max_retries=3)
def solve_captcha(sitekey, page_url, max_retries=3):
for attempt in range(max_retries + 1):
try:
resp = requests.post(SUBMIT_URL, data={
"key": API_KEY,
"method": "userrecaptcha",
"googlekey": sitekey,
"pageurl": page_url,
"json": "1",
}, timeout=15)
data = resp.json()
if data["status"] != 1:
raise Exception(data["request"])
task_id = data["request"]
for _ in range(24):
time.sleep(5)
poll = requests.get(RESULT_URL, params={
"key": API_KEY, "action": "get",
"id": task_id, "json": "1",
}, timeout=15).json()
if poll["status"] == 1:
return poll["request"]
if poll["request"] != "CAPCHA_NOT_READY":
raise Exception(poll["request"])
raise TimeoutError(f"Task {task_id} timed out")
except Exception as e:
if attempt == max_retries:
dlq.push(FailedTask(
sitekey=sitekey,
page_url=page_url,
error=str(e),
attempts=attempt + 1,
timestamp=time.time(),
))
return None
time.sleep(2 ** attempt)
return None
# Process a batch
urls = [f"https://example.com/page/{i}" for i in range(5)]
for url in urls:
token = solve_captcha("6Le-SITEKEY", url)
if token:
print(f"Solved: {token[:40]}...")
print(f"\nDLQ size: {dlq.size()}")
预期输出:
Solved: 03AGdBq26ZfPxL...
Solved: 03AGdBq27AbCdE...
[dlq] Added: ERROR_CAPTCHA_UNSOLVABLE (attempts: 4)
Solved: 03AGdBq28FgHiJ...
[dlq] Added: Task 71823460 timed out (attempts: 4)
DLQ size: 2
从 DLQ 重试
def retry_dlq(dlq: DeadLetterQueue, max_retries=2):
retried = 0
recovered = 0
while dlq.size() > 0:
task = dlq.pop()
if task.attempts >= dlq.max_retries + max_retries:
print(f"[dlq] Permanently failed: {task.sitekey} — {task.error}")
continue
retried += 1
token = solve_captcha(
task.sitekey, task.page_url, max_retries=max_retries
)
if token:
recovered += 1
print(f"[dlq-retry] Recovered: {token[:40]}...")
print(f"[dlq] Retried: {retried}, Recovered: {recovered}")
# Run DLQ retry after main batch
retry_dlq(dlq)
JavaScript:具有文件持久性的 DLQ
const fs = require('fs');
const axios = require('axios');
const API_KEY = 'YOUR_API_KEY';
const DLQ_FILE = './captcha-dlq.json';
class DeadLetterQueue {
constructor(maxRetries = 3) {
this.maxRetries = maxRetries;
this.queue = this._load();
}
push(task) {
this.queue.push({
...task,
timestamp: Date.now(),
});
this._save();
console.log(`[dlq] Added: ${task.error} (attempts: ${task.attempts})`);
}
pop() {
const task = this.queue.shift();
if (task) this._save();
return task || null;
}
size() {
return this.queue.length;
}
_load() {
try {
return JSON.parse(fs.readFileSync(DLQ_FILE, 'utf8'));
} catch {
return [];
}
}
_save() {
fs.writeFileSync(DLQ_FILE, JSON.stringify(this.queue, null, 2));
}
}
const dlq = new DeadLetterQueue(3);
async function solveCaptcha(sitekey, pageurl, maxRetries = 3) {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
const submit = await axios.post('https://ocr.captchaai.com/in.php', null, {
params: { key: API_KEY, method: 'userrecaptcha', googlekey: sitekey, pageurl, json: 1 }
});
if (submit.data.status !== 1) throw new Error(submit.data.request);
const taskId = submit.data.request;
for (let i = 0; i < 24; i++) {
await new Promise(r => setTimeout(r, 5000));
const poll = await axios.get('https://ocr.captchaai.com/res.php', {
params: { key: API_KEY, action: 'get', id: taskId, json: 1 }
});
if (poll.data.status === 1) return poll.data.request;
if (poll.data.request !== 'CAPCHA_NOT_READY') throw new Error(poll.data.request);
}
throw new Error(`Task ${taskId} timed out`);
} catch (err) {
if (attempt === maxRetries) {
dlq.push({ sitekey, pageurl, error: err.message, attempts: attempt + 1 });
return null;
}
await new Promise(r => setTimeout(r, 2 ** attempt * 1000));
}
}
}
// Process tasks
(async () => {
for (let i = 0; i < 5; i++) {
const token = await solveCaptcha('6Le-SITEKEY', `https://example.com/page/${i}`);
if (token) console.log(`Solved: ${token.substring(0, 40)}...`);
}
console.log(`DLQ size: ${dlq.size()}`);
})();
DLQ分析
导出并分析失败的任务以查找模式:
# Export DLQ for analysis
dlq.export_json("failed-tasks.json")
# Analyze error distribution
from collections import Counter
errors = Counter(t["error"] for t in dlq.peek_all())
for error, count in errors.most_common():
print(f" {error}: {count}")
使用此数据可以:
- 识别始终失败的站点密钥→检查参数是否正确
- 特定时间内的点超时 → 与 API 负载相关
- 查找网络错误→检查代理健康状况
故障排除
| 问题 | 原因 | 处理方式 |
|---|---|---|
| DLQ无限增长 | 不处理重试 | 使用 retry_dlq() 安排定期 DLQ 耗尽 |
| 同一任务永远重试 | 无最大尝试次数上限 | 重新排队前检查task.attempts |
| DLQ 文件损坏 | 并发写入 | 使用文件锁定或切换到Redis/database |
| 崩溃时丢失任务 | 仅内存中 DLQ | 使用基于文件或 Redis 支持的 DLQ |
常问问题
我应该使用内存中的 DLQ 还是持久的 DLQ?
对于短期运行的脚本使用内存中。对于长时间运行的服务,使用基于文件或 Redis 支持的服务,其中进程重新启动会丢失排队的任务。
我什么时候应该永久放弃任务?
2-3 次 DLQ 重试后(在原始重试之上)。如果一项任务总共失败 6 次以上,则参数可能是错误的 - 记录下来并继续。
我可以将其与断路器模式结合起来吗?
是的。断路器可防止在中断期间发送请求,并且 DLQ 会捕获在电路跳闸之前失败的任何任务。看断路器模式。
使用 CaptchaAI 再也不会丢失失败的验证码任务
获取您的 API 密钥:验证码网站。
相关指南
- CAPTCHA API 调用的断路器模式
- 实现 CaptchaAI API 的重试逻辑
- Redis队列+CaptchaAI:分布式处理