AI 任务进队列的第一刀,是把「用户正在等的结果」和「后台慢慢跑也可以的工作」分开。注册后发欢迎邮件、付款后写权益、长文本摘要、embedding 重建、批量图片描述,都不该和一次 HTTP request 绑死。

Cloudflare Queues 给的是任务缓冲、重试和 consumer 执行入口,不是无限扩展承诺。2026-05-27 查看官方文档时,Queues 仍然有消息大小、batch、吞吐、backlog、consumer wall time 和 Workers CPU 限制;价格和限制可能之后调整,具体数字要以 Queues PricingQueues LimitsWorkers PricingWorkers Limits 为准。

哪些 AI 任务该进 Cloudflare Queues?

用户必须立刻看到的结果,可以留在同步接口里;用户只需要看到「已接收」和任务状态,就该进队列。这个判断比「AI 任务都异步」更实用,因为短任务进队列也会增加状态管理成本。

产品动作默认放哪里前端返回什么为什么
一句话标签分类同步 Worker结果 JSON输入短,失败成本低
长文本摘要QueuejobId + queued用户输入长度不可控
批量 embeddingQueue任务状态页可以分片、重跑、限速
第三方回调后处理Queuehandler 快速返回把外部回调和内部权益刷新拆开
多模型评审Queuerunning / failed成本高,retry 要可控
图片描述生成QueuejobId + 轮询入口文件读取和模型调用都可能慢

我的默认结构是:HTTP handler 只做鉴权、写入状态表、发送 Queue message;Workers consumer 负责调用 AI、写结果、更新状态;前端轮询 /api/jobs/:jobId。这样用户刷新页面不会重复创建任务,后台也能按 jobId 做幂等。

队列结构怎么拆:主队列、状态表和 DLQ?

最小生产结构是三块:主队列 ai-jobs、一张 job 状态表、一个 Dead Letter Queue。不要只靠 Queues 保存业务状态;队列消息会过期,用户状态、错误原因和重跑记录要落在数据库里。

组件存什么失败时怎么用
ai-jobsjobId、任务类型、输入引用、用户 idconsumer 拉取后执行 AI 调用
jobsqueued/running/succeeded/failed、结果引用、错误类别前端查状态、后台判断是否可重跑
ai-jobs-dlq达到 retry 上限的消息人工检查、修参数、重新投递
日志 / Analyticsattempt、耗时、模型、错误码、billable operations找出高 retry 任务和成本异常

Cloudflare Queues 默认是 at-least-once delivery,极少数情况下同一条消息可能被投递超过一次。AI 任务必须带 idempotency key,比如 jobId + stepName,写结果时用唯一键挡重复提交。

Wrangler 和 Worker consumer 怎么写?

下面这个配置把 HTTP Worker 作为 producer,把同一个或另一个 Worker 作为 consumer。max_batch_size 我故意写成 5,不是因为平台只能 5,而是 AI 上游常有并发和限流;早期产品先让失败面小一点,比把 batch 拉满更好排查。

[[queues.producers]]
queue = "ai-jobs"
binding = "AI_JOBS"

[[queues.consumers]]
queue = "ai-jobs"
max_batch_size = 5
max_batch_timeout = 10
max_retries = 3
dead_letter_queue = "ai-jobs-dlq"

[limits]
cpu_ms = 300_000

TypeScript 里不要把完整 prompt、用户原文和大文件塞进消息体。Queues 单条消息有大小限制,大输入放 R2、D1 或你的数据库,message 里只传引用。

type Env = {
  AI_JOBS: Queue<AiJob>;
  JOBS: D1Database;
};

type AiJob = {
  jobId: string;
  userId: string;
  kind: "summary" | "embedding" | "image_caption";
  inputRef: string;
};

export default {
  async fetch(req: Request, env: Env) {
    const body = await req.json<Pick<AiJob, "kind" | "inputRef">>();
    const jobId = crypto.randomUUID();

    await env.JOBS.prepare(
      "insert into jobs (job_id, status, kind, input_ref) values (?, ?, ?, ?)"
    ).bind(jobId, "queued", body.kind, body.inputRef).run();

    await env.AI_JOBS.send({
      jobId,
      userId: "current-user-id",
      kind: body.kind,
      inputRef: body.inputRef,
    });

    return Response.json({ jobId, status: "queued" }, { status: 202 });
  },

  async queue(batch: MessageBatch<AiJob>, env: Env) {
    for (const msg of batch.messages) {
      try {
        await markRunning(msg.body.jobId, env);
        await runAiJob(msg.body, env);
        await markSucceeded(msg.body.jobId, env);
        msg.ack();
      } catch (error) {
        const retryable = isRetryableAiError(error);
        await markAttempt(msg.body.jobId, retryable, env);

        if (retryable) {
          msg.retry({ delaySeconds: 60 });
        } else {
          await markFailed(msg.body.jobId, error, env);
          msg.ack();
        }
      }
    }
  },
} satisfies ExportedHandler<Env>;

这里最重要的是逐条 ack()retry()。如果整批抛错,已经成功的消息也可能被再次投递;调用外部 AI、写数据库、发邮件这类有副作用的任务,逐条确认更稳。

失败重试该设几次?

max_retries 默认是 3 次,平台上限是 100 次。AI 任务不要把上限当目标;每一次 retry 都会多一次读取操作,也可能重复消耗上游模型额度。

错误类型是否 retry推荐动作成本信号
502 / 503 / 网络短抖退避后重投,保留同一 jobIdretryCount 升高但成功率恢复
429 / 容量不足是,延迟更长降低 consumer 并发或缩小 batchbacklog 上升、lagTime 变长
prompt 过长 / 输入过大切片、压缩或让用户改输入同类任务持续失败
401 / 403暂停 consumer,修 secret 或权限全量任务失败
JSON 解析 / schema 错修 producer payload,再重放受影响任务DLQ 里错误高度一致

我会把自动 retry 上限放在 2-3 次。超过以后进 Dead Letter Queue,由后台按钮或脚本重跑。凌晨上游模型抖动时,最怕的不是失败,而是没有刹车的 retry 把第二天预算烧掉。

DLQ 里要留哪些字段?

Dead Letter Queue 不是垃圾桶。Cloudflare 文档写得很直接:配置了 DLQ,消息达到 consumer 的 retry 上限后会送进 DLQ;没有配置 DLQ,反复失败的消息会被丢弃。DLQ 没有 active consumer 时,消息会保留 4 天。

主消息体至少带 jobIdkindinputRefuserId。状态表里再存 attemptslast_error_classlast_error_messagemodelstarted_atfinished_at。DLQ consumer 只做两件事:把失败写到后台可见列表,必要时给值班人发告警。

不要让 DLQ consumer 自动无限重投主队列。更好的做法是把重跑分成三类:参数修好后批量重跑、单个高价值客户人工重跑、低价值或明显坏输入标记为 failed_final

成本怎么估:Queue 操作数 + Worker CPU + AI 调用?

Queues 的账单按 operation 计,不按「一个 batch」计。Cloudflare 当前口径是每 64KB 写、读、删各算 operation;一条小于 64KB 的消息正常投递通常是 1 次写、1 次读、1 次删。batch 能减少 consumer invocation,不会把 10 条消息变成 1 条消息收费。

成本项2026-05-27 官方口径估算方式
Queues operationFree 每天 10,000 operations;Workers Paid 每月含 1,000,000 operations,超出 $0.40 / 百万messages * 3 + retry_reads + dlq_writes
Message size每 64KB 分块计费,单条消息大小上限 128KB大输入只放引用,不放全文
Worker request / CPUPaid 最低 $5/月,含 10M requests 和 30M CPU ms,超出后请求与 CPU ms 分开计费consumer 每次 invocation 的 CPU 也要算
AI 模型按所选模型的 token / image 单价记录 model、input、output、attempt
存储D1/R2/KV 按各自口径状态表、结果文件、日志分开算

一个小例子:每月 20,000 个 AI job,消息都小于 64KB,95% 一次成功,5% retry 一次,1% 最后进 DLQ。Queues operation 粗算是 20,000 * 3 + 1,000 + 200 = 61,200。这还没算 Workers CPU、D1 写入、R2 结果文件和模型调用,所以 AI SaaS 的大头通常不是队列本身。

后台值班时怎么避免误判?

观测至少看 6 个指标:backlog messages、backlog bytes、lagTime、retryCount、billableOperations、DLQ outcome。Cloudflare Dashboard 能看一部分,GraphQL Analytics API 可以把 backlog、consumer concurrency 和 message operations 拉进自己的监控。

产品侧还要有自己的 job 状态,不然用户只会看到「一直处理中」。我建议状态只保留 5 个:queuedrunningsucceededfailed_retryablefailed_final。状态太多,客服解释不清;状态太少,工程师找不到卡点。

队列事故排查最怕 GitHub Actions 日志、Cloudflare Dashboard、AI provider billing 三个页面各说各话。一个人值班时,把浏览器 profile、2FA 设备、部署入口、回滚命令和 DLQ 重放脚本写进同一份 runbook;它不能替代 token 权限、队列告警和 DLQ 处理,但能减少临场误判。

什么时候不要用 Queues 硬扛?

Cloudflare Queues 的限制要写进设计里。官方 Limits 页面当前列出的关键点包括:单条消息 128KB、最大 consumer batch size 100、sendBatch 最多 100 条或总计 256KB、单队列吞吐 5,000 messages/s、单队列 backlog 25GB、push-based concurrent consumer invocations 250、Queue consumer wall time 15 分钟。

如果你的任务要跑 40 分钟、单输入动辄几 MB、需要强 exactly-once 语义,或者一个 job 会拆出上千个子任务,就不要硬塞进一条 Queue message。把大输入放存储,把长流程拆成多个 step,把需要人工审核的失败留在状态表里。

更具体一点:长报告生成可以切成「抓取 -> 清洗 -> 摘要 -> 合并」四个 job;embedding 重建可以按文档分页;图片批处理可以每张图一个 job,再用数据库聚合进度。Queues 管分发和 retry,业务状态仍然由你自己的表负责。

相关阅读