[ PROMPT_NODE_22202 ]
Agents Crewai 流程
[ SKILL_DOCUMENTATION ]
# CrewAI Flows 指南
## 概述
Flows(工作流)提供了事件驱动的编排能力,支持对执行路径、状态管理和条件分支的精确控制。当你需要比 Crew(智能体组)更细粒度的控制时,请使用 Flows。
## 何时使用 Flows 与 Crews
| 场景 | 使用 Crews | 使用 Flows |
|----------|-----------|-----------|
| 简单的多智能体协作 | ✅ | |
| 顺序/层级化任务 | ✅ | |
| 条件分支 | | ✅ |
| 复杂状态管理 | | ✅ |
| 事件驱动的工作流 | | ✅ |
| 混合模式(在 Flow 步骤中嵌入 Crews) | | ✅ |
## Flow 基础
### 创建 Flow
python
from crewai.flow.flow import Flow, listen, start, router, or_, and_
from pydantic import BaseModel
# 定义状态模型
class MyState(BaseModel):
counter: int = 0
data: str = ""
results: list = []
# 创建带有类型化状态的 Flow
class MyFlow(Flow[MyState]):
@start()
def initialize(self):
"""入口点 - 最先执行"""
self.state.counter = 1
return {"initialized": True}
@listen(initialize)
def process(self, data):
"""在 initialize 完成后运行"""
self.state.counter += 1
return f"Processed: {data}"
# 运行 Flow
flow = MyFlow()
result = flow.kickoff()
print(flow.state.counter) # 访问最终状态
### Flow 装饰器
#### @start() - 入口点
python
@start()
def begin(self):
"""最先执行的方法"""
return {"status": "started"}
# 多个入口点(并行运行)
@start()
def start_a(self):
return "A"
@start()
def start_b(self):
return "B"
#### @listen() - 事件触发器
python
# 监听单个方法
@listen(initialize)
def after_init(self, result):
"""在 initialize 完成时运行"""
return process(result)
# 监听字符串名称
@listen("high_confidence")
def handle_high(self):
"""当路由返回 'high_confidence' 时运行"""
pass
#### @router() - 条件分支
python
@router(analyze)
def decide_path(self):
"""返回字符串以路由到特定的监听器"""
if self.state.confidence > 0.8:
return "high_confidence"
elif self.state.confidence > 0.5:
return "medium_confidence"
return "low_confidence"
@listen("high_confidence")
def handle_high(self):
pass
@listen("medium_confidence")
def handle_medium(self):
pass
@listen("low_confidence")
def handle_low(self):
pass
#### or_() 和 and_() - 条件组合
python
from crewai.flow.flow import or_, and_
# 当满足任一条件时触发
@listen(or_("success", "partial_success"))
def handle_any_success(self):
pass
# 当同时满足两个条件时触发
@listen(and_(task_a, task_b))
def after_both_complete(self):
pass
## 状态管理
### Pydantic 状态模型
python
from pydantic import BaseModel, Field
from typing import Optional
class WorkflowState(BaseModel):
# 必填字段
input_data: str
# 可选字段及默认值
processed: bool = False