[ PROMPT_NODE_24224 ]
Queues 设计模式
[ SKILL_DOCUMENTATION ]
# 队列模式与最佳实践
## 异步任务处理
typescript
// 生产者:接收请求,将任务加入队列
export default {
async fetch(request: Request, env: Env): Promise {
const { userId, reportType } = await request.json();
await env.REPORT_QUEUE.send({ userId, reportType, requestedAt: Date.now() });
return Response.json({ message: 'Report queued', status: 'pending' });
}
};
// 消费者:处理报告
export default {
async queue(batch: MessageBatch, env: Env): Promise {
for (const msg of batch.messages) {
const { userId, reportType } = msg.body;
const report = await generateReport(userId, reportType, env);
await env.REPORTS_BUCKET.put(`${userId}/${reportType}.pdf`, report);
msg.ack();
}
}
};
## 缓冲 API 调用
typescript
// 生产者:将日志条目加入队列
ctx.waitUntil(env.LOGS_QUEUE.send({
method: request.method,
url: request.url,
timestamp: Date.now()
}));
// 消费者:批量写入外部 API
async queue(batch: MessageBatch, env: Env): Promise {
const logs = batch.messages.map(m => m.body);
await fetch(env.LOG_ENDPOINT, { method: 'POST', body: JSON.stringify({ logs }) });
batch.ackAll();
}
## 上游限流
typescript
async queue(batch: MessageBatch, env: Env): Promise {
for (const msg of batch.messages) {
try {
await callRateLimitedAPI(msg.body);
msg.ack();
} catch (error) {
if (error.status === 429) {
const retryAfter = parseInt(error.headers.get('Retry-After') || '60');
msg.retry({ delaySeconds: retryAfter });
} else throw error;
}
}
}
## 事件驱动工作流
typescript
// R2 事件 → 队列 → Worker
export default {
async queue(batch: MessageBatch, env: Env): Promise {
for (const msg of batch.messages) {
const event = msg.body;
if (event.action === 'PutObject') {
await processNewFile(event.object.key, env);
} else if (event.action === 'DeleteObject') {
await cleanupReferences(event.object.key, env);
}
msg.ack();
}
}
};
## 死信队列模式
typescript
// 主队列:达到 max_retries 后,自动进入死信队列 (DLQ)
export default {
async queue(batch: MessageBatch, env: Env): Promise {
for (const msg of batch.messages) {
try {
await riskyOperation(msg.body);
msg.ack();
} catch (error) {
console.error(`Failed after ${msg.attempts}