DevOps 与扩展

Apache Kafka + CaptchaAI:流式验证码任务处理

当验证码解决量达到每小时数千个任务时,您需要的不仅仅是一个简单的队列。 Apache Kafka 提供持久、有序、高吞吐量的消息流,非常适合将验证码任务提交与大规模结果处理解耦。

建筑学

[Scrapers] → Produce → [Kafka: captcha-tasks topic]
                              ↓
                    [CAPTCHA Worker Group]
                    (consume tasks, solve via CaptchaAI)
                              ↓
                    Produce → [Kafka: captcha-results topic]
                              ↓
                    [Result Consumer Group]
                    (process solutions, update database)

两个 Kafka 主题分别关注:

  • captcha-tasks – 等待解决的验证码参数
  • captcha-results – 已解决的代币可供下游使用

先决条件

# Python
pip install kafka-python requests

# Node.js
npm install kafkajs axios

localhost:9092(或您的集群地址)上运行的 Kafka 代理。

第1步:创建主题

kafka-topics.sh --create --topic captcha-tasks \
  --partitions 6 --replication-factor 1 \
  --bootstrap-server localhost:9092

kafka-topics.sh --create --topic captcha-results \
  --partitions 6 --replication-factor 1 \
  --bootstrap-server localhost:9092

六个分区允许每组最多六个并行消费者。

第2步:任务生成器(Scraper端)

Python

import json
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=["localhost:9092"],
    value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    key_serializer=lambda k: k.encode("utf-8") if k else None,
    acks="all",  # Wait for all replicas to confirm
    retries=3
)


def enqueue_captcha(task_id, sitekey, pageurl, captcha_type="userrecaptcha"):
    """Send a CAPTCHA task to Kafka."""
    task = {
        "task_id": task_id,
        "method": captcha_type,
        "sitekey": sitekey,
        "pageurl": pageurl,
        "submitted_at": __import__("time").time()
    }

    future = producer.send(
        "captcha-tasks",
        key=task_id,  # Key ensures same task goes to same partition
        value=task
    )
    future.get(timeout=10)  # Block until confirmed
    return task_id


# Submit tasks
enqueue_captcha("task_001", "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-", "https://example.com")
enqueue_captcha("task_002", "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-", "https://example.com")
producer.flush()

JavaScript

const { Kafka } = require("kafkajs");

const kafka = new Kafka({
  clientId: "captcha-producer",
  brokers: ["localhost:9092"],
});

const producer = kafka.producer();

async function enqueueCaptcha(taskId, sitekey, pageurl) {
  await producer.connect();

  const task = {
    task_id: taskId,
    method: "userrecaptcha",
    sitekey: sitekey,
    pageurl: pageurl,
    submitted_at: Date.now(),
  };

  await producer.send({
    topic: "captcha-tasks",
    messages: [{ key: taskId, value: JSON.stringify(task) }],
  });
}

(async () => {
  await enqueueCaptcha(
    "task_001",
    "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-",
    "https://example.com"
  );
  await producer.disconnect();
})();

第3步:CAPTCHA Worker(消费者+求解器)

Python

import json
import os
import time
import requests
from kafka import KafkaConsumer, KafkaProducer

API_KEY = os.environ["CAPTCHAAI_API_KEY"]

consumer = KafkaConsumer(
    "captcha-tasks",
    bootstrap_servers=["localhost:9092"],
    group_id="captcha-workers",
    value_deserializer=lambda m: json.loads(m.decode("utf-8")),
    auto_offset_reset="earliest",
    enable_auto_commit=False,  # Manual commit after processing
    max_poll_records=10
)

result_producer = KafkaProducer(
    bootstrap_servers=["localhost:9092"],
    value_serializer=lambda v: json.dumps(v).encode("utf-8")
)


def solve_captcha(task):
    """Submit to CaptchaAI and poll for result."""
    # Submit
    resp = requests.post("https://ocr.captchaai.com/in.php", data={
        "key": API_KEY,
        "method": task["method"],
        "googlekey": task["sitekey"],
        "pageurl": task["pageurl"],
        "json": 1
    })
    data = resp.json()

    if data.get("status") != 1:
        return {"error": data.get("request")}

    captcha_id = data["request"]

    # Poll for result
    for _ in range(60):
        time.sleep(5)
        result = requests.get("https://ocr.captchaai.com/res.php", params={
            "key": API_KEY,
            "action": "get",
            "id": captcha_id,
            "json": 1
        }).json()

        if result.get("status") == 1:
            return {"solution": result["request"]}
        if result.get("request") != "CAPCHA_NOT_READY":
            return {"error": result.get("request")}

    return {"error": "TIMEOUT"}


# Main consumer loop
print("CAPTCHA worker started. Waiting for tasks...")
for message in consumer:
    task = message.value
    print(f"Processing {task['task_id']}...")

    result = solve_captcha(task)
    result["task_id"] = task["task_id"]
    result["solved_at"] = time.time()

    # Publish result
    result_producer.send("captcha-results", value=result)
    result_producer.flush()

    # Commit offset after successful processing
    consumer.commit()
    print(f"  → {task['task_id']}: {'solved' if 'solution' in result else result.get('error')}")

JavaScript

const { Kafka } = require("kafkajs");
const axios = require("axios");

const API_KEY = process.env.CAPTCHAAI_API_KEY;

const kafka = new Kafka({
  clientId: "captcha-worker",
  brokers: ["localhost:9092"],
});

const consumer = kafka.consumer({ groupId: "captcha-workers" });
const producer = kafka.producer();

function sleep(ms) {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

async function solveCaptcha(task) {
  const submitResp = await axios.post(
    "https://ocr.captchaai.com/in.php",
    null,
    {
      params: {
        key: API_KEY,
        method: task.method,
        googlekey: task.sitekey,
        pageurl: task.pageurl,
        json: 1,
      },
    }
  );

  if (submitResp.data.status !== 1) {
    return { error: submitResp.data.request };
  }

  const captchaId = submitResp.data.request;

  for (let i = 0; i < 60; i++) {
    await sleep(5000);
    const result = await axios.get("https://ocr.captchaai.com/res.php", {
      params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
    });

    if (result.data.status === 1) return { solution: result.data.request };
    if (result.data.request !== "CAPCHA_NOT_READY")
      return { error: result.data.request };
  }

  return { error: "TIMEOUT" };
}

async function run() {
  await consumer.connect();
  await producer.connect();
  await consumer.subscribe({ topic: "captcha-tasks", fromBeginning: false });

  await consumer.run({
    eachMessage: async ({ message }) => {
      const task = JSON.parse(message.value.toString());
      console.log(`Processing ${task.task_id}...`);

      const result = await solveCaptcha(task);
      result.task_id = task.task_id;
      result.solved_at = Date.now();

      await producer.send({
        topic: "captcha-results",
        messages: [{ value: JSON.stringify(result) }],
      });

      console.log(
        `  → ${task.task_id}: ${result.solution ? "solved" : result.error}`
      );
    },
  });
}

run();

扩展工人

Kafka 消费者组自动在工作人员之间分配分区:

# 6 partitions, 3 workers → each worker gets 2 partitions
Worker-1: partitions 0, 1
Worker-2: partitions 2, 3
Worker-3: partitions 4, 5

# Add Worker-4 → rebalance
Worker-1: partitions 0, 1
Worker-2: partitions 2
Worker-3: partitions 3, 4
Worker-4: partition 5

扩展到分区数量。除此之外,添加更多分区。

监控

通过 Kafka 消费者滞后跟踪关键指标:

kafka-consumer-groups.sh --describe --group captcha-workers \
  --bootstrap-server localhost:9092
公制 健康 警告
消费者滞后 <100 > 1000(添加工人)
消息/sec 中 匹配刮刀率 尖峰表示爆发
消息/sec 输出 匹配率 落后=瓶颈

生产者验证规则

  • 在触及主题之前拒绝错过求解器类型、目标元数据或响应路由详细信息的消息。
  • 添加幂等键,以便重试不会为同一挑战创建重复的解决尝试。
  • 将格式错误的记录发送到具有足够上下文的死信路径以供以后重播。

故障排除

问题 原因 处理方式
消费者增长滞后 工人无法跟上任务进度 添加更多工作实例(最多达到分区数)
重复结果 工作线程在提交偏移量之前崩溃 在结果消费者中添加对task_id的幂等性检查
重新平衡过于频繁 工人崩溃/restarting 增加session.timeout.ms;检查 OOM
任务分配不均匀 密钥分配不佳 使用随机键或更多分区

常问问题

为什么使用 Kafka 而不是 Redis 或 RabbitMQ?

当您需要消息持久性(重放能力)、高吞吐量(100K+ 消息/sec)和消费者组扩展时,Kafka 是理想的选择。对于 1,000 个任务 /hour 下的简单设置,Redis 或 RabbitMQ 就足够了。

我应该使用一个主题还是两个主题?

两个主题(任务+结果)清楚地区分生产者和消费者。任务生产者不需要了解结果消费者,反之亦然。

如何处理有害消息(无法解析的验证码)?

在工作线程中设置重试限制。最大重试次数后,发布到 captcha-dead-letter 主题以进行手动检查。不要通过无限重试来阻塞分区。

相关文章

  • 流式批量验证码结果处理

下一步

构建流式验证码管道 –”获取您的 CaptchaAI API 密钥并连接Kafka进行高吞吐量处理。

相关指南:

该文章已禁用评论。