API 教程

使用 CaptchaAI API 实现稳健的重试逻辑

网络故障、临时过载和暂时性错误都会发生。正确的重试逻辑可以让您的管道在没有手动干预的情况下运行解决这些问题。


重试哪些错误

错误 重试? 为什么
ERROR_NO_SLOT_AVAILABLE ○…是的 临时队列已满
HTTP 429 ○…是的 速率有限
HTTP 500/502/503 ○…是的 服务器临时错误
连接超时 ○…是的 网络故障
CAPCHA_NOT_READY ✓...继续投票 仍在处理中
ERROR_WRONG_USER_KEY ❒ 否 配置错误 – 修复键
ERROR_KEY_DOES_NOT_EXIST ❒ 否 密钥无效
ERROR_ZERO_BALANCE ❒ 否 先添加资金
ERROR_CAPTCHA_UNSOLVABLE 或许 使用新图像重新提交

带指数退避的基本重试

import requests
import time
import random

API_KEY = "YOUR_API_KEY"
BASE_URL = "https://ocr.captchaai.com"

# Errors that should NOT be retried
PERMANENT_ERRORS = {
    "ERROR_WRONG_USER_KEY",
    "ERROR_KEY_DOES_NOT_EXIST",
    "ERROR_ZERO_BALANCE",
    "ERROR_BAD_PARAMETERS",
    "ERROR_WRONG_CAPTCHA_ID",
}

# Errors that should be retried
TRANSIENT_ERRORS = {
    "ERROR_NO_SLOT_AVAILABLE",
    "ERROR_TOO_MUCH_REQUESTS",
}


def submit_with_retry(method, max_retries=5, **params):
    """Submit task with retry on transient errors."""
    data = {"key": API_KEY, "method": method, "json": 1}
    data.update(params)

    for attempt in range(max_retries):
        try:
            resp = requests.post(
                f"{BASE_URL}/in.php", data=data, timeout=30,
            )

            # HTTP-level errors
            if resp.status_code in (429, 500, 502, 503):
                wait = _backoff(attempt)
                print(f"HTTP {resp.status_code}, retry in {wait:.1f}s")
                time.sleep(wait)
                continue

            result = resp.json()

            # Permanent errors — don't retry
            if result.get("request") in PERMANENT_ERRORS:
                raise RuntimeError(f"Permanent error: {result['request']}")

            # Transient errors — retry
            if result.get("request") in TRANSIENT_ERRORS:
                wait = _backoff(attempt)
                print(f"{result['request']}, retry in {wait:.1f}s")
                time.sleep(wait)
                continue

            # Success
            if result.get("status") == 1:
                return result["request"]

            # Unknown error
            raise RuntimeError(f"Unknown error: {result.get('request')}")

        except requests.ConnectionError:
            wait = _backoff(attempt)
            print(f"Connection error, retry in {wait:.1f}s")
            time.sleep(wait)

        except requests.Timeout:
            wait = _backoff(attempt)
            print(f"Timeout, retry in {wait:.1f}s")
            time.sleep(wait)

    raise RuntimeError(f"Failed after {max_retries} retries")


def _backoff(attempt, base=2, max_wait=60):
    """Exponential backoff with jitter."""
    wait = min(base ** attempt, max_wait)
    jitter = random.uniform(0, wait * 0.5)
    return wait + jitter

轮询并重试

def poll_with_retry(task_id, timeout=120, max_poll_errors=3):
    """Poll for result with error retry."""
    start = time.time()
    consecutive_errors = 0

    while time.time() - start < timeout:
        time.sleep(5)

        try:
            resp = requests.get(f"{BASE_URL}/res.php", params={
                "key": API_KEY, "action": "get",
                "id": task_id, "json": 1,
            }, timeout=15)

            if resp.status_code in (429, 500, 502, 503):
                consecutive_errors += 1
                if consecutive_errors >= max_poll_errors:
                    raise RuntimeError("Too many poll errors")
                time.sleep(_backoff(consecutive_errors))
                continue

            data = resp.json()
            consecutive_errors = 0  # Reset on success

            if data["request"] == "CAPCHA_NOT_READY":
                continue

            if data["request"] in PERMANENT_ERRORS:
                raise RuntimeError(f"Solve error: {data['request']}")

            return data["request"]

        except (requests.ConnectionError, requests.Timeout):
            consecutive_errors += 1
            if consecutive_errors >= max_poll_errors:
                raise RuntimeError("Too many poll connection errors")
            time.sleep(_backoff(consecutive_errors))

    raise TimeoutError(f"Task {task_id} timeout after {timeout}s")

完整的重试感知解算器

class RetrySolver:
    """Production-grade solver with comprehensive retry logic."""

    def __init__(self, api_key, max_submit_retries=5, max_poll_retries=3,
                 poll_timeout=120):
        self.api_key = api_key
        self.base = "https://ocr.captchaai.com"
        self.max_submit_retries = max_submit_retries
        self.max_poll_retries = max_poll_retries
        self.poll_timeout = poll_timeout
        self.stats = {
            "total": 0, "success": 0, "retry": 0,
            "permanent_error": 0, "timeout": 0,
        }

    def solve(self, method, **params):
        self.stats["total"] += 1

        # Submit with retry
        task_id = self._submit(method, **params)

        # Poll with retry
        try:
            token = self._poll(task_id)
            self.stats["success"] += 1
            return token
        except TimeoutError:
            self.stats["timeout"] += 1
            raise

    def _submit(self, method, **params):
        data = {"key": self.api_key, "method": method, "json": 1}
        data.update(params)

        for attempt in range(self.max_submit_retries):
            try:
                resp = requests.post(
                    f"{self.base}/in.php", data=data, timeout=30,
                )

                if resp.status_code in (429, 500, 502, 503):
                    self.stats["retry"] += 1
                    time.sleep(_backoff(attempt))
                    continue

                result = resp.json()

                if result.get("request") in PERMANENT_ERRORS:
                    self.stats["permanent_error"] += 1
                    raise RuntimeError(f"Permanent: {result['request']}")

                if result.get("request") in TRANSIENT_ERRORS:
                    self.stats["retry"] += 1
                    time.sleep(_backoff(attempt))
                    continue

                if result.get("status") == 1:
                    return result["request"]

            except (requests.ConnectionError, requests.Timeout):
                self.stats["retry"] += 1
                time.sleep(_backoff(attempt))

        raise RuntimeError("Submit failed after retries")

    def _poll(self, task_id):
        start = time.time()
        errors = 0

        while time.time() - start < self.poll_timeout:
            time.sleep(5)
            try:
                resp = requests.get(f"{self.base}/res.php", params={
                    "key": self.api_key, "action": "get",
                    "id": task_id, "json": 1,
                }, timeout=15)

                if resp.status_code in (429, 500, 502, 503):
                    errors += 1
                    if errors >= self.max_poll_retries:
                        raise RuntimeError("Poll errors exceeded limit")
                    time.sleep(_backoff(errors))
                    continue

                data = resp.json()
                errors = 0

                if data["request"] == "CAPCHA_NOT_READY":
                    continue
                if data.get("status") == 1:
                    return data["request"]
                raise RuntimeError(f"Solve error: {data['request']}")

            except (requests.ConnectionError, requests.Timeout):
                errors += 1
                if errors >= self.max_poll_retries:
                    raise

        raise TimeoutError("Poll timeout")

    def get_stats(self):
        return self.stats


# Usage
solver = RetrySolver("YOUR_API_KEY")

token = solver.solve(
    "userrecaptcha",
    googlekey="SITE_KEY",
    pageurl="https://example.com",
)

print(solver.get_stats())

断路器模式

连续多次失败后停止发送请求:

class CircuitBreaker:
    """Stop requests when the service appears down."""

    def __init__(self, failure_threshold=5, recovery_time=60):
        self.failure_threshold = failure_threshold
        self.recovery_time = recovery_time
        self.failures = 0
        self.last_failure = 0
        self.state = "closed"  # closed=normal, open=blocking

    def can_proceed(self):
        if self.state == "closed":
            return True
        # Check if recovery time has passed
        if time.time() - self.last_failure > self.recovery_time:
            self.state = "half-open"
            return True
        return False

    def record_success(self):
        self.failures = 0
        self.state = "closed"

    def record_failure(self):
        self.failures += 1
        self.last_failure = time.time()
        if self.failures >= self.failure_threshold:
            self.state = "open"
            print(f"Circuit OPEN — pausing for {self.recovery_time}s")


# Integrate with solver
breaker = CircuitBreaker(failure_threshold=5, recovery_time=60)


def solve_with_breaker(method, **params):
    if not breaker.can_proceed():
        raise RuntimeError("Circuit open — API appears unavailable")

    try:
        token = solver.solve(method, **params)
        breaker.record_success()
        return token
    except RuntimeError:
        breaker.record_failure()
        raise

故障排除

问题 原因 处理方式
重试永久性错误 不过滤错误类型 检查 PERMANENT_ERRORS 集
无限重试循环 无最大重试限制 始终设置 max_retries
退避太慢 固定延误 使用带有抖动的指数退避
所有重试结果相同 根本问题不是暂时的 检查API密钥、余额、参数

常问问题

我应该重试多少次?

提交重试 3-5 次,轮询错误重试 2-3 次。重试超过 5 次几乎没有帮助——问题可能不是暂时的。

我应该重试 ERROR_CAPTCHA_UNSOLVABLE 吗?

您可以重新提交任务(新请求)。相同的任务 ID 不会给出不同的结果。

最佳的退避策略是什么?

指数退避(2^尝试秒),随机抖动为 0-50%。当许多客户端同时重试时,这可以防止雷群问题。


相关指南


建立弹性自动化——尝试CaptchaAI具有生产级可靠性。

该文章已禁用评论。