轮询验证码结果会占用线程并在抓取器和求解管道之间创建紧密耦合。 AWS SNS(简单通知服务)解耦了这些问题 - CaptchaAI 将结果发送到您的回调,回调发布到 SNS,并且任意数量的下游消费者独立做出反应。
架构概述
[Scraper] → Submit CAPTCHA → [CaptchaAI API]
↓
Solve completes
↓
Callback → [API Gateway + Lambda]
↓
Publish → [SNS Topic]
↓
┌───────────────┼───────────────┐
↓ ↓ ↓
[SQS Queue] [Lambda Logger] [Email Alert]
(result store) (audit trail) (on failure)
SNS 提供扇出:一个 CAPTCHA 结果触发多个使用者,而回调处理程序不知道它们。
第 1 步:创建 SNS 主题
AWS CLI
aws sns create-topic --name captcha-results --output text
# Returns: arn:aws:sns:us-east-1:123456789:captcha-results
Python(boto3)
import boto3
sns = boto3.client("sns", region_name="us-east-1")
response = sns.create_topic(Name="captcha-results")
topic_arn = response["TopicArn"]
print(f"Topic ARN: {topic_arn}")
第 2 步:构建回调接收器
此 Lambda 函数接收 CaptchaAI 回调结果并将其发布到 SNS。
Python(Lambda 处理程序)
import json
import os
import boto3
sns = boto3.client("sns")
TOPIC_ARN = os.environ["SNS_TOPIC_ARN"]
def lambda_handler(event, context):
"""Receive CaptchaAI callback and publish to SNS."""
# Parse query parameters from API Gateway
params = event.get("queryStringParameters", {}) or {}
task_id = params.get("id", "")
solution = params.get("code", "")
if not task_id or not solution:
return {"statusCode": 400, "body": "Missing id or code"}
# Publish to SNS
message = {
"task_id": task_id,
"solution": solution,
"status": "solved"
}
sns.publish(
TopicArn=TOPIC_ARN,
Message=json.dumps(message),
Subject="captcha-solved",
MessageAttributes={
"task_id": {
"DataType": "String",
"StringValue": task_id
}
}
)
return {"statusCode": 200, "body": "OK"}
JavaScript(Lambda 处理程序)
const { SNSClient, PublishCommand } = require("@aws-sdk/client-sns");
const sns = new SNSClient({ region: "us-east-1" });
const TOPIC_ARN = process.env.SNS_TOPIC_ARN;
exports.handler = async (event) => {
const params = event.queryStringParameters || {};
const taskId = params.id;
const solution = params.code;
if (!taskId || !solution) {
return { statusCode: 400, body: "Missing id or code" };
}
const message = {
task_id: taskId,
solution: solution,
status: "solved",
};
await sns.send(
new PublishCommand({
TopicArn: TOPIC_ARN,
Message: JSON.stringify(message),
Subject: "captcha-solved",
MessageAttributes: {
task_id: { DataType: "String", StringValue: taskId },
},
})
);
return { statusCode: 200, body: "OK" };
};
第 3 步:使用回调 URL 提交验证码
将 CaptchaAI 的 pingback 指向您的 API 网关端点:
Python
import os
import requests
API_KEY = os.environ["CAPTCHAAI_API_KEY"]
CALLBACK_URL = os.environ["CALLBACK_GATEWAY_URL"] # API Gateway URL
def submit_captcha(sitekey, pageurl):
"""Submit CAPTCHA with SNS-backed callback."""
resp = requests.post("https://ocr.captchaai.com/in.php", data={
"key": API_KEY,
"method": "userrecaptcha",
"googlekey": sitekey,
"pageurl": pageurl,
"pingback": CALLBACK_URL,
"json": 1
})
data = resp.json()
if data.get("status") == 1:
return data["request"] # task_id
raise RuntimeError(f"Submit failed: {data.get('request')}")
第四步:订阅消费者
SQS队列(结果存储)
# Subscribe an SQS queue to receive all results
sqs_arn = "arn:aws:sqs:us-east-1:123456789:captcha-results-queue"
sns.subscribe(
TopicArn=topic_arn,
Protocol="sqs",
Endpoint=sqs_arn
)
Lambda(审计记录器)
# Subscribe a Lambda for audit logging
lambda_arn = "arn:aws:lambda:us-east-1:123456789:function:captcha-audit-logger"
sns.subscribe(
TopicArn=topic_arn,
Protocol="lambda",
Endpoint=lambda_arn
)
电子邮件(失败警报)
# Subscribe email for error notifications with filter
sns.subscribe(
TopicArn=topic_arn,
Protocol="email",
Endpoint="ops@example.com"
)
第 5 步:使用 SQS 的结果
您的抓取工具从 SQS 读取解决方案,而不是轮询 CaptchaAI:
Python
import json
import boto3
sqs = boto3.client("sqs", region_name="us-east-1")
QUEUE_URL = os.environ["SQS_QUEUE_URL"]
def get_solved_captcha(timeout=30):
"""Wait for a CAPTCHA solution from the SQS queue."""
response = sqs.receive_message(
QueueUrl=QUEUE_URL,
MaxNumberOfMessages=1,
WaitTimeSeconds=min(timeout, 20) # Long polling (max 20s)
)
messages = response.get("Messages", [])
if not messages:
return None
msg = messages[0]
# SNS wraps the message — unwrap it
sns_envelope = json.loads(msg["Body"])
result = json.loads(sns_envelope["Message"])
# Delete message after processing
sqs.delete_message(
QueueUrl=QUEUE_URL,
ReceiptHandle=msg["ReceiptHandle"]
)
return result
JavaScript
const {
SQSClient,
ReceiveMessageCommand,
DeleteMessageCommand,
} = require("@aws-sdk/client-sqs");
const sqs = new SQSClient({ region: "us-east-1" });
const QUEUE_URL = process.env.SQS_QUEUE_URL;
async function getSolvedCaptcha(timeout = 30) {
const response = await sqs.send(
new ReceiveMessageCommand({
QueueUrl: QUEUE_URL,
MaxNumberOfMessages: 1,
WaitTimeSeconds: Math.min(timeout, 20),
})
);
const messages = response.Messages || [];
if (messages.length === 0) return null;
const msg = messages[0];
const snsEnvelope = JSON.parse(msg.Body);
const result = JSON.parse(snsEnvelope.Message);
await sqs.send(
new DeleteMessageCommand({
QueueUrl: QUEUE_URL,
ReceiptHandle: msg.ReceiptHandle,
})
);
return result;
}
SNS消息过滤
将不同的结果路由给不同的消费者:
# Only send failures to the ops queue
sns.subscribe(
TopicArn=topic_arn,
Protocol="sqs",
Endpoint=failure_queue_arn,
Attributes={
"FilterPolicy": json.dumps({
"status": ["failed", "error"]
})
}
)
故障排除
| 问题 | 原因 | 处理方式 |
|---|---|---|
| 回调返回403 | API网关身份验证阻止CaptchaAI | 禁用回调路由上的身份验证;使用基于令牌的验证代替 |
| SQS 消息未到达 | SNS → SQS 权限缺失 | SQS队列策略添加sns:Publish权限 |
| 处理重复结果 | SNS 至少传递一次 | 实现幂等性——处理前检查task_id |
| Lambda 冷启动延迟回调 | 未设置预配置并发数 | 为回调 Lambda 启用预配置并发 |
常问问题
为什么使用 SNS 而不是直接在回调 Lambda 中处理结果?
SNS 将回调处理程序与下游逻辑解耦。您可以添加新的使用者(日志记录、警报、分析),而无需修改回调 Lambda。回调保持简单和快速。
SNS 层增加的延迟是多少?
SNS 每条消息增加 10-50 毫秒。由于 CAPTCHA 解决需要 5-30 秒,因此这个开销可以忽略不计。
我可以使用 SNS FIFO 进行有序处理吗?
是的。如果您需要有序结果,请使用带有 SQS FIFO 队列的 SNS FIFO 主题。将 MessageGroupId 设置为每个任务排序的任务 ID。
相关文章
- 构建客户端验证码管道 Captchaai
- 构建负责任的自动化 Captchaai
- 构建 Captchaai 使用仪表板监控
下一步
构建事件驱动的验证码解决方案 -获取您的 CaptchaAI API 密钥并将其连接到您的 AWS 事件管道。
相关指南:
- AWS Lambda + CaptchaAI 无服务器集成
- 回调 URL 和 Webhook 指南
- Webhook 安全性:验证回调