Kubernetes 为大容量验证码解决提供自动扩展、自我修复的基础设施。本指南部署工作单元,从 Redis 队列中提取任务并根据需求进行扩展。
建筑学
Producer → Redis Queue → Worker Pods (auto-scaled) → CaptchaAI API
↓
Results Store (Redis)
工人部署
# k8s/worker-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: captcha-worker
labels:
app: captcha-worker
spec:
replicas: 3
selector:
matchLabels:
app: captcha-worker
template:
metadata:
labels:
app: captcha-worker
spec:
containers:
- name: worker
image: your-registry/captcha-worker:latest
env:
- name: CAPTCHAAI_KEY
valueFrom:
secretKeyRef:
name: captchaai-secret
key: api-key
- name: REDIS_URL
value: "redis://redis-service:6379"
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "250m"
库伯内特的秘密
kubectl create secret generic captchaai-secret \
--from-literal=api-key=YOUR_API_KEY
Redis部署
# k8s/redis.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: redis
spec:
replicas: 1
selector:
matchLabels:
app: redis
template:
metadata:
labels:
app: redis
spec:
containers:
- name: redis
image: redis:7-alpine
ports:
- containerPort: 6379
resources:
requests:
memory: "128Mi"
cpu: "100m"
---
apiVersion: v1
kind: Service
metadata:
name: redis-service
spec:
selector:
app: redis
ports:
- port: 6379
工人代码
# worker.py
import os
import json
import time
import redis
import requests
class CaptchaWorker:
"""Kubernetes worker that processes CAPTCHA tasks from Redis."""
def __init__(self):
self.api_key = os.environ["CAPTCHAAI_KEY"]
self.redis = redis.from_url(
os.environ.get("REDIS_URL", "redis://localhost:6379"),
)
self.base = "https://ocr.captchaai.com"
def run(self):
"""Main worker loop."""
hostname = os.environ.get("HOSTNAME", "unknown")
print(f"Worker {hostname} started")
while True:
result = self.redis.blpop("captcha:queue", timeout=30)
if result is None:
continue
_, raw = result
task = json.loads(raw)
task_id = task.get("id", "unknown")
print(f"[{hostname}] Processing {task_id}")
start = time.time()
try:
token = self._solve(task["method"], task["params"])
duration = time.time() - start
self.redis.hset("captcha:results", task_id, json.dumps({
"status": "success",
"token": token,
"duration": f"{duration:.1f}s",
"worker": hostname,
}))
print(f"[{hostname}] {task_id} solved in {duration:.1f}s")
except Exception as e:
self.redis.hset("captcha:results", task_id, json.dumps({
"status": "error",
"error": str(e),
"worker": hostname,
}))
print(f"[{hostname}] {task_id} failed: {e}")
# Update queue length metric
queue_len = self.redis.llen("captcha:queue")
self.redis.set("captcha:queue_length", queue_len)
def _solve(self, method, params, timeout=120):
resp = requests.post(f"{self.base}/in.php", data={
"key": self.api_key,
"method": method,
"json": 1,
**params,
}, timeout=30)
result = resp.json()
if result.get("status") != 1:
raise RuntimeError(result.get("request"))
captcha_id = result["request"]
start = time.time()
while time.time() - start < timeout:
time.sleep(5)
resp = requests.get(f"{self.base}/res.php", params={
"key": self.api_key,
"action": "get",
"id": captcha_id,
"json": 1,
}, timeout=15)
data = resp.json()
if data["request"] != "CAPCHA_NOT_READY":
if data.get("status") == 1:
return data["request"]
raise RuntimeError(data["request"])
raise TimeoutError("Solve timeout")
if __name__ == "__main__":
CaptchaWorker().run()
水平 Pod 自动缩放器
根据队列深度缩放工作人员:
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: captcha-worker-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: captcha-worker
minReplicas: 2
maxReplicas: 20
metrics:
- type: External
external:
metric:
name: redis_queue_length
selector:
matchLabels:
queue: captcha
target:
type: AverageValue
averageValue: "10"
任务制作者
import json
import uuid
import redis
def submit_tasks(redis_url, tasks):
"""Submit CAPTCHA tasks to the queue."""
r = redis.from_url(redis_url)
task_ids = []
for task in tasks:
task_id = str(uuid.uuid4())[:8]
task["id"] = task_id
r.rpush("captcha:queue", json.dumps(task))
task_ids.append(task_id)
return task_ids
def get_results(redis_url, task_ids, timeout=180):
"""Wait for and collect results."""
r = redis.from_url(redis_url)
results = {}
deadline = time.time() + timeout
while len(results) < len(task_ids) and time.time() < deadline:
for tid in task_ids:
if tid in results:
continue
raw = r.hget("captcha:results", tid)
if raw:
results[tid] = json.loads(raw)
time.sleep(1)
return results
故障排除
| 问题 | 原因 | 处理方式 |
|---|---|---|
| 工人未开始工作 | 未创建秘密 | 运行 kubectl create secret 命令 |
| CrashLoopBackOff 中的 Pod | 缺少环境变量或 Redis | 使用 kubectl logs 检查日志 |
| HPA 不缩放 | 未配置自定义指标 | 安装指标适配器 (KEDA) |
| 队列不断增长但没有处理 | 工人闲置/crashed | 检查 Pod 运行状况并重新启动 |
常问问题
我应该从多少个工作单元开始?
从 3 个副本开始,让 HPA 根据队列深度进行扩展。每个 pod 可处理约 5-10 个并发解决问题,具体取决于验证码类型。
我应该使用作业还是部署?
对处理共享队列的连续工作人员使用部署。将作业用于具有固定数量任务的批处理工作负载。
我可以使用 KEDA 代替 HPA 吗?
是的。 KEDA(Kubernetes 事件驱动自动缩放)本身支持 Redis 队列长度作为缩放触发器,并且比自定义指标更容易配置。
相关指南
- Docker + CaptchaAI
- Redis队列+CaptchaAI
规模扩大至数千——获取CaptchaAI对于 Kubernetes。