当大规模抓取时,您需要的不仅仅是一次一个验证码解决。队列系统将验证码提交与结果检索分离,从而实现数百个任务的并行解决。
为什么使用队列?
单请求验证码解决会浪费等待时间。队列系统:
- 立即提交所有验证码
- 并行轮询多个任务 ID
- 重试失败自动解决
- 控制并发以遵守 API 速率限制
- 提供进度跟踪和回调
基本线程队列
import time
import threading
import requests
from queue import Queue, Empty
API_KEY = "YOUR_API_KEY"
class CaptchaQueue:
"""Thread-based CAPTCHA solving queue."""
def __init__(self, api_key, max_workers=10):
self.api_key = api_key
self.task_queue = Queue()
self.result_queue = Queue()
self.max_workers = max_workers
self.workers = []
def submit(self, method, callback=None, **params):
"""Add a CAPTCHA task to the queue."""
task = {
"method": method,
"params": params,
"callback": callback,
}
self.task_queue.put(task)
def start(self):
"""Start worker threads."""
for _ in range(self.max_workers):
t = threading.Thread(target=self._worker, daemon=True)
t.start()
self.workers.append(t)
def wait(self):
"""Wait for all tasks to complete."""
self.task_queue.join()
def get_results(self):
"""Get all available results."""
results = []
while not self.result_queue.empty():
try:
results.append(self.result_queue.get_nowait())
except Empty:
break
return results
def _worker(self):
while True:
try:
task = self.task_queue.get(timeout=1)
except Empty:
continue
try:
result = self._solve(task["method"], **task["params"])
entry = {"status": "solved", "result": result, "task": task}
self.result_queue.put(entry)
if task["callback"]:
task["callback"](result)
except Exception as e:
entry = {"status": "error", "error": str(e), "task": task}
self.result_queue.put(entry)
finally:
self.task_queue.task_done()
def _solve(self, method, **params):
submit = requests.post("https://ocr.captchaai.com/in.php", data={
"key": self.api_key, "method": method, "json": 1, **params,
}, timeout=30).json()
if submit.get("status") != 1:
raise Exception(f"Submit error: {submit.get('request')}")
task_id = submit["request"]
for _ in range(30):
time.sleep(5)
result = requests.get("https://ocr.captchaai.com/res.php", params={
"key": self.api_key, "action": "get", "id": task_id, "json": 1,
}, timeout=30).json()
if result.get("status") == 1:
return result["request"]
if result.get("request") == "ERROR_CAPTCHA_UNSOLVABLE":
raise Exception("CAPTCHA unsolvable")
raise TimeoutError("Solve timed out")
# Usage
queue = CaptchaQueue(API_KEY, max_workers=5)
queue.start()
# Submit multiple CAPTCHAs
urls_and_sitekeys = [
("https://example.com/page1", "SITEKEY_1"),
("https://example.com/page2", "SITEKEY_2"),
("https://example.com/page3", "SITEKEY_3"),
]
for url, sitekey in urls_and_sitekeys:
queue.submit("userrecaptcha", googlekey=sitekey, pageurl=url)
queue.wait()
results = queue.get_results()
print(f"Solved {len(results)} CAPTCHAs")
for r in results:
print(f" {r['status']}: {r.get('result', r.get('error', ''))[:50]}")
使用 asyncio 的异步队列
import asyncio
import aiohttp
API_KEY = "YOUR_API_KEY"
class AsyncCaptchaQueue:
"""Async CAPTCHA solving queue with concurrency control."""
def __init__(self, api_key, max_concurrent=10):
self.api_key = api_key
self.semaphore = asyncio.Semaphore(max_concurrent)
self.results = []
async def solve_batch(self, tasks):
"""Solve a batch of CAPTCHA tasks concurrently."""
coros = [self._solve_task(task) for task in tasks]
self.results = await asyncio.gather(*coros, return_exceptions=True)
return self.results
async def _solve_task(self, task):
async with self.semaphore:
return await self._solve(task["method"], **task["params"])
async def _solve(self, method, **params):
async with aiohttp.ClientSession() as session:
# Submit
async with session.post("https://ocr.captchaai.com/in.php", data={
"key": self.api_key, "method": method, "json": 1, **params,
}) as resp:
data = await resp.json(content_type=None)
if data.get("status") != 1:
raise Exception(f"Submit error: {data.get('request')}")
task_id = data["request"]
# Poll
for _ in range(30):
await asyncio.sleep(5)
async with session.get("https://ocr.captchaai.com/res.php", params={
"key": self.api_key, "action": "get", "id": task_id, "json": 1,
}) as resp:
result = await resp.json(content_type=None)
if result.get("status") == 1:
return result["request"]
if result.get("request") == "ERROR_CAPTCHA_UNSOLVABLE":
raise Exception("CAPTCHA unsolvable")
raise TimeoutError("Solve timed out")
# Usage
async def main():
queue = AsyncCaptchaQueue(API_KEY, max_concurrent=5)
tasks = [
{"method": "userrecaptcha", "params": {"googlekey": f"SITEKEY_{i}", "pageurl": f"https://example.com/page{i}"}}
for i in range(10)
]
results = await queue.solve_batch(tasks)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i}: ERROR — {result}")
else:
print(f"Task {i}: {result[:50]}...")
asyncio.run(main())
生产者-消费者模式
对于动态发现页面的连续抓取工作负载:
import asyncio
import aiohttp
API_KEY = "YOUR_API_KEY"
class ProducerConsumerQueue:
"""Continuous CAPTCHA solving with producer-consumer pattern."""
def __init__(self, api_key, queue_size=100, num_consumers=5):
self.api_key = api_key
self.queue = asyncio.Queue(maxsize=queue_size)
self.num_consumers = num_consumers
self.solved_count = 0
self.error_count = 0
self.running = True
async def produce(self, tasks):
"""Producer: feed CAPTCHA tasks into the queue."""
for task in tasks:
await self.queue.put(task)
# Signal consumers to stop
for _ in range(self.num_consumers):
await self.queue.put(None)
async def consume(self, result_handler):
"""Consumer: solve CAPTCHAs and call result handler."""
async with aiohttp.ClientSession() as session:
while True:
task = await self.queue.get()
if task is None:
self.queue.task_done()
break
try:
result = await self._solve(session, task["method"], **task["params"])
self.solved_count += 1
if result_handler:
await result_handler(task, result)
except Exception as e:
self.error_count += 1
print(f"Error: {e}")
finally:
self.queue.task_done()
async def run(self, tasks, result_handler=None):
"""Run the producer-consumer pipeline."""
# Start producer
producer = asyncio.create_task(self.produce(tasks))
# Start consumers
consumers = [
asyncio.create_task(self.consume(result_handler))
for _ in range(self.num_consumers)
]
# Wait for everything to finish
await producer
await asyncio.gather(*consumers)
print(f"Complete: {self.solved_count} solved, {self.error_count} errors")
async def _solve(self, session, method, **params):
async with session.post("https://ocr.captchaai.com/in.php", data={
"key": self.api_key, "method": method, "json": 1, **params,
}) as resp:
data = await resp.json(content_type=None)
if data.get("status") != 1:
raise Exception(f"Submit: {data.get('request')}")
task_id = data["request"]
for _ in range(30):
await asyncio.sleep(5)
async with session.get("https://ocr.captchaai.com/res.php", params={
"key": self.api_key, "action": "get", "id": task_id, "json": 1,
}) as resp:
result = await resp.json(content_type=None)
if result.get("status") == 1:
return result["request"]
raise TimeoutError("Timed out")
# Usage
async def handle_result(task, token):
url = task["params"]["pageurl"]
print(f"Solved for {url}: {token[:30]}...")
async def main():
queue = ProducerConsumerQueue(API_KEY, num_consumers=5)
tasks = [
{"method": "userrecaptcha", "params": {"googlekey": f"SITEKEY_{i}", "pageurl": f"https://example.com/page{i}"}}
for i in range(20)
]
await queue.run(tasks, result_handler=handle_result)
asyncio.run(main())
优先队列
当某些验证码比其他验证码更重要时:
import asyncio
from dataclasses import dataclass, field
API_KEY = "YOUR_API_KEY"
@dataclass(order=True)
class PriorityTask:
priority: int
task: dict = field(compare=False)
class PriorityCaptchaQueue:
"""CAPTCHA queue with priority levels."""
def __init__(self, api_key, num_workers=5):
self.api_key = api_key
self.queue = asyncio.PriorityQueue()
self.num_workers = num_workers
self.results = {}
async def submit(self, task_id, method, priority=5, **params):
"""Submit with priority (lower number = higher priority)."""
await self.queue.put(PriorityTask(
priority=priority,
task={"id": task_id, "method": method, "params": params},
))
async def process(self):
"""Process all queued tasks by priority."""
workers = [asyncio.create_task(self._worker()) for _ in range(self.num_workers)]
# Wait for queue to drain
await self.queue.join()
# Cancel workers
for w in workers:
w.cancel()
return self.results
async def _worker(self):
import aiohttp
async with aiohttp.ClientSession() as session:
while True:
item = await self.queue.get()
task = item.task
try:
result = await self._solve(session, task["method"], **task["params"])
self.results[task["id"]] = {"status": "solved", "token": result}
except Exception as e:
self.results[task["id"]] = {"status": "error", "error": str(e)}
finally:
self.queue.task_done()
async def _solve(self, session, method, **params):
import aiohttp
async with session.post("https://ocr.captchaai.com/in.php", data={
"key": self.api_key, "method": method, "json": 1, **params,
}) as resp:
data = await resp.json(content_type=None)
if data.get("status") != 1:
raise Exception(data.get("request"))
task_id = data["request"]
for _ in range(30):
await asyncio.sleep(5)
async with session.get("https://ocr.captchaai.com/res.php", params={
"key": self.api_key, "action": "get", "id": task_id, "json": 1,
}) as resp:
result = await resp.json(content_type=None)
if result.get("status") == 1:
return result["request"]
raise TimeoutError()
# Usage
async def main():
pq = PriorityCaptchaQueue(API_KEY, num_workers=3)
# High priority — checkout pages
await pq.submit("checkout_1", "turnstile", priority=1, sitekey="KEY", pageurl="https://shop.com/checkout")
# Normal priority — product pages
for i in range(5):
await pq.submit(f"product_{i}", "userrecaptcha", priority=5, googlekey="KEY", pageurl=f"https://shop.com/p/{i}")
# Low priority — info pages
for i in range(3):
await pq.submit(f"info_{i}", "userrecaptcha", priority=10, googlekey="KEY", pageurl=f"https://shop.com/info/{i}")
results = await pq.process()
for task_id, result in results.items():
print(f"{task_id}: {result['status']}")
asyncio.run(main())
监控和报告
import time
from dataclasses import dataclass, field
@dataclass
class QueueMetrics:
submitted: int = 0
solved: int = 0
failed: int = 0
total_solve_time: float = 0.0
start_time: float = field(default_factory=time.time)
@property
def avg_solve_time(self):
return self.total_solve_time / self.solved if self.solved else 0
@property
def success_rate(self):
total = self.solved + self.failed
return (self.solved / total * 100) if total else 0
@property
def throughput(self):
elapsed = time.time() - self.start_time
return self.solved / elapsed * 60 if elapsed > 0 else 0
def report(self):
return (
f"Submitted: {self.submitted} | "
f"Solved: {self.solved} | "
f"Failed: {self.failed} | "
f"Avg time: {self.avg_solve_time:.1f}s | "
f"Success: {self.success_rate:.1f}% | "
f"Throughput: {self.throughput:.0f}/min"
)
故障排除
| 症状 | 原因 | 处理方式 |
|---|---|---|
| 队列增长但任务未完成 | 太多的工作人员压垮了 API | 减少 max_workers / max_concurrent |
ERROR_NO_SLOT_AVAILABLE |
API并发限制达到 | 在提交之间添加延迟 |
| 任务卡在队列中 | 工作线程因异常而死亡 | 将工作循环包装在 try/except 中 |
| 记忆随着时间的推移而增长 | 结果未消耗 | 定期调用get_results() |
| 异步队列块 | 缺少 await |
确保等待所有异步调用 |
常见问题
我可以运行多少个并发求解?
CaptchaAI 处理服务器端的并发。从 10 个并发工作人员开始,然后根据您的计划限制增加。检查 ERROR_NO_SLOT_AVAILABLE 以了解何时限制。
我应该使用线程还是异步?
将 asyncio 用于新项目 - 它可以更有效地处理 I/O-bound CAPTCHA 解决方案。如果集成到现有同步代码中,请使用线程。
如何处理 API 速率限制?
使用信号量(异步)或有界队列(线程)来限制并发请求。如果您点击 ERROR_NO_SLOT_AVAILABLE,请在提交之间添加短暂的延迟。
概括
验证码解决队列将提交与轮询分离,从而实现并行解决CaptchaAI。为同步代码选择线程,为现代 Python 选择异步,为连续工作负载选择生产者-消费者。
相关文章
- Python 剧作家 Captchaai 实用指南
- 构建客户端验证码管道 Captchaai
- 构建负责任的自动化 Captchaai