From d518b036d006f9ac8967bb40e53fc16864b388a7 Mon Sep 17 00:00:00 2001 From: Remi Cadene Date: Tue, 22 Apr 2025 14:46:33 +0000 Subject: [PATCH] Faster self.meta.episodes[...] switch back to set_transform instead of set_format Add video_files_size_in_mb pre-commit run --all-files --- lerobot/common/datasets/lerobot_dataset.py | 83 ++++++++----------- lerobot/common/datasets/utils.py | 23 +++-- .../v30/convert_dataset_v21_to_v30.py | 11 +-- lerobot/common/datasets/video_utils.py | 1 + tests/fixtures/dataset_factories.py | 12 ++- 5 files changed, 66 insertions(+), 64 deletions(-) diff --git a/lerobot/common/datasets/lerobot_dataset.py b/lerobot/common/datasets/lerobot_dataset.py index 544f965b..7c61e486 100644 --- a/lerobot/common/datasets/lerobot_dataset.py +++ b/lerobot/common/datasets/lerobot_dataset.py @@ -56,6 +56,7 @@ from lerobot.common.datasets.utils import ( get_safe_version, get_video_duration_in_s, get_video_size_in_mb, + hf_transform_to_torch, is_valid_version, load_episodes, load_info, @@ -136,14 +137,16 @@ class LeRobotDatasetMetadata: return packaging.version.parse(self.info["codebase_version"]) def get_data_file_path(self, ep_index: int) -> Path: - chunk_idx = self.episodes["data/chunk_index"][ep_index] - file_idx = self.episodes["data/file_index"][ep_index] + ep = self.episodes[ep_index] + chunk_idx = ep["data/chunk_index"] + file_idx = ep["data/file_index"] fpath = self.data_path.format(chunk_index=chunk_idx, file_index=file_idx) return Path(fpath) def get_video_file_path(self, ep_index: int, vid_key: str) -> Path: - chunk_idx = self.episodes[f"videos/{vid_key}/chunk_index"][ep_index] - file_idx = self.episodes[f"videos/{vid_key}/file_index"][ep_index] + ep = self.episodes[ep_index] + chunk_idx = ep[f"videos/{vid_key}/chunk_index"] + file_idx = ep[f"videos/{vid_key}/file_index"] fpath = self.video_path.format(video_key=vid_key, chunk_index=chunk_idx, file_index=file_idx) return Path(fpath) @@ -218,9 +221,14 @@ class LeRobotDatasetMetadata: return self.info["chunks_size"] @property - def files_size_in_mb(self) -> int: - """Max size of file in mega bytes.""" - return self.info["files_size_in_mb"] + def data_files_size_in_mb(self) -> int: + """Max size of data file in mega bytes.""" + return self.info["data_files_size_in_mb"] + + @property + def video_files_size_in_mb(self) -> int: + """Max size of video file in mega bytes.""" + return self.info["video_files_size_in_mb"] def get_task_index(self, task: str) -> int | None: """ @@ -278,23 +286,14 @@ class LeRobotDatasetMetadata: df["dataset_to_index"] = [len(df)] else: # Retrieve information from the latest parquet file - latest_ep = self.episodes.with_format( - columns=[ - "meta/episodes/chunk_index", - "meta/episodes/file_index", - "dataset_from_index", - "dataset_to_index", - ] - )[-1] - chunk_idx, file_idx = ( - latest_ep["meta/episodes/chunk_index"], - latest_ep["meta/episodes/file_index"], - ) + latest_ep = self.episodes[-1] + chunk_idx = latest_ep["meta/episodes/chunk_index"] + file_idx = latest_ep["meta/episodes/file_index"] latest_path = self.root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx) latest_size_in_mb = get_parquet_file_size_in_mb(latest_path) - if latest_size_in_mb + ep_size_in_mb >= self.files_size_in_mb: + if latest_size_in_mb + ep_size_in_mb >= self.data_files_size_in_mb: # Size limit is reached, prepare new parquet file chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, self.chunks_size) @@ -304,7 +303,7 @@ class LeRobotDatasetMetadata: df["dataset_from_index"] = [latest_ep["dataset_to_index"]] df["dataset_to_index"] = [latest_ep["dataset_to_index"] + len(df)] - if latest_size_in_mb + ep_size_in_mb < self.files_size_in_mb: + if latest_size_in_mb + ep_size_in_mb < self.data_files_size_in_mb: # Size limit wasnt reached, concatenate latest dataframe with new one latest_df = pd.read_parquet(latest_path) df = pd.concat([latest_df, df], ignore_index=True) @@ -339,6 +338,7 @@ class LeRobotDatasetMetadata: # Update info self.info["total_episodes"] += 1 self.info["total_frames"] += episode_length + self.info["total_tasks"] = len(self.tasks) self.info["splits"] = {"train": f"0:{self.info['total_episodes']}"} if len(self.video_keys) > 0: self.update_video_info() @@ -674,14 +674,14 @@ class LeRobotDataset(torch.utils.data.Dataset): def load_hf_dataset(self) -> datasets.Dataset: """hf_dataset contains all the observations, states, actions, rewards, etc.""" hf_dataset = load_nested_dataset(self.root / "data") - hf_dataset.set_format("torch") + hf_dataset.set_transform(hf_transform_to_torch) return hf_dataset def create_hf_dataset(self) -> datasets.Dataset: features = get_hf_features_from_features(self.features) ft_dict = {col: [] for col in features} hf_dataset = datasets.Dataset.from_dict(ft_dict, features=features, split="train") - hf_dataset.set_format("torch") + hf_dataset.set_transform(hf_transform_to_torch) return hf_dataset @property @@ -712,8 +712,9 @@ class LeRobotDataset(torch.utils.data.Dataset): return get_hf_features_from_features(self.features) def _get_query_indices(self, idx: int, ep_idx: int) -> tuple[dict[str, list[int | bool]]]: - ep_start = self.meta.episodes["dataset_from_index"][ep_idx] - ep_end = self.meta.episodes["dataset_to_index"][ep_idx] + ep = self.meta.episodes[ep_idx] + ep_start = ep["dataset_from_index"] + ep_end = ep["dataset_to_index"] query_indices = { key: [max(ep_start, min(ep_end - 1, idx + delta)) for delta in delta_idx] for key, delta_idx in self.delta_indices.items() @@ -754,12 +755,13 @@ class LeRobotDataset(torch.utils.data.Dataset): Segmentation Fault. This probably happens because a memory reference to the video loader is created in the main process and a subprocess fails to access it. """ + ep = self.meta.episodes[ep_idx] item = {} for vid_key, query_ts in query_timestamps.items(): # Episodes are stored sequentially on a single mp4 to reduce the number of files. # Thus we load the start timestamp of the episode on this mp4 and # shift the query timestamp accordingly. - from_timestamp = self.meta.episodes[f"videos/{vid_key}/from_timestamp"][ep_idx] + from_timestamp = ep[f"videos/{vid_key}/from_timestamp"] shifted_query_ts = [from_timestamp + ts for ts in query_ts] video_path = self.root / self.meta.get_video_file_path(ep_idx, vid_key) @@ -984,15 +986,16 @@ class LeRobotDataset(torch.utils.data.Dataset): latest_num_frames = 0 else: # Retrieve information from the latest parquet file - latest_ep = self.meta.episodes.with_format(columns=["data/chunk_index", "data/file_index"])[-1] - chunk_idx, file_idx = latest_ep["data/chunk_index"], latest_ep["data/file_index"] + latest_ep = self.meta.episodes[-1] + chunk_idx = latest_ep["data/chunk_index"] + file_idx = latest_ep["data/file_index"] latest_path = self.root / self.meta.data_path.format(chunk_index=chunk_idx, file_index=file_idx) latest_size_in_mb = get_parquet_file_size_in_mb(latest_path) latest_num_frames = get_parquet_num_frames(latest_path) # Determine if a new parquet file is needed - if latest_size_in_mb + ep_size_in_mb >= self.meta.files_size_in_mb: + if latest_size_in_mb + ep_size_in_mb >= self.meta.data_files_size_in_mb: # Size limit is reached, prepare new parquet file chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, self.meta.chunks_size) latest_num_frames = 0 @@ -1039,13 +1042,9 @@ class LeRobotDataset(torch.utils.data.Dataset): shutil.move(str(ep_path), str(new_path)) else: # Retrieve information from the latest video file - latest_ep = self.meta.episodes.with_format( - columns=[f"videos/{video_key}/chunk_index", f"videos/{video_key}/file_index"] - )[-1] - chunk_idx, file_idx = ( - latest_ep[f"videos/{video_key}/chunk_index"], - latest_ep[f"videos/{video_key}/file_index"], - ) + latest_ep = self.meta.episodes[-1] + chunk_idx = latest_ep[f"videos/{video_key}/chunk_index"] + file_idx = latest_ep[f"videos/{video_key}/file_index"] latest_path = self.root / self.meta.video_path.format( video_key=video_key, chunk_index=chunk_idx, file_index=file_idx @@ -1053,7 +1052,7 @@ class LeRobotDataset(torch.utils.data.Dataset): latest_size_in_mb = get_video_size_in_mb(latest_path) latest_duration_in_s = get_video_duration_in_s(latest_path) - if latest_size_in_mb + ep_size_in_mb >= self.meta.files_size_in_mb: + if latest_size_in_mb + ep_size_in_mb >= self.meta.video_files_size_in_mb: # Move temporary episode video to a new video file in the dataset chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, self.meta.chunks_size) new_path = self.root / self.meta.video_path.format( @@ -1115,16 +1114,6 @@ class LeRobotDataset(torch.utils.data.Dataset): if self.image_writer is not None: self.image_writer.wait_until_done() - # TODO(rcadene): this method is currently not used - # def encode_videos(self) -> None: - # """ - # Use ffmpeg to convert frames stored as png into mp4 videos. - # Note: `encode_video_frames` is a blocking call. Making it asynchronous shouldn't speedup encoding, - # since video encoding with ffmpeg is already using multithreading. - # """ - # for ep_idx in range(self.meta.total_episodes): - # self.encode_episode_videos(ep_idx) - def _encode_temporary_episode_video(self, video_key: str, episode_index: int) -> dict: """ Use ffmpeg to convert frames stored as png into mp4 videos. diff --git a/lerobot/common/datasets/utils.py b/lerobot/common/datasets/utils.py index f404113e..6edf4652 100644 --- a/lerobot/common/datasets/utils.py +++ b/lerobot/common/datasets/utils.py @@ -50,7 +50,8 @@ from lerobot.common.utils.utils import is_valid_numpy_dtype_string from lerobot.configs.types import FeatureType, PolicyFeature DEFAULT_CHUNK_SIZE = 1000 # Max number of files per chunk -DEFAULT_FILE_SIZE_IN_MB = 100.0 # Max size per file +DEFAULT_DATA_FILE_SIZE_IN_MB = 100 # Max size per file +DEFAULT_VIDEO_FILE_SIZE_IN_MB = 500 # Max size per file INFO_PATH = "meta/info.json" STATS_PATH = "meta/stats.json" @@ -142,6 +143,7 @@ def get_video_size_in_mb(mp4_path: Path): def concat_video_files(paths_to_cat: list[Path], root: Path, video_key: str, chunk_idx: int, file_idx: int): + # TODO(rcadene): move to video_utils.py # TODO(rcadene): add docstring tmp_dir = Path(tempfile.mkdtemp(dir=root)) # Create a text file with the list of files to concatenate @@ -175,6 +177,7 @@ def concat_video_files(paths_to_cat: list[Path], root: Path, video_key: str, chu def get_video_duration_in_s(mp4_file: Path): + # TODO(rcadene): move to video_utils.py command = [ "ffprobe", "-v", @@ -290,7 +293,7 @@ def load_stats(local_dir: Path) -> dict[str, dict[str, np.ndarray]]: def write_hf_dataset(hf_dataset: Dataset, local_dir: Path): - if get_hf_dataset_size_in_mb(hf_dataset) > DEFAULT_FILE_SIZE_IN_MB: + if get_hf_dataset_size_in_mb(hf_dataset) > DEFAULT_DATA_FILE_SIZE_IN_MB: raise NotImplementedError("Contact a maintainer.") path = local_dir / DEFAULT_DATA_PATH.format(chunk_index=0, file_index=0) @@ -310,7 +313,7 @@ def load_tasks(local_dir: Path): def write_episodes(episodes: Dataset, local_dir: Path): - if get_hf_dataset_size_in_mb(episodes) > DEFAULT_FILE_SIZE_IN_MB: + if get_hf_dataset_size_in_mb(episodes) > DEFAULT_DATA_FILE_SIZE_IN_MB: raise NotImplementedError("Contact a maintainer.") fpath = local_dir / DEFAULT_EPISODES_PATH.format(chunk_index=0, file_index=0) @@ -318,9 +321,13 @@ def write_episodes(episodes: Dataset, local_dir: Path): episodes.to_parquet(fpath) -def load_episodes(local_dir: Path): - hf_dataset = load_nested_dataset(local_dir / EPISODES_DIR) - return hf_dataset +def load_episodes(local_dir: Path) -> datasets.Dataset: + episodes = load_nested_dataset(local_dir / EPISODES_DIR) + # Select episode features/columns containing references to episode data and videos + # (e.g. tasks, dataset_from_index, dataset_to_index, data/chunk_index, data/file_index, etc.) + # This is to speedup access to these data, instead of having to load episode stats. + episodes = episodes.select_columns([key for key in episodes.features if not key.startswith("stats/")]) + return episodes def backward_compatible_episodes_stats( @@ -528,9 +535,9 @@ def create_empty_dataset_info( "total_episodes": 0, "total_frames": 0, "total_tasks": 0, - "total_videos": 0, "chunks_size": DEFAULT_CHUNK_SIZE, - "files_size_in_mb": DEFAULT_FILE_SIZE_IN_MB, + "data_files_size_in_mb": DEFAULT_DATA_FILE_SIZE_IN_MB, + "video_files_size_in_mb": DEFAULT_VIDEO_FILE_SIZE_IN_MB, "fps": fps, "splits": {}, "data_path": DEFAULT_DATA_PATH, diff --git a/lerobot/common/datasets/v30/convert_dataset_v21_to_v30.py b/lerobot/common/datasets/v30/convert_dataset_v21_to_v30.py index 2c9e08a6..a58a6271 100644 --- a/lerobot/common/datasets/v30/convert_dataset_v21_to_v30.py +++ b/lerobot/common/datasets/v30/convert_dataset_v21_to_v30.py @@ -34,8 +34,9 @@ from lerobot.common.datasets.compute_stats import aggregate_stats from lerobot.common.datasets.lerobot_dataset import CODEBASE_VERSION, LeRobotDataset from lerobot.common.datasets.utils import ( DEFAULT_CHUNK_SIZE, + DEFAULT_DATA_FILE_SIZE_IN_MB, DEFAULT_DATA_PATH, - DEFAULT_FILE_SIZE_IN_MB, + DEFAULT_VIDEO_FILE_SIZE_IN_MB, DEFAULT_VIDEO_PATH, cast_stats_to_numpy, concat_video_files, @@ -174,7 +175,7 @@ def convert_data(root, new_root): episodes_metadata.append(ep_metadata) ep_idx += 1 - if size_in_mb < DEFAULT_FILE_SIZE_IN_MB: + if size_in_mb < DEFAULT_DATA_FILE_SIZE_IN_MB: paths_to_cat.append(ep_path) continue @@ -263,7 +264,7 @@ def convert_videos_of_camera(root: Path, new_root: Path, video_key): episodes_metadata.append(ep_metadata) ep_idx += 1 - if size_in_mb < DEFAULT_FILE_SIZE_IN_MB: + if size_in_mb < DEFAULT_VIDEO_FILE_SIZE_IN_MB: paths_to_cat.append(ep_path) continue @@ -337,8 +338,8 @@ def convert_info(root, new_root): info["codebase_version"] = "v3.0" del info["total_chunks"] del info["total_videos"] - info["files_size_in_mb"] = DEFAULT_FILE_SIZE_IN_MB - # TODO(rcadene): chunk- or chunk_ or file- or file_ + info["data_files_size_in_mb"] = DEFAULT_DATA_FILE_SIZE_IN_MB + info["video_files_size_in_mb"] = DEFAULT_VIDEO_FILE_SIZE_IN_MB info["data_path"] = DEFAULT_DATA_PATH info["video_path"] = DEFAULT_VIDEO_PATH info["fps"] = float(info["fps"]) diff --git a/lerobot/common/datasets/video_utils.py b/lerobot/common/datasets/video_utils.py index a0dd2544..b0f6c15c 100644 --- a/lerobot/common/datasets/video_utils.py +++ b/lerobot/common/datasets/video_utils.py @@ -155,6 +155,7 @@ def decode_video_frames_torchvision( ) # get closest frames to the query timestamps + # TODO(rcadene): remove torch.stack closest_frames = torch.stack([loaded_frames[idx] for idx in argmin_]) closest_ts = loaded_ts[argmin_] diff --git a/tests/fixtures/dataset_factories.py b/tests/fixtures/dataset_factories.py index 256952bd..16547645 100644 --- a/tests/fixtures/dataset_factories.py +++ b/tests/fixtures/dataset_factories.py @@ -28,12 +28,14 @@ from datasets import Dataset from lerobot.common.datasets.lerobot_dataset import CODEBASE_VERSION, LeRobotDataset, LeRobotDatasetMetadata from lerobot.common.datasets.utils import ( DEFAULT_CHUNK_SIZE, + DEFAULT_DATA_FILE_SIZE_IN_MB, DEFAULT_DATA_PATH, DEFAULT_FEATURES, - DEFAULT_FILE_SIZE_IN_MB, + DEFAULT_VIDEO_FILE_SIZE_IN_MB, DEFAULT_VIDEO_PATH, flatten_dict, get_hf_features_from_features, + hf_transform_to_torch, ) from tests.fixtures.constants import ( DEFAULT_FPS, @@ -121,7 +123,8 @@ def info_factory(features_factory): total_tasks: int = 0, total_videos: int = 0, chunks_size: int = DEFAULT_CHUNK_SIZE, - files_size_in_mb: float = DEFAULT_FILE_SIZE_IN_MB, + data_files_size_in_mb: float = DEFAULT_DATA_FILE_SIZE_IN_MB, + video_files_size_in_mb: float = DEFAULT_VIDEO_FILE_SIZE_IN_MB, data_path: str = DEFAULT_DATA_PATH, video_path: str = DEFAULT_VIDEO_PATH, motor_features: dict = DUMMY_MOTOR_FEATURES, @@ -137,7 +140,8 @@ def info_factory(features_factory): "total_tasks": total_tasks, "total_videos": total_videos, "chunks_size": chunks_size, - "files_size_in_mb": files_size_in_mb, + "data_files_size_in_mb": data_files_size_in_mb, + "video_files_size_in_mb": video_files_size_in_mb, "fps": fps, "splits": {}, "data_path": data_path, @@ -352,7 +356,7 @@ def hf_dataset_factory(features_factory, tasks_factory, episodes_factory, img_ar }, features=hf_features, ) - dataset.set_format("torch") + dataset.set_transform(hf_transform_to_torch) return dataset return _create_hf_dataset