DevOps 与扩展

用于大规模验证码解决的 Kubernetes 作业队列

Kubernetes 为大容量验证码解决提供自动扩展、自我修复的基础设施。本指南部署工作单元,从 Redis 队列中提取任务并根据需求进行扩展。


建筑学

Producer → Redis Queue → Worker Pods (auto-scaled) → CaptchaAI API
                              ↓
                       Results Store (Redis)

工人部署

# k8s/worker-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: captcha-worker
  labels:
    app: captcha-worker
spec:
  replicas: 3
  selector:
    matchLabels:
      app: captcha-worker
  template:
    metadata:
      labels:
        app: captcha-worker
    spec:
      containers:

        - name: worker
          image: your-registry/captcha-worker:latest
          env:

            - name: CAPTCHAAI_KEY
              valueFrom:
                secretKeyRef:
                  name: captchaai-secret
                  key: api-key

            - name: REDIS_URL
              value: "redis://redis-service:6379"
          resources:
            requests:
              memory: "128Mi"
              cpu: "100m"
            limits:
              memory: "256Mi"
              cpu: "250m"

库伯内特的秘密

kubectl create secret generic captchaai-secret \
  --from-literal=api-key=YOUR_API_KEY

Redis部署

# k8s/redis.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: redis
spec:
  replicas: 1
  selector:
    matchLabels:
      app: redis
  template:
    metadata:
      labels:
        app: redis
    spec:
      containers:

        - name: redis
          image: redis:7-alpine
          ports:

            - containerPort: 6379
          resources:
            requests:
              memory: "128Mi"
              cpu: "100m"
---
apiVersion: v1
kind: Service
metadata:
  name: redis-service
spec:
  selector:
    app: redis
  ports:

    - port: 6379

工人代码

# worker.py
import os
import json
import time
import redis
import requests


class CaptchaWorker:
    """Kubernetes worker that processes CAPTCHA tasks from Redis."""

    def __init__(self):
        self.api_key = os.environ["CAPTCHAAI_KEY"]
        self.redis = redis.from_url(
            os.environ.get("REDIS_URL", "redis://localhost:6379"),
        )
        self.base = "https://ocr.captchaai.com"

    def run(self):
        """Main worker loop."""
        hostname = os.environ.get("HOSTNAME", "unknown")
        print(f"Worker {hostname} started")

        while True:
            result = self.redis.blpop("captcha:queue", timeout=30)
            if result is None:
                continue

            _, raw = result
            task = json.loads(raw)
            task_id = task.get("id", "unknown")

            print(f"[{hostname}] Processing {task_id}")
            start = time.time()

            try:
                token = self._solve(task["method"], task["params"])
                duration = time.time() - start
                self.redis.hset("captcha:results", task_id, json.dumps({
                    "status": "success",
                    "token": token,
                    "duration": f"{duration:.1f}s",
                    "worker": hostname,
                }))
                print(f"[{hostname}] {task_id} solved in {duration:.1f}s")

            except Exception as e:
                self.redis.hset("captcha:results", task_id, json.dumps({
                    "status": "error",
                    "error": str(e),
                    "worker": hostname,
                }))
                print(f"[{hostname}] {task_id} failed: {e}")

            # Update queue length metric
            queue_len = self.redis.llen("captcha:queue")
            self.redis.set("captcha:queue_length", queue_len)

    def _solve(self, method, params, timeout=120):
        resp = requests.post(f"{self.base}/in.php", data={
            "key": self.api_key,
            "method": method,
            "json": 1,
            **params,
        }, timeout=30)
        result = resp.json()

        if result.get("status") != 1:
            raise RuntimeError(result.get("request"))

        captcha_id = result["request"]

        start = time.time()
        while time.time() - start < timeout:
            time.sleep(5)
            resp = requests.get(f"{self.base}/res.php", params={
                "key": self.api_key,
                "action": "get",
                "id": captcha_id,
                "json": 1,
            }, timeout=15)
            data = resp.json()
            if data["request"] != "CAPCHA_NOT_READY":
                if data.get("status") == 1:
                    return data["request"]
                raise RuntimeError(data["request"])

        raise TimeoutError("Solve timeout")


if __name__ == "__main__":
    CaptchaWorker().run()

水平 Pod 自动缩放器

根据队列深度缩放工作人员:

# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: captcha-worker-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: captcha-worker
  minReplicas: 2
  maxReplicas: 20
  metrics:

    - type: External
      external:
        metric:
          name: redis_queue_length
          selector:
            matchLabels:
              queue: captcha
        target:
          type: AverageValue
          averageValue: "10"

任务制作者

import json
import uuid
import redis


def submit_tasks(redis_url, tasks):
    """Submit CAPTCHA tasks to the queue."""
    r = redis.from_url(redis_url)
    task_ids = []

    for task in tasks:
        task_id = str(uuid.uuid4())[:8]
        task["id"] = task_id
        r.rpush("captcha:queue", json.dumps(task))
        task_ids.append(task_id)

    return task_ids


def get_results(redis_url, task_ids, timeout=180):
    """Wait for and collect results."""
    r = redis.from_url(redis_url)
    results = {}
    deadline = time.time() + timeout

    while len(results) < len(task_ids) and time.time() < deadline:
        for tid in task_ids:
            if tid in results:
                continue
            raw = r.hget("captcha:results", tid)
            if raw:
                results[tid] = json.loads(raw)
        time.sleep(1)

    return results

故障排除

问题 原因 处理方式
工人未开始工作 未创建秘密 运行 kubectl create secret 命令
CrashLoopBackOff 中的 Pod 缺少环境变量或 Redis 使用 kubectl logs 检查日志
HPA 不缩放 未配置自定义指标 安装指标适配器 (KEDA)
队列不断增长但没有处理 工人闲置/crashed 检查 Pod 运行状况并重新启动

常问问题

我应该从多少个工作单元开始?

从 3 个副本开始,让 HPA 根据队列深度进行扩展。每个 pod 可处理约 5-10 个并发解决问题,具体取决于验证码类型。

我应该使用作业还是部署?

对处理共享队列的连续工作人员使用部署。将作业用于具有固定数量任务的批处理工作负载。

我可以使用 KEDA 代替 HPA 吗?

是的。 KEDA(Kubernetes 事件驱动自动缩放)本身支持 Redis 队列长度作为缩放触发器,并且比自定义指标更容易配置。


相关指南


规模扩大至数千——获取CaptchaAI对于 Kubernetes。

该文章已禁用评论。