Remove legacy from datasets/utils.py

This commit is contained in:
Remi Cadene 2025-04-19 19:27:14 +05:30
parent b0cca75e5e
commit 9c0836c8d0
2 changed files with 33 additions and 125 deletions

View File

@ -27,7 +27,6 @@ from types import SimpleNamespace
from typing import Any, Tuple from typing import Any, Tuple
import datasets import datasets
import jsonlines
import numpy as np import numpy as np
import packaging.version import packaging.version
import pandas import pandas
@ -53,15 +52,6 @@ from lerobot.configs.types import DictLike, FeatureType, PolicyFeature
DEFAULT_CHUNK_SIZE = 1000 # Max number of files per chunk DEFAULT_CHUNK_SIZE = 1000 # Max number of files per chunk
DEFAULT_FILE_SIZE_IN_MB = 500.0 # Max size per file DEFAULT_FILE_SIZE_IN_MB = 500.0 # Max size per file
# Keep legacy for `convert_dataset_v21_to_v30.py`
LEGACY_EPISODES_PATH = "meta/episodes.jsonl"
LEGACY_EPISODES_STATS_PATH = "meta/episodes_stats.jsonl"
LEGACY_TASKS_PATH = "meta/tasks.jsonl"
LEGACY_DEFAULT_VIDEO_PATH = "videos/chunk-{episode_chunk:03d}/{video_key}/episode_{episode_index:06d}.mp4"
LEGACY_DEFAULT_PARQUET_PATH = "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet"
DEFAULT_IMAGE_PATH = "images/{image_key}/episode-{episode_index:06d}/frame-{frame_index:06d}.png"
INFO_PATH = "meta/info.json" INFO_PATH = "meta/info.json"
STATS_PATH = "meta/stats.json" STATS_PATH = "meta/stats.json"
@ -74,6 +64,7 @@ DEFAULT_TASKS_PATH = "meta/tasks.parquet"
DEFAULT_EPISODES_PATH = EPISODES_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet" DEFAULT_EPISODES_PATH = EPISODES_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet"
DEFAULT_DATA_PATH = DATA_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet" DEFAULT_DATA_PATH = DATA_DIR + "/" + CHUNK_FILE_PATTERN + ".parquet"
DEFAULT_VIDEO_PATH = VIDEO_DIR + "/{video_key}/" + CHUNK_FILE_PATTERN + ".mp4" DEFAULT_VIDEO_PATH = VIDEO_DIR + "/{video_key}/" + CHUNK_FILE_PATTERN + ".mp4"
DEFAULT_IMAGE_PATH = "images/{image_key}/episode-{episode_index:06d}/frame-{frame_index:06d}.png"
DATASET_CARD_TEMPLATE = """ DATASET_CARD_TEMPLATE = """
--- ---
@ -105,19 +96,11 @@ def get_hf_dataset_size_in_mb(hf_ds: Dataset) -> int:
def get_pd_dataframe_size_in_mb(df: pandas.DataFrame) -> int: def get_pd_dataframe_size_in_mb(df: pandas.DataFrame) -> int:
# TODO(rcadene): unused?
memory_usage_bytes = df.memory_usage(deep=True).sum() memory_usage_bytes = df.memory_usage(deep=True).sum()
return memory_usage_bytes / (1024**2) return memory_usage_bytes / (1024**2)
def get_chunk_file_indices(path: Path) -> Tuple[int, int]:
if not path.stem.startswith("file-") or not path.parent.name.startswith("chunk-"):
raise ValueError(f"Path does not follow {CHUNK_FILE_PATTERN}: '{path}'")
chunk_index = int(path.parent.replace("chunk-", ""))
file_index = int(path.stem.replace("file-", ""))
return chunk_index, file_index
def update_chunk_file_indices(chunk_idx: int, file_idx: int, chunks_size: int): def update_chunk_file_indices(chunk_idx: int, file_idx: int, chunks_size: int):
if file_idx == chunks_size - 1: if file_idx == chunks_size - 1:
file_idx = 0 file_idx = 0
@ -142,14 +125,6 @@ def load_nested_dataset(pq_dir: Path) -> Dataset:
) )
def get_latest_parquet_path(pq_dir: Path) -> Path:
return sorted(pq_dir.glob("*/*.parquet"))[-1]
def get_latest_video_path(pq_dir: Path, video_key: str) -> Path:
return sorted(pq_dir.glob(f"{video_key}/*/*.mp4"))[-1]
def get_parquet_num_frames(parquet_path): def get_parquet_num_frames(parquet_path):
metadata = pq.read_metadata(parquet_path) metadata = pq.read_metadata(parquet_path)
return metadata.num_rows return metadata.num_rows
@ -161,7 +136,8 @@ def get_video_size_in_mb(mp4_path: Path):
return file_size_mb return file_size_mb
def concat_video_files(paths_to_cat, root, video_key, chunk_idx, file_idx): def concat_video_files(paths_to_cat: list[Path], root: Path, video_key: str, chunk_idx: int, file_idx: int):
# TODO(rcadene): add docstring
tmp_dir = Path(tempfile.mkdtemp(dir=root)) tmp_dir = Path(tempfile.mkdtemp(dir=root))
# Create a text file with the list of files to concatenate # Create a text file with the list of files to concatenate
path_concat_video_files = tmp_dir / "concat_video_files.txt" path_concat_video_files = tmp_dir / "concat_video_files.txt"
@ -244,18 +220,6 @@ def unflatten_dict(d: dict, sep: str = "/") -> dict:
return outdict return outdict
def get_nested_item(obj: DictLike, flattened_key: str, sep: str = "/") -> Any:
split_keys = flattened_key.split(sep)
getter = obj[split_keys[0]]
if len(split_keys) == 1:
return getter
for key in split_keys[1:]:
getter = getter[key]
return getter
def serialize_dict(stats: dict[str, torch.Tensor | np.ndarray | dict]) -> dict: def serialize_dict(stats: dict[str, torch.Tensor | np.ndarray | dict]) -> dict:
serialized_dict = {} serialized_dict = {}
for key, value in flatten_dict(stats).items(): for key, value in flatten_dict(stats).items():
@ -285,58 +249,34 @@ def load_json(fpath: Path) -> Any:
with open(fpath) as f: with open(fpath) as f:
return json.load(f) return json.load(f)
def write_json(data: dict, fpath: Path) -> None: def write_json(data: dict, fpath: Path) -> None:
fpath.parent.mkdir(exist_ok=True, parents=True) fpath.parent.mkdir(exist_ok=True, parents=True)
with open(fpath, "w") as f: with open(fpath, "w") as f:
json.dump(data, f, indent=4, ensure_ascii=False) json.dump(data, f, indent=4, ensure_ascii=False)
def load_jsonlines(fpath: Path) -> list[Any]:
with jsonlines.open(fpath, "r") as reader:
return list(reader)
def write_jsonlines(data: dict, fpath: Path) -> None:
fpath.parent.mkdir(exist_ok=True, parents=True)
with jsonlines.open(fpath, "w") as writer:
writer.write_all(data)
def append_jsonlines(data: dict, fpath: Path) -> None:
fpath.parent.mkdir(exist_ok=True, parents=True)
with jsonlines.open(fpath, "a") as writer:
writer.write(data)
def write_info(info: dict, local_dir: Path): def write_info(info: dict, local_dir: Path):
write_json(info, local_dir / INFO_PATH) write_json(info, local_dir / INFO_PATH)
def load_info(local_dir: Path) -> dict: def load_info(local_dir: Path) -> dict:
info = load_json(local_dir / INFO_PATH) info = load_json(local_dir / INFO_PATH)
for ft in info["features"].values(): for ft in info["features"].values():
ft["shape"] = tuple(ft["shape"]) ft["shape"] = tuple(ft["shape"])
return info return info
def write_stats(stats: dict, local_dir: Path): def write_stats(stats: dict, local_dir: Path):
serialized_stats = serialize_dict(stats) serialized_stats = serialize_dict(stats)
write_json(serialized_stats, local_dir / STATS_PATH) write_json(serialized_stats, local_dir / STATS_PATH)
def cast_stats_to_numpy(stats) -> dict[str, dict[str, np.ndarray]]: def cast_stats_to_numpy(stats) -> dict[str, dict[str, np.ndarray]]:
stats = {key: np.array(value) for key, value in flatten_dict(stats).items()} stats = {key: np.array(value) for key, value in flatten_dict(stats).items()}
return unflatten_dict(stats) return unflatten_dict(stats)
def load_stats(local_dir: Path) -> dict[str, dict[str, np.ndarray]]: def load_stats(local_dir: Path) -> dict[str, dict[str, np.ndarray]]:
if not (local_dir / STATS_PATH).exists(): if not (local_dir / STATS_PATH).exists():
return None return None
stats = load_json(local_dir / STATS_PATH) stats = load_json(local_dir / STATS_PATH)
return cast_stats_to_numpy(stats) return cast_stats_to_numpy(stats)
def write_hf_dataset(hf_dataset: Dataset, local_dir: Path): def write_hf_dataset(hf_dataset: Dataset, local_dir: Path):
if get_hf_dataset_size_in_mb(hf_dataset) > DEFAULT_FILE_SIZE_IN_MB: if get_hf_dataset_size_in_mb(hf_dataset) > DEFAULT_FILE_SIZE_IN_MB:
raise NotImplementedError("Contact a maintainer.") raise NotImplementedError("Contact a maintainer.")
@ -352,30 +292,12 @@ def write_tasks(tasks: pandas.DataFrame, local_dir: Path):
tasks.to_parquet(path) tasks.to_parquet(path)
def legacy_write_task(task_index: int, task: dict, local_dir: Path):
task_dict = {
"task_index": task_index,
"task": task,
}
append_jsonlines(task_dict, local_dir / LEGACY_TASKS_PATH)
def legacy_load_tasks(local_dir: Path) -> tuple[dict, dict]:
tasks = load_jsonlines(local_dir / LEGACY_TASKS_PATH)
tasks = {item["task_index"]: item["task"] for item in sorted(tasks, key=lambda x: x["task_index"])}
task_to_task_index = {task: task_index for task_index, task in tasks.items()}
return tasks, task_to_task_index
def load_tasks(local_dir: Path): def load_tasks(local_dir: Path):
tasks = pd.read_parquet(local_dir / DEFAULT_TASKS_PATH) tasks = pd.read_parquet(local_dir / DEFAULT_TASKS_PATH)
return tasks return tasks
def write_episode(episode: dict, local_dir: Path):
append_jsonlines(episode, local_dir / LEGACY_EPISODES_PATH)
def write_episodes(episodes: Dataset, local_dir: Path): def write_episodes(episodes: Dataset, local_dir: Path):
if get_hf_dataset_size_in_mb(episodes) > DEFAULT_FILE_SIZE_IN_MB: if get_hf_dataset_size_in_mb(episodes) > DEFAULT_FILE_SIZE_IN_MB:
raise NotImplementedError("Contact a maintainer.") raise NotImplementedError("Contact a maintainer.")
@ -385,45 +307,11 @@ def write_episodes(episodes: Dataset, local_dir: Path):
episodes.to_parquet(fpath) episodes.to_parquet(fpath)
def legacy_load_episodes(local_dir: Path) -> dict:
episodes = load_jsonlines(local_dir / LEGACY_EPISODES_PATH)
return {item["episode_index"]: item for item in sorted(episodes, key=lambda x: x["episode_index"])}
def load_episodes(local_dir: Path): def load_episodes(local_dir: Path):
hf_dataset = load_nested_dataset(local_dir / EPISODES_DIR) hf_dataset = load_nested_dataset(local_dir / EPISODES_DIR)
return hf_dataset return hf_dataset
def legacy_write_episode_stats(episode_index: int, episode_stats: dict, local_dir: Path):
# We wrap episode_stats in a dictionary since `episode_stats["episode_index"]`
# is a dictionary of stats and not an integer.
episode_stats = {"episode_index": episode_index, "stats": serialize_dict(episode_stats)}
append_jsonlines(episode_stats, local_dir / LEGACY_EPISODES_STATS_PATH)
# def write_episodes_stats(episodes_stats: Dataset, local_dir: Path):
# if get_hf_dataset_size_in_mb(episodes_stats) > DEFAULT_FILE_SIZE_IN_MB:
# raise NotImplementedError("Contact a maintainer.")
# fpath = local_dir / DEFAULT_EPISODES_STATS_PATH.format(chunk_index=0, file_index=0)
# fpath.parent.mkdir(parents=True, exist_ok=True)
# episodes_stats.to_parquet(fpath)
def legacy_load_episodes_stats(local_dir: Path) -> dict:
episodes_stats = load_jsonlines(local_dir / LEGACY_EPISODES_STATS_PATH)
return {
item["episode_index"]: cast_stats_to_numpy(item["stats"])
for item in sorted(episodes_stats, key=lambda x: x["episode_index"])
}
# def load_episodes_stats(local_dir: Path):
# hf_dataset = load_nested_dataset(local_dir / EPISODES_STATS_DIR)
# return hf_dataset
def backward_compatible_episodes_stats( def backward_compatible_episodes_stats(
stats: dict[str, dict[str, np.ndarray]], episodes: list[int] stats: dict[str, dict[str, np.ndarray]], episodes: list[int]
) -> dict[str, dict[str, np.ndarray]]: ) -> dict[str, dict[str, np.ndarray]]:

View File

@ -20,7 +20,9 @@ python lerobot/common/datasets/v30/convert_dataset_v21_to_v30.py \
import argparse import argparse
import shutil import shutil
from pathlib import Path from pathlib import Path
from typing import Any
import jsonlines
import pandas as pd import pandas as pd
import tqdm import tqdm
from datasets import Dataset from datasets import Dataset
@ -34,6 +36,7 @@ from lerobot.common.datasets.utils import (
DEFAULT_DATA_PATH, DEFAULT_DATA_PATH,
DEFAULT_FILE_SIZE_IN_MB, DEFAULT_FILE_SIZE_IN_MB,
DEFAULT_VIDEO_PATH, DEFAULT_VIDEO_PATH,
cast_stats_to_numpy,
concat_video_files, concat_video_files,
flatten_dict, flatten_dict,
get_parquet_file_size_in_mb, get_parquet_file_size_in_mb,
@ -44,6 +47,7 @@ from lerobot.common.datasets.utils import (
legacy_load_episodes_stats, legacy_load_episodes_stats,
legacy_load_tasks, legacy_load_tasks,
load_info, load_info,
serialize_dict,
update_chunk_file_indices, update_chunk_file_indices,
write_episodes, write_episodes,
write_info, write_info,
@ -51,6 +55,12 @@ from lerobot.common.datasets.utils import (
write_tasks, write_tasks,
) )
LEGACY_EPISODES_PATH = "meta/episodes.jsonl"
LEGACY_EPISODES_STATS_PATH = "meta/episodes_stats.jsonl"
LEGACY_TASKS_PATH = "meta/tasks.jsonl"
LEGACY_DEFAULT_VIDEO_PATH = "videos/chunk-{episode_chunk:03d}/{video_key}/episode_{episode_index:06d}.mp4"
LEGACY_DEFAULT_PARQUET_PATH = "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet"
V21 = "v2.1" V21 = "v2.1"
@ -97,16 +107,26 @@ meta/info.json
""" """
# def generate_flat_ep_stats(episodes_stats): def load_jsonlines(fpath: Path) -> list[Any]:
# for ep_idx, ep_stats in episodes_stats.items(): with jsonlines.open(fpath, "r") as reader:
# flat_ep_stats = flatten_dict(ep_stats) return list(reader)
# flat_ep_stats["episode_index"] = ep_idx
# yield flat_ep_stats
# def convert_episodes_stats(root, new_root): def legacy_load_episodes(local_dir: Path) -> dict:
# episodes_stats = legacy_load_episodes_stats(root) episodes = load_jsonlines(local_dir / LEGACY_EPISODES_PATH)
# ds_episodes_stats = Dataset.from_generator(lambda: generate_flat_ep_stats(episodes_stats)) return {item["episode_index"]: item for item in sorted(episodes, key=lambda x: x["episode_index"])}
# write_episodes_stats(ds_episodes_stats, new_root)
def legacy_load_episodes_stats(local_dir: Path) -> dict:
episodes_stats = load_jsonlines(local_dir / LEGACY_EPISODES_STATS_PATH)
return {
item["episode_index"]: cast_stats_to_numpy(item["stats"])
for item in sorted(episodes_stats, key=lambda x: x["episode_index"])
}
def legacy_load_tasks(local_dir: Path) -> tuple[dict, dict]:
tasks = load_jsonlines(local_dir / LEGACY_TASKS_PATH)
tasks = {item["task_index"]: item["task"] for item in sorted(tasks, key=lambda x: x["task_index"])}
task_to_task_index = {task: task_index for task_index, task in tasks.items()}
return tasks, task_to_task_index
def convert_tasks(root, new_root): def convert_tasks(root, new_root):