网络故障、临时过载和暂时性错误都会发生。正确的重试逻辑可以让您的管道在没有手动干预的情况下运行解决这些问题。
重试哪些错误
| 错误 | 重试? | 为什么 |
|---|---|---|
ERROR_NO_SLOT_AVAILABLE |
○…是的 | 临时队列已满 |
| HTTP 429 | ○…是的 | 速率有限 |
| HTTP 500/502/503 | ○…是的 | 服务器临时错误 |
| 连接超时 | ○…是的 | 网络故障 |
CAPCHA_NOT_READY |
✓...继续投票 | 仍在处理中 |
ERROR_WRONG_USER_KEY |
❒ 否 | 配置错误 – 修复键 |
ERROR_KEY_DOES_NOT_EXIST |
❒ 否 | 密钥无效 |
ERROR_ZERO_BALANCE |
❒ 否 | 先添加资金 |
ERROR_CAPTCHA_UNSOLVABLE |
或许 | 使用新图像重新提交 |
带指数退避的基本重试
import requests
import time
import random
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://ocr.captchaai.com"
# Errors that should NOT be retried
PERMANENT_ERRORS = {
"ERROR_WRONG_USER_KEY",
"ERROR_KEY_DOES_NOT_EXIST",
"ERROR_ZERO_BALANCE",
"ERROR_BAD_PARAMETERS",
"ERROR_WRONG_CAPTCHA_ID",
}
# Errors that should be retried
TRANSIENT_ERRORS = {
"ERROR_NO_SLOT_AVAILABLE",
"ERROR_TOO_MUCH_REQUESTS",
}
def submit_with_retry(method, max_retries=5, **params):
"""Submit task with retry on transient errors."""
data = {"key": API_KEY, "method": method, "json": 1}
data.update(params)
for attempt in range(max_retries):
try:
resp = requests.post(
f"{BASE_URL}/in.php", data=data, timeout=30,
)
# HTTP-level errors
if resp.status_code in (429, 500, 502, 503):
wait = _backoff(attempt)
print(f"HTTP {resp.status_code}, retry in {wait:.1f}s")
time.sleep(wait)
continue
result = resp.json()
# Permanent errors — don't retry
if result.get("request") in PERMANENT_ERRORS:
raise RuntimeError(f"Permanent error: {result['request']}")
# Transient errors — retry
if result.get("request") in TRANSIENT_ERRORS:
wait = _backoff(attempt)
print(f"{result['request']}, retry in {wait:.1f}s")
time.sleep(wait)
continue
# Success
if result.get("status") == 1:
return result["request"]
# Unknown error
raise RuntimeError(f"Unknown error: {result.get('request')}")
except requests.ConnectionError:
wait = _backoff(attempt)
print(f"Connection error, retry in {wait:.1f}s")
time.sleep(wait)
except requests.Timeout:
wait = _backoff(attempt)
print(f"Timeout, retry in {wait:.1f}s")
time.sleep(wait)
raise RuntimeError(f"Failed after {max_retries} retries")
def _backoff(attempt, base=2, max_wait=60):
"""Exponential backoff with jitter."""
wait = min(base ** attempt, max_wait)
jitter = random.uniform(0, wait * 0.5)
return wait + jitter
轮询并重试
def poll_with_retry(task_id, timeout=120, max_poll_errors=3):
"""Poll for result with error retry."""
start = time.time()
consecutive_errors = 0
while time.time() - start < timeout:
time.sleep(5)
try:
resp = requests.get(f"{BASE_URL}/res.php", params={
"key": API_KEY, "action": "get",
"id": task_id, "json": 1,
}, timeout=15)
if resp.status_code in (429, 500, 502, 503):
consecutive_errors += 1
if consecutive_errors >= max_poll_errors:
raise RuntimeError("Too many poll errors")
time.sleep(_backoff(consecutive_errors))
continue
data = resp.json()
consecutive_errors = 0 # Reset on success
if data["request"] == "CAPCHA_NOT_READY":
continue
if data["request"] in PERMANENT_ERRORS:
raise RuntimeError(f"Solve error: {data['request']}")
return data["request"]
except (requests.ConnectionError, requests.Timeout):
consecutive_errors += 1
if consecutive_errors >= max_poll_errors:
raise RuntimeError("Too many poll connection errors")
time.sleep(_backoff(consecutive_errors))
raise TimeoutError(f"Task {task_id} timeout after {timeout}s")
完整的重试感知解算器
class RetrySolver:
"""Production-grade solver with comprehensive retry logic."""
def __init__(self, api_key, max_submit_retries=5, max_poll_retries=3,
poll_timeout=120):
self.api_key = api_key
self.base = "https://ocr.captchaai.com"
self.max_submit_retries = max_submit_retries
self.max_poll_retries = max_poll_retries
self.poll_timeout = poll_timeout
self.stats = {
"total": 0, "success": 0, "retry": 0,
"permanent_error": 0, "timeout": 0,
}
def solve(self, method, **params):
self.stats["total"] += 1
# Submit with retry
task_id = self._submit(method, **params)
# Poll with retry
try:
token = self._poll(task_id)
self.stats["success"] += 1
return token
except TimeoutError:
self.stats["timeout"] += 1
raise
def _submit(self, method, **params):
data = {"key": self.api_key, "method": method, "json": 1}
data.update(params)
for attempt in range(self.max_submit_retries):
try:
resp = requests.post(
f"{self.base}/in.php", data=data, timeout=30,
)
if resp.status_code in (429, 500, 502, 503):
self.stats["retry"] += 1
time.sleep(_backoff(attempt))
continue
result = resp.json()
if result.get("request") in PERMANENT_ERRORS:
self.stats["permanent_error"] += 1
raise RuntimeError(f"Permanent: {result['request']}")
if result.get("request") in TRANSIENT_ERRORS:
self.stats["retry"] += 1
time.sleep(_backoff(attempt))
continue
if result.get("status") == 1:
return result["request"]
except (requests.ConnectionError, requests.Timeout):
self.stats["retry"] += 1
time.sleep(_backoff(attempt))
raise RuntimeError("Submit failed after retries")
def _poll(self, task_id):
start = time.time()
errors = 0
while time.time() - start < self.poll_timeout:
time.sleep(5)
try:
resp = requests.get(f"{self.base}/res.php", params={
"key": self.api_key, "action": "get",
"id": task_id, "json": 1,
}, timeout=15)
if resp.status_code in (429, 500, 502, 503):
errors += 1
if errors >= self.max_poll_retries:
raise RuntimeError("Poll errors exceeded limit")
time.sleep(_backoff(errors))
continue
data = resp.json()
errors = 0
if data["request"] == "CAPCHA_NOT_READY":
continue
if data.get("status") == 1:
return data["request"]
raise RuntimeError(f"Solve error: {data['request']}")
except (requests.ConnectionError, requests.Timeout):
errors += 1
if errors >= self.max_poll_retries:
raise
raise TimeoutError("Poll timeout")
def get_stats(self):
return self.stats
# Usage
solver = RetrySolver("YOUR_API_KEY")
token = solver.solve(
"userrecaptcha",
googlekey="SITE_KEY",
pageurl="https://example.com",
)
print(solver.get_stats())
断路器模式
连续多次失败后停止发送请求:
class CircuitBreaker:
"""Stop requests when the service appears down."""
def __init__(self, failure_threshold=5, recovery_time=60):
self.failure_threshold = failure_threshold
self.recovery_time = recovery_time
self.failures = 0
self.last_failure = 0
self.state = "closed" # closed=normal, open=blocking
def can_proceed(self):
if self.state == "closed":
return True
# Check if recovery time has passed
if time.time() - self.last_failure > self.recovery_time:
self.state = "half-open"
return True
return False
def record_success(self):
self.failures = 0
self.state = "closed"
def record_failure(self):
self.failures += 1
self.last_failure = time.time()
if self.failures >= self.failure_threshold:
self.state = "open"
print(f"Circuit OPEN — pausing for {self.recovery_time}s")
# Integrate with solver
breaker = CircuitBreaker(failure_threshold=5, recovery_time=60)
def solve_with_breaker(method, **params):
if not breaker.can_proceed():
raise RuntimeError("Circuit open — API appears unavailable")
try:
token = solver.solve(method, **params)
breaker.record_success()
return token
except RuntimeError:
breaker.record_failure()
raise
故障排除
| 问题 | 原因 | 处理方式 |
|---|---|---|
| 重试永久性错误 | 不过滤错误类型 | 检查 PERMANENT_ERRORS 集 |
| 无限重试循环 | 无最大重试限制 | 始终设置 max_retries |
| 退避太慢 | 固定延误 | 使用带有抖动的指数退避 |
| 所有重试结果相同 | 根本问题不是暂时的 | 检查API密钥、余额、参数 |
常问问题
我应该重试多少次?
提交重试 3-5 次,轮询错误重试 2-3 次。重试超过 5 次几乎没有帮助——问题可能不是暂时的。
我应该重试 ERROR_CAPTCHA_UNSOLVABLE 吗?
您可以重新提交任务(新请求)。相同的任务 ID 不会给出不同的结果。
最佳的退避策略是什么?
指数退避(2^尝试秒),随机抖动为 0-50%。当许多客户端同时重试时,这可以防止雷群问题。
相关指南
建立弹性自动化——尝试CaptchaAI具有生产级可靠性。