CaptchaAI pingback 参数允许您通过 HTTP 回调而不是轮询接收已解析的令牌。本指南涵盖了生产系统的高级通知模式。
Pingback 如何运作
1. Submit task with pingback=YOUR_CALLBACK_URL
2. CaptchaAI solves the CAPTCHA
3. CaptchaAI sends GET request to your callback:
YOUR_CALLBACK_URL?id=TASK_ID&code=TOKEN
4. Your server processes the result
模式 1:使用结果存储即发即忘
提交任务并让回调将结果存储在线程安全字典中:
import requests
import threading
import time
from flask import Flask, request
class PingbackStore:
"""Store for results received via pingback."""
def __init__(self):
self.results = {}
self.events = {}
self.lock = threading.Lock()
def register(self, task_id):
"""Register a task ID we expect results for."""
with self.lock:
self.events[task_id] = threading.Event()
def store(self, task_id, token):
"""Store result from pingback callback."""
with self.lock:
self.results[task_id] = token
if task_id in self.events:
self.events[task_id].set()
def wait(self, task_id, timeout=120):
"""Wait for a specific result."""
event = self.events.get(task_id)
if not event:
return None
event.wait(timeout=timeout)
return self.results.get(task_id)
def get(self, task_id):
"""Get result without waiting (non-blocking)."""
return self.results.get(task_id)
# Global store
store = PingbackStore()
# Flask app for receiving callbacks
app = Flask(__name__)
@app.route("/pingback")
def receive_pingback():
"""Handle CaptchaAI pingback callback."""
task_id = request.args.get("id")
code = request.args.get("code")
if not task_id or not code:
return "Bad request", 400
store.store(task_id, code)
return "OK", 200
def submit_with_pingback(api_key, method, callback_url, **params):
"""Submit a task with pingback enabled."""
data = {
"key": api_key,
"method": method,
"pingback": callback_url,
"json": 1,
}
data.update(params)
resp = requests.post(
"https://ocr.captchaai.com/in.php",
data=data,
timeout=30,
)
result = resp.json()
if result.get("status") != 1:
raise RuntimeError(f"Submit error: {result.get('request')}")
task_id = result["request"]
store.register(task_id)
return task_id
# Usage
# Start Flask server in background thread
server = threading.Thread(
target=lambda: app.run(port=8080, debug=False),
daemon=True,
)
server.start()
# Submit task
task_id = submit_with_pingback(
"YOUR_API_KEY",
"userrecaptcha",
"https://yourserver.com/pingback",
googlekey="SITE_KEY",
pageurl="https://example.com",
)
# Wait for result via pingback
token = store.wait(task_id, timeout=120)
print(f"Token: {token[:50]}...")
模式 2:多任务扇出
提交多个任务并在结果到达时收集结果:
import requests
import threading
import time
class FanOutSolver:
"""Submit many tasks, collect results via pingback."""
def __init__(self, api_key, callback_url):
self.api_key = api_key
self.callback_url = callback_url
self.store = PingbackStore()
self.pending = []
def submit(self, method, **params):
"""Submit a task and track it."""
data = {
"key": self.api_key,
"method": method,
"pingback": self.callback_url,
"json": 1,
}
data.update(params)
resp = requests.post(
"https://ocr.captchaai.com/in.php",
data=data,
timeout=30,
)
result = resp.json()
if result.get("status") != 1:
raise RuntimeError(f"Submit error: {result.get('request')}")
task_id = result["request"]
self.store.register(task_id)
self.pending.append(task_id)
return task_id
def submit_batch(self, tasks):
"""Submit multiple tasks.
tasks: list of dicts with 'method' and params
"""
task_ids = []
for task in tasks:
method = task.pop("method")
task_id = self.submit(method, **task)
task_ids.append(task_id)
time.sleep(0.1) # Avoid rate limits
return task_ids
def collect_all(self, timeout=180):
"""Wait for all pending results."""
results = {}
deadline = time.time() + timeout
for task_id in self.pending:
remaining = max(1, deadline - time.time())
token = self.store.wait(task_id, timeout=remaining)
results[task_id] = token
self.pending.clear()
return results
# Usage
solver = FanOutSolver("YOUR_API_KEY", "https://yourserver.com/pingback")
# Submit 5 tasks
tasks = [
{
"method": "userrecaptcha",
"googlekey": "SITE_KEY",
"pageurl": f"https://example.com/page{i}",
}
for i in range(5)
]
task_ids = solver.submit_batch(tasks)
print(f"Submitted {len(task_ids)} tasks")
# Wait for all results
results = solver.collect_all(timeout=180)
for tid, token in results.items():
status = "solved" if token else "failed"
print(f" {tid}: {status}")
模式 3:通知路由器
根据任务元数据将结果路由到不同的处理程序:
import threading
from collections import defaultdict
class NotificationRouter:
"""Route pingback results to registered handlers."""
def __init__(self):
self.handlers = {}
self.default_handler = None
self.task_routes = {}
self.lock = threading.Lock()
def register_handler(self, name, handler_fn):
"""Register a named handler function."""
self.handlers[name] = handler_fn
def set_default(self, handler_fn):
"""Set a default handler for unrouted tasks."""
self.default_handler = handler_fn
def route(self, task_id, handler_name):
"""Route a task ID to a specific handler."""
with self.lock:
self.task_routes[task_id] = handler_name
def dispatch(self, task_id, token):
"""Dispatch a result to the correct handler."""
handler_name = self.task_routes.get(task_id)
if handler_name and handler_name in self.handlers:
self.handlers[handler_name](task_id, token)
elif self.default_handler:
self.default_handler(task_id, token)
# Usage
router = NotificationRouter()
# Register handlers
def login_handler(task_id, token):
print(f"Login flow got token from {task_id}")
# Submit token to login form
def scraping_handler(task_id, token):
print(f"Scraping pipeline got token from {task_id}")
# Continue scraping with token
router.register_handler("login", login_handler)
router.register_handler("scraping", scraping_handler)
# When submitting
task_id = submit_with_pingback(
"YOUR_API_KEY", "userrecaptcha",
"https://yourserver.com/pingback",
googlekey="KEY", pageurl="https://example.com",
)
router.route(task_id, "login")
# In pingback handler
# router.dispatch(task_id, token)
保护您的 Pingback 端点
import hmac
import hashlib
from flask import Flask, request, abort
app = Flask(__name__)
API_KEY = "YOUR_API_KEY"
@app.route("/pingback")
def secure_pingback():
"""Validate pingback requests."""
task_id = request.args.get("id")
code = request.args.get("code")
ip = request.remote_addr
# Validate required parameters
if not task_id or not code:
abort(400)
# Validate IP (CaptchaAI server IPs)
# Add actual CaptchaAI IPs to allowlist
ALLOWED_IPS = {"0.0.0.0/0"} # Replace with real IPs
# Validate task ID format (numeric)
if not task_id.isdigit():
abort(400)
# Store result
store.store(task_id, code)
return "OK", 200
何时使用 Pingback 与 Polling
| 因素 | 回拨 | 轮询 |
|---|---|---|
| 基础设施 | 需要公共端点 | 无需服务器 |
| 延迟 | 即时通知 | 5秒轮询间隔延迟 |
| 规模 | 更适合 100+ 并发 | 适合 <50 个并发 |
| 可靠性 | 需要重试处理 | 简单的重试循环 |
| 防火墙 | 需要入站端口 | 仅限出境 |
| 复杂 | 更高的设置 | 较低的设置 |
故障排除
| 问题 | 原因 | 处理方式 |
|---|---|---|
| 没有收到回电 | 端点无法到达 | 验证服务器是公共的;检查防火墙 |
| 重复回调 | CaptchaAI 重试 | 使处理程序幂等 |
| 回调中的任务 ID 错误 | 过时的服务器状态 | 检查任务注册时间 |
| 尽管解决还是超时 | 回调地址无法访问 | 首先使用curl 测试端点 |
常问问题
我可以对所有验证码类型使用 pingback 吗?
是的。 pingback 参数适用于 reCAPTCHA、Turnstile、GeeTest、Image、BLS 和所有其他支持的方法。
如果回调到达时我的服务器已关闭,会发生什么情况?
CaptchaAI 可能会重试回调。您还应该为在超时内未收到回调的任务实现后备轮询机制。
我可以使用 localhost 进行测试吗?
不可以。回调 URL 必须可公开访问。使用 ngrok 或类似的隧道进行本地测试。
相关指南
构建事件驱动的工作流程 -获取您的 CaptchaAI 密钥现在。