API Tutorials

CaptchaAI Pingback 和任务通知模式

CaptchaAI pingback 参数允许您通过 HTTP 回调而不是轮询接收已解析的令牌。本指南涵盖了生产系统的高级通知模式。


Pingback 如何运作


1. Submit task with pingback=YOUR_CALLBACK_URL
2. CaptchaAI solves the CAPTCHA
3. CaptchaAI sends GET request to your callback:
   YOUR_CALLBACK_URL?id=TASK_ID&code=TOKEN

4. Your server processes the result

模式 1:使用结果存储即发即忘

提交任务并让回调将结果存储在线程安全字典中:

import requests
import threading
import time
from flask import Flask, request


class PingbackStore:
    """Store for results received via pingback."""

    def __init__(self):
        self.results = {}
        self.events = {}
        self.lock = threading.Lock()

    def register(self, task_id):
        """Register a task ID we expect results for."""
        with self.lock:
            self.events[task_id] = threading.Event()

    def store(self, task_id, token):
        """Store result from pingback callback."""
        with self.lock:
            self.results[task_id] = token
            if task_id in self.events:
                self.events[task_id].set()

    def wait(self, task_id, timeout=120):
        """Wait for a specific result."""
        event = self.events.get(task_id)
        if not event:
            return None
        event.wait(timeout=timeout)
        return self.results.get(task_id)

    def get(self, task_id):
        """Get result without waiting (non-blocking)."""
        return self.results.get(task_id)


# Global store
store = PingbackStore()

# Flask app for receiving callbacks
app = Flask(__name__)


@app.route("/pingback")
def receive_pingback():
    """Handle CaptchaAI pingback callback."""
    task_id = request.args.get("id")
    code = request.args.get("code")

    if not task_id or not code:
        return "Bad request", 400

    store.store(task_id, code)
    return "OK", 200


def submit_with_pingback(api_key, method, callback_url, **params):
    """Submit a task with pingback enabled."""
    data = {
        "key": api_key,
        "method": method,
        "pingback": callback_url,
        "json": 1,
    }
    data.update(params)

    resp = requests.post(
        "https://ocr.captchaai.com/in.php",
        data=data,
        timeout=30,
    )
    result = resp.json()

    if result.get("status") != 1:
        raise RuntimeError(f"Submit error: {result.get('request')}")

    task_id = result["request"]
    store.register(task_id)
    return task_id


# Usage
# Start Flask server in background thread
server = threading.Thread(
    target=lambda: app.run(port=8080, debug=False),
    daemon=True,
)
server.start()

# Submit task
task_id = submit_with_pingback(
    "YOUR_API_KEY",
    "userrecaptcha",
    "https://yourserver.com/pingback",
    googlekey="SITE_KEY",
    pageurl="https://example.com",
)

# Wait for result via pingback
token = store.wait(task_id, timeout=120)
print(f"Token: {token[:50]}...")

模式 2:多任务扇出

提交多个任务并在结果到达时收集结果:

import requests
import threading
import time


class FanOutSolver:
    """Submit many tasks, collect results via pingback."""

    def __init__(self, api_key, callback_url):
        self.api_key = api_key
        self.callback_url = callback_url
        self.store = PingbackStore()
        self.pending = []

    def submit(self, method, **params):
        """Submit a task and track it."""
        data = {
            "key": self.api_key,
            "method": method,
            "pingback": self.callback_url,
            "json": 1,
        }
        data.update(params)

        resp = requests.post(
            "https://ocr.captchaai.com/in.php",
            data=data,
            timeout=30,
        )
        result = resp.json()

        if result.get("status") != 1:
            raise RuntimeError(f"Submit error: {result.get('request')}")

        task_id = result["request"]
        self.store.register(task_id)
        self.pending.append(task_id)
        return task_id

    def submit_batch(self, tasks):
        """Submit multiple tasks.

        tasks: list of dicts with 'method' and params
        """
        task_ids = []
        for task in tasks:
            method = task.pop("method")
            task_id = self.submit(method, **task)
            task_ids.append(task_id)
            time.sleep(0.1)  # Avoid rate limits
        return task_ids

    def collect_all(self, timeout=180):
        """Wait for all pending results."""
        results = {}
        deadline = time.time() + timeout

        for task_id in self.pending:
            remaining = max(1, deadline - time.time())
            token = self.store.wait(task_id, timeout=remaining)
            results[task_id] = token

        self.pending.clear()
        return results


# Usage
solver = FanOutSolver("YOUR_API_KEY", "https://yourserver.com/pingback")

# Submit 5 tasks
tasks = [
    {
        "method": "userrecaptcha",
        "googlekey": "SITE_KEY",
        "pageurl": f"https://example.com/page{i}",
    }
    for i in range(5)
]

task_ids = solver.submit_batch(tasks)
print(f"Submitted {len(task_ids)} tasks")

# Wait for all results
results = solver.collect_all(timeout=180)
for tid, token in results.items():
    status = "solved" if token else "failed"
    print(f"  {tid}: {status}")

模式 3:通知路由器

根据任务元数据将结果路由到不同的处理程序:

import threading
from collections import defaultdict


class NotificationRouter:
    """Route pingback results to registered handlers."""

    def __init__(self):
        self.handlers = {}
        self.default_handler = None
        self.task_routes = {}
        self.lock = threading.Lock()

    def register_handler(self, name, handler_fn):
        """Register a named handler function."""
        self.handlers[name] = handler_fn

    def set_default(self, handler_fn):
        """Set a default handler for unrouted tasks."""
        self.default_handler = handler_fn

    def route(self, task_id, handler_name):
        """Route a task ID to a specific handler."""
        with self.lock:
            self.task_routes[task_id] = handler_name

    def dispatch(self, task_id, token):
        """Dispatch a result to the correct handler."""
        handler_name = self.task_routes.get(task_id)

        if handler_name and handler_name in self.handlers:
            self.handlers[handler_name](task_id, token)
        elif self.default_handler:
            self.default_handler(task_id, token)


# Usage
router = NotificationRouter()

# Register handlers
def login_handler(task_id, token):
    print(f"Login flow got token from {task_id}")
    # Submit token to login form

def scraping_handler(task_id, token):
    print(f"Scraping pipeline got token from {task_id}")
    # Continue scraping with token

router.register_handler("login", login_handler)
router.register_handler("scraping", scraping_handler)

# When submitting
task_id = submit_with_pingback(
    "YOUR_API_KEY", "userrecaptcha",
    "https://yourserver.com/pingback",
    googlekey="KEY", pageurl="https://example.com",
)
router.route(task_id, "login")

# In pingback handler
# router.dispatch(task_id, token)

保护您的 Pingback 端点

import hmac
import hashlib
from flask import Flask, request, abort

app = Flask(__name__)
API_KEY = "YOUR_API_KEY"


@app.route("/pingback")
def secure_pingback():
    """Validate pingback requests."""
    task_id = request.args.get("id")
    code = request.args.get("code")
    ip = request.remote_addr

    # Validate required parameters
    if not task_id or not code:
        abort(400)

    # Validate IP (CaptchaAI server IPs)
    # Add actual CaptchaAI IPs to allowlist
    ALLOWED_IPS = {"0.0.0.0/0"}  # Replace with real IPs

    # Validate task ID format (numeric)
    if not task_id.isdigit():
        abort(400)

    # Store result
    store.store(task_id, code)
    return "OK", 200

何时使用 Pingback 与 Polling

因素 回拨 轮询
基础设施 需要公共端点 无需服务器
延迟 即时通知 5秒轮询间隔延迟
规模 更适合 100+ 并发 适合 <50 个并发
可靠性 需要重试处理 简单的重试循环
防火墙 需要入站端口 仅限出境
复杂 更高的设置 较低的设置

故障排除

问题 原因 处理方式
没有收到回电 端点无法到达 验证服务器是公共的;检查防火墙
重复回调 CaptchaAI 重试 使处理程序幂等
回调中的任务 ID 错误 过时的服务器状态 检查任务注册时间
尽管解决还是超时 回调地址无法访问 首先使用curl 测试端点

常问问题

我可以对所有验证码类型使用 pingback 吗?

是的。 pingback 参数适用于 reCAPTCHA、Turnstile、GeeTest、Image、BLS 和所有其他支持的方法。

如果回调到达时我的服务器已关闭,会发生什么情况?

CaptchaAI 可能会重试回调。您还应该为在超时内未收到回调的任务实现后备轮询机制。

我可以使用 localhost 进行测试吗?

不可以。回调 URL 必须可公开访问。使用 ngrok 或类似的隧道进行本地测试。


相关指南


构建事件驱动的工作流程 -获取您的 CaptchaAI 密钥现在。

该文章已禁用评论。