API Tutorials

批量验证码求解:高效提交多个任务

当抓取数百或数千页时,一次解决一个验证码的速度很慢。并行提交多个任务,同时轮询它们,并在结果到达时对其进行处理。


顺序与并行

Sequential (slow):
  Submit #1 → Poll → Result (15s)
  Submit #2 → Poll → Result (15s)
  Submit #3 → Poll → Result (15s)
  Total: ~45s for 3 solves

Parallel (fast):
  Submit #1 ─┐
  Submit #2 ─┤→ Poll all → Results arrive
  Submit #3 ─┘
  Total: ~15s for 3 solves

基本并发求解

import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

API_KEY = "YOUR_API_KEY"
BASE_URL = "https://ocr.captchaai.com"


def submit_task(method, **params):
    """Submit a single CAPTCHA task."""
    data = {"key": API_KEY, "method": method, "json": 1}
    data.update(params)
    resp = requests.post(f"{BASE_URL}/in.php", data=data, timeout=30)
    result = resp.json()
    if result.get("status") != 1:
        raise RuntimeError(f"Submit error: {result.get('request')}")
    return result["request"]


def poll_result(task_id, timeout=120):
    """Poll until result is ready."""
    start = time.time()
    while time.time() - start < timeout:
        time.sleep(5)
        resp = requests.get(f"{BASE_URL}/res.php", params={
            "key": API_KEY, "action": "get",
            "id": task_id, "json": 1,
        }, timeout=15)
        data = resp.json()
        if data["request"] != "CAPCHA_NOT_READY":
            return data["request"]
    raise TimeoutError(f"Task {task_id} timeout")


def solve_one(sitekey, pageurl):
    """Submit and poll a single task."""
    task_id = submit_task("userrecaptcha", googlekey=sitekey, pageurl=pageurl)
    token = poll_result(task_id)
    return {"url": pageurl, "token": token}


def batch_solve(tasks, max_workers=10):
    """Solve multiple CAPTCHAs in parallel."""
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(solve_one, t["sitekey"], t["url"]): t
            for t in tasks
        }
        for future in as_completed(futures):
            task = futures[future]
            try:
                result = future.result()
                results.append(result)
                print(f"Solved: {result['url']}")
            except Exception as e:
                print(f"Failed: {task['url']} - {e}")
                results.append({"url": task["url"], "token": None, "error": str(e)})

    return results


# Usage
tasks = [
    {"sitekey": "SITE_KEY_1", "url": "https://example.com/page1"},
    {"sitekey": "SITE_KEY_2", "url": "https://example.com/page2"},
    {"sitekey": "SITE_KEY_3", "url": "https://example.com/page3"},
]

results = batch_solve(tasks, max_workers=5)
print(f"Solved {sum(1 for r in results if r.get('token'))}/{len(tasks)}")

异步批量求解器

对于更高的并发,请使用 asyncio:

import asyncio
import aiohttp
import time

API_KEY = "YOUR_API_KEY"
BASE_URL = "https://ocr.captchaai.com"


async def submit_task_async(session, method, **params):
    data = {"key": API_KEY, "method": method, "json": 1}
    data.update(params)
    async with session.post(f"{BASE_URL}/in.php", data=data) as resp:
        result = await resp.json()
        if result.get("status") != 1:
            raise RuntimeError(f"Submit error: {result.get('request')}")
        return result["request"]


async def poll_result_async(session, task_id, timeout=120):
    start = time.time()
    while time.time() - start < timeout:
        await asyncio.sleep(5)
        params = {
            "key": API_KEY, "action": "get",
            "id": task_id, "json": 1,
        }
        async with session.get(f"{BASE_URL}/res.php", params=params) as resp:
            data = await resp.json()
            if data["request"] != "CAPCHA_NOT_READY":
                return data["request"]
    raise TimeoutError(f"Task {task_id} timeout")


async def solve_one_async(session, sitekey, pageurl):
    task_id = await submit_task_async(
        session, "userrecaptcha",
        googlekey=sitekey, pageurl=pageurl,
    )
    token = await poll_result_async(session, task_id)
    return {"url": pageurl, "token": token}


async def batch_solve_async(tasks, max_concurrent=20):
    """Solve many CAPTCHAs concurrently with asyncio."""
    semaphore = asyncio.Semaphore(max_concurrent)
    results = []

    async def solve_with_limit(task):
        async with semaphore:
            try:
                result = await solve_one_async(
                    session, task["sitekey"], task["url"],
                )
                return result
            except Exception as e:
                return {"url": task["url"], "token": None, "error": str(e)}

    async with aiohttp.ClientSession() as session:
        coros = [solve_with_limit(t) for t in tasks]
        results = await asyncio.gather(*coros)

    return results


# Usage
tasks = [
    {"sitekey": "KEY", "url": f"https://example.com/page{i}"}
    for i in range(50)
]

results = asyncio.run(batch_solve_async(tasks, max_concurrent=20))
solved = sum(1 for r in results if r.get("token"))
print(f"Solved: {solved}/{len(tasks)}")

先提交后投票模式

为了获得最大吞吐量,请将提交与轮询分开:

import requests
import time

API_KEY = "YOUR_API_KEY"
BASE_URL = "https://ocr.captchaai.com"


def batch_submit(tasks):
    """Submit all tasks first, return task IDs."""
    submitted = []
    for task in tasks:
        try:
            data = {
                "key": API_KEY,
                "method": "userrecaptcha",
                "googlekey": task["sitekey"],
                "pageurl": task["url"],
                "json": 1,
            }
            resp = requests.post(f"{BASE_URL}/in.php", data=data, timeout=30)
            result = resp.json()
            if result.get("status") == 1:
                submitted.append({
                    "task_id": result["request"],
                    "url": task["url"],
                })
            time.sleep(0.1)  # Brief delay between submits
        except Exception as e:
            print(f"Submit failed for {task['url']}: {e}")
    return submitted


def batch_poll(submitted, timeout=120):
    """Poll all submitted tasks until complete."""
    pending = {s["task_id"]: s for s in submitted}
    results = []
    start = time.time()

    while pending and time.time() - start < timeout:
        time.sleep(5)
        for task_id in list(pending.keys()):
            try:
                resp = requests.get(f"{BASE_URL}/res.php", params={
                    "key": API_KEY, "action": "get",
                    "id": task_id, "json": 1,
                }, timeout=15)
                data = resp.json()
                if data["request"] != "CAPCHA_NOT_READY":
                    info = pending.pop(task_id)
                    results.append({
                        "url": info["url"],
                        "token": data["request"],
                    })
            except Exception:
                pass

    # Mark remaining as failed
    for task_id, info in pending.items():
        results.append({"url": info["url"], "token": None, "error": "timeout"})

    return results


# Usage
tasks = [
    {"sitekey": "KEY", "url": f"https://example.com/page{i}"}
    for i in range(20)
]

submitted = batch_submit(tasks)
print(f"Submitted {len(submitted)} tasks")

results = batch_poll(submitted)
solved = sum(1 for r in results if r.get("token"))
print(f"Solved: {solved}/{len(tasks)}")

吞吐量指南

并发任务 大约速度 最适合
1-5 3-5 解决/min 测试、轻刮
5-20 15-60 解决/min 生产刮削
20-50 60-150解决/min 大容量管道
50-100 150-300解决/min 企业规模

故障排除

问题 原因 处理方式
速率限制 (429) 提交次数过多/second 在提交之间添加 100 毫秒的延迟
多次超时 轮询超时时间太短 增加到120-180秒
超过 50 个并发时收益递减 网络瓶颈 使用异步 (aiohttp) 代替线程
结果好坏参半 任务ID跟踪问题 使用以task_id为键的字典

常问问题

我可以一次提交多少个任务?

并发任务没有硬性限制。从 10-20 开始,然后根据您的成功率和速度需求增加。

我应该使用线程还是异步?

对于中小批量(最多 50 个),线程 (ThreadPoolExecutor) 更简单。对于 100+ 并发任务,async (aiohttp) 效率更高。

批量求解成本更高吗?

不需要。无论是单独提交还是批量提交,每个任务的成本都是相同的。批处理只是节省时间。


相关指南


扩展您的验证码解决方案 -尝试CaptchaAI用于高通量批处理。

该文章已禁用评论。