当抓取数百或数千页时,一次解决一个验证码的速度很慢。并行提交多个任务,同时轮询它们,并在结果到达时对其进行处理。
顺序与并行
Sequential (slow):
Submit #1 → Poll → Result (15s)
Submit #2 → Poll → Result (15s)
Submit #3 → Poll → Result (15s)
Total: ~45s for 3 solves
Parallel (fast):
Submit #1 ─┐
Submit #2 ─┤→ Poll all → Results arrive
Submit #3 ─┘
Total: ~15s for 3 solves
基本并发求解
import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://ocr.captchaai.com"
def submit_task(method, **params):
"""Submit a single CAPTCHA task."""
data = {"key": API_KEY, "method": method, "json": 1}
data.update(params)
resp = requests.post(f"{BASE_URL}/in.php", data=data, timeout=30)
result = resp.json()
if result.get("status") != 1:
raise RuntimeError(f"Submit error: {result.get('request')}")
return result["request"]
def poll_result(task_id, timeout=120):
"""Poll until result is ready."""
start = time.time()
while time.time() - start < timeout:
time.sleep(5)
resp = requests.get(f"{BASE_URL}/res.php", params={
"key": API_KEY, "action": "get",
"id": task_id, "json": 1,
}, timeout=15)
data = resp.json()
if data["request"] != "CAPCHA_NOT_READY":
return data["request"]
raise TimeoutError(f"Task {task_id} timeout")
def solve_one(sitekey, pageurl):
"""Submit and poll a single task."""
task_id = submit_task("userrecaptcha", googlekey=sitekey, pageurl=pageurl)
token = poll_result(task_id)
return {"url": pageurl, "token": token}
def batch_solve(tasks, max_workers=10):
"""Solve multiple CAPTCHAs in parallel."""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(solve_one, t["sitekey"], t["url"]): t
for t in tasks
}
for future in as_completed(futures):
task = futures[future]
try:
result = future.result()
results.append(result)
print(f"Solved: {result['url']}")
except Exception as e:
print(f"Failed: {task['url']} - {e}")
results.append({"url": task["url"], "token": None, "error": str(e)})
return results
# Usage
tasks = [
{"sitekey": "SITE_KEY_1", "url": "https://example.com/page1"},
{"sitekey": "SITE_KEY_2", "url": "https://example.com/page2"},
{"sitekey": "SITE_KEY_3", "url": "https://example.com/page3"},
]
results = batch_solve(tasks, max_workers=5)
print(f"Solved {sum(1 for r in results if r.get('token'))}/{len(tasks)}")
异步批量求解器
对于更高的并发,请使用 asyncio:
import asyncio
import aiohttp
import time
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://ocr.captchaai.com"
async def submit_task_async(session, method, **params):
data = {"key": API_KEY, "method": method, "json": 1}
data.update(params)
async with session.post(f"{BASE_URL}/in.php", data=data) as resp:
result = await resp.json()
if result.get("status") != 1:
raise RuntimeError(f"Submit error: {result.get('request')}")
return result["request"]
async def poll_result_async(session, task_id, timeout=120):
start = time.time()
while time.time() - start < timeout:
await asyncio.sleep(5)
params = {
"key": API_KEY, "action": "get",
"id": task_id, "json": 1,
}
async with session.get(f"{BASE_URL}/res.php", params=params) as resp:
data = await resp.json()
if data["request"] != "CAPCHA_NOT_READY":
return data["request"]
raise TimeoutError(f"Task {task_id} timeout")
async def solve_one_async(session, sitekey, pageurl):
task_id = await submit_task_async(
session, "userrecaptcha",
googlekey=sitekey, pageurl=pageurl,
)
token = await poll_result_async(session, task_id)
return {"url": pageurl, "token": token}
async def batch_solve_async(tasks, max_concurrent=20):
"""Solve many CAPTCHAs concurrently with asyncio."""
semaphore = asyncio.Semaphore(max_concurrent)
results = []
async def solve_with_limit(task):
async with semaphore:
try:
result = await solve_one_async(
session, task["sitekey"], task["url"],
)
return result
except Exception as e:
return {"url": task["url"], "token": None, "error": str(e)}
async with aiohttp.ClientSession() as session:
coros = [solve_with_limit(t) for t in tasks]
results = await asyncio.gather(*coros)
return results
# Usage
tasks = [
{"sitekey": "KEY", "url": f"https://example.com/page{i}"}
for i in range(50)
]
results = asyncio.run(batch_solve_async(tasks, max_concurrent=20))
solved = sum(1 for r in results if r.get("token"))
print(f"Solved: {solved}/{len(tasks)}")
先提交后投票模式
为了获得最大吞吐量,请将提交与轮询分开:
import requests
import time
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://ocr.captchaai.com"
def batch_submit(tasks):
"""Submit all tasks first, return task IDs."""
submitted = []
for task in tasks:
try:
data = {
"key": API_KEY,
"method": "userrecaptcha",
"googlekey": task["sitekey"],
"pageurl": task["url"],
"json": 1,
}
resp = requests.post(f"{BASE_URL}/in.php", data=data, timeout=30)
result = resp.json()
if result.get("status") == 1:
submitted.append({
"task_id": result["request"],
"url": task["url"],
})
time.sleep(0.1) # Brief delay between submits
except Exception as e:
print(f"Submit failed for {task['url']}: {e}")
return submitted
def batch_poll(submitted, timeout=120):
"""Poll all submitted tasks until complete."""
pending = {s["task_id"]: s for s in submitted}
results = []
start = time.time()
while pending and time.time() - start < timeout:
time.sleep(5)
for task_id in list(pending.keys()):
try:
resp = requests.get(f"{BASE_URL}/res.php", params={
"key": API_KEY, "action": "get",
"id": task_id, "json": 1,
}, timeout=15)
data = resp.json()
if data["request"] != "CAPCHA_NOT_READY":
info = pending.pop(task_id)
results.append({
"url": info["url"],
"token": data["request"],
})
except Exception:
pass
# Mark remaining as failed
for task_id, info in pending.items():
results.append({"url": info["url"], "token": None, "error": "timeout"})
return results
# Usage
tasks = [
{"sitekey": "KEY", "url": f"https://example.com/page{i}"}
for i in range(20)
]
submitted = batch_submit(tasks)
print(f"Submitted {len(submitted)} tasks")
results = batch_poll(submitted)
solved = sum(1 for r in results if r.get("token"))
print(f"Solved: {solved}/{len(tasks)}")
吞吐量指南
| 并发任务 | 大约速度 | 最适合 |
|---|---|---|
| 1-5 | 3-5 解决/min | 测试、轻刮 |
| 5-20 | 15-60 解决/min | 生产刮削 |
| 20-50 | 60-150解决/min | 大容量管道 |
| 50-100 | 150-300解决/min | 企业规模 |
故障排除
| 问题 | 原因 | 处理方式 |
|---|---|---|
| 速率限制 (429) | 提交次数过多/second | 在提交之间添加 100 毫秒的延迟 |
| 多次超时 | 轮询超时时间太短 | 增加到120-180秒 |
| 超过 50 个并发时收益递减 | 网络瓶颈 | 使用异步 (aiohttp) 代替线程 |
| 结果好坏参半 | 任务ID跟踪问题 | 使用以task_id为键的字典 |
常问问题
我可以一次提交多少个任务?
并发任务没有硬性限制。从 10-20 开始,然后根据您的成功率和速度需求增加。
我应该使用线程还是异步?
对于中小批量(最多 50 个),线程 (ThreadPoolExecutor) 更简单。对于 100+ 并发任务,async (aiohttp) 效率更高。
批量求解成本更高吗?
不需要。无论是单独提交还是批量提交,每个任务的成本都是相同的。批处理只是节省时间。
相关指南
- 速率限制和 429 响应
- 重试逻辑实现
扩展您的验证码解决方案 -尝试CaptchaAI用于高通量批处理。