当多个工作人员解决同一站点的验证码时,他们会遇到一个问题:每个工作人员都有自己的会话。目标站点会看到不同的 cookie、不同的 IP 和不同的浏览器特征。会话状态管理可同步工作人员之间的上下文,因此解决方案是一致的,并且目标站点可以看到一致的会话。
会话状态问题
Worker 1 → Login → Solve CAPTCHA → Get cookie A → Submit form ✅
Worker 2 → New session → Solve CAPTCHA → Get cookie B → Submit form ✅
Worker 3 → Reuse cookie A? → Cookie expired → Solve CAPTCHA → Fail ❌
如果没有共享状态,工作人员会浪费过期会话的解决方案,并产生目标站点可以检测到的不一致行为。
会话状态包括什么
| 状态组件 | 寿命 | 分享策略 |
|---|---|---|
| 身份验证cookie | 分钟到小时 | 带 TTL 的 Redis |
| 验证码令牌 | 90-300秒 | Redis 列表(短 TTL) |
qa_session_cookie cookie |
〜30分钟 | Redis 哈希 |
| CSRF 代币 | 每页加载 | 不要分享——每个工人都有自己的 |
| 浏览器特征 | 永恒的 | 配置,而不是运行时状态 |
| 代理分配 | 每节课 | Redis 支持的代理池 |
建筑学
┌──────────────────────────────────────┐
│ Session State Store │
│ (Redis) │
│ │
│ cookies:{domain} → Hash │
│ tokens:{sitekey} → List │
│ proxies:pool → Set │
│ locks:{domain}:{worker} → String │
└─────┬──────────┬──────────┬──────────┘
│ │ │
┌───▼───┐ ┌──▼────┐ ┌──▼────┐
│Worker1│ │Worker2│ │Worker3│
└───────┘ └───────┘ └───────┘
Python实现
会话存储
import os
import json
import time
import redis
import requests
from datetime import datetime, timezone
r = redis.Redis(
host=os.environ.get("REDIS_HOST", "localhost"),
port=int(os.environ.get("REDIS_PORT", 6379)),
decode_responses=True
)
API_KEY = os.environ["CAPTCHAAI_API_KEY"]
class SessionStore:
"""Shared session state across distributed workers."""
def __init__(self, domain):
self.domain = domain
self.cookie_key = f"session:cookies:{domain}"
self.token_key = f"session:tokens:{domain}"
def save_cookies(self, cookies, ttl=1800):
"""Store cookies from a successful session."""
cookie_data = {name: value for name, value in cookies.items()}
r.hset(self.cookie_key, mapping=cookie_data)
r.expire(self.cookie_key, ttl)
def get_cookies(self):
"""Retrieve shared cookies."""
cookies = r.hgetall(self.cookie_key)
return cookies if cookies else None
def save_token(self, sitekey, token, ttl=80):
"""Store a solved CAPTCHA token."""
key = f"{self.token_key}:{sitekey}"
r.rpush(key, token)
r.expire(key, ttl)
def get_token(self, sitekey):
"""Pop a cached CAPTCHA token."""
key = f"{self.token_key}:{sitekey}"
return r.lpop(key)
def acquire_session_lock(self, worker_id, ttl=300):
"""Ensure only one worker manages the session at a time."""
lock_key = f"session:lock:{self.domain}"
return r.set(lock_key, worker_id, nx=True, ex=ttl)
def release_session_lock(self, worker_id):
"""Release session lock if this worker holds it."""
lock_key = f"session:lock:{self.domain}"
current = r.get(lock_key)
if current == worker_id:
r.delete(lock_key)
共享状态的工作者
class CaptchaWorker:
def __init__(self, worker_id, domain):
self.worker_id = worker_id
self.store = SessionStore(domain)
self.session = requests.Session()
def setup_session(self):
"""Load shared cookies into this worker's session."""
cookies = self.store.get_cookies()
if cookies:
for name, value in cookies.items():
self.session.cookies.set(name, value)
return True
return False
def solve_captcha(self, sitekey, pageurl):
"""Solve with token cache and session sharing."""
# Check for cached token
cached = self.store.get_token(sitekey)
if cached:
return {"solution": cached, "source": "cache"}
# Solve via CaptchaAI
resp = requests.post("https://ocr.captchaai.com/in.php", data={
"key": API_KEY,
"method": "userrecaptcha",
"googlekey": sitekey,
"pageurl": pageurl,
"json": 1
})
data = resp.json()
if data.get("status") != 1:
return {"error": data.get("request")}
captcha_id = data["request"]
for _ in range(60):
time.sleep(5)
result = requests.get("https://ocr.captchaai.com/res.php", params={
"key": API_KEY, "action": "get",
"id": captcha_id, "json": 1
}).json()
if result.get("status") == 1:
token = result["request"]
self.store.save_token(sitekey, token)
return {"solution": token, "source": "api"}
if result.get("request") != "CAPCHA_NOT_READY":
return {"error": result.get("request")}
return {"error": "TIMEOUT"}
def process_page(self, url, sitekey):
"""Full workflow: setup session → solve CAPTCHA → submit."""
# Load shared session
self.setup_session()
# Solve CAPTCHA
result = self.solve_captcha(sitekey, url)
if "error" in result:
return result
# Submit form with token
response = self.session.post(url, data={
"g-recaptcha-response": result["solution"]
})
# Share resulting cookies
self.store.save_cookies(dict(self.session.cookies))
return {"status": response.status_code, "source": result["source"]}
代理池管理
class ProxyPool:
"""Distribute proxies across workers to avoid IP conflicts."""
def __init__(self, proxies):
self.pool_key = "session:proxy_pool"
self.assigned_key = "session:proxy_assigned"
# Initialize pool
for proxy in proxies:
r.sadd(self.pool_key, proxy)
def acquire_proxy(self, worker_id, ttl=600):
"""Assign an unused proxy to a worker."""
# Check if worker already has one
existing = r.hget(self.assigned_key, worker_id)
if existing:
return existing
# Pop from available pool
proxy = r.spop(self.pool_key)
if proxy:
r.hset(self.assigned_key, worker_id, proxy)
r.expire(self.assigned_key, ttl)
return proxy
return None
def release_proxy(self, worker_id):
"""Return proxy to the pool."""
proxy = r.hget(self.assigned_key, worker_id)
if proxy:
r.sadd(self.pool_key, proxy)
r.hdel(self.assigned_key, worker_id)
JavaScript 实现
const Redis = require("ioredis");
const axios = require("axios");
const redis = new Redis(process.env.REDIS_URL || "redis://localhost:6379");
const API_KEY = process.env.CAPTCHAAI_API_KEY;
class SessionStore {
constructor(domain) {
this.domain = domain;
this.cookieKey = `session:cookies:${domain}`;
this.tokenKey = `session:tokens:${domain}`;
}
async saveCookies(cookies, ttl = 1800) {
const entries = Object.entries(cookies).flat();
if (entries.length > 0) {
await redis.hset(this.cookieKey, ...entries);
await redis.expire(this.cookieKey, ttl);
}
}
async getCookies() {
return await redis.hgetall(this.cookieKey);
}
async saveToken(sitekey, token, ttl = 80) {
const key = `${this.tokenKey}:${sitekey}`;
await redis.rpush(key, token);
await redis.expire(key, ttl);
}
async getToken(sitekey) {
return await redis.lpop(`${this.tokenKey}:${sitekey}`);
}
async acquireLock(workerId, ttl = 300) {
const result = await redis.set(`session:lock:${this.domain}`, workerId, "NX", "EX", ttl);
return result === "OK";
}
async releaseLock(workerId) {
const current = await redis.get(`session:lock:${this.domain}`);
if (current === workerId) await redis.del(`session:lock:${this.domain}`);
}
}
async function workerSolve(store, sitekey, pageurl) {
const cached = await store.getToken(sitekey);
if (cached) return { solution: cached, source: "cache" };
const submit = await axios.post("https://ocr.captchaai.com/in.php", null, {
params: { key: API_KEY, method: "userrecaptcha", googlekey: sitekey, pageurl, json: 1 },
});
if (submit.data.status !== 1) return { error: submit.data.request };
const captchaId = submit.data.request;
for (let i = 0; i < 60; i++) {
await new Promise((r) => setTimeout(r, 5000));
const poll = await axios.get("https://ocr.captchaai.com/res.php", {
params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
});
if (poll.data.status === 1) {
await store.saveToken(sitekey, poll.data.request);
return { solution: poll.data.request, source: "api" };
}
if (poll.data.request !== "CAPCHA_NOT_READY") return { error: poll.data.request };
}
return { error: "TIMEOUT" };
}
状态管理模式
| 图案 | 何时使用 |
|---|---|
| 会话锁定 | 一名工作人员管理登录,其他工作人员使用 cookie |
| 代币池 | 高吞吐量:QA 预测试和分配代币 |
| Cookie 共享 | 工作人员需要经过身份验证的会话 |
| 代理亲和力 | 目标站点跟踪 IP 会话绑定 |
故障排除
| 问题 | 原因 | 处理方式 |
|---|---|---|
| 工人获得不同的会话 | Cookie 不通过 Redis 共享 | 验证请求成功后是否调用 save_cookies |
| 令牌在其他工作人员使用之前已过期 | TTL太长或网络延迟 | 降低TTL安全裕度;在检索后 10 秒内使用令牌 |
| 会话锁从未释放 | 工人坠毁 | 锁键自动释放 TTL(默认 300 秒) |
| 目标站点阻止工人 | 所有工作人员使用相同的代理 | 使用具有每个工作人员关联性的代理池 |
常问问题
每个工人都应该共享cookie吗?
仅适用于需要经过身份验证的会话的站点。对于无状态验证码解决(提交 sitekey → 获取令牌),工作人员不需要共享 cookie - 只需要共享令牌。
如何处理会话过期?
将 Redis TTL 设置为比会话生命周期稍短。当 cookie 过期时,一名工作人员将获取会话锁,重新进行身份验证,并为其他工作人员存储新的 cookie。
基于浏览器的会话 (Puppeteer/Playwright) 怎么样?
使用 page.cookies() 序列化浏览器 cookie 并存储在 Redis 中。其他工作人员用 page.setCookie() 加载它们。这适用于不同的浏览器实例和计算机。
下一步
有效地协调分布式验证码工作人员 –”获取您的 CaptchaAI API 密钥。
相关指南:
- Redis 令牌 TTL 管理
- 用于更好解决率的 Cookie
- 浏览器会话持久性