DevOps & Scaling

负载均衡器背后的 CaptchaAI:架构模式

当您的抓取基础设施发送数千个验证码解决请求时,单个工作进程就会出现瓶颈。负载均衡器将请求分配给多个工作人员,从而提高吞吐量、实现故障转移并让您水平扩展。

架构概述

[Scraper 1] ──┐                      ┌── [Worker 1] ──→ CaptchaAI API
[Scraper 2] ──┤── [Load Balancer] ──┤── [Worker 2] ──→ CaptchaAI API
[Scraper 3] ──┘                      └── [Worker 3] ──→ CaptchaAI API

NGINX 配置

循环(默认)

upstream captcha_workers {
    server 10.0.1.10:8080;
    server 10.0.1.11:8080;
    server 10.0.1.12:8080;
}

server {
    listen 80;
    server_name captcha.internal;

    location /solve {
        proxy_pass http://captcha_workers;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_connect_timeout 10s;
        proxy_read_timeout 300s;  # CAPTCHA solving can take minutes
    }

    location /health {
        proxy_pass http://captcha_workers;
        proxy_connect_timeout 5s;
        proxy_read_timeout 5s;
    }
}

最少连接(更适合验证码解决)

upstream captcha_workers {
    least_conn;  # Route to worker with fewest active connections
    server 10.0.1.10:8080;
    server 10.0.1.11:8080;
    server 10.0.1.12:8080 weight=2;  # Higher capacity worker

    # Health checks
    server 10.0.1.10:8080 max_fails=3 fail_timeout=30s;
    server 10.0.1.11:8080 max_fails=3 fail_timeout=30s;
    server 10.0.1.12:8080 max_fails=3 fail_timeout=30s;
}

与备份人员

upstream captcha_workers {
    least_conn;
    server 10.0.1.10:8080;
    server 10.0.1.11:8080;
    server 10.0.1.12:8080 backup;  # Only used when others are down
}

工作API服务器

Python(烧瓶)

import os
import time
import threading
import requests
from flask import Flask, request, jsonify

API_KEY = os.environ["CAPTCHAAI_API_KEY"]
app = Flask(__name__)

# Track active tasks for load reporting
active_tasks = 0
tasks_lock = threading.Lock()
max_concurrent = int(os.environ.get("MAX_CONCURRENT", "20"))


@app.route("/solve", methods=["POST"])
def solve():
    global active_tasks
    with tasks_lock:
        if active_tasks >= max_concurrent:
            return jsonify({"error": "WORKER_AT_CAPACITY"}), 503
        active_tasks += 1

    try:
        data = request.json
        result = solve_captcha(data)
        return jsonify(result)
    finally:
        with tasks_lock:
            active_tasks -= 1


@app.route("/health")
def health():
    with tasks_lock:
        load = active_tasks / max_concurrent
    return jsonify({
        "status": "healthy" if load < 0.9 else "overloaded",
        "active_tasks": active_tasks,
        "max_concurrent": max_concurrent,
        "load_pct": round(load * 100, 1)
    }), 200 if load < 0.9 else 503


def solve_captcha(data):
    session = requests.Session()
    payload = {
        "key": API_KEY,
        "method": data.get("method", "userrecaptcha"),
        "googlekey": data.get("sitekey"),
        "pageurl": data.get("pageurl"),
        "json": 1
    }

    if data.get("proxy"):
        payload["proxy"] = data["proxy"]
        payload["proxytype"] = data.get("proxytype", "HTTP")

    resp = session.post("https://ocr.captchaai.com/in.php", data=payload)
    result = resp.json()
    if result.get("status") != 1:
        return {"error": result.get("request")}

    captcha_id = result["request"]
    for _ in range(60):
        time.sleep(5)
        poll = session.get("https://ocr.captchaai.com/res.php", params={
            "key": API_KEY, "action": "get", "id": captcha_id, "json": 1
        }).json()
        if poll.get("status") == 1:
            return {"solution": poll["request"], "captcha_id": captcha_id}
        if poll.get("request") != "CAPCHA_NOT_READY":
            return {"error": poll.get("request")}

    return {"error": "TIMEOUT"}


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8080, threaded=True)

JavaScript(快速)

const express = require("express");
const axios = require("axios");

const API_KEY = process.env.CAPTCHAAI_API_KEY;
const MAX_CONCURRENT = parseInt(process.env.MAX_CONCURRENT || "20", 10);
const PORT = parseInt(process.env.PORT || "8080", 10);

let activeTasks = 0;
const app = express();
app.use(express.json());

app.post("/solve", async (req, res) => {
  if (activeTasks >= MAX_CONCURRENT) {
    return res.status(503).json({ error: "WORKER_AT_CAPACITY" });
  }
  activeTasks++;

  try {
    const result = await solveCaptcha(req.body);
    res.json(result);
  } catch (err) {
    res.status(500).json({ error: err.message });
  } finally {
    activeTasks--;
  }
});

app.get("/health", (req, res) => {
  const load = activeTasks / MAX_CONCURRENT;
  const status = load < 0.9 ? "healthy" : "overloaded";
  res
    .status(load < 0.9 ? 200 : 503)
    .json({ status, activeTasks, maxConcurrent: MAX_CONCURRENT, loadPct: Math.round(load * 100) });
});

async function solveCaptcha(data) {
  const submitResp = await axios.post("https://ocr.captchaai.com/in.php", null, {
    params: {
      key: API_KEY,
      method: data.method || "userrecaptcha",
      googlekey: data.sitekey,
      pageurl: data.pageurl,
      json: 1,
    },
  });

  if (submitResp.data.status !== 1) {
    return { error: submitResp.data.request };
  }

  const captchaId = submitResp.data.request;
  for (let i = 0; i < 60; i++) {
    await new Promise((r) => setTimeout(r, 5000));
    const pollResp = await axios.get("https://ocr.captchaai.com/res.php", {
      params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
    });

    if (pollResp.data.status === 1) {
      return { solution: pollResp.data.request, captchaId };
    }
    if (pollResp.data.request !== "CAPCHA_NOT_READY") {
      return { error: pollResp.data.request };
    }
  }
  return { error: "TIMEOUT" };
}

app.listen(PORT, () => console.log(`Worker listening on port ${PORT}`));

路由策略比较

战略 它是如何运作的 最适合
循环赛 顺序轮换 同等能力的工人
最少连接数 路由至负载最少的路径 验证码解决(可变任务持续时间)
加权 与体重成正比 混合能力工人
IP哈希值 同一个客户→同一个工人 需要会话关联性
随机的 随机选择 简单、均匀分布负载

建议: 使用最少连接进行验证码解决。任务持续时间各不相同(5 秒到 120 秒),因此循环会造成负载不均匀。

客户端负载平衡

当无法使用外部负载均衡器时,请在客户端中实现路由:

import random
import requests

class ClientLoadBalancer:
    def __init__(self, workers):
        self.workers = [
            {"url": url, "healthy": True, "active": 0}
            for url in workers
        ]

    def get_worker(self):
        healthy = [w for w in self.workers if w["healthy"]]
        if not healthy:
            raise Exception("No healthy workers")
        return min(healthy, key=lambda w: w["active"])

    def solve(self, task):
        worker = self.get_worker()
        worker["active"] += 1
        try:
            resp = requests.post(
                f"{worker['url']}/solve",
                json=task,
                timeout=300
            )
            if resp.status_code == 503:
                worker["healthy"] = False
                return self.solve(task)  # Retry on another worker
            return resp.json()
        except requests.RequestException:
            worker["healthy"] = False
            return self.solve(task)
        finally:
            worker["active"] -= 1


lb = ClientLoadBalancer([
    "http://10.0.1.10:8080",
    "http://10.0.1.11:8080",
    "http://10.0.1.12:8080"
])
result = lb.solve({"sitekey": "6Le-wvkS...", "pageurl": "https://example.com"})

Algorithm selection matrix

  • 当工作人员同质且解决时间保持在较窄的延迟范围内时,请使用循环法。
  • 当解决持续时间变化时,请使用最少的连接,否则长时间运行的挑战将堆积在一名工作人员身上。
  • 仅针对故障隔离或会话敏感目标保留备份路由和源关联性。

故障排除

问题 原因 处理方式
502错误的网关 Worker 崩溃或未启动 检查工人日志;验证端口绑定
负载分布不均匀 具有可变任务持续时间的循环法 切换到最少连接
健康检查误报 检查通行证,但工人已满负荷 在健康响应中包含负载百分比
连接超时 proxy_read_timeout 太短 设置为 300s+ 验证码解决

常问问题

我需要为 2-3 名工作人员配备负载均衡器吗?

客户端负载平衡非常适合小型设置。当您有 5 个以上工作人员或需要 SSL 终止和运行状况检查等功能时,请使用专用负载均衡器(NGINX、HAProxy)。

我应该使用粘性会话吗?

不会。验证码解决请求是无状态的——任何工作人员都可以处理任何任务。粘性会话会造成负载分布不均匀。

不同地区的员工如何处理?

使用路由到最近的健康区域的全局负载均衡器(AWS Global Accelerator、Cloudflare Load Balancing)。每个区域都为该区域的工作人员运行自己的本地负载均衡器。

相关文章

下一步

扩展您的验证码解决吞吐量 -”获取您的 CaptchaAI API 密钥并部署在负载均衡器后面。

相关指南:

该文章已禁用评论。