[ PROMPT_NODE_24198 ]
Pipelines API 参考
[ SKILL_DOCUMENTATION ]
# Pipelines API 参考
## 管道绑定接口
typescript
// 来自 @cloudflare/workers-types
interface Pipeline {
send(data: object | object[]): Promise;
}
interface Env {
STREAM: Pipeline;
}
export default {
async fetch(request: Request, env: Env): Promise {
// send() 返回 Promise - 无结果数据
await env.STREAM.send([event]);
return new Response('OK');
}
} satisfies ExportedHandler;
**关键点:**
- `send()` 接受单个对象或数组
- 始终返回 `Promise`(无确认数据)
- 网络/验证错误时抛出异常(请使用 try/catch 包装)
- 使用 `ctx.waitUntil()` 实现即发即弃模式
## 写入事件
### 单个事件
typescript
await env.STREAM.send([{
user_id: "12345",
event_type: "purchase",
product_id: "widget-001",
amount: 29.99
}]);
### 批量事件
typescript
const events = [
{ user_id: "user1", event_type: "view" },
{ user_id: "user2", event_type: "purchase", amount: 50 }
];
await env.STREAM.send(events);
**限制:**
- 每个请求最大 1 MB
- 每个流 5 MB/s
### 即发即弃模式
typescript
export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise {
const event = { /* ... */ };
// 不阻塞响应发送
ctx.waitUntil(env.STREAM.send([event]));
return new Response('OK');
}
};
### 错误处理
typescript
try {
await env.STREAM.send([event]);
} catch (error) {
console.error('Pipeline 发送失败:', error);
// 记录到其他系统、重试或返回错误响应
return new Response('事件追踪失败', { status: 500 });
}
## HTTP 摄取 API
### 端点格式
https://{stream-id}.ingest.cloudflare.com
通过以下命令获取 `{stream-id}`: `npx wrangler pipelines streams list`
### 请求格式
**关键:** 必须发送数组,不能是单个对象
bash
# ✅ 正确
curl -X POST https://{stream-id}.ingest.cloudflare.com
-H "Content-Type: application/json"
-d '[{"user_id": "123", "event_type": "purchase"}]'
# ❌ 错误 - 将会失败
curl -X POST https://{stream-id}.ingest.cloudflare.com
-H "Content-Type: application/json"
-d '{"user_id": "123", "event_type": "purchase"}'
### 身份验证
bash
curl -X POST https://{stream-id}.ingest.cloudflare.com
-H "Content-Type: application/json"
-H "Authorization: Bearer YOUR_API_TOKEN"
-d '[{"event": "data"}]'
**所需权限:** Workers Pipeline Send