Tutorials

跨分布式工作人员的 CAPTCHA 会话状态管理

当多个工作人员解决同一站点的验证码时,他们会遇到一个问题:每个工作人员都有自己的会话。目标站点会看到不同的 cookie、不同的 IP 和不同的浏览器特征。会话状态管理可同步工作人员之间的上下文,因此解决方案是一致的,并且目标站点可以看到一致的会话。

会话状态问题

Worker 1 → Login → Solve CAPTCHA → Get cookie A → Submit form ✅
Worker 2 → New session → Solve CAPTCHA → Get cookie B → Submit form ✅
Worker 3 → Reuse cookie A? → Cookie expired → Solve CAPTCHA → Fail ❌

如果没有共享状态,工作人员会浪费过期会话的解决方案,并产生目标站点可以检测到的不一致行为。

会话状态包括什么

状态组件 寿命 分享策略
身份验证cookie 分钟到小时 带 TTL 的 Redis
验证码令牌 90-300秒 Redis 列表(短 TTL)
qa_session_cookie cookie 〜30分钟 Redis 哈希
CSRF 代币 每页加载 不要分享——每个工人都有自己的
浏览器特征 永恒的 配置,而不是运行时状态
代理分配 每节课 Redis 支持的代理池

建筑学

┌──────────────────────────────────────┐
│          Session State Store          │
│              (Redis)                  │
│                                      │
│  cookies:{domain} → Hash             │
│  tokens:{sitekey} → List             │
│  proxies:pool → Set                  │
│  locks:{domain}:{worker} → String    │
└─────┬──────────┬──────────┬──────────┘
      │          │          │
  ┌───▼───┐  ┌──▼────┐  ┌──▼────┐
  │Worker1│  │Worker2│  │Worker3│
  └───────┘  └───────┘  └───────┘

Python实现

会话存储

import os
import json
import time
import redis
import requests
from datetime import datetime, timezone

r = redis.Redis(
    host=os.environ.get("REDIS_HOST", "localhost"),
    port=int(os.environ.get("REDIS_PORT", 6379)),
    decode_responses=True
)

API_KEY = os.environ["CAPTCHAAI_API_KEY"]


class SessionStore:
    """Shared session state across distributed workers."""

    def __init__(self, domain):
        self.domain = domain
        self.cookie_key = f"session:cookies:{domain}"
        self.token_key = f"session:tokens:{domain}"

    def save_cookies(self, cookies, ttl=1800):
        """Store cookies from a successful session."""
        cookie_data = {name: value for name, value in cookies.items()}
        r.hset(self.cookie_key, mapping=cookie_data)
        r.expire(self.cookie_key, ttl)

    def get_cookies(self):
        """Retrieve shared cookies."""
        cookies = r.hgetall(self.cookie_key)
        return cookies if cookies else None

    def save_token(self, sitekey, token, ttl=80):
        """Store a solved CAPTCHA token."""
        key = f"{self.token_key}:{sitekey}"
        r.rpush(key, token)
        r.expire(key, ttl)

    def get_token(self, sitekey):
        """Pop a cached CAPTCHA token."""
        key = f"{self.token_key}:{sitekey}"
        return r.lpop(key)

    def acquire_session_lock(self, worker_id, ttl=300):
        """Ensure only one worker manages the session at a time."""
        lock_key = f"session:lock:{self.domain}"
        return r.set(lock_key, worker_id, nx=True, ex=ttl)

    def release_session_lock(self, worker_id):
        """Release session lock if this worker holds it."""
        lock_key = f"session:lock:{self.domain}"
        current = r.get(lock_key)
        if current == worker_id:
            r.delete(lock_key)

共享状态的工作者

class CaptchaWorker:
    def __init__(self, worker_id, domain):
        self.worker_id = worker_id
        self.store = SessionStore(domain)
        self.session = requests.Session()

    def setup_session(self):
        """Load shared cookies into this worker's session."""
        cookies = self.store.get_cookies()
        if cookies:
            for name, value in cookies.items():
                self.session.cookies.set(name, value)
            return True
        return False

    def solve_captcha(self, sitekey, pageurl):
        """Solve with token cache and session sharing."""
        # Check for cached token
        cached = self.store.get_token(sitekey)
        if cached:
            return {"solution": cached, "source": "cache"}

        # Solve via CaptchaAI
        resp = requests.post("https://ocr.captchaai.com/in.php", data={
            "key": API_KEY,
            "method": "userrecaptcha",
            "googlekey": sitekey,
            "pageurl": pageurl,
            "json": 1
        })
        data = resp.json()
        if data.get("status") != 1:
            return {"error": data.get("request")}

        captcha_id = data["request"]

        for _ in range(60):
            time.sleep(5)
            result = requests.get("https://ocr.captchaai.com/res.php", params={
                "key": API_KEY, "action": "get",
                "id": captcha_id, "json": 1
            }).json()

            if result.get("status") == 1:
                token = result["request"]
                self.store.save_token(sitekey, token)
                return {"solution": token, "source": "api"}

            if result.get("request") != "CAPCHA_NOT_READY":
                return {"error": result.get("request")}

        return {"error": "TIMEOUT"}

    def process_page(self, url, sitekey):
        """Full workflow: setup session → solve CAPTCHA → submit."""
        # Load shared session
        self.setup_session()

        # Solve CAPTCHA
        result = self.solve_captcha(sitekey, url)
        if "error" in result:
            return result

        # Submit form with token
        response = self.session.post(url, data={
            "g-recaptcha-response": result["solution"]
        })

        # Share resulting cookies
        self.store.save_cookies(dict(self.session.cookies))

        return {"status": response.status_code, "source": result["source"]}

代理池管理

class ProxyPool:
    """Distribute proxies across workers to avoid IP conflicts."""

    def __init__(self, proxies):
        self.pool_key = "session:proxy_pool"
        self.assigned_key = "session:proxy_assigned"
        # Initialize pool
        for proxy in proxies:
            r.sadd(self.pool_key, proxy)

    def acquire_proxy(self, worker_id, ttl=600):
        """Assign an unused proxy to a worker."""
        # Check if worker already has one
        existing = r.hget(self.assigned_key, worker_id)
        if existing:
            return existing

        # Pop from available pool
        proxy = r.spop(self.pool_key)
        if proxy:
            r.hset(self.assigned_key, worker_id, proxy)
            r.expire(self.assigned_key, ttl)
            return proxy
        return None

    def release_proxy(self, worker_id):
        """Return proxy to the pool."""
        proxy = r.hget(self.assigned_key, worker_id)
        if proxy:
            r.sadd(self.pool_key, proxy)
            r.hdel(self.assigned_key, worker_id)

JavaScript 实现

const Redis = require("ioredis");
const axios = require("axios");

const redis = new Redis(process.env.REDIS_URL || "redis://localhost:6379");
const API_KEY = process.env.CAPTCHAAI_API_KEY;

class SessionStore {
  constructor(domain) {
    this.domain = domain;
    this.cookieKey = `session:cookies:${domain}`;
    this.tokenKey = `session:tokens:${domain}`;
  }

  async saveCookies(cookies, ttl = 1800) {
    const entries = Object.entries(cookies).flat();
    if (entries.length > 0) {
      await redis.hset(this.cookieKey, ...entries);
      await redis.expire(this.cookieKey, ttl);
    }
  }

  async getCookies() {
    return await redis.hgetall(this.cookieKey);
  }

  async saveToken(sitekey, token, ttl = 80) {
    const key = `${this.tokenKey}:${sitekey}`;
    await redis.rpush(key, token);
    await redis.expire(key, ttl);
  }

  async getToken(sitekey) {
    return await redis.lpop(`${this.tokenKey}:${sitekey}`);
  }

  async acquireLock(workerId, ttl = 300) {
    const result = await redis.set(`session:lock:${this.domain}`, workerId, "NX", "EX", ttl);
    return result === "OK";
  }

  async releaseLock(workerId) {
    const current = await redis.get(`session:lock:${this.domain}`);
    if (current === workerId) await redis.del(`session:lock:${this.domain}`);
  }
}

async function workerSolve(store, sitekey, pageurl) {
  const cached = await store.getToken(sitekey);
  if (cached) return { solution: cached, source: "cache" };

  const submit = await axios.post("https://ocr.captchaai.com/in.php", null, {
    params: { key: API_KEY, method: "userrecaptcha", googlekey: sitekey, pageurl, json: 1 },
  });
  if (submit.data.status !== 1) return { error: submit.data.request };

  const captchaId = submit.data.request;
  for (let i = 0; i < 60; i++) {
    await new Promise((r) => setTimeout(r, 5000));
    const poll = await axios.get("https://ocr.captchaai.com/res.php", {
      params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
    });
    if (poll.data.status === 1) {
      await store.saveToken(sitekey, poll.data.request);
      return { solution: poll.data.request, source: "api" };
    }
    if (poll.data.request !== "CAPCHA_NOT_READY") return { error: poll.data.request };
  }
  return { error: "TIMEOUT" };
}

状态管理模式

图案 何时使用
会话锁定 一名工作人员管理登录,其他工作人员使用 cookie
代币池 高吞吐量:QA 预测试和分配代币
Cookie 共享 工作人员需要经过身份验证的会话
代理亲和力 目标站点跟踪 IP 会话绑定

故障排除

问题 原因 处理方式
工人获得不同的会话 Cookie 不通过 Redis 共享 验证请求成功后是否调用 save_cookies
令牌在其他工作人员使用之前已过期 TTL太长或网络延迟 降低TTL安全裕度;在检索后 10 秒内使用令牌
会话锁从未释放 工人坠毁 锁键自动释放 TTL(默认 300 秒)
目标站点阻止工人 所有工作人员使用相同的代理 使用具有每个工作人员关联性的代理池

常问问题

仅适用于需要经过身份验证的会话的站点。对于无状态验证码解决(提交 sitekey → 获取令牌),工作人员不需要共享 cookie - 只需要共享令牌。

如何处理会话过期?

将 Redis TTL 设置为比会话生命周期稍短。当 cookie 过期时,一名工作人员将获取会话锁,重新进行身份验证,并为其他工作人员存储新的 cookie。

基于浏览器的会话 (Puppeteer/Playwright) 怎么样?

使用 page.cookies() 序列化浏览器 cookie 并存储在 Redis 中。其他工作人员用 page.setCookie() 加载它们。这适用于不同的浏览器实例和计算机。

下一步

有效地协调分布式验证码工作人员 –”获取您的 CaptchaAI API 密钥

相关指南:

该文章已禁用评论。

相关文章

DevOps & Scaling 用于 CaptchaAI Worker 部署的 Ansible Playbook
使用 Captcha AI Worker 部署 Ansible Playbook 的 Dev Ops 指南,包括生产中 Captcha AI 工作流程的架构决策、操作注意事项和自动化模式。

使用 Captcha AI Worker 部署 Ansible Playbook 的 Dev Ops 指南,包括生产中 Captcha AI 工作流程的架构决策、操作注...

Apr 19, 2026
DevOps & Scaling AWS Lambda + CaptchaAI:无服务器验证码解决
AWS Lambda + Captcha AI 的开发运营指南:无服务器验证码解决方案,包含生产中 Captcha AI 工作流程的架构决策、操作注意事项和自动化模式。

AWS Lambda + Captcha AI 的开发运营指南:无服务器验证码解决方案,包含生产中 Captcha AI 工作流程的架构决策、操作...

Apr 21, 2026
DevOps & Scaling 使用 AWS SNS 和 CaptchaAI 构建事件驱动的验证码解决方案
使用 AWS SNS 和 Captcha AI 构建事件驱动的验证码解决方案的开发运营指南,包括生产中 Captcha AI 工作流程的架构决策、操作注意事项和自动化模式。

使用 AWS SNS 和 Captcha AI 构建事件驱动的验证码解决方案的开发运营指南,包括生产中 Captcha AI 工作流程的架构决...

Apr 22, 2026