• 分布式执行引擎ray入门--(2)Ray Data


    目录

    一、overview

    基础代码

    核心API:

    二、核心概念

    2.1 加载数据

    从S3上读

    从本地读: 

     其他读取方式

      读取分布式数据(spark)

     从ML libraries 库中读取(不支持并行读取)

    从sql中读取

    2.2 变换数据

    map

    flat_map

    Transforming batches

    Shuffling rows

    Repartitioning data

    2.3 消费数据

    1) 按行遍历

    2)按batch遍历

    3)遍历batch时shuffle

    4)为分布式并行训练分割数据

    2.4 保存数据

    保存文件

    修改分区数

    将数据转换为python对象

    将数据转换为分布式数据(spark)


    今天来带大家一起来学习下ray中对数据的操作,还是非常简洁的。

    一、overview

     

    基础代码

    1. from typing import Dict
    2. import numpy as np
    3. import ray
    4. # Create datasets from on-disk files, Python objects, and cloud storage like S3.
    5. ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
    6. # Apply functions to transform data. Ray Data executes transformations in parallel.
    7. def compute_area(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    8. length = batch["petal length (cm)"]
    9. width = batch["petal width (cm)"]
    10. batch["petal area (cm^2)"] = length * width
    11. return batch
    12. transformed_ds = ds.map_batches(compute_area)
    13. # Iterate over batches of data.
    14. for batch in transformed_ds.iter_batches(batch_size=4):
    15. print(batch)
    16. # Save dataset contents to on-disk files or cloud storage.
    17. transformed_ds.write_parquet("local:///tmp/iris/")

    使用ray.data可以方便地从硬盘、python对象、S3上读取文件

    最后写入云端

    核心API:

    二、核心概念

    2.1 加载数据

    • 从S3上读

    1. import ray
    2. #加载csv文件
    3. ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
    4. print(ds.schema())
    5. ds.show(limit=1)
    6. #加载parquet文件
    7. ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")
    8. #加载image
    9. ds = ray.data.read_images("s3://anonymous@ray-example-data/batoidea/JPEGImages/")
    10. # Text
    11. ds = ray.data.read_text("s3://anonymous@ray-example-data/this.txt")
    12. # binary
    13. ds = ray.data.read_binary_files("s3://anonymous@ray-example-data/documents")
    14. #tfrecords
    15. ds = ray.data.read_tfrecords("s3://anonymous@ray-example-data/iris.tfrecords")
    • 从本地读: 

    ds = ray.data.read_parquet("local:///tmp/iris.parquet")
    • 处理压缩文件
    1. ds = ray.data.read_csv(
    2. "s3://anonymous@ray-example-data/iris.csv.gz",
    3. arrow_open_stream_args={"compression": "gzip"},
    4. )
    •  其他读取方式

    1. import ray
    2. # 从python对象里获取
    3. ds = ray.data.from_items([
    4. {"food": "spam", "price": 9.34},
    5. {"food": "ham", "price": 5.37},
    6. {"food": "eggs", "price": 0.94}
    7. ])
    8. ds = ray.data.from_items([1, 2, 3, 4, 5])
    9. # 从numpy里获取
    10. array = np.ones((3, 2, 2))
    11. ds = ray.data.from_numpy(array)
    12. # 从pandas里获取
    13. df = pd.DataFrame({
    14. "food": ["spam", "ham", "eggs"],
    15. "price": [9.34, 5.37, 0.94]
    16. })
    17. ds = ray.data.from_pandas(df)
    18. # 从py arrow里获取
    19. table = pa.table({
    20. "food": ["spam", "ham", "eggs"],
    21. "price": [9.34, 5.37, 0.94]
    22. })
    23. ds = ray.data.from_arrow(table)
    1. import ray
    2. import raydp
    3. spark = raydp.init_spark(app_name="Spark -> Datasets Example",
    4. num_executors=2,
    5. executor_cores=2,
    6. executor_memory="500MB")
    7. df = spark.createDataFrame([(i, str(i)) for i in range(10000)], ["col1", "col2"])
    8. ds = ray.data.from_spark(df)
    9. ds.show(3)

     从ML libraries 库中读取(不支持并行读取)

    1. import ray.data
    2. from datasets import load_dataset
    3. # 从huggingface里读取(不支持并行读取)
    4. hf_ds = load_dataset("wikitext", "wikitext-2-raw-v1")
    5. ray_ds = ray.data.from_huggingface(hf_ds["train"])
    6. ray_ds.take(2)
    7. # 从TensorFlow中读取(不支持并行读取)
    8. import ray
    9. import tensorflow_datasets as tfds
    10. tf_ds, _ = tfds.load("cifar10", split=["train", "test"])
    11. ds = ray.data.from_tf(tf_ds)
    12. print(ds)

    sql中读取

    1. import mysql.connector
    2. import ray
    3. def create_connection():
    4. return mysql.connector.connect(
    5. user="admin",
    6. password=...,
    7. host="example-mysql-database.c2c2k1yfll7o.us-west-2.rds.amazonaws.com",
    8. connection_timeout=30,
    9. database="example",
    10. )
    11. # Get all movies
    12. dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
    13. # Get movies after the year 1980
    14. dataset = ray.data.read_sql(
    15. "SELECT title, score FROM movie WHERE year >= 1980", create_connection
    16. )
    17. # Get the number of movies per year
    18. dataset = ray.data.read_sql(
    19. "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
    20. )

    Ray还支持从BigQuery和MongoDB中读取,篇幅问题,不赘述了。

    2.2 变换数据

    变换默认是lazy,直到遍历、保存、检视数据集时才执行

    map

    1. import os
    2. from typing import Any, Dict
    3. import ray
    4. def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
    5. row["filename"] = os.path.basename(row["path"])
    6. return row
    7. ds = (
    8. ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple", include_paths=True)
    9. .map(parse_filename)
    10. )

    flat_map

    1. from typing import Any, Dict, List
    2. import ray
    3. def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:
    4. return [row] * 2
    5. print(
    6. ray.data.range(3)
    7. .flat_map(duplicate_row)
    8. .take_all()
    9. )
    10. # 结果:
    11. # [{'id': 0}, {'id': 0}, {'id': 1}, {'id': 1}, {'id': 2}, {'id': 2}]
    12. # 原先的元素都变成2

    Transforming batches

    1. from typing import Dict
    2. import numpy as np
    3. import ray
    4. def increase_brightness(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    5. batch["image"] = np.clip(batch["image"] + 4, 0, 255)
    6. return batch
    7. # batch_format:指定batch类型,可不加
    8. ds = (
    9. ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
    10. .map_batches(increase_brightness, batch_format="numpy")
    11. )

    如果初始化较贵,使用类而不是函数,这样每次调用类的时候,进行初始化。类有状态,而函数没有状态。

    并行度可以指定(min,max)来自由调整

    Shuffling rows

    1. import ray
    2. ds = (
    3. ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
    4. .random_shuffle()
    5. )

    Repartitioning data

    1. import ray
    2. ds = ray.data.range(10000, parallelism=1000)
    3. # Repartition the data into 100 blocks. Since shuffle=False, Ray Data will minimize
    4. # data movement during this operation by merging adjacent blocks.
    5. ds = ds.repartition(100, shuffle=False).materialize()
    6. # Repartition the data into 200 blocks, and force a full data shuffle.
    7. # This operation will be more expensive
    8. ds = ds.repartition(200, shuffle=True).materialize()

    2.3 消费数据

    1) 按行遍历

    1. import ray
    2. ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
    3. for row in ds.iter_rows():
    4. print(row)

    2)按batch遍历

    numpy、pandas、torch、tf使用不同的API遍历batch

    1. # numpy
    2. import ray
    3. ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
    4. for batch in ds.iter_batches(batch_size=2, batch_format="numpy"):
    5. print(batch)
    6. # pandas
    7. import ray
    8. ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
    9. for batch in ds.iter_batches(batch_size=2, batch_format="pandas"):
    10. print(batch)
    11. # torch
    12. import ray
    13. ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
    14. for batch in ds.iter_torch_batches(batch_size=2):
    15. print(batch)
    16. # tf
    17. import ray
    18. ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
    19. tf_dataset = ds.to_tf(
    20. feature_columns="sepal length (cm)",
    21. label_columns="target",
    22. batch_size=2
    23. )
    24. for features, labels in tf_dataset:
    25. print(features, labels)

    3)遍历batch时shuffle

    只需要在遍历batch时增加local_shuffle_buffer_size参数即可。

    非全局洗牌,但性能更好。

    1. import ray
    2. ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
    3. for batch in ds.iter_batches(
    4. batch_size=2,
    5. batch_format="numpy",
    6. local_shuffle_buffer_size=250,
    7. ):
    8. print(batch)

    4)为分布式并行训练分割数据

    1. import ray
    2. @ray.remote
    3. class Worker:
    4. def train(self, data_iterator):
    5. for batch in data_iterator.iter_batches(batch_size=8):
    6. pass
    7. ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
    8. workers = [Worker.remote() for _ in range(4)]
    9. shards = ds.streaming_split(n=4, equal=True)
    10. ray.get([w.train.remote(s) for w, s in zip(workers, shards)])

    2.4 保存数据

    保存文件

    非常类似pandas保存文件,唯一的区别保存本地文件时需要加入local://前缀。

    注意:如果不加local://前缀,ray则会将不同分区的数据写在不同节点上

    1. import ray
    2. ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
    3. # local
    4. ds.write_parquet("local:///tmp/iris/")
    5. # s3
    6. ds.write_parquet("s3://my-bucket/my-folder")

    修改分区数

    1. import os
    2. import ray
    3. ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
    4. ds.repartition(2).write_csv("/tmp/two_files/")
    5. print(os.listdir("/tmp/two_files/"))

    将数据转换为python对象

    1. import ray
    2. ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
    3. df = ds.to_pandas()
    4. print(df)

    将数据转换为分布式数据(spark

    1. import ray
    2. import raydp
    3. spark = raydp.init_spark(
    4. app_name = "example",
    5. num_executors = 1,
    6. executor_cores = 4,
    7. executor_memory = "512M"
    8. )
    9. ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
    10. df = ds.to_spark(spark)

  • 相关阅读:
    【C++】基础强训
    springboot+vue学习用品商店商城系统java毕业设计ucozu
    Python中dataframe.groupby()根据数据属性对数据分组
    sql语句实现查询
    Android Handler 机制解析
    uni-app:如何配置uni.request请求的超时响应时间(全局+局部)
    2023/9/13 -- C++/QT
    【 C++ 】unordered_map和unordered_set的介绍和使用
    Java学习笔记 --- 注解
    零基础Linux_14(基础IO_文件)缓冲区+文件系统inode等
  • 原文地址:https://blog.csdn.net/qq_17246605/article/details/136601935