[ PROMPT_NODE_22252 ]
ray-data
[ SKILL_DOCUMENTATION ]
# Ray Data - 可扩展的机器学习数据处理
用于机器学习和 AI 工作负载的分布式数据处理库。
## 何时使用 Ray Data
**在以下情况使用 Ray Data:**
- 为机器学习训练处理大型数据集 (>100GB)
- 需要在集群上进行分布式数据预处理
- 构建批处理推理工作流
- 加载多模态数据(图像、音频、视频)
- 将数据处理从笔记本电脑扩展到集群
**主要特性**:
- **流式执行**:处理超过内存大小的数据
- **GPU 支持**:使用 GPU 加速转换
- **框架集成**:PyTorch, TensorFlow, HuggingFace
- **多模态**:图像、Parquet、CSV、JSON、音频、视频
**可替代方案**:
- **Pandas**:单机上的小数据 (<1GB)
- **Dask**:表格数据,类 SQL 操作
- **Spark**:企业级 ETL,SQL 查询
## 快速入门
### 安装
bash
pip install -U 'ray[data]'
### 加载与转换数据
python
import ray
# 读取 Parquet 文件
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")
# 转换数据(惰性执行)
ds = ds.map_batches(lambda batch: {"processed": batch["text"].str.lower()})
# 消费数据
for batch in ds.iter_batches(batch_size=100):
print(batch)
### 与 Ray Train 集成
python
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
# 创建数据集
train_ds = ray.data.read_parquet("s3://bucket/train/*.parquet")
def train_func(config):
# 在训练中访问数据集
train_ds = ray.train.get_dataset_shard("train")
for epoch in range(10):
for batch in train_ds.iter_batches(batch_size=32):
# 在批次上进行训练
pass
# 使用 Ray 进行训练
trainer = TorchTrainer(
train_func,
datasets={"train": train_ds},
scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
)
trainer.fit()
## 读取数据
### 从云存储读取
python
import ray
# Parquet (机器学习推荐)
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")
# CSV
ds = ray.data.read_csv("s3://bucket/data/*.csv")
# JSON
ds = ray.data.read_json("gs://bucket/data/*.json")
# 图像
ds = ray.data.read_images("s3://bucket/images/")
### 从 Python 对象读取
python
# 从列表
ds = ray.data.from_items([{"id": i, "value": i * 2} for i in range(1000)])
# 从范围
ds = ray.data.range(1000000) # 合成数据
# 从 pandas
import pandas as pd
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]})
ds = ray.data.from_pandas(df)
## 转换