[ PROMPT_NODE_26426 ]
schedulers
[ SKILL_DOCUMENTATION ]
# Dask 调度器
## 概述
Dask 提供了多种任务调度器,每种都适用于不同的工作负载。调度器决定了任务如何执行:顺序执行、并行线程执行、并行进程执行或跨集群分布式执行。
## 调度器类型
### 单机调度器
#### 1. 本地线程(默认)
**描述**:线程调度器使用本地 `concurrent.futures.ThreadPoolExecutor` 执行计算。
**何时使用**:
- NumPy, Pandas, scikit-learn 中的数值计算
- 释放 GIL(全局解释器锁)的库
- 从共享内存访问中受益的操作
- Dask Arrays 和 DataFrames 的默认设置
**特性**:
- 低开销
- 线程间共享内存
- 最适合释放 GIL 的操作
- 不适合纯 Python 代码(GIL 争用)
**示例**:
python
import dask.array as da
# 默认使用线程
x = da.random.random((10000, 10000), chunks=(1000, 1000))
result = x.mean().compute() # 使用线程计算
**显式配置**:
python
import dask
# 全局设置
dask.config.set(scheduler='threads')
# 或针对单次计算
result = x.mean().compute(scheduler='threads')
#### 2. 本地进程
**描述**:使用 `concurrent.futures.ProcessPoolExecutor` 的多进程调度器。
**何时使用**:
- 存在 GIL 争用的纯 Python 代码
- 文本处理和 Python 集合
- 从进程隔离中受益的操作
- CPU 密集型 Python 代码
**特性**:
- 绕过 GIL 限制
- 产生进程间数据传输成本
- 比线程开销更高
- 适用于输入/输出较小的线性工作流
**示例**:
python
import dask.bag as db
# 适用于 Python 对象处理
bag = db.read_text('data/*.txt')
result = bag.map(complex_python_function).compute(scheduler='processes')
**显式配置**:
python
import dask
# 全局设置
dask.config.set(scheduler='processes')
# 或针对单次计算
result = computation.compute(scheduler='processes')
**限制**:
- 数据必须可序列化 (pickle)
- 进程创建带来的开销
- 数据复制带来的内存开销
#### 3. 单线程(同步)
**描述**:单线程同步调度器在本地线程中执行所有计算,没有任何并行性。
**何时使用**:
- 使用 pdb 进行调试
- 使用标准 Python 工具进行性能分析
- 详细了解错误
- 开发和测试
**特性**:
- 无并行性
-