当多个工作人员或重试提交相同的验证码进行解决时,您将为每个重复项付费。重复数据删除层捕获相同的请求并返回相同的结果 - 节省 API 积分并减少延迟。
重复是如何发生的
| 设想 | 原因 | 浪费 |
|---|---|---|
| 在结果到达之前重试 | 积极的重试逻辑 | 每个验证码的成本为 2-5 倍 |
| 多名工人,同一目标 | 工人之间没有协调 | 并行浪费解决 |
| 页面刷新重新触发 | 前端超时重试 | 每次刷新额外解决 |
| 队列消息重播 | 至少一次交货保证 | 每次重播重复求解 |
重复数据删除关键设计
从请求参数生成唯一密钥:
import hashlib
def dedup_key(method, sitekey, pageurl):
"""Generate a deduplication key for a CAPTCHA solve request."""
raw = f"{method}:{sitekey}:{pageurl}"
return f"captcha:dedup:{hashlib.sha256(raw.encode()).hexdigest()[:16]}"
主要组成:
| 验证码类型 | 关键部件 |
|---|---|
| reCAPTCHA v2 | method + sitekey + pageurl |
| reCAPTCHA v3 | method + sitekey + pageurl + action |
| 验证码 | method + sitekey + pageurl |
| 旋转门 | method + sitekey + pageurl |
| 图片验证码 | method + body 的哈希值(图像内容) |
基于Redis的重复数据删除
Python实现
import os
import time
import json
import hashlib
import redis
import requests
r = redis.Redis(
host=os.environ.get("REDIS_HOST", "localhost"),
port=int(os.environ.get("REDIS_PORT", 6379)),
decode_responses=True
)
API_KEY = os.environ["CAPTCHAAI_API_KEY"]
# Dedup window: how long to consider a request "in progress"
DEDUP_TTL = 180 # seconds
def dedup_key(method, sitekey, pageurl, extra=""):
raw = f"{method}:{sitekey}:{pageurl}:{extra}"
return f"captcha:dedup:{hashlib.sha256(raw.encode()).hexdigest()[:16]}"
def solve_with_dedup(sitekey, pageurl, method="userrecaptcha"):
key = dedup_key(method, sitekey, pageurl)
# Check if this request is already being solved
existing = r.get(key)
if existing:
state = json.loads(existing)
if state["status"] == "solving":
# Wait for the result
return wait_for_result(key)
elif state["status"] == "solved":
return {"solution": state["solution"], "source": "dedup_cache"}
elif state["status"] == "error":
pass # Allow retry on error
# Mark as solving
r.set(key, json.dumps({"status": "solving", "started": time.time()}), ex=DEDUP_TTL)
# Submit to CaptchaAI
resp = requests.post("https://ocr.captchaai.com/in.php", data={
"key": API_KEY,
"method": method,
"googlekey": sitekey,
"pageurl": pageurl,
"json": 1
})
data = resp.json()
if data.get("status") != 1:
r.set(key, json.dumps({"status": "error", "error": data.get("request")}), ex=30)
return {"error": data.get("request")}
captcha_id = data["request"]
# Poll for result
for _ in range(60):
time.sleep(5)
result = requests.get("https://ocr.captchaai.com/res.php", params={
"key": API_KEY, "action": "get",
"id": captcha_id, "json": 1
}).json()
if result.get("status") == 1:
solution = result["request"]
# Cache the result for other workers (short TTL since tokens expire)
r.set(key, json.dumps({
"status": "solved",
"solution": solution,
"solved_at": time.time()
}), ex=60) # Cache result for 60 seconds
return {"solution": solution, "source": "api"}
if result.get("request") != "CAPCHA_NOT_READY":
r.set(key, json.dumps({
"status": "error", "error": result.get("request")
}), ex=30)
return {"error": result.get("request")}
r.set(key, json.dumps({"status": "error", "error": "TIMEOUT"}), ex=30)
return {"error": "TIMEOUT"}
def wait_for_result(key, timeout=120):
"""Wait for another worker to finish solving."""
start = time.time()
while time.time() - start < timeout:
data = r.get(key)
if data:
state = json.loads(data)
if state["status"] == "solved":
return {"solution": state["solution"], "source": "dedup_wait"}
if state["status"] == "error":
return {"error": state.get("error", "UNKNOWN")}
time.sleep(2)
return {"error": "DEDUP_WAIT_TIMEOUT"}
JavaScript 实现
const Redis = require("ioredis");
const axios = require("axios");
const crypto = require("crypto");
const redis = new Redis(process.env.REDIS_URL || "redis://localhost:6379");
const API_KEY = process.env.CAPTCHAAI_API_KEY;
const DEDUP_TTL = 180;
function dedupKey(method, sitekey, pageurl) {
const raw = `${method}:${sitekey}:${pageurl}`;
const hash = crypto.createHash("sha256").update(raw).digest("hex").slice(0, 16);
return `captcha:dedup:${hash}`;
}
async function solveWithDedup(sitekey, pageurl, method = "userrecaptcha") {
const key = dedupKey(method, sitekey, pageurl);
// Check existing
const existing = await redis.get(key);
if (existing) {
const state = JSON.parse(existing);
if (state.status === "solving") return await waitForResult(key);
if (state.status === "solved") return { solution: state.solution, source: "dedup_cache" };
}
// Mark as solving
await redis.set(key, JSON.stringify({ status: "solving", started: Date.now() }), "EX", DEDUP_TTL);
// Submit
const submit = await axios.post("https://ocr.captchaai.com/in.php", null, {
params: { key: API_KEY, method, googlekey: sitekey, pageurl, json: 1 },
});
if (submit.data.status !== 1) {
await redis.set(key, JSON.stringify({ status: "error", error: submit.data.request }), "EX", 30);
return { error: submit.data.request };
}
const captchaId = submit.data.request;
for (let i = 0; i < 60; i++) {
await new Promise((r) => setTimeout(r, 5000));
const poll = await axios.get("https://ocr.captchaai.com/res.php", {
params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
});
if (poll.data.status === 1) {
await redis.set(key, JSON.stringify({ status: "solved", solution: poll.data.request }), "EX", 60);
return { solution: poll.data.request, source: "api" };
}
if (poll.data.request !== "CAPCHA_NOT_READY") {
await redis.set(key, JSON.stringify({ status: "error", error: poll.data.request }), "EX", 30);
return { error: poll.data.request };
}
}
await redis.set(key, JSON.stringify({ status: "error", error: "TIMEOUT" }), "EX", 30);
return { error: "TIMEOUT" };
}
async function waitForResult(key, timeout = 120000) {
const start = Date.now();
while (Date.now() - start < timeout) {
const data = await redis.get(key);
if (data) {
const state = JSON.parse(data);
if (state.status === "solved") return { solution: state.solution, source: "dedup_wait" };
if (state.status === "error") return { error: state.error };
}
await new Promise((r) => setTimeout(r, 2000));
}
return { error: "DEDUP_WAIT_TIMEOUT" };
}
数据库锁定替代方案
对于不使用 Redis 的基于 PostgreSQL 的重复数据删除:
import psycopg2
def solve_with_pg_dedup(conn, sitekey, pageurl):
"""Use PostgreSQL advisory locks for deduplication."""
# Generate a numeric lock key from the dedup key
lock_id = hash(f"{sitekey}:{pageurl}") & 0x7FFFFFFF
cursor = conn.cursor()
# Try to acquire advisory lock (non-blocking)
cursor.execute("SELECT pg_try_advisory_lock(%s)", (lock_id,))
acquired = cursor.fetchone()[0]
if not acquired:
# Another worker is solving — wait for result
cursor.execute("SELECT pg_advisory_lock(%s)", (lock_id,))
# Lock acquired means other worker finished — check cache
cursor.execute(
"SELECT solution FROM captcha_cache "
"WHERE sitekey = %s AND pageurl = %s "
"AND created_at > NOW() - INTERVAL '60 seconds'",
(sitekey, pageurl)
)
row = cursor.fetchone()
cursor.execute("SELECT pg_advisory_unlock(%s)", (lock_id,))
if row:
return {"solution": row[0], "source": "pg_cache"}
return {"error": "NO_CACHED_RESULT"}
try:
# Solve the CAPTCHA
solution = solve_via_api(sitekey, pageurl)
if solution:
cursor.execute(
"INSERT INTO captcha_cache (sitekey, pageurl, solution) "
"VALUES (%s, %s, %s)",
(sitekey, pageurl, solution)
)
conn.commit()
return {"solution": solution} if solution else {"error": "SOLVE_FAILED"}
finally:
cursor.execute("SELECT pg_advisory_unlock(%s)", (lock_id,))
重复数据删除有效性指标
跟踪重复数据删除节省的费用:
def track_dedup_stats(source):
"""Increment counters for dedup tracking."""
today = time.strftime("%Y-%m-%d")
r.hincrby(f"dedup:stats:{today}", source, 1)
r.expire(f"dedup:stats:{today}", 7 * 86400)
def get_dedup_report():
today = time.strftime("%Y-%m-%d")
stats = r.hgetall(f"dedup:stats:{today}")
total = sum(int(v) for v in stats.values())
saved = int(stats.get("dedup_cache", 0)) + int(stats.get("dedup_wait", 0))
return {
"total_requests": total,
"deduplicated": saved,
"savings_pct": f"{saved / total * 100:.1f}%" if total else "0%",
"breakdown": stats
}
故障排除
| 问题 | 原因 | 处理方式 |
|---|---|---|
| 重复数据删除键冲突 | 哈希太短或缺少参数 | 在密钥中包含所有验证码特定的参数;增加哈希长度 |
| 等待工人超时 | 解决worker崩溃的问题 | solving 状态上的 TTL 自动过期(180 秒) |
| 过时的缓存结果 | 令牌已过期但缓存仍然有效 | 设置结果缓存 TTL 短于令牌生命周期(reCAPTCHA 为 60 秒) |
| 现场比赛条件 | 两名工人同时检查 | 使用SET NX(set-if-not-exists)进行原子锁获取 |
常问问题
重复数据删除何时值得如此复杂?
当您有多个工作人员针对同一 sitekey/pageurl 组合时。即使 10% 的重复数据删除率也能大规模节省大量 API 积分,并且消除了浪费的解决时间。
我应该删除图像验证码吗?
是的,但使用图像内容的哈希值作为重复数据删除密钥的一部分。相同的图像返回相同的文本,因此重复数据删除是有效的。
相同的验证码有不同的代理怎么办?
不要在重复数据删除密钥中包含代理。无论使用哪个代理来解决问题,解决方案令牌都有效。包含代理会破坏重复数据删除。
下一步
停止为重复的验证码解决付费 –”获取您的 CaptchaAI API 密钥并立即实施重复数据删除。
相关指南:
- Redis 令牌 TTL 管理
- 会话状态分布式工作人员
- 批量错误恢复