实战教程

通过数据库锁定消除重复验证码解决请求

当多个工作人员或重试提交相同的验证码进行解决时,您将为每个重复项付费。重复数据删除层捕获相同的请求并返回相同的结果 - 节省 API 积分并减少延迟。

重复是如何发生的

设想 原因 浪费
在结果到达之前重试 积极的重试逻辑 每个验证码的成本为 2-5 倍
多名工人,同一目标 工人之间没有协调 并行浪费解决
页面刷新重新触发 前端超时重试 每次刷新额外解决
队列消息重播 至少一次交货保证 每次重播重复求解

重复数据删除关键设计

从请求参数生成唯一密钥:

import hashlib


def dedup_key(method, sitekey, pageurl):
    """Generate a deduplication key for a CAPTCHA solve request."""
    raw = f"{method}:{sitekey}:{pageurl}"
    return f"captcha:dedup:{hashlib.sha256(raw.encode()).hexdigest()[:16]}"

主要组成:

验证码类型 关键部件
reCAPTCHA v2 method + sitekey + pageurl
reCAPTCHA v3 method + sitekey + pageurl + action
验证码 method + sitekey + pageurl
旋转门 method + sitekey + pageurl
图片验证码 method + body 的哈希值(图像内容)

基于Redis的重复数据删除

Python实现

import os
import time
import json
import hashlib
import redis
import requests

r = redis.Redis(
    host=os.environ.get("REDIS_HOST", "localhost"),
    port=int(os.environ.get("REDIS_PORT", 6379)),
    decode_responses=True
)

API_KEY = os.environ["CAPTCHAAI_API_KEY"]

# Dedup window: how long to consider a request "in progress"
DEDUP_TTL = 180  # seconds


def dedup_key(method, sitekey, pageurl, extra=""):
    raw = f"{method}:{sitekey}:{pageurl}:{extra}"
    return f"captcha:dedup:{hashlib.sha256(raw.encode()).hexdigest()[:16]}"


def solve_with_dedup(sitekey, pageurl, method="userrecaptcha"):
    key = dedup_key(method, sitekey, pageurl)

    # Check if this request is already being solved
    existing = r.get(key)
    if existing:
        state = json.loads(existing)
        if state["status"] == "solving":
            # Wait for the result
            return wait_for_result(key)
        elif state["status"] == "solved":
            return {"solution": state["solution"], "source": "dedup_cache"}
        elif state["status"] == "error":
            pass  # Allow retry on error

    # Mark as solving
    r.set(key, json.dumps({"status": "solving", "started": time.time()}), ex=DEDUP_TTL)

    # Submit to CaptchaAI
    resp = requests.post("https://ocr.captchaai.com/in.php", data={
        "key": API_KEY,
        "method": method,
        "googlekey": sitekey,
        "pageurl": pageurl,
        "json": 1
    })
    data = resp.json()

    if data.get("status") != 1:
        r.set(key, json.dumps({"status": "error", "error": data.get("request")}), ex=30)
        return {"error": data.get("request")}

    captcha_id = data["request"]

    # Poll for result
    for _ in range(60):
        time.sleep(5)
        result = requests.get("https://ocr.captchaai.com/res.php", params={
            "key": API_KEY, "action": "get",
            "id": captcha_id, "json": 1
        }).json()

        if result.get("status") == 1:
            solution = result["request"]
            # Cache the result for other workers (short TTL since tokens expire)
            r.set(key, json.dumps({
                "status": "solved",
                "solution": solution,
                "solved_at": time.time()
            }), ex=60)  # Cache result for 60 seconds
            return {"solution": solution, "source": "api"}

        if result.get("request") != "CAPCHA_NOT_READY":
            r.set(key, json.dumps({
                "status": "error", "error": result.get("request")
            }), ex=30)
            return {"error": result.get("request")}

    r.set(key, json.dumps({"status": "error", "error": "TIMEOUT"}), ex=30)
    return {"error": "TIMEOUT"}


def wait_for_result(key, timeout=120):
    """Wait for another worker to finish solving."""
    start = time.time()
    while time.time() - start < timeout:
        data = r.get(key)
        if data:
            state = json.loads(data)
            if state["status"] == "solved":
                return {"solution": state["solution"], "source": "dedup_wait"}
            if state["status"] == "error":
                return {"error": state.get("error", "UNKNOWN")}
        time.sleep(2)
    return {"error": "DEDUP_WAIT_TIMEOUT"}

JavaScript 实现

const Redis = require("ioredis");
const axios = require("axios");
const crypto = require("crypto");

const redis = new Redis(process.env.REDIS_URL || "redis://localhost:6379");
const API_KEY = process.env.CAPTCHAAI_API_KEY;
const DEDUP_TTL = 180;

function dedupKey(method, sitekey, pageurl) {
  const raw = `${method}:${sitekey}:${pageurl}`;
  const hash = crypto.createHash("sha256").update(raw).digest("hex").slice(0, 16);
  return `captcha:dedup:${hash}`;
}

async function solveWithDedup(sitekey, pageurl, method = "userrecaptcha") {
  const key = dedupKey(method, sitekey, pageurl);

  // Check existing
  const existing = await redis.get(key);
  if (existing) {
    const state = JSON.parse(existing);
    if (state.status === "solving") return await waitForResult(key);
    if (state.status === "solved") return { solution: state.solution, source: "dedup_cache" };
  }

  // Mark as solving
  await redis.set(key, JSON.stringify({ status: "solving", started: Date.now() }), "EX", DEDUP_TTL);

  // Submit
  const submit = await axios.post("https://ocr.captchaai.com/in.php", null, {
    params: { key: API_KEY, method, googlekey: sitekey, pageurl, json: 1 },
  });

  if (submit.data.status !== 1) {
    await redis.set(key, JSON.stringify({ status: "error", error: submit.data.request }), "EX", 30);
    return { error: submit.data.request };
  }

  const captchaId = submit.data.request;

  for (let i = 0; i < 60; i++) {
    await new Promise((r) => setTimeout(r, 5000));
    const poll = await axios.get("https://ocr.captchaai.com/res.php", {
      params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
    });

    if (poll.data.status === 1) {
      await redis.set(key, JSON.stringify({ status: "solved", solution: poll.data.request }), "EX", 60);
      return { solution: poll.data.request, source: "api" };
    }
    if (poll.data.request !== "CAPCHA_NOT_READY") {
      await redis.set(key, JSON.stringify({ status: "error", error: poll.data.request }), "EX", 30);
      return { error: poll.data.request };
    }
  }

  await redis.set(key, JSON.stringify({ status: "error", error: "TIMEOUT" }), "EX", 30);
  return { error: "TIMEOUT" };
}

async function waitForResult(key, timeout = 120000) {
  const start = Date.now();
  while (Date.now() - start < timeout) {
    const data = await redis.get(key);
    if (data) {
      const state = JSON.parse(data);
      if (state.status === "solved") return { solution: state.solution, source: "dedup_wait" };
      if (state.status === "error") return { error: state.error };
    }
    await new Promise((r) => setTimeout(r, 2000));
  }
  return { error: "DEDUP_WAIT_TIMEOUT" };
}

数据库锁定替代方案

对于不使用 Redis 的基于 PostgreSQL 的重复数据删除:

import psycopg2


def solve_with_pg_dedup(conn, sitekey, pageurl):
    """Use PostgreSQL advisory locks for deduplication."""
    # Generate a numeric lock key from the dedup key
    lock_id = hash(f"{sitekey}:{pageurl}") & 0x7FFFFFFF

    cursor = conn.cursor()

    # Try to acquire advisory lock (non-blocking)
    cursor.execute("SELECT pg_try_advisory_lock(%s)", (lock_id,))
    acquired = cursor.fetchone()[0]

    if not acquired:
        # Another worker is solving — wait for result
        cursor.execute("SELECT pg_advisory_lock(%s)", (lock_id,))
        # Lock acquired means other worker finished — check cache
        cursor.execute(
            "SELECT solution FROM captcha_cache "
            "WHERE sitekey = %s AND pageurl = %s "
            "AND created_at > NOW() - INTERVAL '60 seconds'",
            (sitekey, pageurl)
        )
        row = cursor.fetchone()
        cursor.execute("SELECT pg_advisory_unlock(%s)", (lock_id,))
        if row:
            return {"solution": row[0], "source": "pg_cache"}
        return {"error": "NO_CACHED_RESULT"}

    try:
        # Solve the CAPTCHA
        solution = solve_via_api(sitekey, pageurl)
        if solution:
            cursor.execute(
                "INSERT INTO captcha_cache (sitekey, pageurl, solution) "
                "VALUES (%s, %s, %s)",
                (sitekey, pageurl, solution)
            )
            conn.commit()
        return {"solution": solution} if solution else {"error": "SOLVE_FAILED"}
    finally:
        cursor.execute("SELECT pg_advisory_unlock(%s)", (lock_id,))

重复数据删除有效性指标

跟踪重复数据删除节省的费用:

def track_dedup_stats(source):
    """Increment counters for dedup tracking."""
    today = time.strftime("%Y-%m-%d")
    r.hincrby(f"dedup:stats:{today}", source, 1)
    r.expire(f"dedup:stats:{today}", 7 * 86400)


def get_dedup_report():
    today = time.strftime("%Y-%m-%d")
    stats = r.hgetall(f"dedup:stats:{today}")
    total = sum(int(v) for v in stats.values())
    saved = int(stats.get("dedup_cache", 0)) + int(stats.get("dedup_wait", 0))
    return {
        "total_requests": total,
        "deduplicated": saved,
        "savings_pct": f"{saved / total * 100:.1f}%" if total else "0%",
        "breakdown": stats
    }

故障排除

问题 原因 处理方式
重复数据删除键冲突 哈希太短或缺少参数 在密钥中包含所有验证码特定的参数;增加哈希长度
等待工人超时 解决worker崩溃的问题 solving 状态上的 TTL 自动过期(180 秒)
过时的缓存结果 令牌已过期但缓存仍然有效 设置结果缓存 TTL 短于令牌生命周期(reCAPTCHA 为 60 秒)
现场比赛条件 两名工人同时检查 使用SET NX(set-if-not-exists)进行原子锁获取

常问问题

重复数据删除何时值得如此复杂?

当您有多个工作人员针对同一 sitekey/pageurl 组合时。即使 10% 的重复数据删除率也能大规模节省大量 API 积分,并且消除了浪费的解决时间。

我应该删除图像验证码吗?

是的,但使用图像内容的哈希值作为重复数据删除密钥的一部分。相同的图像返回相同的文本,因此重复数据删除是有效的。

相同的验证码有不同的代理怎么办?

不要在重复数据删除密钥中包含代理。无论使用哪个代理来解决问题,解决方案令牌都有效。包含代理会破坏重复数据删除。

下一步

停止为重复的验证码解决付费 –”获取您的 CaptchaAI API 密钥并立即实施重复数据删除。

相关指南:

该文章已禁用评论。