From 9c0836c8d07be3efc4e82246ec06fbf7bdfbcdec Mon Sep 17 00:00:00 2001 From: Remi Cadene Date: Sat, 19 Apr 2025 19:27:14 +0530 Subject: [PATCH] Remove legacy from datasets/utils.py --- lerobot/common/datasets/utils.py | 120 +----------------- .../v30/convert_dataset_v21_to_v30.py | 38 ++++-- 2 files changed, 33 insertions(+), 125 deletions(-) diff --git a/lerobot/common/datasets/utils.py b/lerobot/common/datasets/utils.py index 487e4331..e8f76b14 100644 --- a/lerobot/common/datasets/utils.py +++ b/lerobot/common/datasets/utils.py @@ -27,7 +27,6 @@ from types import SimpleNamespace from typing import Any, Tuple import datasets -import jsonlines import numpy as np import packaging.version import pandas @@ -53,15 +52,6 @@ from lerobot.configs.types import DictLike, FeatureType, PolicyFeature DEFAULT_CHUNK_SIZE = 1000 # Max number of files per chunk DEFAULT_FILE_SIZE_IN_MB = 500.0 # Max size per file -# Keep legacy for `convert_dataset_v21_to_v30.py` -LEGACY_EPISODES_PATH = "meta/episodes.jsonl" -LEGACY_EPISODES_STATS_PATH = "meta/episodes_stats.jsonl" -LEGACY_TASKS_PATH = "meta/tasks.jsonl" -LEGACY_DEFAULT_VIDEO_PATH = "videos/chunk-{episode_chunk:03d}/{video_key}/episode_{episode_index:06d}.mp4" -LEGACY_DEFAULT_PARQUET_PATH = "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet" - -DEFAULT_IMAGE_PATH = "images/{image_key}/episode-{episode_index:06d}/frame-{frame_index:06d}.png" - INFO_PATH = "meta/info.json" STATS_PATH = "meta/stats.json" @@ -74,6 +64,7 @@ DEFAULT_TASKS_PATH = "meta/tasks.parquet" DEFAULT_EPISODES_PATH = EPISODES_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet" DEFAULT_DATA_PATH = DATA_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet" DEFAULT_VIDEO_PATH = VIDEO_DIR + "/{video_key}/" + CHUNK_FILE_PATTERN + ".mp4" +DEFAULT_IMAGE_PATH = "images/{image_key}/episode-{episode_index:06d}/frame-{frame_index:06d}.png" DATASET_CARD_TEMPLATE = """ --- @@ -105,19 +96,11 @@ def get_hf_dataset_size_in_mb(hf_ds: Dataset) -> int: def get_pd_dataframe_size_in_mb(df: pandas.DataFrame) -> int: + # TODO(rcadene): unused? memory_usage_bytes = df.memory_usage(deep=True).sum() return memory_usage_bytes / (1024**2) -def get_chunk_file_indices(path: Path) -> Tuple[int, int]: - if not path.stem.startswith("file-") or not path.parent.name.startswith("chunk-"): - raise ValueError(f"Path does not follow {CHUNK_FILE_PATTERN}: '{path}'") - - chunk_index = int(path.parent.replace("chunk-", "")) - file_index = int(path.stem.replace("file-", "")) - return chunk_index, file_index - - def update_chunk_file_indices(chunk_idx: int, file_idx: int, chunks_size: int): if file_idx == chunks_size - 1: file_idx = 0 @@ -142,14 +125,6 @@ def load_nested_dataset(pq_dir: Path) -> Dataset: ) -def get_latest_parquet_path(pq_dir: Path) -> Path: - return sorted(pq_dir.glob("*/*.parquet"))[-1] - - -def get_latest_video_path(pq_dir: Path, video_key: str) -> Path: - return sorted(pq_dir.glob(f"{video_key}/*/*.mp4"))[-1] - - def get_parquet_num_frames(parquet_path): metadata = pq.read_metadata(parquet_path) return metadata.num_rows @@ -161,7 +136,8 @@ def get_video_size_in_mb(mp4_path: Path): return file_size_mb -def concat_video_files(paths_to_cat, root, video_key, chunk_idx, file_idx): +def concat_video_files(paths_to_cat: list[Path], root: Path, video_key: str, chunk_idx: int, file_idx: int): + # TODO(rcadene): add docstring tmp_dir = Path(tempfile.mkdtemp(dir=root)) # Create a text file with the list of files to concatenate path_concat_video_files = tmp_dir / "concat_video_files.txt" @@ -244,18 +220,6 @@ def unflatten_dict(d: dict, sep: str = "/") -> dict: return outdict -def get_nested_item(obj: DictLike, flattened_key: str, sep: str = "/") -> Any: - split_keys = flattened_key.split(sep) - getter = obj[split_keys[0]] - if len(split_keys) == 1: - return getter - - for key in split_keys[1:]: - getter = getter[key] - - return getter - - def serialize_dict(stats: dict[str, torch.Tensor | np.ndarray | dict]) -> dict: serialized_dict = {} for key, value in flatten_dict(stats).items(): @@ -285,58 +249,34 @@ def load_json(fpath: Path) -> Any: with open(fpath) as f: return json.load(f) - def write_json(data: dict, fpath: Path) -> None: fpath.parent.mkdir(exist_ok=True, parents=True) with open(fpath, "w") as f: json.dump(data, f, indent=4, ensure_ascii=False) - -def load_jsonlines(fpath: Path) -> list[Any]: - with jsonlines.open(fpath, "r") as reader: - return list(reader) - - -def write_jsonlines(data: dict, fpath: Path) -> None: - fpath.parent.mkdir(exist_ok=True, parents=True) - with jsonlines.open(fpath, "w") as writer: - writer.write_all(data) - - -def append_jsonlines(data: dict, fpath: Path) -> None: - fpath.parent.mkdir(exist_ok=True, parents=True) - with jsonlines.open(fpath, "a") as writer: - writer.write(data) - - def write_info(info: dict, local_dir: Path): write_json(info, local_dir / INFO_PATH) - def load_info(local_dir: Path) -> dict: info = load_json(local_dir / INFO_PATH) for ft in info["features"].values(): ft["shape"] = tuple(ft["shape"]) return info - def write_stats(stats: dict, local_dir: Path): serialized_stats = serialize_dict(stats) write_json(serialized_stats, local_dir / STATS_PATH) - def cast_stats_to_numpy(stats) -> dict[str, dict[str, np.ndarray]]: stats = {key: np.array(value) for key, value in flatten_dict(stats).items()} return unflatten_dict(stats) - def load_stats(local_dir: Path) -> dict[str, dict[str, np.ndarray]]: if not (local_dir / STATS_PATH).exists(): return None stats = load_json(local_dir / STATS_PATH) return cast_stats_to_numpy(stats) - def write_hf_dataset(hf_dataset: Dataset, local_dir: Path): if get_hf_dataset_size_in_mb(hf_dataset) > DEFAULT_FILE_SIZE_IN_MB: raise NotImplementedError("Contact a maintainer.") @@ -352,30 +292,12 @@ def write_tasks(tasks: pandas.DataFrame, local_dir: Path): tasks.to_parquet(path) -def legacy_write_task(task_index: int, task: dict, local_dir: Path): - task_dict = { - "task_index": task_index, - "task": task, - } - append_jsonlines(task_dict, local_dir / LEGACY_TASKS_PATH) - - -def legacy_load_tasks(local_dir: Path) -> tuple[dict, dict]: - tasks = load_jsonlines(local_dir / LEGACY_TASKS_PATH) - tasks = {item["task_index"]: item["task"] for item in sorted(tasks, key=lambda x: x["task_index"])} - task_to_task_index = {task: task_index for task_index, task in tasks.items()} - return tasks, task_to_task_index - def load_tasks(local_dir: Path): tasks = pd.read_parquet(local_dir / DEFAULT_TASKS_PATH) return tasks -def write_episode(episode: dict, local_dir: Path): - append_jsonlines(episode, local_dir / LEGACY_EPISODES_PATH) - - def write_episodes(episodes: Dataset, local_dir: Path): if get_hf_dataset_size_in_mb(episodes) > DEFAULT_FILE_SIZE_IN_MB: raise NotImplementedError("Contact a maintainer.") @@ -385,45 +307,11 @@ def write_episodes(episodes: Dataset, local_dir: Path): episodes.to_parquet(fpath) -def legacy_load_episodes(local_dir: Path) -> dict: - episodes = load_jsonlines(local_dir / LEGACY_EPISODES_PATH) - return {item["episode_index"]: item for item in sorted(episodes, key=lambda x: x["episode_index"])} - - def load_episodes(local_dir: Path): hf_dataset = load_nested_dataset(local_dir / EPISODES_DIR) return hf_dataset -def legacy_write_episode_stats(episode_index: int, episode_stats: dict, local_dir: Path): - # We wrap episode_stats in a dictionary since `episode_stats["episode_index"]` - # is a dictionary of stats and not an integer. - episode_stats = {"episode_index": episode_index, "stats": serialize_dict(episode_stats)} - append_jsonlines(episode_stats, local_dir / LEGACY_EPISODES_STATS_PATH) - - -# def write_episodes_stats(episodes_stats: Dataset, local_dir: Path): -# if get_hf_dataset_size_in_mb(episodes_stats) > DEFAULT_FILE_SIZE_IN_MB: -# raise NotImplementedError("Contact a maintainer.") - -# fpath = local_dir / DEFAULT_EPISODES_STATS_PATH.format(chunk_index=0, file_index=0) -# fpath.parent.mkdir(parents=True, exist_ok=True) -# episodes_stats.to_parquet(fpath) - - -def legacy_load_episodes_stats(local_dir: Path) -> dict: - episodes_stats = load_jsonlines(local_dir / LEGACY_EPISODES_STATS_PATH) - return { - item["episode_index"]: cast_stats_to_numpy(item["stats"]) - for item in sorted(episodes_stats, key=lambda x: x["episode_index"]) - } - - -# def load_episodes_stats(local_dir: Path): -# hf_dataset = load_nested_dataset(local_dir / EPISODES_STATS_DIR) -# return hf_dataset - - def backward_compatible_episodes_stats( stats: dict[str, dict[str, np.ndarray]], episodes: list[int] ) -> dict[str, dict[str, np.ndarray]]: diff --git a/lerobot/common/datasets/v30/convert_dataset_v21_to_v30.py b/lerobot/common/datasets/v30/convert_dataset_v21_to_v30.py index 83a6145c..66137122 100644 --- a/lerobot/common/datasets/v30/convert_dataset_v21_to_v30.py +++ b/lerobot/common/datasets/v30/convert_dataset_v21_to_v30.py @@ -20,7 +20,9 @@ python lerobot/common/datasets/v30/convert_dataset_v21_to_v30.py \ import argparse import shutil from pathlib import Path +from typing import Any +import jsonlines import pandas as pd import tqdm from datasets import Dataset @@ -34,6 +36,7 @@ from lerobot.common.datasets.utils import ( DEFAULT_DATA_PATH, DEFAULT_FILE_SIZE_IN_MB, DEFAULT_VIDEO_PATH, + cast_stats_to_numpy, concat_video_files, flatten_dict, get_parquet_file_size_in_mb, @@ -44,6 +47,7 @@ from lerobot.common.datasets.utils import ( legacy_load_episodes_stats, legacy_load_tasks, load_info, + serialize_dict, update_chunk_file_indices, write_episodes, write_info, @@ -51,6 +55,12 @@ from lerobot.common.datasets.utils import ( write_tasks, ) +LEGACY_EPISODES_PATH = "meta/episodes.jsonl" +LEGACY_EPISODES_STATS_PATH = "meta/episodes_stats.jsonl" +LEGACY_TASKS_PATH = "meta/tasks.jsonl" +LEGACY_DEFAULT_VIDEO_PATH = "videos/chunk-{episode_chunk:03d}/{video_key}/episode_{episode_index:06d}.mp4" +LEGACY_DEFAULT_PARQUET_PATH = "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet" + V21 = "v2.1" @@ -97,16 +107,26 @@ meta/info.json """ -# def generate_flat_ep_stats(episodes_stats): -# for ep_idx, ep_stats in episodes_stats.items(): -# flat_ep_stats = flatten_dict(ep_stats) -# flat_ep_stats["episode_index"] = ep_idx -# yield flat_ep_stats +def load_jsonlines(fpath: Path) -> list[Any]: + with jsonlines.open(fpath, "r") as reader: + return list(reader) -# def convert_episodes_stats(root, new_root): -# episodes_stats = legacy_load_episodes_stats(root) -# ds_episodes_stats = Dataset.from_generator(lambda: generate_flat_ep_stats(episodes_stats)) -# write_episodes_stats(ds_episodes_stats, new_root) +def legacy_load_episodes(local_dir: Path) -> dict: + episodes = load_jsonlines(local_dir / LEGACY_EPISODES_PATH) + return {item["episode_index"]: item for item in sorted(episodes, key=lambda x: x["episode_index"])} + +def legacy_load_episodes_stats(local_dir: Path) -> dict: + episodes_stats = load_jsonlines(local_dir / LEGACY_EPISODES_STATS_PATH) + return { + item["episode_index"]: cast_stats_to_numpy(item["stats"]) + for item in sorted(episodes_stats, key=lambda x: x["episode_index"]) + } + +def legacy_load_tasks(local_dir: Path) -> tuple[dict, dict]: + tasks = load_jsonlines(local_dir / LEGACY_TASKS_PATH) + tasks = {item["task_index"]: item["task"] for item in sorted(tasks, key=lambda x: x["task_index"])} + task_to_task_index = {task: task_index for task_index, task in tasks.items()} + return tasks, task_to_task_index def convert_tasks(root, new_root):