[ PROMPT_NODE_26424 ]
futures
[ SKILL_DOCUMENTATION ]
# Dask Futures
## 概述
Dask futures 扩展了 Python 的 `concurrent.futures` 接口,支持即时(非惰性)的任务执行。与延迟计算(用于 DataFrames、Arrays 和 Bags)不同,futures 在计算随时间演变或需要动态构建工作流的情况下提供了更大的灵活性。
## 核心概念
Futures 代表实时任务执行:
- 任务在提交时立即执行(非惰性)
- 每个 future 代表一个远程计算结果
- 在 futures 之间自动进行依赖跟踪
- 支持动态、演进式的工作流
- 直接控制任务调度和数据放置
## 关键能力
### 实时执行
- 任务提交后立即运行
- 无需显式调用 `.compute()`
- 使用 `.result()` 方法获取结果
### 自动依赖管理
当你提交带有 future 输入的任务时,Dask 会自动处理依赖跟踪。一旦所有输入 futures 完成,它们将被移动到单个 worker 上进行高效计算。
### 动态工作流
构建基于中间结果演变的计算:
- 基于先前结果提交新任务
- 条件执行路径
- 具有不同结构的迭代算法
## 何时使用 Futures
**在以下情况使用 Futures**:
- 构建动态、演进式的工作流
- 需要即时任务执行(非惰性)
- 计算依赖于运行时条件
- 需要精细控制任务放置
- 实现自定义并行算法
- 需要有状态计算(使用 actors)
**在以下情况使用其他集合**:
- 静态、预定义的计算图(使用 delayed, DataFrames, Arrays)
- 大规模集合上的简单数据并行(使用 Bags, DataFrames)
- 标准数组/数据框操作已足够
## 设置 Client
Futures 需要一个分布式 client:
python
from dask.distributed import Client
# 本地集群(单机)
client = Client()
# 或指定资源
client = Client(n_workers=4, threads_per_worker=2)
# 或连接到现有集群
client = Client('scheduler-address:8786')
## 提交任务
### 基本提交
python
from dask.distributed import Client
client = Client()
# 提交单个任务
def add(x, y):
return x + y
future = client.submit(add, 1, 2)
# 获取结果
result = future.result() # 阻塞直到完成
print(result) # 3
### 多个任务
python
# 提交多个独立任务
futures = []
for i in range(10):
future = client.submit(add,