当验证码解决量达到每小时数千个任务时,您需要的不仅仅是一个简单的队列。 Apache Kafka 提供持久、有序、高吞吐量的消息流,非常适合将验证码任务提交与大规模结果处理解耦。
建筑学
[Scrapers] → Produce → [Kafka: captcha-tasks topic]
↓
[CAPTCHA Worker Group]
(consume tasks, solve via CaptchaAI)
↓
Produce → [Kafka: captcha-results topic]
↓
[Result Consumer Group]
(process solutions, update database)
两个 Kafka 主题分别关注:
captcha-tasks– 等待解决的验证码参数captcha-results– 已解决的代币可供下游使用
先决条件
# Python
pip install kafka-python requests
# Node.js
npm install kafkajs axios
在 localhost:9092(或您的集群地址)上运行的 Kafka 代理。
第1步:创建主题
kafka-topics.sh --create --topic captcha-tasks \
--partitions 6 --replication-factor 1 \
--bootstrap-server localhost:9092
kafka-topics.sh --create --topic captcha-results \
--partitions 6 --replication-factor 1 \
--bootstrap-server localhost:9092
六个分区允许每组最多六个并行消费者。
第2步:任务生成器(Scraper端)
Python
import json
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=["localhost:9092"],
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
key_serializer=lambda k: k.encode("utf-8") if k else None,
acks="all", # Wait for all replicas to confirm
retries=3
)
def enqueue_captcha(task_id, sitekey, pageurl, captcha_type="userrecaptcha"):
"""Send a CAPTCHA task to Kafka."""
task = {
"task_id": task_id,
"method": captcha_type,
"sitekey": sitekey,
"pageurl": pageurl,
"submitted_at": __import__("time").time()
}
future = producer.send(
"captcha-tasks",
key=task_id, # Key ensures same task goes to same partition
value=task
)
future.get(timeout=10) # Block until confirmed
return task_id
# Submit tasks
enqueue_captcha("task_001", "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-", "https://example.com")
enqueue_captcha("task_002", "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-", "https://example.com")
producer.flush()
JavaScript
const { Kafka } = require("kafkajs");
const kafka = new Kafka({
clientId: "captcha-producer",
brokers: ["localhost:9092"],
});
const producer = kafka.producer();
async function enqueueCaptcha(taskId, sitekey, pageurl) {
await producer.connect();
const task = {
task_id: taskId,
method: "userrecaptcha",
sitekey: sitekey,
pageurl: pageurl,
submitted_at: Date.now(),
};
await producer.send({
topic: "captcha-tasks",
messages: [{ key: taskId, value: JSON.stringify(task) }],
});
}
(async () => {
await enqueueCaptcha(
"task_001",
"6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-",
"https://example.com"
);
await producer.disconnect();
})();
第3步:CAPTCHA Worker(消费者+求解器)
Python
import json
import os
import time
import requests
from kafka import KafkaConsumer, KafkaProducer
API_KEY = os.environ["CAPTCHAAI_API_KEY"]
consumer = KafkaConsumer(
"captcha-tasks",
bootstrap_servers=["localhost:9092"],
group_id="captcha-workers",
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
auto_offset_reset="earliest",
enable_auto_commit=False, # Manual commit after processing
max_poll_records=10
)
result_producer = KafkaProducer(
bootstrap_servers=["localhost:9092"],
value_serializer=lambda v: json.dumps(v).encode("utf-8")
)
def solve_captcha(task):
"""Submit to CaptchaAI and poll for result."""
# Submit
resp = requests.post("https://ocr.captchaai.com/in.php", data={
"key": API_KEY,
"method": task["method"],
"googlekey": task["sitekey"],
"pageurl": task["pageurl"],
"json": 1
})
data = resp.json()
if data.get("status") != 1:
return {"error": data.get("request")}
captcha_id = data["request"]
# Poll for result
for _ in range(60):
time.sleep(5)
result = requests.get("https://ocr.captchaai.com/res.php", params={
"key": API_KEY,
"action": "get",
"id": captcha_id,
"json": 1
}).json()
if result.get("status") == 1:
return {"solution": result["request"]}
if result.get("request") != "CAPCHA_NOT_READY":
return {"error": result.get("request")}
return {"error": "TIMEOUT"}
# Main consumer loop
print("CAPTCHA worker started. Waiting for tasks...")
for message in consumer:
task = message.value
print(f"Processing {task['task_id']}...")
result = solve_captcha(task)
result["task_id"] = task["task_id"]
result["solved_at"] = time.time()
# Publish result
result_producer.send("captcha-results", value=result)
result_producer.flush()
# Commit offset after successful processing
consumer.commit()
print(f" → {task['task_id']}: {'solved' if 'solution' in result else result.get('error')}")
JavaScript
const { Kafka } = require("kafkajs");
const axios = require("axios");
const API_KEY = process.env.CAPTCHAAI_API_KEY;
const kafka = new Kafka({
clientId: "captcha-worker",
brokers: ["localhost:9092"],
});
const consumer = kafka.consumer({ groupId: "captcha-workers" });
const producer = kafka.producer();
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function solveCaptcha(task) {
const submitResp = await axios.post(
"https://ocr.captchaai.com/in.php",
null,
{
params: {
key: API_KEY,
method: task.method,
googlekey: task.sitekey,
pageurl: task.pageurl,
json: 1,
},
}
);
if (submitResp.data.status !== 1) {
return { error: submitResp.data.request };
}
const captchaId = submitResp.data.request;
for (let i = 0; i < 60; i++) {
await sleep(5000);
const result = await axios.get("https://ocr.captchaai.com/res.php", {
params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
});
if (result.data.status === 1) return { solution: result.data.request };
if (result.data.request !== "CAPCHA_NOT_READY")
return { error: result.data.request };
}
return { error: "TIMEOUT" };
}
async function run() {
await consumer.connect();
await producer.connect();
await consumer.subscribe({ topic: "captcha-tasks", fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
const task = JSON.parse(message.value.toString());
console.log(`Processing ${task.task_id}...`);
const result = await solveCaptcha(task);
result.task_id = task.task_id;
result.solved_at = Date.now();
await producer.send({
topic: "captcha-results",
messages: [{ value: JSON.stringify(result) }],
});
console.log(
` → ${task.task_id}: ${result.solution ? "solved" : result.error}`
);
},
});
}
run();
扩展工人
Kafka 消费者组自动在工作人员之间分配分区:
# 6 partitions, 3 workers → each worker gets 2 partitions
Worker-1: partitions 0, 1
Worker-2: partitions 2, 3
Worker-3: partitions 4, 5
# Add Worker-4 → rebalance
Worker-1: partitions 0, 1
Worker-2: partitions 2
Worker-3: partitions 3, 4
Worker-4: partition 5
扩展到分区数量。除此之外,添加更多分区。
监控
通过 Kafka 消费者滞后跟踪关键指标:
kafka-consumer-groups.sh --describe --group captcha-workers \
--bootstrap-server localhost:9092
| 公制 | 健康 | 警告 |
|---|---|---|
| 消费者滞后 | <100 | > 1000(添加工人) |
| 消息/sec 中 | 匹配刮刀率 | 尖峰表示爆发 |
| 消息/sec 输出 | 匹配率 | 落后=瓶颈 |
生产者验证规则
- 在触及主题之前拒绝错过求解器类型、目标元数据或响应路由详细信息的消息。
- 添加幂等键,以便重试不会为同一挑战创建重复的解决尝试。
- 将格式错误的记录发送到具有足够上下文的死信路径以供以后重播。
故障排除
| 问题 | 原因 | 处理方式 |
|---|---|---|
| 消费者增长滞后 | 工人无法跟上任务进度 | 添加更多工作实例(最多达到分区数) |
| 重复结果 | 工作线程在提交偏移量之前崩溃 | 在结果消费者中添加对task_id的幂等性检查 |
| 重新平衡过于频繁 | 工人崩溃/restarting | 增加session.timeout.ms;检查 OOM |
| 任务分配不均匀 | 密钥分配不佳 | 使用随机键或更多分区 |
常问问题
为什么使用 Kafka 而不是 Redis 或 RabbitMQ?
当您需要消息持久性(重放能力)、高吞吐量(100K+ 消息/sec)和消费者组扩展时,Kafka 是理想的选择。对于 1,000 个任务 /hour 下的简单设置,Redis 或 RabbitMQ 就足够了。
我应该使用一个主题还是两个主题?
两个主题(任务+结果)清楚地区分生产者和消费者。任务生产者不需要了解结果消费者,反之亦然。
如何处理有害消息(无法解析的验证码)?
在工作线程中设置重试限制。最大重试次数后,发布到 captcha-dead-letter 主题以进行手动检查。不要通过无限重试来阻塞分区。
相关文章
- 流式批量验证码结果处理
下一步
构建流式验证码管道 –”获取您的 CaptchaAI API 密钥并连接Kafka进行高吞吐量处理。
相关指南:
- 用于分布式处理的 Redis 队列
- RabbitMQ 消息队列集成
- 每小时解决 10,000 个任务