DevOps & Scaling

使用 AWS SNS 和 CaptchaAI 构建事件驱动的验证码解决方案

轮询验证码结果会占用线程并在抓取器和求解管道之间创建紧密耦合。 AWS SNS(简单通知服务)解耦了这些问题 - CaptchaAI 将结果发送到您的回调,回调发布到 SNS,并且任意数量的下游消费者独立做出反应。

架构概述

[Scraper] → Submit CAPTCHA → [CaptchaAI API]
                                    ↓
                            Solve completes
                                    ↓
                            Callback → [API Gateway + Lambda]
                                    ↓
                            Publish → [SNS Topic]
                                    ↓
                    ┌───────────────┼───────────────┐
                    ↓               ↓               ↓
            [SQS Queue]      [Lambda Logger]   [Email Alert]
            (result store)   (audit trail)     (on failure)

SNS 提供扇出:一个 CAPTCHA 结果触发多个使用者,而回调处理程序不知道它们。

第 1 步:创建 SNS 主题

AWS CLI

aws sns create-topic --name captcha-results --output text
# Returns: arn:aws:sns:us-east-1:123456789:captcha-results

Python(boto3)

import boto3

sns = boto3.client("sns", region_name="us-east-1")

response = sns.create_topic(Name="captcha-results")
topic_arn = response["TopicArn"]
print(f"Topic ARN: {topic_arn}")

第 2 步:构建回调接收器

此 Lambda 函数接收 CaptchaAI 回调结果并将其发布到 SNS。

Python(Lambda 处理程序)

import json
import os
import boto3

sns = boto3.client("sns")
TOPIC_ARN = os.environ["SNS_TOPIC_ARN"]


def lambda_handler(event, context):
    """Receive CaptchaAI callback and publish to SNS."""
    # Parse query parameters from API Gateway
    params = event.get("queryStringParameters", {}) or {}
    task_id = params.get("id", "")
    solution = params.get("code", "")

    if not task_id or not solution:
        return {"statusCode": 400, "body": "Missing id or code"}

    # Publish to SNS
    message = {
        "task_id": task_id,
        "solution": solution,
        "status": "solved"
    }

    sns.publish(
        TopicArn=TOPIC_ARN,
        Message=json.dumps(message),
        Subject="captcha-solved",
        MessageAttributes={
            "task_id": {
                "DataType": "String",
                "StringValue": task_id
            }
        }
    )

    return {"statusCode": 200, "body": "OK"}

JavaScript(Lambda 处理程序)

const { SNSClient, PublishCommand } = require("@aws-sdk/client-sns");

const sns = new SNSClient({ region: "us-east-1" });
const TOPIC_ARN = process.env.SNS_TOPIC_ARN;

exports.handler = async (event) => {
  const params = event.queryStringParameters || {};
  const taskId = params.id;
  const solution = params.code;

  if (!taskId || !solution) {
    return { statusCode: 400, body: "Missing id or code" };
  }

  const message = {
    task_id: taskId,
    solution: solution,
    status: "solved",
  };

  await sns.send(
    new PublishCommand({
      TopicArn: TOPIC_ARN,
      Message: JSON.stringify(message),
      Subject: "captcha-solved",
      MessageAttributes: {
        task_id: { DataType: "String", StringValue: taskId },
      },
    })
  );

  return { statusCode: 200, body: "OK" };
};

第 3 步:使用回调 URL 提交验证码

将 CaptchaAI 的 pingback 指向您的 API 网关端点:

Python

import os
import requests

API_KEY = os.environ["CAPTCHAAI_API_KEY"]
CALLBACK_URL = os.environ["CALLBACK_GATEWAY_URL"]  # API Gateway URL


def submit_captcha(sitekey, pageurl):
    """Submit CAPTCHA with SNS-backed callback."""
    resp = requests.post("https://ocr.captchaai.com/in.php", data={
        "key": API_KEY,
        "method": "userrecaptcha",
        "googlekey": sitekey,
        "pageurl": pageurl,
        "pingback": CALLBACK_URL,
        "json": 1
    })
    data = resp.json()

    if data.get("status") == 1:
        return data["request"]  # task_id
    raise RuntimeError(f"Submit failed: {data.get('request')}")

第四步:订阅消费者

SQS队列(结果存储)

# Subscribe an SQS queue to receive all results
sqs_arn = "arn:aws:sqs:us-east-1:123456789:captcha-results-queue"

sns.subscribe(
    TopicArn=topic_arn,
    Protocol="sqs",
    Endpoint=sqs_arn
)

Lambda(审计记录器)

# Subscribe a Lambda for audit logging
lambda_arn = "arn:aws:lambda:us-east-1:123456789:function:captcha-audit-logger"

sns.subscribe(
    TopicArn=topic_arn,
    Protocol="lambda",
    Endpoint=lambda_arn
)

电子邮件(失败警报)

# Subscribe email for error notifications with filter
sns.subscribe(
    TopicArn=topic_arn,
    Protocol="email",
    Endpoint="ops@example.com"
)

第 5 步:使用 SQS 的结果

您的抓取工具从 SQS 读取解决方案,而不是轮询 CaptchaAI:

Python

import json
import boto3

sqs = boto3.client("sqs", region_name="us-east-1")
QUEUE_URL = os.environ["SQS_QUEUE_URL"]


def get_solved_captcha(timeout=30):
    """Wait for a CAPTCHA solution from the SQS queue."""
    response = sqs.receive_message(
        QueueUrl=QUEUE_URL,
        MaxNumberOfMessages=1,
        WaitTimeSeconds=min(timeout, 20)  # Long polling (max 20s)
    )

    messages = response.get("Messages", [])
    if not messages:
        return None

    msg = messages[0]
    # SNS wraps the message — unwrap it
    sns_envelope = json.loads(msg["Body"])
    result = json.loads(sns_envelope["Message"])

    # Delete message after processing
    sqs.delete_message(
        QueueUrl=QUEUE_URL,
        ReceiptHandle=msg["ReceiptHandle"]
    )

    return result

JavaScript

const {
  SQSClient,
  ReceiveMessageCommand,
  DeleteMessageCommand,
} = require("@aws-sdk/client-sqs");

const sqs = new SQSClient({ region: "us-east-1" });
const QUEUE_URL = process.env.SQS_QUEUE_URL;

async function getSolvedCaptcha(timeout = 30) {
  const response = await sqs.send(
    new ReceiveMessageCommand({
      QueueUrl: QUEUE_URL,
      MaxNumberOfMessages: 1,
      WaitTimeSeconds: Math.min(timeout, 20),
    })
  );

  const messages = response.Messages || [];
  if (messages.length === 0) return null;

  const msg = messages[0];
  const snsEnvelope = JSON.parse(msg.Body);
  const result = JSON.parse(snsEnvelope.Message);

  await sqs.send(
    new DeleteMessageCommand({
      QueueUrl: QUEUE_URL,
      ReceiptHandle: msg.ReceiptHandle,
    })
  );

  return result;
}

SNS消息过滤

将不同的结果路由给不同的消费者:

# Only send failures to the ops queue
sns.subscribe(
    TopicArn=topic_arn,
    Protocol="sqs",
    Endpoint=failure_queue_arn,
    Attributes={
        "FilterPolicy": json.dumps({
            "status": ["failed", "error"]
        })
    }
)

故障排除

问题 原因 处理方式
回调返回403 API网关身份验证阻止CaptchaAI 禁用回调路由上的身份验证;使用基于令牌的验证代替
SQS 消息未到达 SNS → SQS 权限缺失 SQS队列策略添加sns:Publish权限
处理重复结果 SNS 至少传递一次 实现幂等性——处理前检查task_id
Lambda 冷启动延迟回调 未设置预配置并发数 为回调 Lambda 启用预配置并发

常问问题

为什么使用 SNS 而不是直接在回调 Lambda 中处理结果?

SNS 将回调处理程序与下游逻辑解耦。您可以添加新的使用者(日志记录、警报、分析),而无需修改回调 Lambda。回调保持简单和快速。

SNS 层增加的延迟是多少?

SNS 每条消息增加 10-50 毫秒。由于 CAPTCHA 解决需要 5-30 秒,因此这个开销可以忽略不计。

我可以使用 SNS FIFO 进行有序处理吗?

是的。如果您需要有序结果,请使用带有 SQS FIFO 队列的 SNS FIFO 主题。将 MessageGroupId 设置为每个任务排序的任务 ID。

相关文章

下一步

构建事件驱动的验证码解决方案 -获取您的 CaptchaAI API 密钥并将其连接到您的 AWS 事件管道。

相关指南:

该文章已禁用评论。

相关文章

DevOps & Scaling AWS Lambda + CaptchaAI:无服务器验证码解决
AWS Lambda + Captcha AI 的开发运营指南:无服务器验证码解决方案,包含生产中 Captcha AI 工作流程的架构决策、操作注意事项和自动化模式。

AWS Lambda + Captcha AI 的开发运营指南:无服务器验证码解决方案,包含生产中 Captcha AI 工作流程的架构决策、操作...

Apr 21, 2026
DevOps & Scaling 用于 CaptchaAI Worker 部署的 Ansible Playbook
使用 Captcha AI Worker 部署 Ansible Playbook 的 Dev Ops 指南,包括生产中 Captcha AI 工作流程的架构决策、操作注意事项和自动化模式。

使用 Captcha AI Worker 部署 Ansible Playbook 的 Dev Ops 指南,包括生产中 Captcha AI 工作流程的架构决策、操作注...

Apr 19, 2026
DevOps & Scaling 验证码解决基础设施的蓝绿部署
验证码解决基础设施的蓝绿部署的 Dev Ops 指南,包括生产中 Captcha AI 工作流程的架构决策、操作注意事项和自动化模式。

验证码解决基础设施的蓝绿部署的 Dev Ops 指南,包括生产中 Captcha AI 工作流程的架构决策、操作注意事项和自动化模...

Apr 26, 2026