当您管理多个客户的抓取或自动化时,每个项目最终都会遇到验证码。不要为每个项目编写一次性的解决代码,而是构建一个可重用的管道。本指南将介绍该架构。
管道架构
┌──────────────┐ ┌───────────────┐ ┌──────────────┐
│ Client A │──▶ │ │ │ │
│ Client B │──▶ │ Task Queue │──▶ │ CaptchaAI │
│ Client C │──▶ │ │ │ API │
└──────────────┘ └───────────────┘ └──────────────┘
│ │
▼ ▼
┌───────────────┐ ┌──────────────┐
│ Result Store │◀── │ Polling │
│ (Redis/DB) │ │ Workers │
└───────────────┘ └──────────────┘
成分:
- 任务接收 - 接收来自客户端抓取工具的解决请求
- 队列 - 缓冲任务,强制每个客户端的并发限制
- 求解器工作人员 – 提交至 CaptchaAI 并投票获取结果
- 结果存储 - 保存已解决的令牌以供消费者检索
Python管道
核心求解器类
import requests
import time
from dataclasses import dataclass
from typing import Optional
from collections import deque
from threading import Lock
SUBMIT_URL = "https://ocr.captchaai.com/in.php"
RESULT_URL = "https://ocr.captchaai.com/res.php"
@dataclass
class SolveRequest:
client_id: str
method: str
params: dict
callback: Optional[callable] = None
@dataclass
class SolveResult:
client_id: str
task_id: str
token: Optional[str] = None
error: Optional[str] = None
class CaptchaPipeline:
def __init__(self, api_key: str, max_concurrent: int = 10):
self.api_key = api_key
self.max_concurrent = max_concurrent
self.queue = deque()
self.active = {}
self.lock = Lock()
def enqueue(self, request: SolveRequest):
with self.lock:
self.queue.append(request)
def submit_task(self, request: SolveRequest) -> Optional[str]:
data = {
"key": self.api_key,
"method": request.method,
"json": 1,
**request.params
}
try:
resp = requests.post(SUBMIT_URL, data=data, timeout=15)
result = resp.json()
if result.get("status") == 1:
return result["request"]
else:
print(f"[{request.client_id}] Submit error: {result.get('error_text', result.get('request'))}")
return None
except requests.RequestException as e:
print(f"[{request.client_id}] Network error: {e}")
return None
def poll_result(self, task_id: str, max_wait: int = 120) -> Optional[str]:
elapsed = 0
interval = 5
while elapsed < max_wait:
time.sleep(interval)
elapsed += interval
try:
resp = requests.get(RESULT_URL, params={
"key": self.api_key,
"action": "get",
"id": task_id,
"json": 1
}, timeout=10)
result = resp.json()
if result.get("status") == 1:
return result["request"]
elif result.get("request") == "CAPCHA_NOT_READY":
continue
else:
print(f"Poll error for {task_id}: {result.get('error_text', result.get('request'))}")
return None
except requests.RequestException:
continue
return None
def process_queue(self):
while self.queue or self.active:
# Fill active slots
with self.lock:
while self.queue and len(self.active) < self.max_concurrent:
request = self.queue.popleft()
task_id = self.submit_task(request)
if task_id:
self.active[task_id] = request
# Poll active tasks
completed = []
for task_id, request in list(self.active.items()):
token = self.poll_result(task_id, max_wait=10)
if token:
result = SolveResult(
client_id=request.client_id,
task_id=task_id,
token=token
)
if request.callback:
request.callback(result)
completed.append(task_id)
with self.lock:
for task_id in completed:
del self.active[task_id]
多客户端使用
pipeline = CaptchaPipeline(api_key="YOUR_API_KEY", max_concurrent=15)
# Client A — reCAPTCHA v2
pipeline.enqueue(SolveRequest(
client_id="client_a",
method="userrecaptcha",
params={
"googlekey": "6Le-SITEKEY-A",
"pageurl": "https://client-a-staging.example.com/qa-form"
},
callback=lambda r: print(f"[{r.client_id}] Solved: {r.token[:40]}...")
))
# Client B — Turnstile
pipeline.enqueue(SolveRequest(
client_id="client_b",
method="turnstile",
params={
"sitekey": "0x4AAAA-SITEKEY-B",
"pageurl": "https://client-b-target.com/login"
},
callback=lambda r: print(f"[{r.client_id}] Solved: {r.token[:40]}...")
))
pipeline.process_queue()
Node.js 管道
const axios = require("axios");
const SUBMIT_URL = "https://ocr.captchaai.com/in.php";
const RESULT_URL = "https://ocr.captchaai.com/res.php";
class CaptchaPipeline {
constructor(apiKey, maxConcurrent = 10) {
this.apiKey = apiKey;
this.maxConcurrent = maxConcurrent;
this.queue = [];
this.activeCount = 0;
}
enqueue(clientId, method, params) {
return new Promise((resolve, reject) => {
this.queue.push({ clientId, method, params, resolve, reject });
this._processNext();
});
}
async _processNext() {
if (this.activeCount >= this.maxConcurrent || this.queue.length === 0) return;
this.activeCount++;
const task = this.queue.shift();
try {
const token = await this._solve(task);
task.resolve({ clientId: task.clientId, token });
} catch (err) {
task.reject(err);
} finally {
this.activeCount--;
this._processNext();
}
}
async _solve(task) {
const submitResp = await axios.post(SUBMIT_URL, null, {
params: {
key: this.apiKey,
method: task.method,
json: 1,
...task.params,
},
timeout: 15000,
});
if (submitResp.data.status !== 1) {
throw new Error(submitResp.data.error_text || submitResp.data.request);
}
const taskId = submitResp.data.request;
return this._poll(taskId);
}
async _poll(taskId, maxWait = 120000) {
const interval = 5000;
let elapsed = 0;
while (elapsed < maxWait) {
await new Promise((r) => setTimeout(r, interval));
elapsed += interval;
try {
const resp = await axios.get(RESULT_URL, {
params: {
key: this.apiKey,
action: "get",
id: taskId,
json: 1,
},
timeout: 10000,
});
if (resp.data.status === 1) return resp.data.request;
if (resp.data.request !== "CAPCHA_NOT_READY") {
throw new Error(resp.data.error_text || resp.data.request);
}
} catch (err) {
if (err.response) throw err;
}
}
throw new Error(`Timeout waiting for task ${taskId}`);
}
}
// Usage
(async () => {
const pipeline = new CaptchaPipeline("YOUR_API_KEY", 15);
const results = await Promise.allSettled([
pipeline.enqueue("client_a", "userrecaptcha", {
googlekey: "6Le-SITEKEY-A",
pageurl: "https://client-a-staging.example.com/qa-form",
}),
pipeline.enqueue("client_b", "turnstile", {
sitekey: "0x4AAAA-SITEKEY-B",
pageurl: "https://client-b-target.com/login",
}),
]);
results.forEach((r) => {
if (r.status === "fulfilled") {
console.log(`[${r.value.clientId}] Token: ${r.value.token.slice(0, 40)}...`);
} else {
console.error(`Failed: ${r.reason.message}`);
}
});
})();
每个客户端配置
跟踪每个客户端的设置,例如代理、求解器首选项和速率限制:
CLIENT_CONFIG = {
"client_a": {
"proxy": "host:port:user:pass",
"proxytype": "HTTP",
"max_concurrent": 5,
"default_method": "userrecaptcha"
},
"client_b": {
"proxy": None,
"proxytype": None,
"max_concurrent": 10,
"default_method": "turnstile"
}
}
def build_params(client_id, params):
config = CLIENT_CONFIG.get(client_id, {})
if config.get("proxy"):
params["proxy"] = config["proxy"]
params["proxytype"] = config["proxytype"]
return params
错误处理策略
| 错误 | 回复 |
|---|---|
ERROR_ZERO_BALANCE |
停止队列,提醒所有客户 |
ERROR_NO_SLOT_AVAILABLE |
延迟重新排队任务 |
ERROR_WRONG_CAPTCHA_ID |
丢弃,记录错误 |
ERROR_CAPTCHA_UNSOLVABLE |
重试一次,然后失败 |
| 网络超时 | 回退重试(最多重试 3 次) |
故障排除
| 问题 | 原因 | 处理方式 |
|---|---|---|
| 队列无限增长 | 活动插槽已满 | 增加max_concurrent或添加工人 |
| 回调未触发 | 任务默默失败 | 检查 poll 循环中的错误返回 |
| 客户端之间的混合令牌 | 共享结果存储 | client_id + task_id 的主要结果 |
| 速率限制错误 (429) | 并发提交过多 | 降低并发,增加提交延迟 |
常问问题
每个客户端应该运行多少个并发任务?
从 5-10 开始。监控解决时间和错误率,然后进行调整。CaptchaAI 支持高并发,但您的代理池可能是瓶颈。
我应该为每个客户端使用单独的 API 密钥吗?
它简化了计费。如果需要一键跟踪,请使用 CaptchaAI soft_id 参数。
如何处理通宵排队的情况?
持久化队列(Redis 或数据库)。重新启动时,重新加载挂起的任务并恢复处理。
使用 CaptchaAI 构建您的验证码管道
开始构建客户端管道验证码网站。