DevOps & Scaling

自动缩放验证码解决工作人员

静态工作池在安静时段会浪费金钱,并在高峰时段会造成瓶颈。自动扩展将工作人员数量与实际需求相匹配,从而优化成本和吞吐量。


缩放信号

信号 扩大规模何时 缩小规模何时
队列深度 > 20 个待处理任务 < 5 个待处理任务
工人利用率 > 80% 忙碌 < 20% 忙碌
解决延迟 P95 > 60 秒 P95 < 20 秒
错误率 > 5%(需要新员工) 稳定<1%
平衡 N/A 余额 < $1(停止缩放)

基于线程的自动缩放器

在单个进程中扩展工作线程:

import os
import time
import threading
import requests
import json
import redis


class AutoScalingPool:
    """Dynamically scale CaptchaAI worker threads."""

    def __init__(self, api_key, redis_url="redis://localhost:6379"):
        self.api_key = api_key
        self.redis = redis.from_url(redis_url)
        self.base = "https://ocr.captchaai.com"
        self.queue_key = "captcha:tasks"
        self.results_key = "captcha:results"

        self.min_workers = 2
        self.max_workers = 20
        self.workers = []
        self.active_count = 0
        self.lock = threading.Lock()
        self.running = True

    def start(self):
        """Start the pool with minimum workers."""
        for _ in range(self.min_workers):
            self._add_worker()

        # Start scaler in background
        scaler = threading.Thread(target=self._scaling_loop, daemon=True)
        scaler.start()
        print(f"Pool started with {self.min_workers} workers")

    def _add_worker(self):
        """Add a worker thread."""
        if len(self.workers) >= self.max_workers:
            return
        t = threading.Thread(target=self._worker_loop, daemon=True)
        t.start()
        self.workers.append(t)

    def _remove_worker(self):
        """Signal one worker to stop (lazy removal)."""
        if len(self.workers) <= self.min_workers:
            return
        self.workers.pop()  # Thread will exit on next idle cycle

    def _worker_loop(self):
        """Worker loop: fetch and process tasks."""
        while self.running and threading.current_thread() in self.workers:
            result = self.redis.blpop(self.queue_key, timeout=10)
            if result is None:
                continue

            _, raw = result
            task = json.loads(raw)
            task_id = task["id"]

            with self.lock:
                self.active_count += 1

            try:
                token = self._solve(task["method"], task["params"])
                self.redis.hset(self.results_key, task_id, json.dumps({
                    "status": "success", "token": token,
                }))
            except Exception as e:
                self.redis.hset(self.results_key, task_id, json.dumps({
                    "status": "error", "error": str(e),
                }))
            finally:
                with self.lock:
                    self.active_count -= 1

    def _scaling_loop(self):
        """Periodically adjust worker count."""
        while self.running:
            time.sleep(10)

            queue_depth = self.redis.llen(self.queue_key)
            current = len(self.workers)
            utilization = (
                self.active_count / current * 100 if current > 0 else 0
            )

            # Scale up: queue growing and workers busy
            if queue_depth > 20 and utilization > 70:
                new_count = min(current + 2, self.max_workers)
                while len(self.workers) < new_count:
                    self._add_worker()
                print(f"Scaled up: {current} → {len(self.workers)} workers")

            # Scale down: queue empty and workers idle
            elif queue_depth < 5 and utilization < 20:
                target = max(current - 1, self.min_workers)
                while len(self.workers) > target:
                    self._remove_worker()
                if len(self.workers) < current:
                    print(f"Scaled down: {current} → {len(self.workers)} workers")

    def _solve(self, method, params, timeout=120):
        data = {"key": self.api_key, "method": method, "json": 1}
        data.update(params)

        resp = requests.post(
            f"{self.base}/in.php", data=data, timeout=30,
        )
        result = resp.json()

        if result.get("status") != 1:
            raise RuntimeError(result.get("request"))

        captcha_id = result["request"]
        start = time.time()

        while time.time() - start < timeout:
            time.sleep(5)
            resp = requests.get(f"{self.base}/res.php", params={
                "key": self.api_key,
                "action": "get",
                "id": captcha_id,
                "json": 1,
            }, timeout=15)
            data = resp.json()
            if data["request"] != "CAPCHA_NOT_READY":
                if data.get("status") == 1:
                    return data["request"]
                raise RuntimeError(data["request"])

        raise TimeoutError("Solve timeout")

    def stats(self):
        return {
            "workers": len(self.workers),
            "active": self.active_count,
            "queue": self.redis.llen(self.queue_key),
        }


# Usage
pool = AutoScalingPool(os.environ["CAPTCHAAI_KEY"])
pool.start()

# Monitor
while True:
    print(pool.stats())
    time.sleep(30)

基于流程的自动缩放器

缩放工作进程以实现 CPU 隔离:

import multiprocessing
import time
import redis
import os


class ProcessScaler:
    """Scale worker processes based on queue depth."""

    def __init__(self, worker_fn, redis_url="redis://localhost:6379"):
        self.worker_fn = worker_fn
        self.redis = redis.from_url(redis_url)
        self.processes = []
        self.min_workers = 2
        self.max_workers = 16

    def run(self, check_interval=15):
        """Run the scaler loop."""
        # Start minimum workers
        for _ in range(self.min_workers):
            self._spawn()

        while True:
            time.sleep(check_interval)
            self._cleanup_dead()

            queue_depth = self.redis.llen("captcha:tasks")
            current = len(self.processes)

            # Scale up
            if queue_depth > current * 5 and current < self.max_workers:
                to_add = min(
                    max(1, queue_depth // 10),
                    self.max_workers - current,
                )
                for _ in range(to_add):
                    self._spawn()
                print(f"Scaled up to {len(self.processes)} workers")

            # Scale down
            elif queue_depth < 3 and current > self.min_workers:
                to_remove = min(2, current - self.min_workers)
                for _ in range(to_remove):
                    p = self.processes.pop()
                    p.terminate()
                print(f"Scaled down to {len(self.processes)} workers")

    def _spawn(self):
        p = multiprocessing.Process(target=self.worker_fn)
        p.start()
        self.processes.append(p)

    def _cleanup_dead(self):
        self.processes = [p for p in self.processes if p.is_alive()]
        # Ensure minimum
        while len(self.processes) < self.min_workers:
            self._spawn()

平衡感知缩放

当资金不足时停止扩容:

def check_balance(api_key, min_balance=2.0):
    """Check if balance is sufficient for scaling."""
    resp = requests.get("https://ocr.captchaai.com/res.php", params={
        "key": api_key,
        "action": "getbalance",
        "json": 1,
    }, timeout=15)
    balance = float(resp.json()["request"])

    if balance < min_balance:
        print(f"Balance ${balance:.2f} below ${min_balance} — halting scale-up")
        return False
    return True

集成到缩放循环中:

# In _scaling_loop:
if queue_depth > 20 and utilization > 70:
    if check_balance(self.api_key, min_balance=2.0):
        # Scale up
        ...
    else:
        print("Scaling paused — low balance")

扩展策略比较

战略 最适合 延迟 复杂
线程池 I/O-bound(API 调用) 低的 低的
进程池 CPU 密集型预处理 中等的 中等的
Kubernetes HPA 云原生部署 更高 高的
科达 事件驱动的扩展 中等的 中等的

故障排除

问题 原因 处理方式
工人们不断扩大规模 队列永远不会耗尽 检查工人是否真正在加工
缩减规模过于激进 门槛低 将缩小延迟增加到 30 秒以上
僵尸进程 进程未清理 定期使用_cleanup_dead()
余额快速流失 工人太多 为缩放逻辑添加余额检查

常问问题

正确的工人与队列的比例是多少?

目标是每 5-10 个排队任务有 1 个工作人员。每个工作人员每分钟处理约 3-6 个验证码,具体取决于类型。

我应该使用线程还是进程?

用于纯 API 调用的线程(CaptchaAI 是 I/O-bound)。当您在求解的同时还进行图像预处理或大量计算时进行处理。

我应该多快扩大规模?

快速扩大规模(每 10-15 秒检查一次),缓慢缩小规模(低负载时等待 30-60 秒)。这可以防止状态之间的颠簸。


相关指南

  • Kubernetes 作业队列
  • Prometheus/Grafana 监控

智能规模——获取您的 CaptchaAI 密钥今天。

该文章已禁用评论。

相关文章

DevOps & Scaling 用于 CaptchaAI Worker 部署的 Ansible Playbook
使用 Captcha AI Worker 部署 Ansible Playbook 的 Dev Ops 指南,包括生产中 Captcha AI 工作流程的架构决策、操作注意事项和自动化模式。

使用 Captcha AI Worker 部署 Ansible Playbook 的 Dev Ops 指南,包括生产中 Captcha AI 工作流程的架构决策、操作注...

Apr 19, 2026
DevOps & Scaling AWS Lambda + CaptchaAI:无服务器验证码解决
AWS Lambda + Captcha AI 的开发运营指南:无服务器验证码解决方案,包含生产中 Captcha AI 工作流程的架构决策、操作注意事项和自动化模式。

AWS Lambda + Captcha AI 的开发运营指南:无服务器验证码解决方案,包含生产中 Captcha AI 工作流程的架构决策、操作...

Apr 21, 2026
DevOps & Scaling 使用 AWS SNS 和 CaptchaAI 构建事件驱动的验证码解决方案
使用 AWS SNS 和 Captcha AI 构建事件驱动的验证码解决方案的开发运营指南,包括生产中 Captcha AI 工作流程的架构决策、操作注意事项和自动化模式。

使用 AWS SNS 和 Captcha AI 构建事件驱动的验证码解决方案的开发运营指南,包括生产中 Captcha AI 工作流程的架构决...

Apr 22, 2026