[ PROMPT_NODE_24534 ]
flow_patterns
[ SKILL_DOCUMENTATION ]
# CocoIndex 工作流模式
本参考指南提供了构建 CocoIndex 工作流的常见模式和示例。
## 基础工作流模式
python
import cocoindex
@cocoindex.flow_def(name="FlowName")
def my_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
# 1. 导入源数据
data_scope["source_data"] = flow_builder.add_source(...)
# 2. 创建输出收集器
my_collector = data_scope.add_collector()
# 3. 转换数据
with data_scope["source_data"].row() as item:
item["transformed"] = item["field"].transform(...)
my_collector.collect(...)
# 4. 导出到目标
my_collector.export("target_name", ..., primary_key_fields=[...])
## 常见工作流模式
### 模式 1:简单文本嵌入
将本地文件中的文档嵌入到向量数据库中。
python
@cocoindex.flow_def(name="TextEmbedding")
def text_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
# 导入文档
data_scope["documents"] = flow_builder.add_source(
cocoindex.sources.LocalFile(path="documents")
)
doc_embeddings = data_scope.add_collector()
with data_scope["documents"].row() as doc:
# 分块处理
doc["chunks"] = doc["content"].transform(
cocoindex.functions.SplitRecursively(),
language="markdown",
chunk_size=2000,
chunk_overlap=500
)
with doc["chunks"].row() as chunk:
# 嵌入每个分块
chunk["embedding"] = chunk["text"].transform(
cocoindex.functions.SentenceTransformerEmbed(
model="sentence-transformers/all-MiniLM-L6-v2"
)
)
doc_embeddings.collect(
id=cocoindex.GeneratedField.UUID,
filename=doc["filename"],
text=chunk["text"],
embedding=chunk["embedding"]
)
# 导出到带有向量索引的 Postgres
doc_embeddings.export(
"doc_embeddings",
cocoindex.targets.Postgres(),
primary_key_fields=["id"],
vector_indexes=[
cocoindex.VectorIndexDef(
field_name="embedding",
metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY
)
]
)
### 模式 2:带语言检测的代码嵌入
python
@cocoindex.flow_def(name="CodeEmbedding")
def code_embedding_flow(flow_bui