AI 任务进队列的第一刀,是把「用户正在等的结果」和「后台慢慢跑也可以的工作」分开。注册后发欢迎邮件、付款后写权益、长文本摘要、embedding 重建、批量图片描述,都不该和一次 HTTP request 绑死。
Cloudflare Queues 给的是任务缓冲、重试和 consumer 执行入口,不是无限扩展承诺。2026-05-27 查看官方文档时,Queues 仍然有消息大小、batch、吞吐、backlog、consumer wall time 和 Workers CPU 限制;价格和限制可能之后调整,具体数字要以 Queues Pricing、Queues Limits、Workers Pricing 和 Workers Limits 为准。
哪些 AI 任务该进 Cloudflare Queues?
用户必须立刻看到的结果,可以留在同步接口里;用户只需要看到「已接收」和任务状态,就该进队列。这个判断比「AI 任务都异步」更实用,因为短任务进队列也会增加状态管理成本。
| 产品动作 | 默认放哪里 | 前端返回什么 | 为什么 |
|---|---|---|---|
| 一句话标签分类 | 同步 Worker | 结果 JSON | 输入短,失败成本低 |
| 长文本摘要 | Queue | jobId + queued | 用户输入长度不可控 |
| 批量 embedding | Queue | 任务状态页 | 可以分片、重跑、限速 |
| 第三方回调后处理 | Queue | handler 快速返回 | 把外部回调和内部权益刷新拆开 |
| 多模型评审 | Queue | running / failed | 成本高,retry 要可控 |
| 图片描述生成 | Queue | jobId + 轮询入口 | 文件读取和模型调用都可能慢 |
我的默认结构是:HTTP handler 只做鉴权、写入状态表、发送 Queue message;Workers consumer 负责调用 AI、写结果、更新状态;前端轮询 /api/jobs/:jobId。这样用户刷新页面不会重复创建任务,后台也能按 jobId 做幂等。
队列结构怎么拆:主队列、状态表和 DLQ?
最小生产结构是三块:主队列 ai-jobs、一张 job 状态表、一个 Dead Letter Queue。不要只靠 Queues 保存业务状态;队列消息会过期,用户状态、错误原因和重跑记录要落在数据库里。
| 组件 | 存什么 | 失败时怎么用 |
|---|---|---|
ai-jobs | jobId、任务类型、输入引用、用户 id | consumer 拉取后执行 AI 调用 |
jobs 表 | queued/running/succeeded/failed、结果引用、错误类别 | 前端查状态、后台判断是否可重跑 |
ai-jobs-dlq | 达到 retry 上限的消息 | 人工检查、修参数、重新投递 |
| 日志 / Analytics | attempt、耗时、模型、错误码、billable operations | 找出高 retry 任务和成本异常 |
Cloudflare Queues 默认是 at-least-once delivery,极少数情况下同一条消息可能被投递超过一次。AI 任务必须带 idempotency key,比如 jobId + stepName,写结果时用唯一键挡重复提交。
Wrangler 和 Worker consumer 怎么写?
下面这个配置把 HTTP Worker 作为 producer,把同一个或另一个 Worker 作为 consumer。max_batch_size 我故意写成 5,不是因为平台只能 5,而是 AI 上游常有并发和限流;早期产品先让失败面小一点,比把 batch 拉满更好排查。
[[queues.producers]]
queue = "ai-jobs"
binding = "AI_JOBS"
[[queues.consumers]]
queue = "ai-jobs"
max_batch_size = 5
max_batch_timeout = 10
max_retries = 3
dead_letter_queue = "ai-jobs-dlq"
[limits]
cpu_ms = 300_000
TypeScript 里不要把完整 prompt、用户原文和大文件塞进消息体。Queues 单条消息有大小限制,大输入放 R2、D1 或你的数据库,message 里只传引用。
type Env = {
AI_JOBS: Queue<AiJob>;
JOBS: D1Database;
};
type AiJob = {
jobId: string;
userId: string;
kind: "summary" | "embedding" | "image_caption";
inputRef: string;
};
export default {
async fetch(req: Request, env: Env) {
const body = await req.json<Pick<AiJob, "kind" | "inputRef">>();
const jobId = crypto.randomUUID();
await env.JOBS.prepare(
"insert into jobs (job_id, status, kind, input_ref) values (?, ?, ?, ?)"
).bind(jobId, "queued", body.kind, body.inputRef).run();
await env.AI_JOBS.send({
jobId,
userId: "current-user-id",
kind: body.kind,
inputRef: body.inputRef,
});
return Response.json({ jobId, status: "queued" }, { status: 202 });
},
async queue(batch: MessageBatch<AiJob>, env: Env) {
for (const msg of batch.messages) {
try {
await markRunning(msg.body.jobId, env);
await runAiJob(msg.body, env);
await markSucceeded(msg.body.jobId, env);
msg.ack();
} catch (error) {
const retryable = isRetryableAiError(error);
await markAttempt(msg.body.jobId, retryable, env);
if (retryable) {
msg.retry({ delaySeconds: 60 });
} else {
await markFailed(msg.body.jobId, error, env);
msg.ack();
}
}
}
},
} satisfies ExportedHandler<Env>;
这里最重要的是逐条 ack() 和 retry()。如果整批抛错,已经成功的消息也可能被再次投递;调用外部 AI、写数据库、发邮件这类有副作用的任务,逐条确认更稳。
失败重试该设几次?
max_retries 默认是 3 次,平台上限是 100 次。AI 任务不要把上限当目标;每一次 retry 都会多一次读取操作,也可能重复消耗上游模型额度。
| 错误类型 | 是否 retry | 推荐动作 | 成本信号 |
|---|---|---|---|
| 502 / 503 / 网络短抖 | 是 | 退避后重投,保留同一 jobId | retryCount 升高但成功率恢复 |
| 429 / 容量不足 | 是,延迟更长 | 降低 consumer 并发或缩小 batch | backlog 上升、lagTime 变长 |
| prompt 过长 / 输入过大 | 否 | 切片、压缩或让用户改输入 | 同类任务持续失败 |
| 401 / 403 | 否 | 暂停 consumer,修 secret 或权限 | 全量任务失败 |
| JSON 解析 / schema 错 | 否 | 修 producer payload,再重放受影响任务 | DLQ 里错误高度一致 |
我会把自动 retry 上限放在 2-3 次。超过以后进 Dead Letter Queue,由后台按钮或脚本重跑。凌晨上游模型抖动时,最怕的不是失败,而是没有刹车的 retry 把第二天预算烧掉。
DLQ 里要留哪些字段?
Dead Letter Queue 不是垃圾桶。Cloudflare 文档写得很直接:配置了 DLQ,消息达到 consumer 的 retry 上限后会送进 DLQ;没有配置 DLQ,反复失败的消息会被丢弃。DLQ 没有 active consumer 时,消息会保留 4 天。
主消息体至少带 jobId、kind、inputRef、userId。状态表里再存 attempts、last_error_class、last_error_message、model、started_at、finished_at。DLQ consumer 只做两件事:把失败写到后台可见列表,必要时给值班人发告警。
不要让 DLQ consumer 自动无限重投主队列。更好的做法是把重跑分成三类:参数修好后批量重跑、单个高价值客户人工重跑、低价值或明显坏输入标记为 failed_final。
成本怎么估:Queue 操作数 + Worker CPU + AI 调用?
Queues 的账单按 operation 计,不按「一个 batch」计。Cloudflare 当前口径是每 64KB 写、读、删各算 operation;一条小于 64KB 的消息正常投递通常是 1 次写、1 次读、1 次删。batch 能减少 consumer invocation,不会把 10 条消息变成 1 条消息收费。
| 成本项 | 2026-05-27 官方口径 | 估算方式 |
|---|---|---|
| Queues operation | Free 每天 10,000 operations;Workers Paid 每月含 1,000,000 operations,超出 $0.40 / 百万 | messages * 3 + retry_reads + dlq_writes |
| Message size | 每 64KB 分块计费,单条消息大小上限 128KB | 大输入只放引用,不放全文 |
| Worker request / CPU | Paid 最低 $5/月,含 10M requests 和 30M CPU ms,超出后请求与 CPU ms 分开计费 | consumer 每次 invocation 的 CPU 也要算 |
| AI 模型 | 按所选模型的 token / image 单价 | 记录 model、input、output、attempt |
| 存储 | D1/R2/KV 按各自口径 | 状态表、结果文件、日志分开算 |
一个小例子:每月 20,000 个 AI job,消息都小于 64KB,95% 一次成功,5% retry 一次,1% 最后进 DLQ。Queues operation 粗算是 20,000 * 3 + 1,000 + 200 = 61,200。这还没算 Workers CPU、D1 写入、R2 结果文件和模型调用,所以 AI SaaS 的大头通常不是队列本身。
后台值班时怎么避免误判?
观测至少看 6 个指标:backlog messages、backlog bytes、lagTime、retryCount、billableOperations、DLQ outcome。Cloudflare Dashboard 能看一部分,GraphQL Analytics API 可以把 backlog、consumer concurrency 和 message operations 拉进自己的监控。
产品侧还要有自己的 job 状态,不然用户只会看到「一直处理中」。我建议状态只保留 5 个:queued、running、succeeded、failed_retryable、failed_final。状态太多,客服解释不清;状态太少,工程师找不到卡点。
队列事故排查最怕 GitHub Actions 日志、Cloudflare Dashboard、AI provider billing 三个页面各说各话。一个人值班时,把浏览器 profile、2FA 设备、部署入口、回滚命令和 DLQ 重放脚本写进同一份 runbook;它不能替代 token 权限、队列告警和 DLQ 处理,但能减少临场误判。
什么时候不要用 Queues 硬扛?
Cloudflare Queues 的限制要写进设计里。官方 Limits 页面当前列出的关键点包括:单条消息 128KB、最大 consumer batch size 100、sendBatch 最多 100 条或总计 256KB、单队列吞吐 5,000 messages/s、单队列 backlog 25GB、push-based concurrent consumer invocations 250、Queue consumer wall time 15 分钟。
如果你的任务要跑 40 分钟、单输入动辄几 MB、需要强 exactly-once 语义,或者一个 job 会拆出上千个子任务,就不要硬塞进一条 Queue message。把大输入放存储,把长流程拆成多个 step,把需要人工审核的失败留在状态表里。
更具体一点:长报告生成可以切成「抓取 -> 清洗 -> 摘要 -> 合并」四个 job;embedding 重建可以按文档分页;图片批处理可以每张图一个 job,再用数据库聚合进度。Queues 管分发和 retry,业务状态仍然由你自己的表负责。