Tutorials

使用 CaptchaAI 构建客户端验证码管道

当您管理多个客户的抓取或自动化时,每个项目最终都会遇到验证码。不要为每个项目编写一次性的解决代码,而是构建一个可重用的管道。本指南将介绍该架构。


管道架构

┌──────────────┐    ┌───────────────┐    ┌──────────────┐
│  Client A    │──▶ │               │    │              │
│  Client B    │──▶ │  Task Queue   │──▶ │  CaptchaAI   │
│  Client C    │──▶ │               │    │  API         │
└──────────────┘    └───────────────┘    └──────────────┘
                           │                    │
                           ▼                    ▼
                    ┌───────────────┐    ┌──────────────┐
                    │  Result Store │◀── │  Polling      │
                    │  (Redis/DB)   │    │  Workers      │
                    └───────────────┘    └──────────────┘

成分

  1. 任务接收 - 接收来自客户端抓取工具的解决请求
  2. 队列 - 缓冲任务,强制每个客户端的并发限制
  3. 求解器工作人员 – 提交至 CaptchaAI 并投票获取结果
  4. 结果存储 - 保存已解决的令牌以供消费者检索

Python管道

核心求解器类

import requests
import time
from dataclasses import dataclass
from typing import Optional
from collections import deque
from threading import Lock

SUBMIT_URL = "https://ocr.captchaai.com/in.php"
RESULT_URL = "https://ocr.captchaai.com/res.php"

@dataclass
class SolveRequest:
    client_id: str
    method: str
    params: dict
    callback: Optional[callable] = None

@dataclass
class SolveResult:
    client_id: str
    task_id: str
    token: Optional[str] = None
    error: Optional[str] = None


class CaptchaPipeline:
    def __init__(self, api_key: str, max_concurrent: int = 10):
        self.api_key = api_key
        self.max_concurrent = max_concurrent
        self.queue = deque()
        self.active = {}
        self.lock = Lock()

    def enqueue(self, request: SolveRequest):
        with self.lock:
            self.queue.append(request)

    def submit_task(self, request: SolveRequest) -> Optional[str]:
        data = {
            "key": self.api_key,
            "method": request.method,
            "json": 1,
            **request.params
        }

        try:
            resp = requests.post(SUBMIT_URL, data=data, timeout=15)
            result = resp.json()

            if result.get("status") == 1:
                return result["request"]
            else:
                print(f"[{request.client_id}] Submit error: {result.get('error_text', result.get('request'))}")
                return None
        except requests.RequestException as e:
            print(f"[{request.client_id}] Network error: {e}")
            return None

    def poll_result(self, task_id: str, max_wait: int = 120) -> Optional[str]:
        elapsed = 0
        interval = 5
        while elapsed < max_wait:
            time.sleep(interval)
            elapsed += interval

            try:
                resp = requests.get(RESULT_URL, params={
                    "key": self.api_key,
                    "action": "get",
                    "id": task_id,
                    "json": 1
                }, timeout=10)
                result = resp.json()

                if result.get("status") == 1:
                    return result["request"]
                elif result.get("request") == "CAPCHA_NOT_READY":
                    continue
                else:
                    print(f"Poll error for {task_id}: {result.get('error_text', result.get('request'))}")
                    return None
            except requests.RequestException:
                continue

        return None

    def process_queue(self):
        while self.queue or self.active:
            # Fill active slots
            with self.lock:
                while self.queue and len(self.active) < self.max_concurrent:
                    request = self.queue.popleft()
                    task_id = self.submit_task(request)
                    if task_id:
                        self.active[task_id] = request

            # Poll active tasks
            completed = []
            for task_id, request in list(self.active.items()):
                token = self.poll_result(task_id, max_wait=10)
                if token:
                    result = SolveResult(
                        client_id=request.client_id,
                        task_id=task_id,
                        token=token
                    )
                    if request.callback:
                        request.callback(result)
                    completed.append(task_id)

            with self.lock:
                for task_id in completed:
                    del self.active[task_id]

多客户端使用

pipeline = CaptchaPipeline(api_key="YOUR_API_KEY", max_concurrent=15)

# Client A — reCAPTCHA v2
pipeline.enqueue(SolveRequest(
    client_id="client_a",
    method="userrecaptcha",
    params={
        "googlekey": "6Le-SITEKEY-A",
        "pageurl": "https://client-a-staging.example.com/qa-form"
    },
    callback=lambda r: print(f"[{r.client_id}] Solved: {r.token[:40]}...")
))

# Client B — Turnstile
pipeline.enqueue(SolveRequest(
    client_id="client_b",
    method="turnstile",
    params={
        "sitekey": "0x4AAAA-SITEKEY-B",
        "pageurl": "https://client-b-target.com/login"
    },
    callback=lambda r: print(f"[{r.client_id}] Solved: {r.token[:40]}...")
))

pipeline.process_queue()

Node.js 管道

const axios = require("axios");

const SUBMIT_URL = "https://ocr.captchaai.com/in.php";
const RESULT_URL = "https://ocr.captchaai.com/res.php";

class CaptchaPipeline {
  constructor(apiKey, maxConcurrent = 10) {
    this.apiKey = apiKey;
    this.maxConcurrent = maxConcurrent;
    this.queue = [];
    this.activeCount = 0;
  }

  enqueue(clientId, method, params) {
    return new Promise((resolve, reject) => {
      this.queue.push({ clientId, method, params, resolve, reject });
      this._processNext();
    });
  }

  async _processNext() {
    if (this.activeCount >= this.maxConcurrent || this.queue.length === 0) return;

    this.activeCount++;
    const task = this.queue.shift();

    try {
      const token = await this._solve(task);
      task.resolve({ clientId: task.clientId, token });
    } catch (err) {
      task.reject(err);
    } finally {
      this.activeCount--;
      this._processNext();
    }
  }

  async _solve(task) {
    const submitResp = await axios.post(SUBMIT_URL, null, {
      params: {
        key: this.apiKey,
        method: task.method,
        json: 1,
        ...task.params,
      },
      timeout: 15000,
    });

    if (submitResp.data.status !== 1) {
      throw new Error(submitResp.data.error_text || submitResp.data.request);
    }

    const taskId = submitResp.data.request;
    return this._poll(taskId);
  }

  async _poll(taskId, maxWait = 120000) {
    const interval = 5000;
    let elapsed = 0;

    while (elapsed < maxWait) {
      await new Promise((r) => setTimeout(r, interval));
      elapsed += interval;

      try {
        const resp = await axios.get(RESULT_URL, {
          params: {
            key: this.apiKey,
            action: "get",
            id: taskId,
            json: 1,
          },
          timeout: 10000,
        });

        if (resp.data.status === 1) return resp.data.request;
        if (resp.data.request !== "CAPCHA_NOT_READY") {
          throw new Error(resp.data.error_text || resp.data.request);
        }
      } catch (err) {
        if (err.response) throw err;
      }
    }

    throw new Error(`Timeout waiting for task ${taskId}`);
  }
}

// Usage
(async () => {
  const pipeline = new CaptchaPipeline("YOUR_API_KEY", 15);

  const results = await Promise.allSettled([
    pipeline.enqueue("client_a", "userrecaptcha", {
      googlekey: "6Le-SITEKEY-A",
      pageurl: "https://client-a-staging.example.com/qa-form",
    }),
    pipeline.enqueue("client_b", "turnstile", {
      sitekey: "0x4AAAA-SITEKEY-B",
      pageurl: "https://client-b-target.com/login",
    }),
  ]);

  results.forEach((r) => {
    if (r.status === "fulfilled") {
      console.log(`[${r.value.clientId}] Token: ${r.value.token.slice(0, 40)}...`);
    } else {
      console.error(`Failed: ${r.reason.message}`);
    }
  });
})();

每个客户端配置

跟踪每个客户端的设置,例如代理、求解器首选项和速率限制:

CLIENT_CONFIG = {
    "client_a": {
        "proxy": "host:port:user:pass",
        "proxytype": "HTTP",
        "max_concurrent": 5,
        "default_method": "userrecaptcha"
    },
    "client_b": {
        "proxy": None,
        "proxytype": None,
        "max_concurrent": 10,
        "default_method": "turnstile"
    }
}

def build_params(client_id, params):
    config = CLIENT_CONFIG.get(client_id, {})
    if config.get("proxy"):
        params["proxy"] = config["proxy"]
        params["proxytype"] = config["proxytype"]
    return params

错误处理策略

错误 回复
ERROR_ZERO_BALANCE 停止队列,提醒所有客户
ERROR_NO_SLOT_AVAILABLE 延迟重新排队任务
ERROR_WRONG_CAPTCHA_ID 丢弃,记录错误
ERROR_CAPTCHA_UNSOLVABLE 重试一次,然后失败
网络超时 回退重试(最多重试 3 次)

故障排除

问题 原因 处理方式
队列无限增长 活动插槽已满 增加max_concurrent或添加工人
回调未触发 任务默默失败 检查 poll 循环中的错误返回
客户端之间的混合令牌 共享结果存储 client_id + task_id 的主要结果
速率限制错误 (429) 并发提交过多 降低并发,增加提交延迟

常问问题

每个客户端应该运行多少个并发任务?

从 5-10 开始。监控解决时间和错误率,然后进行调整。CaptchaAI 支持高并发,但您的代理池可能是瓶颈。

我应该为每个客户端使用单独的 API 密钥吗?

它简化了计费。如果需要一键跟踪,请使用 CaptchaAI soft_id 参数。

如何处理通宵排队的情况?

持久化队列(Redis 或数据库)。重新启动时,重新加载挂起的任务并恢复处理。


使用 CaptchaAI 构建您的验证码管道

开始构建客户端管道验证码网站


相关指南

该文章已禁用评论。

相关文章

Tutorials 弹出模式中的验证码:检测和token 提交
弹出模式中的验证码分步教程:检测和token 提交,具有可直接重用的示例和清晰的 Captcha AI 工作流程。

弹出模式中的验证码分步教程:检测和token 提交,具有可直接重用的示例和清晰的 Captcha AI 工作流程。

May 05, 2026
Use Cases 高需求 e-commerce checkout 的 CAPTCHA 测试
面向 QA 的指南:在自有 staging 环境中使用虚拟库存、测试支付 token 和 Captcha AI 验证 e-commerce checkout 的 CAPTCHA 集成。

面向 QA 的指南:在自有 staging 环境中使用虚拟库存、测试支付 token 和 Captcha AI 验证 e-commerce checkout 的 CA...

May 04, 2026
Tutorials 在自有 QA 中安全地缓存与复用 CAPTCHA token
在自有 QA 环境中,按 token 类型与有效期合理地缓存与复用 CAPTCHA token 的模式 — 不涉及生产令牌复用。

在自有 QA 环境中,按 token 类型与有效期合理地缓存与复用 CAPTCHA token 的模式 — 不涉及生产令牌复用。

May 01, 2026