diff --git a/examples/port_datasets/droid_rlds/README.md b/examples/port_datasets/droid_rlds/README.md new file mode 100644 index 00000000..b4da4986 --- /dev/null +++ b/examples/port_datasets/droid_rlds/README.md @@ -0,0 +1,144 @@ +# Port DROID 1.0.1 dataset to LeRobotDataset + +## Download + +TODO + +It will take 2 TB in your local disk. + +## Port on a single computer + +First, install tensorflow dataset utilities to read from raw files: +```bash +pip install tensorflow +pip install tensorflow_datasets +``` + +Then run this script to start porting the dataset: +```bash +python examples/port_datasets/droid_rlds/port_droid.py \ + --raw-dir /your/data/droid/1.0.1 \ + --repo-id your_id/droid_1.0.1 \ + --push-to-hub +``` + +It will take 400GB in your local disk. + +As usual, your LeRobotDataset will be stored in your huggingface/lerobot cache folder. + +WARNING: it will take 7 days for porting the dataset locally and 3 days to upload, so we will need to parallelize over multiple nodes on a slurm cluster. + +NOTE: For development, run this script to start porting a shard: +```bash +python examples/port_datasets/droid_rlds/port.py \ + --raw-dir /your/data/droid/1.0.1 \ + --repo-id your_id/droid_1.0.1 \ + --num-shards 2048 \ + --shard-index 0 +``` + +## Port over SLURM + +Install slurm utilities from Hugging Face: +```bash +pip install datatrove +``` + + +### 1. Port one shard per job + +Run this script to start porting shards of the dataset: +```bash +python examples/port_datasets/droid_rlds/slurm_port_shards.py \ + --raw-dir /your/data/droid/1.0.1 \ + --repo-id your_id/droid_1.0.1 \ + --logs-dir /your/logs \ + --job-name port_droid \ + --partition your_partition \ + --workers 2048 \ + --cpus-per-task 8 \ + --mem-per-cpu 1950M +``` + +**Note on how to set your command line arguments** + +Regarding `--partition`, find yours by running: +```bash +info --format="%R"` +``` +and select the CPU partition if you have one. No GPU needed. + +Regarding `--workers`, it is the number of slurm jobs you will launch in parallel. 2048 is the maximum number, since there is 2048 shards in Droid. This big number will certainly max-out your cluster. + +Regarding `--cpus-per-task` and `--mem-per-cpu`, by default it will use ~16GB of RAM (8*1950M) which is recommended to load the raw frames and 8 CPUs which can be useful to parallelize the encoding of the frames. + +Find the number of CPUs and Memory of the nodes of your partition by running: +```bash +sinfo -N -p your_partition -h -o "%N cpus=%c mem=%m" +``` + +**Useful commands to check progress and debug** + +Check if your jobs are running: +```bash +squeue -u $USER` +``` + +You should see a list with job indices like `15125385_155` where `15125385` is the index of the run and `155` is the worker index. The output/print of this worker is written in real time in `/your/logs/job_name/slurm_jobs/15125385_155.out`. For instance, you can inspect the content of this file by running `less /your/logs/job_name/slurm_jobs/15125385_155.out`. + +Check the progression of your jobs by running: +```bash +jobs_status /your/logs +``` + +If it's not 100% and no more slurm job is running, it means that some of them failed. Inspect the logs by running: +```bash +failed_logs /your/logs/job_name +``` + +If there is an issue in the code, you can fix it in debug mode with `--slurm 0` which allows to set breakpoint: +```bash +python examples/port_datasets/droid_rlds/slurm_port_shards.py --slurm 0 ... +``` + +And you can relaunch the same command, which will skip the completed jobs: +```bash +python examples/port_datasets/droid_rlds/slurm_port_shards.py --slurm 1 ... +``` + +Once all jobs are completed, you will have one dataset per shard (e.g. `droid_1.0.1_world_2048_rank_1594`) saved on disk in your `/lerobot/home/dir/your_id` directory. You can find your `/lerobot/home/dir` by running: +```bash +python -c "from lerobot.common.constants import HF_LEROBOT_HOME;print(HF_LEROBOT_HOME)" +``` + + +### 2. Aggregate all shards + +Run this script to start aggregation: +```bash +python examples/port_datasets/droid_rlds/slurm_aggregate_shards.py \ + --repo-id your_id/droid_1.0.1 \ + --logs-dir /your/logs \ + --job-name aggr_droid \ + --partition your_partition \ + --workers 2048 \ + --cpus-per-task 8 \ + --mem-per-cpu 1950M +``` + +Once all jobs are completed, you will have one dataset your `/lerobot/home/dir/your_id/droid_1.0.1` directory. + + +### 3. Upload dataset + +Run this script to start uploading: +```bash +python examples/port_datasets/droid_rlds/slurm_upload.py \ + --repo-id your_id/droid_1.0.1 \ + --logs-dir /your/logs \ + --job-name upload_droid \ + --partition your_partition \ + --workers 50 \ + --cpus-per-task 4 \ + --mem-per-cpu 1950M +``` diff --git a/examples/port_datasets/droid_rlds/port_droid.py b/examples/port_datasets/droid_rlds/port_droid.py new file mode 100644 index 00000000..b92cd1f7 --- /dev/null +++ b/examples/port_datasets/droid_rlds/port_droid.py @@ -0,0 +1,411 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import logging +import time +from pathlib import Path + +import numpy as np +import tensorflow_datasets as tfds + +from lerobot.common.datasets.lerobot_dataset import LeRobotDataset +from lerobot.common.utils.utils import get_elapsed_time_in_days_hours_minutes_seconds + +DROID_SHARDS = 2048 +DROID_FPS = 15 +DROID_ROBOT_TYPE = "Franka" + +# Dataset schema slightly adapted from: https://droid-dataset.github.io/droid/the-droid-dataset.html#-dataset-schema +DROID_FEATURES = { + # true on first step of the episode + "is_first": { + "dtype": "bool", + "shape": (1,), + "names": None, + }, + # true on last step of the episode + "is_last": { + "dtype": "bool", + "shape": (1,), + "names": None, + }, + # true on last step of the episode if it is a terminal step, True for demos + "is_terminal": { + "dtype": "bool", + "shape": (1,), + "names": None, + }, + # language_instruction is also stored as "task" to follow LeRobot standard + "language_instruction": { + "dtype": "string", + "shape": (1,), + "names": None, + }, + "language_instruction_2": { + "dtype": "string", + "shape": (1,), + "names": None, + }, + "language_instruction_3": { + "dtype": "string", + "shape": (1,), + "names": None, + }, + "observation.state.gripper_position": { + "dtype": "float32", + "shape": (1,), + "names": { + "axes": ["gripper"], + }, + }, + "observation.state.cartesian_position": { + "dtype": "float32", + "shape": (6,), + "names": { + "axes": ["x", "y", "z", "roll", "pitch", "yaw"], + }, + }, + "observation.state.joint_position": { + "dtype": "float32", + "shape": (7,), + "names": { + "axes": ["x", "y", "z", "roll", "pitch", "yaw"], + }, + }, + # Add this new feature to follow LeRobot standard of using joint position + gripper + "observation.state": { + "dtype": "float32", + "shape": (8,), + "names": { + "axes": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6", "gripper"], + }, + }, + # Initially called wrist_image_left + "observation.images.wrist_left": { + "dtype": "video", + "shape": (180, 320, 3), + "names": [ + "height", + "width", + "channels", + ], + }, + # Initially called exterior_image_1_left + "observation.images.exterior_1_left": { + "dtype": "video", + "shape": (180, 320, 3), + "names": [ + "height", + "width", + "channels", + ], + }, + # Initially called exterior_image_2_left + "observation.images.exterior_2_left": { + "dtype": "video", + "shape": (180, 320, 3), + "names": [ + "height", + "width", + "channels", + ], + }, + "action.gripper_position": { + "dtype": "float32", + "shape": (1,), + "names": { + "axes": ["gripper"], + }, + }, + "action.gripper_velocity": { + "dtype": "float32", + "shape": (1,), + "names": { + "axes": ["gripper"], + }, + }, + "action.cartesian_position": { + "dtype": "float32", + "shape": (6,), + "names": { + "axes": ["x", "y", "z", "roll", "pitch", "yaw"], + }, + }, + "action.cartesian_velocity": { + "dtype": "float32", + "shape": (6,), + "names": { + "axes": ["x", "y", "z", "roll", "pitch", "yaw"], + }, + }, + "action.joint_position": { + "dtype": "float32", + "shape": (7,), + "names": { + "axes": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"], + }, + }, + "action.joint_velocity": { + "dtype": "float32", + "shape": (7,), + "names": { + "axes": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"], + }, + }, + # This feature was called "action" in RLDS dataset and consists of [6x joint velocities, 1x gripper position] + "action.original": { + "dtype": "float32", + "shape": (7,), + "names": { + "axes": ["x", "y", "z", "roll", "pitch", "yaw", "gripper"], + }, + }, + # Add this new feature to follow LeRobot standard of using joint position + gripper + "action": { + "dtype": "float32", + "shape": (8,), + "names": { + "axes": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6", "gripper"], + }, + }, + "discount": { + "dtype": "float32", + "shape": (1,), + "names": None, + }, + "reward": { + "dtype": "float32", + "shape": (1,), + "names": None, + }, + # Meta data that are the same for all frames in the episode + "task_category": { + "dtype": "string", + "shape": (1,), + "names": None, + }, + "building": { + "dtype": "string", + "shape": (1,), + "names": None, + }, + "collector_id": { + "dtype": "string", + "shape": (1,), + "names": None, + }, + "date": { + "dtype": "string", + "shape": (1,), + "names": None, + }, + "camera_extrinsics.wrist_left": { + "dtype": "float32", + "shape": (6,), + "names": { + "axes": ["x", "y", "z", "roll", "pitch", "yaw"], + }, + }, + "camera_extrinsics.exterior_1_left": { + "dtype": "float32", + "shape": (6,), + "names": { + "axes": ["x", "y", "z", "roll", "pitch", "yaw"], + }, + }, + "camera_extrinsics.exterior_2_left": { + "dtype": "float32", + "shape": (6,), + "names": { + "axes": ["x", "y", "z", "roll", "pitch", "yaw"], + }, + }, + "is_episode_successful": { + "dtype": "bool", + "shape": (1,), + "names": None, + }, +} + + +def is_episode_successful(tf_episode_metadata): + # Adapted from: https://github.com/droid-dataset/droid_policy_learning/blob/dd1020eb20d981f90b5ff07dc80d80d5c0cb108b/robomimic/utils/rlds_utils.py#L8 + return "/success/" in tf_episode_metadata["file_path"].numpy().decode() + + +def generate_lerobot_frames(tf_episode): + m = tf_episode["episode_metadata"] + frame_meta = { + "task_category": m["building"].numpy().decode(), + "building": m["building"].numpy().decode(), + "collector_id": m["collector_id"].numpy().decode(), + "date": m["date"].numpy().decode(), + "camera_extrinsics.wrist_left": m["extrinsics_wrist_cam"].numpy(), + "camera_extrinsics.exterior_1_left": m["extrinsics_exterior_cam_1"].numpy(), + "camera_extrinsics.exterior_2_left": m["extrinsics_exterior_cam_2"].numpy(), + "is_episode_successful": np.array([is_episode_successful(m)]), + } + for f in tf_episode["steps"]: + # Dataset schema slightly adapted from: https://droid-dataset.github.io/droid/the-droid-dataset.html#-dataset-schema + frame = { + "is_first": np.array([f["is_first"].numpy()]), + "is_last": np.array([f["is_last"].numpy()]), + "is_terminal": np.array([f["is_terminal"].numpy()]), + "language_instruction": f["language_instruction"].numpy().decode(), + "language_instruction_2": f["language_instruction_2"].numpy().decode(), + "language_instruction_3": f["language_instruction_3"].numpy().decode(), + "observation.state.gripper_position": f["observation"]["gripper_position"].numpy(), + "observation.state.cartesian_position": f["observation"]["cartesian_position"].numpy(), + "observation.state.joint_position": f["observation"]["joint_position"].numpy(), + "observation.images.wrist_left": f["observation"]["wrist_image_left"].numpy(), + "observation.images.exterior_1_left": f["observation"]["exterior_image_1_left"].numpy(), + "observation.images.exterior_2_left": f["observation"]["exterior_image_2_left"].numpy(), + "action.gripper_position": f["action_dict"]["gripper_position"].numpy(), + "action.gripper_velocity": f["action_dict"]["gripper_velocity"].numpy(), + "action.cartesian_position": f["action_dict"]["cartesian_position"].numpy(), + "action.cartesian_velocity": f["action_dict"]["cartesian_velocity"].numpy(), + "action.joint_position": f["action_dict"]["joint_position"].numpy(), + "action.joint_velocity": f["action_dict"]["joint_velocity"].numpy(), + "discount": np.array([f["discount"].numpy()]), + "reward": np.array([f["reward"].numpy()]), + "action.original": f["action"].numpy(), + } + + # language_instruction is also stored as "task" to follow LeRobot standard + frame["task"] = frame["language_instruction"] + + # Add this new feature to follow LeRobot standard of using joint position + gripper + frame["observation.state"] = np.concatenate( + [frame["observation.state.joint_position"], frame["observation.state.gripper_position"]] + ) + frame["action"] = np.concatenate([frame["action.joint_position"], frame["action.gripper_position"]]) + + # Meta data that are the same for all frames in the episode + frame.update(frame_meta) + + # Cast fp64 to fp32 + for key in frame: + if isinstance(frame[key], np.ndarray) and frame[key].dtype == np.float64: + frame[key] = frame[key].astype(np.float32) + + yield frame + + +def port_droid( + raw_dir: Path, + repo_id: str, + push_to_hub: bool = False, + num_shards: int | None = None, + shard_index: int | None = None, +): + dataset_name = raw_dir.parent.name + version = raw_dir.name + data_dir = raw_dir.parent.parent + + builder = tfds.builder(f"{dataset_name}/{version}", data_dir=data_dir, version="") + + if num_shards is not None: + tfds_num_shards = builder.info.splits["train"].num_shards + if tfds_num_shards != DROID_SHARDS: + raise ValueError( + f"Number of shards of Droid dataset is expected to be {DROID_SHARDS} but is {tfds_num_shards}." + ) + if num_shards != tfds_num_shards: + raise ValueError( + f"We only shard over the fixed number of shards provided by tensorflow dataset ({tfds_num_shards}), but {num_shards} shards provided instead." + ) + if shard_index >= tfds_num_shards: + raise ValueError( + f"Shard index is greater than the num of shards ({shard_index} >= {num_shards})." + ) + + raw_dataset = builder.as_dataset(split=f"train[{shard_index}shard]") + else: + raw_dataset = builder.as_dataset(split="train") + + lerobot_dataset = LeRobotDataset.create( + repo_id=repo_id, + robot_type=DROID_ROBOT_TYPE, + fps=DROID_FPS, + features=DROID_FEATURES, + ) + + start_time = time.time() + num_episodes = raw_dataset.cardinality().numpy().item() + logging.info(f"Number of episodes {num_episodes}") + + for episode_index, episode in enumerate(raw_dataset): + elapsed_time = time.time() - start_time + d, h, m, s = get_elapsed_time_in_days_hours_minutes_seconds(elapsed_time) + + logging.info( + f"{episode_index} / {num_episodes} episodes processed (after {d} days, {h} hours, {m} minutes, {s:.3f} seconds)" + ) + + for frame in generate_lerobot_frames(episode): + lerobot_dataset.add_frame(frame) + + lerobot_dataset.save_episode() + logging.info("Save_episode") + + if push_to_hub: + lerobot_dataset.push_to_hub( + # Add openx tag, since it belongs to the openx collection of datasets + tags=["openx"], + private=False, + ) + + +def main(): + parser = argparse.ArgumentParser() + + parser.add_argument( + "--raw-dir", + type=Path, + required=True, + help="Directory containing input raw datasets (e.g. `path/to/dataset` or `path/to/dataset/version).", + ) + parser.add_argument( + "--repo-id", + type=str, + help="Repositery identifier on Hugging Face: a community or a user name `/` the name of the dataset, required when push-to-hub is True", + ) + parser.add_argument( + "--push-to-hub", + action="store_true", + help="Upload to hub.", + ) + parser.add_argument( + "--num-shards", + type=int, + default=None, + help="Number of shards. Can be either None to load the full dataset, or 2048 to load one of the 2048 tensorflow dataset files.", + ) + parser.add_argument( + "--shard-index", + type=int, + default=None, + help="Index of the shard. Can be either None to load the full dataset, or in [0,2047] to load one of the 2048 tensorflow dataset files.", + ) + + args = parser.parse_args() + + port_droid(**vars(args)) + + +if __name__ == "__main__": + main() diff --git a/examples/port_datasets/droid_rlds/slurm_aggregate_shards.py b/examples/port_datasets/droid_rlds/slurm_aggregate_shards.py new file mode 100644 index 00000000..d2f2dfdf --- /dev/null +++ b/examples/port_datasets/droid_rlds/slurm_aggregate_shards.py @@ -0,0 +1,288 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import logging +from pathlib import Path + +import tqdm +from datatrove.executor import LocalPipelineExecutor +from datatrove.executor.slurm import SlurmPipelineExecutor +from datatrove.pipeline.base import PipelineStep + +from examples.port_datasets.droid_rlds.port_droid import DROID_SHARDS +from lerobot.common.datasets.aggregate import validate_all_metadata +from lerobot.common.datasets.lerobot_dataset import LeRobotDatasetMetadata +from lerobot.common.datasets.utils import write_episode, write_episode_stats, write_info, write_task +from lerobot.common.utils.utils import init_logging + + +class AggregateDatasets(PipelineStep): + def __init__( + self, + repo_ids: list[str], + aggregated_repo_id: str, + ): + super().__init__() + self.repo_ids = repo_ids + self.aggr_repo_id = aggregated_repo_id + + self.create_aggr_dataset() + + def create_aggr_dataset(self): + init_logging() + + logging.info("Start aggregate_datasets") + + all_metadata = [LeRobotDatasetMetadata(repo_id) for repo_id in self.repo_ids] + + fps, robot_type, features = validate_all_metadata(all_metadata) + + # Create resulting dataset folder + aggr_meta = LeRobotDatasetMetadata.create( + repo_id=self.aggr_repo_id, + fps=fps, + robot_type=robot_type, + features=features, + ) + + logging.info("Find all tasks") + # find all tasks, deduplicate them, create new task indices for each dataset + # indexed by dataset index + datasets_task_index_to_aggr_task_index = {} + aggr_task_index = 0 + for dataset_index, meta in enumerate(tqdm.tqdm(all_metadata, desc="Find all tasks")): + task_index_to_aggr_task_index = {} + + for task_index, task in meta.tasks.items(): + if task not in aggr_meta.task_to_task_index: + # add the task to aggr tasks mappings + aggr_meta.tasks[aggr_task_index] = task + aggr_meta.task_to_task_index[task] = aggr_task_index + aggr_task_index += 1 + + # add task_index anyway + task_index_to_aggr_task_index[task_index] = aggr_meta.task_to_task_index[task] + + datasets_task_index_to_aggr_task_index[dataset_index] = task_index_to_aggr_task_index + + logging.info("Prepare copy data and videos") + datasets_ep_idx_to_aggr_ep_idx = {} + datasets_aggr_episode_index_shift = {} + aggr_episode_index_shift = 0 + for dataset_index, meta in enumerate(tqdm.tqdm(all_metadata, desc="Prepare copy data and videos")): + ep_idx_to_aggr_ep_idx = {} + + for episode_index in range(meta.total_episodes): + aggr_episode_index = episode_index + aggr_episode_index_shift + ep_idx_to_aggr_ep_idx[episode_index] = aggr_episode_index + + datasets_ep_idx_to_aggr_ep_idx[dataset_index] = ep_idx_to_aggr_ep_idx + datasets_aggr_episode_index_shift[dataset_index] = aggr_episode_index_shift + + # populate episodes + for episode_index, episode_dict in meta.episodes.items(): + aggr_episode_index = episode_index + aggr_episode_index_shift + episode_dict["episode_index"] = aggr_episode_index + aggr_meta.episodes[aggr_episode_index] = episode_dict + + # populate episodes_stats + for episode_index, episode_stats in meta.episodes_stats.items(): + aggr_episode_index = episode_index + aggr_episode_index_shift + aggr_meta.episodes_stats[aggr_episode_index] = episode_stats + + # populate info + aggr_meta.info["total_episodes"] += meta.total_episodes + aggr_meta.info["total_frames"] += meta.total_frames + aggr_meta.info["total_videos"] += len(aggr_meta.video_keys) * meta.total_episodes + + aggr_episode_index_shift += meta.total_episodes + + logging.info("Write meta data") + aggr_meta.info["total_tasks"] = len(aggr_meta.tasks) + aggr_meta.info["total_chunks"] = aggr_meta.get_episode_chunk(aggr_episode_index_shift - 1) + aggr_meta.info["splits"] = {"train": f"0:{aggr_meta.info['total_episodes']}"} + + # create a new episodes jsonl with updated episode_index using write_episode + for episode_dict in tqdm.tqdm(aggr_meta.episodes.values(), desc="Write episodes"): + write_episode(episode_dict, aggr_meta.root) + + # create a new episode_stats jsonl with updated episode_index using write_episode_stats + for episode_index, episode_stats in tqdm.tqdm( + aggr_meta.episodes_stats.items(), desc="Write episodes stats" + ): + write_episode_stats(episode_index, episode_stats, aggr_meta.root) + + # create a new task jsonl with updated episode_index using write_task + for task_index, task in tqdm.tqdm(aggr_meta.tasks.items(), desc="Write tasks"): + write_task(task_index, task, aggr_meta.root) + + write_info(aggr_meta.info, aggr_meta.root) + + self.datasets_task_index_to_aggr_task_index = datasets_task_index_to_aggr_task_index + self.datasets_ep_idx_to_aggr_ep_idx = datasets_ep_idx_to_aggr_ep_idx + self.datasets_aggr_episode_index_shift = datasets_aggr_episode_index_shift + + logging.info("Meta data done writing!") + + def run(self, data=None, rank: int = 0, world_size: int = 1): + import logging + import shutil + + import pandas as pd + + from lerobot.common.datasets.aggregate import get_update_episode_and_task_func + from lerobot.common.datasets.lerobot_dataset import LeRobotDatasetMetadata + from lerobot.common.utils.utils import init_logging + + init_logging() + + aggr_meta = LeRobotDatasetMetadata(self.aggr_repo_id) + all_metadata = [LeRobotDatasetMetadata(repo_id) for repo_id in self.repo_ids] + + if world_size != len(all_metadata): + raise ValueError() + + dataset_index = rank + meta = all_metadata[dataset_index] + aggr_episode_index_shift = self.datasets_aggr_episode_index_shift[dataset_index] + + logging.info("Copy data") + for episode_index in range(meta.total_episodes): + aggr_episode_index = self.datasets_ep_idx_to_aggr_ep_idx[dataset_index][episode_index] + data_path = meta.root / meta.get_data_file_path(episode_index) + aggr_data_path = aggr_meta.root / aggr_meta.get_data_file_path(aggr_episode_index) + + # update episode_index and task_index + df = pd.read_parquet(data_path) + update_row_func = get_update_episode_and_task_func( + aggr_episode_index_shift, self.datasets_task_index_to_aggr_task_index[dataset_index] + ) + df = df.apply(update_row_func, axis=1) + + aggr_data_path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(aggr_data_path) + + logging.info("Copy videos") + for episode_index in range(meta.total_episodes): + aggr_episode_index = episode_index + aggr_episode_index_shift + for vid_key in meta.video_keys: + video_path = meta.root / meta.get_video_file_path(episode_index, vid_key) + aggr_video_path = aggr_meta.root / aggr_meta.get_video_file_path(aggr_episode_index, vid_key) + aggr_video_path.parent.mkdir(parents=True, exist_ok=True) + shutil.copy(video_path, aggr_video_path) + + # copy_command = f"cp {video_path} {aggr_video_path} &" + # subprocess.Popen(copy_command, shell=True) + + logging.info("Done!") + + +def make_aggregate_executor( + repo_ids, repo_id, job_name, logs_dir, workers, partition, cpus_per_task, mem_per_cpu, slurm=True +): + kwargs = { + "pipeline": [ + AggregateDatasets(repo_ids, repo_id), + ], + "logging_dir": str(logs_dir / job_name), + } + + if slurm: + kwargs.update( + { + "job_name": job_name, + "tasks": DROID_SHARDS, + "workers": workers, + "time": "08:00:00", + "partition": partition, + "cpus_per_task": cpus_per_task, + "sbatch_args": {"mem-per-cpu": mem_per_cpu}, + } + ) + executor = SlurmPipelineExecutor(**kwargs) + else: + kwargs.update( + { + "tasks": DROID_SHARDS, + "workers": 1, + } + ) + executor = LocalPipelineExecutor(**kwargs) + + return executor + + +def main(): + parser = argparse.ArgumentParser() + + parser.add_argument( + "--repo-id", + type=str, + help="Repositery identifier on Hugging Face: a community or a user name `/` the name of the dataset, required when push-to-hub is True.", + ) + parser.add_argument( + "--logs-dir", + type=Path, + help="Path to logs directory for `datatrove`.", + ) + parser.add_argument( + "--job-name", + type=str, + default="aggr_droid", + help="Job name used in slurm, and name of the directory created inside the provided logs directory.", + ) + parser.add_argument( + "--slurm", + type=int, + default=1, + help="Launch over slurm. Use `--slurm 0` to launch sequentially (useful to debug).", + ) + parser.add_argument( + "--workers", + type=int, + default=2048, + help="Number of slurm workers. It should be less than the maximum number of shards.", + ) + parser.add_argument( + "--partition", + type=str, + help="Slurm partition. Ideally a CPU partition. No need for GPU partition.", + ) + parser.add_argument( + "--cpus-per-task", + type=int, + default=8, + help="Number of cpus that each slurm worker will use.", + ) + parser.add_argument( + "--mem-per-cpu", + type=str, + default="1950M", + help="Memory per cpu that each worker will use.", + ) + + args = parser.parse_args() + kwargs = vars(args) + kwargs["slurm"] = kwargs.pop("slurm") == 1 + + repo_ids = [f"{args.repo_id}_world_{DROID_SHARDS}_rank_{rank}" for rank in range(DROID_SHARDS)] + aggregate_executor = make_aggregate_executor(repo_ids, **kwargs) + aggregate_executor.run() + + +if __name__ == "__main__": + main() diff --git a/examples/port_datasets/droid_rlds/slurm_port_shards.py b/examples/port_datasets/droid_rlds/slurm_port_shards.py new file mode 100644 index 00000000..08e36bc3 --- /dev/null +++ b/examples/port_datasets/droid_rlds/slurm_port_shards.py @@ -0,0 +1,161 @@ +import argparse +from pathlib import Path + +from datatrove.executor import LocalPipelineExecutor +from datatrove.executor.slurm import SlurmPipelineExecutor +from datatrove.pipeline.base import PipelineStep + +from examples.port_datasets.droid_rlds.port_droid import DROID_SHARDS +from lerobot.common.datasets.lerobot_dataset import LeRobotDatasetMetadata + + +def validate_shard(repo_id): + """Sanity check that ensure meta data can be loaded and all files are present.""" + meta = LeRobotDatasetMetadata(repo_id) + + if meta.total_episodes == 0: + raise ValueError("Number of episodes is 0.") + + for ep_idx in range(meta.total_episodes): + data_path = meta.root / meta.get_data_file_path(ep_idx) + + if not data_path.exists(): + raise ValueError(f"Parquet file is missing in: {data_path}") + + for vid_key in meta.video_keys: + vid_path = meta.root / meta.get_video_file_path(ep_idx, vid_key) + if not vid_path.exists(): + raise ValueError(f"Video file is missing in: {vid_path}") + + +class PortDroidShards(PipelineStep): + def __init__( + self, + raw_dir: Path | str, + repo_id: str = None, + ): + super().__init__() + self.raw_dir = Path(raw_dir) + self.repo_id = repo_id + + def run(self, data=None, rank: int = 0, world_size: int = 1): + from datasets.utils.tqdm import disable_progress_bars + + from examples.port_datasets.droid_rlds.port_droid import port_droid + from lerobot.common.utils.utils import init_logging + + init_logging() + disable_progress_bars() + + shard_repo_id = f"{self.repo_id}_world_{world_size}_rank_{rank}" + + port_droid( + self.raw_dir, + shard_repo_id, + push_to_hub=False, + num_shards=world_size, + shard_index=rank, + ) + + validate_shard(shard_repo_id) + + +def make_port_executor( + raw_dir, repo_id, job_name, logs_dir, workers, partition, cpus_per_task, mem_per_cpu, slurm=True +): + kwargs = { + "pipeline": [ + PortDroidShards(raw_dir, repo_id), + ], + "logging_dir": str(logs_dir / job_name), + } + + if slurm: + kwargs.update( + { + "job_name": job_name, + "tasks": DROID_SHARDS, + "workers": workers, + "time": "08:00:00", + "partition": partition, + "cpus_per_task": cpus_per_task, + "sbatch_args": {"mem-per-cpu": mem_per_cpu}, + } + ) + executor = SlurmPipelineExecutor(**kwargs) + else: + kwargs.update( + { + "tasks": 1, + "workers": 1, + } + ) + executor = LocalPipelineExecutor(**kwargs) + + return executor + + +def main(): + parser = argparse.ArgumentParser() + + parser.add_argument( + "--raw-dir", + type=Path, + required=True, + help="Directory containing input raw datasets (e.g. `path/to/dataset` or `path/to/dataset/version).", + ) + parser.add_argument( + "--repo-id", + type=str, + help="Repositery identifier on Hugging Face: a community or a user name `/` the name of the dataset, required when push-to-hub is True.", + ) + parser.add_argument( + "--logs-dir", + type=Path, + help="Path to logs directory for `datatrove`.", + ) + parser.add_argument( + "--job-name", + type=str, + default="port_droid", + help="Job name used in slurm, and name of the directory created inside the provided logs directory.", + ) + parser.add_argument( + "--slurm", + type=int, + default=1, + help="Launch over slurm. Use `--slurm 0` to launch sequentially (useful to debug).", + ) + parser.add_argument( + "--workers", + type=int, + default=2048, + help="Number of slurm workers. It should be less than the maximum number of shards.", + ) + parser.add_argument( + "--partition", + type=str, + help="Slurm partition. Ideally a CPU partition. No need for GPU partition.", + ) + parser.add_argument( + "--cpus-per-task", + type=int, + default=8, + help="Number of cpus that each slurm worker will use.", + ) + parser.add_argument( + "--mem-per-cpu", + type=str, + default="1950M", + help="Memory per cpu that each worker will use.", + ) + + args = parser.parse_args() + kwargs = vars(args) + kwargs["slurm"] = kwargs.pop("slurm") == 1 + port_executor = make_port_executor(**kwargs) + port_executor.run() + + +if __name__ == "__main__": + main() diff --git a/examples/port_datasets/droid_rlds/slurm_upload.py b/examples/port_datasets/droid_rlds/slurm_upload.py new file mode 100644 index 00000000..8dec7c20 --- /dev/null +++ b/examples/port_datasets/droid_rlds/slurm_upload.py @@ -0,0 +1,263 @@ +import argparse +import logging +import os +from pathlib import Path + +from datatrove.executor import LocalPipelineExecutor +from datatrove.executor.slurm import SlurmPipelineExecutor +from datatrove.pipeline.base import PipelineStep +from huggingface_hub import HfApi +from huggingface_hub.constants import REPOCARD_NAME + +from examples.port_datasets.droid_rlds.port_droid import DROID_SHARDS +from lerobot.common.datasets.lerobot_dataset import CODEBASE_VERSION, LeRobotDatasetMetadata +from lerobot.common.datasets.utils import create_lerobot_dataset_card +from lerobot.common.utils.utils import init_logging + + +class UploadDataset(PipelineStep): + def __init__( + self, + repo_id: str, + branch: str | None = None, + revision: str | None = None, + tags: list | None = None, + license: str | None = "apache-2.0", + private: bool = False, + distant_repo_id: str | None = None, + **card_kwargs, + ): + super().__init__() + self.repo_id = repo_id + self.distant_repo_id = self.repo_id if distant_repo_id is None else distant_repo_id + self.branch = branch + self.tags = tags + self.license = license + self.private = private + self.card_kwargs = card_kwargs + self.revision = revision if revision else CODEBASE_VERSION + + if os.environ.get("HF_HUB_ENABLE_HF_TRANSFER", "0") != "1": + logging.warning( + 'HF_HUB_ENABLE_HF_TRANSFER is not set to "1". Install hf_transfer and set the env ' + "variable for faster uploads:\npip install hf-transfer\nexport HF_HUB_ENABLE_HF_TRANSFER=1" + ) + + self.create_repo() + + def create_repo(self): + logging.info(f"Loading meta data from {self.repo_id}...") + meta = LeRobotDatasetMetadata(self.repo_id) + + logging.info(f"Creating repo {self.distant_repo_id}...") + hub_api = HfApi() + hub_api.create_repo( + repo_id=self.distant_repo_id, + private=self.private, + repo_type="dataset", + exist_ok=True, + ) + if self.branch: + hub_api.create_branch( + repo_id=self.distant_repo_id, + branch=self.branch, + revision=self.revision, + repo_type="dataset", + exist_ok=True, + ) + + if not hub_api.file_exists( + self.distant_repo_id, REPOCARD_NAME, repo_type="dataset", revision=self.branch + ): + card = create_lerobot_dataset_card( + tags=self.tags, dataset_info=meta.info, license=self.license, **self.card_kwargs + ) + card.push_to_hub(repo_id=self.distant_repo_id, repo_type="dataset", revision=self.branch) + + def list_files_recursively(directory): + base_path = Path(directory) + return [str(file.relative_to(base_path)) for file in base_path.rglob("*") if file.is_file()] + + logging.info(f"Listing all local files from {self.repo_id}...") + self.file_paths = list_files_recursively(meta.root) + self.file_paths = sorted(self.file_paths) + + def create_chunks(self, lst, n): + from itertools import islice + + it = iter(lst) + return [list(islice(it, size)) for size in [len(lst) // n + (i < len(lst) % n) for i in range(n)]] + + def create_commits(self, additions): + import logging + import math + import random + import time + + from huggingface_hub import create_commit + from huggingface_hub.utils import HfHubHTTPError + + FILES_BETWEEN_COMMITS = 10 # noqa: N806 + BASE_DELAY = 0.1 # noqa: N806 + MAX_RETRIES = 12 # noqa: N806 + + # Split the files into smaller chunks for faster commit + # and avoiding "A commit has happened since" error + num_chunks = math.ceil(len(additions) / FILES_BETWEEN_COMMITS) + chunks = self.create_chunks(additions, num_chunks) + + for chunk in chunks: + retries = 0 + while True: + try: + create_commit( + self.distant_repo_id, + repo_type="dataset", + operations=chunk, + commit_message=f"DataTrove upload ({len(chunk)} files)", + revision=self.branch, + ) + # TODO: every 100 chunks super_squach_commits() + logging.info("create_commit completed!") + break + except HfHubHTTPError as e: + if "A commit has happened since" in e.server_message: + if retries >= MAX_RETRIES: + logging.error(f"Failed to create commit after {MAX_RETRIES=}. Giving up.") + raise e + logging.info("Commit creation race condition issue. Waiting...") + time.sleep(BASE_DELAY * 2**retries + random.uniform(0, 2)) + retries += 1 + else: + raise e + + def run(self, data=None, rank: int = 0, world_size: int = 1): + import logging + + from datasets.utils.tqdm import disable_progress_bars + from huggingface_hub import CommitOperationAdd, preupload_lfs_files + + from lerobot.common.datasets.lerobot_dataset import LeRobotDatasetMetadata + from lerobot.common.utils.utils import init_logging + + init_logging() + disable_progress_bars() + + chunks = self.create_chunks(self.file_paths, world_size) + file_paths = chunks[rank] + + if len(file_paths) == 0: + raise ValueError(file_paths) + + logging.info("Pre-uploading LFS files...") + for i, path in enumerate(file_paths): + logging.info(f"{i}: {path}") + + meta = LeRobotDatasetMetadata(self.repo_id) + additions = [ + CommitOperationAdd(path_in_repo=path, path_or_fileobj=meta.root / path) for path in file_paths + ] + preupload_lfs_files( + repo_id=self.distant_repo_id, repo_type="dataset", additions=additions, revision=self.branch + ) + + logging.info("Creating commits...") + self.create_commits(additions) + logging.info("Done!") + + +def make_upload_executor( + repo_id, job_name, logs_dir, workers, partition, cpus_per_task, mem_per_cpu, slurm=True +): + kwargs = { + "pipeline": [ + UploadDataset(repo_id), + ], + "logging_dir": str(logs_dir / job_name), + } + + if slurm: + kwargs.update( + { + "job_name": job_name, + "tasks": DROID_SHARDS, + "workers": workers, + "time": "08:00:00", + "partition": partition, + "cpus_per_task": cpus_per_task, + "sbatch_args": {"mem-per-cpu": mem_per_cpu}, + } + ) + executor = SlurmPipelineExecutor(**kwargs) + else: + kwargs.update( + { + "tasks": DROID_SHARDS, + "workers": 1, + } + ) + executor = LocalPipelineExecutor(**kwargs) + + return executor + + +def main(): + parser = argparse.ArgumentParser() + + parser.add_argument( + "--repo-id", + type=str, + help="Repositery identifier on Hugging Face: a community or a user name `/` the name of the dataset, required when push-to-hub is True.", + ) + parser.add_argument( + "--logs-dir", + type=Path, + help="Path to logs directory for `datatrove`.", + ) + parser.add_argument( + "--job-name", + type=str, + default="upload_droid", + help="Job name used in slurm, and name of the directory created inside the provided logs directory.", + ) + parser.add_argument( + "--slurm", + type=int, + default=1, + help="Launch over slurm. Use `--slurm 0` to launch sequentially (useful to debug).", + ) + parser.add_argument( + "--workers", + type=int, + default=50, + help="Number of slurm workers. It should be less than the maximum number of shards.", + ) + parser.add_argument( + "--partition", + type=str, + help="Slurm partition. Ideally a CPU partition. No need for GPU partition.", + ) + parser.add_argument( + "--cpus-per-task", + type=int, + default=8, + help="Number of cpus that each slurm worker will use.", + ) + parser.add_argument( + "--mem-per-cpu", + type=str, + default="1950M", + help="Memory per cpu that each worker will use.", + ) + + init_logging() + + args = parser.parse_args() + kwargs = vars(args) + kwargs["slurm"] = kwargs.pop("slurm") == 1 + upload_executor = make_upload_executor(**kwargs) + upload_executor.run() + + +if __name__ == "__main__": + main() diff --git a/lerobot/common/datasets/aggregate.py b/lerobot/common/datasets/aggregate.py new file mode 100644 index 00000000..8761def9 --- /dev/null +++ b/lerobot/common/datasets/aggregate.py @@ -0,0 +1,175 @@ +import logging +import shutil + +import pandas as pd +import tqdm + +from lerobot.common.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata +from lerobot.common.datasets.utils import write_episode, write_episode_stats, write_info, write_task +from lerobot.common.utils.utils import init_logging + + +def validate_all_metadata(all_metadata: list[LeRobotDatasetMetadata]): + # validate same fps, robot_type, features + + fps = all_metadata[0].fps + robot_type = all_metadata[0].robot_type + features = all_metadata[0].features + + for meta in tqdm.tqdm(all_metadata, desc="Validate all meta data"): + if fps != meta.fps: + raise ValueError(f"Same fps is expected, but got fps={meta.fps} instead of {fps}.") + if robot_type != meta.robot_type: + raise ValueError( + f"Same robot_type is expected, but got robot_type={meta.robot_type} instead of {robot_type}." + ) + if features != meta.features: + raise ValueError( + f"Same features is expected, but got features={meta.features} instead of {features}." + ) + + return fps, robot_type, features + + +def get_update_episode_and_task_func(episode_index_to_add, task_index_to_global_task_index): + def _update(row): + row["episode_index"] = row["episode_index"] + episode_index_to_add + row["task_index"] = task_index_to_global_task_index[row["task_index"]] + return row + + return _update + + +def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, aggr_root=None): + logging.info("Start aggregate_datasets") + + all_metadata = [LeRobotDatasetMetadata(repo_id) for repo_id in repo_ids] + + fps, robot_type, features = validate_all_metadata(all_metadata) + + # Create resulting dataset folder + aggr_meta = LeRobotDatasetMetadata.create( + repo_id=aggr_repo_id, + fps=fps, + robot_type=robot_type, + features=features, + root=aggr_root, + ) + + logging.info("Find all tasks") + # find all tasks, deduplicate them, create new task indices for each dataset + # indexed by dataset index + datasets_task_index_to_aggr_task_index = {} + aggr_task_index = 0 + for dataset_index, meta in enumerate(tqdm.tqdm(all_metadata, desc="Find all tasks")): + task_index_to_aggr_task_index = {} + + for task_index, task in meta.tasks.items(): + if task not in aggr_meta.task_to_task_index: + # add the task to aggr tasks mappings + aggr_meta.tasks[aggr_task_index] = task + aggr_meta.task_to_task_index[task] = aggr_task_index + aggr_task_index += 1 + + # add task_index anyway + task_index_to_aggr_task_index[task_index] = aggr_meta.task_to_task_index[task] + + datasets_task_index_to_aggr_task_index[dataset_index] = task_index_to_aggr_task_index + + logging.info("Copy data and videos") + aggr_episode_index_shift = 0 + for dataset_index, meta in enumerate(tqdm.tqdm(all_metadata, desc="Copy data and videos")): + # cp data + for episode_index in range(meta.total_episodes): + aggr_episode_index = episode_index + aggr_episode_index_shift + data_path = meta.root / meta.get_data_file_path(episode_index) + aggr_data_path = aggr_meta.root / aggr_meta.get_data_file_path(aggr_episode_index) + + # update episode_index and task_index + df = pd.read_parquet(data_path) + update_row_func = get_update_episode_and_task_func( + aggr_episode_index_shift, datasets_task_index_to_aggr_task_index[dataset_index] + ) + df = df.apply(update_row_func, axis=1) + + aggr_data_path.parent.mkdir(parents=True, exist_ok=True) + df.to_parquet(aggr_data_path) + + # cp videos + for episode_index in range(meta.total_episodes): + aggr_episode_index = episode_index + aggr_episode_index_shift + for vid_key in meta.video_keys: + video_path = meta.root / meta.get_video_file_path(episode_index, vid_key) + aggr_video_path = aggr_meta.root / aggr_meta.get_video_file_path(aggr_episode_index, vid_key) + aggr_video_path.parent.mkdir(parents=True, exist_ok=True) + shutil.copy(video_path, aggr_video_path) + + # copy_command = f"cp {video_path} {aggr_video_path} &" + # subprocess.Popen(copy_command, shell=True) + + # populate episodes + for episode_index, episode_dict in meta.episodes.items(): + aggr_episode_index = episode_index + aggr_episode_index_shift + episode_dict["episode_index"] = aggr_episode_index + aggr_meta.episodes[aggr_episode_index] = episode_dict + + # populate episodes_stats + for episode_index, episode_stats in meta.episodes_stats.items(): + aggr_episode_index = episode_index + aggr_episode_index_shift + aggr_meta.episodes_stats[aggr_episode_index] = episode_stats + + # populate info + aggr_meta.info["total_episodes"] += meta.total_episodes + aggr_meta.info["total_frames"] += meta.total_frames + aggr_meta.info["total_videos"] += len(aggr_meta.video_keys) * meta.total_episodes + + aggr_episode_index_shift += meta.total_episodes + + logging.info("write meta data") + + aggr_meta.info["total_chunks"] = aggr_meta.get_episode_chunk(aggr_episode_index_shift - 1) + aggr_meta.info["splits"] = {"train": f"0:{aggr_meta.info['total_episodes']}"} + + # create a new episodes jsonl with updated episode_index using write_episode + for episode_dict in aggr_meta.episodes.values(): + write_episode(episode_dict, aggr_meta.root) + + # create a new episode_stats jsonl with updated episode_index using write_episode_stats + for episode_index, episode_stats in aggr_meta.episodes_stats.items(): + write_episode_stats(episode_index, episode_stats, aggr_meta.root) + + # create a new task jsonl with updated episode_index using write_task + for task_index, task in aggr_meta.tasks.items(): + write_task(task_index, task, aggr_meta.root) + + write_info(aggr_meta.info, aggr_meta.root) + + +if __name__ == "__main__": + init_logging() + repo_id = "cadene/droid" + aggr_repo_id = "cadene/droid" + datetime = "2025-02-22_11-23-54" + + # root = Path(f"/tmp/{repo_id}") + # if root.exists(): + # shutil.rmtree(root) + root = None + + # all_metadata = [LeRobotDatasetMetadata(f"{repo_id}_{datetime}_world_2048_rank_{rank}") for rank in range(2048)] + + # aggregate_datasets( + # all_metadata, + # aggr_repo_id, + # root=root, + # ) + + aggr_dataset = LeRobotDataset( + repo_id=aggr_repo_id, + root=root, + ) + aggr_dataset.push_to_hub(tags=["openx"]) + + # for meta in all_metadata: + # dataset = LeRobotDataset(repo_id=meta.repo_id, root=meta.root) + # dataset.push_to_hub(tags=["openx"]) diff --git a/lerobot/common/datasets/lerobot_dataset.py b/lerobot/common/datasets/lerobot_dataset.py index d8da85d6..9e5434cf 100644 --- a/lerobot/common/datasets/lerobot_dataset.py +++ b/lerobot/common/datasets/lerobot_dataset.py @@ -74,7 +74,7 @@ from lerobot.common.datasets.video_utils import ( ) from lerobot.common.robot_devices.robots.utils import Robot -CODEBASE_VERSION = "v2.1" +CODEBASE_VERSION = "v3.0" class LeRobotDatasetMetadata: @@ -617,6 +617,8 @@ class LeRobotDataset(torch.utils.data.Dataset): """hf_dataset contains all the observations, states, actions, rewards, etc.""" if self.episodes is None: path = str(self.root / "data") + # TODO(rcadene): load_dataset convert parquet to arrow. + # set num_proc to accelerate this conversion hf_dataset = load_dataset("parquet", data_dir=path, split="train") else: files = [str(self.root / self.meta.get_data_file_path(ep_idx)) for ep_idx in self.episodes] diff --git a/lerobot/common/datasets/v30/convert_dataset_v21_to_v30.py b/lerobot/common/datasets/v30/convert_dataset_v21_to_v30.py new file mode 100644 index 00000000..f1efae35 --- /dev/null +++ b/lerobot/common/datasets/v30/convert_dataset_v21_to_v30.py @@ -0,0 +1,137 @@ +""" +This script will help you convert any LeRobot dataset already pushed to the hub from codebase version 2.1 to +3.0. It will: + +- Generate per-episodes stats and writes them in `episodes_stats.jsonl` +- Check consistency between these new stats and the old ones. +- Remove the deprecated `stats.json`. +- Update codebase_version in `info.json`. +- Push this new version to the hub on the 'main' branch and tags it with "v2.1". + +Usage: + +```bash +python lerobot/common/datasets/v30/convert_dataset_v21_to_v30.py \ + --repo-id=lerobot/pusht +``` + +""" + +import argparse +import logging + +from datasets import Dataset +from huggingface_hub import snapshot_download + +from lerobot.common.constants import HF_LEROBOT_HOME +from lerobot.common.datasets.utils import ( + load_episodes_stats, +) + +V21 = "v2.1" + + +class SuppressWarnings: + def __enter__(self): + self.previous_level = logging.getLogger().getEffectiveLevel() + logging.getLogger().setLevel(logging.ERROR) + + def __exit__(self, exc_type, exc_val, exc_tb): + logging.getLogger().setLevel(self.previous_level) + + +def convert_dataset( + repo_id: str, + branch: str | None = None, + num_workers: int = 4, +): + root = HF_LEROBOT_HOME / repo_id + snapshot_download( + repo_id, + repo_type="dataset", + revision=V21, + local_dir=root, + ) + + # Concatenate videos + + # Create + + """ + ------------------------- + OLD + data/chunk-000/episode_000000.parquet + + NEW + data/chunk-000/file_000.parquet + ------------------------- + OLD + videos/chunk-000/CAMERA/episode_000000.mp4 + + NEW + videos/chunk-000/file_000.mp4 + ------------------------- + OLD + episodes.jsonl + {"episode_index": 1, "tasks": ["Put the blue block in the green bowl"], "length": 266} + + NEW + meta/episodes/chunk-000/episodes_000.parquet + episode_index | video_chunk_index | video_file_index | data_chunk_index | data_file_index | tasks | length + ------------------------- + OLD + tasks.jsonl + {"task_index": 1, "task": "Put the blue block in the green bowl"} + + NEW + meta/tasks/chunk-000/file_000.parquet + task_index | task + ------------------------- + OLD + episodes_stats.jsonl + + NEW + meta/episodes_stats/chunk-000/file_000.parquet + episode_index | mean | std | min | max + ------------------------- + UPDATE + meta/info.json + ------------------------- + """ + + new_root = HF_LEROBOT_HOME / f"{repo_id}_v30" + new_root.mkdir(parents=True, exist_ok=True) + + episodes_stats = load_episodes_stats(root) + hf_dataset = Dataset.from_dict(episodes_stats) # noqa: F841 + + meta_ep_st_ch = new_root / "meta/episodes_stats/chunk-000" + meta_ep_st_ch.mkdir(parents=True, exist_ok=True) + + # hf_dataset.to_parquet(meta_ep_st_ch / 'file_000.parquet') + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--repo-id", + type=str, + required=True, + help="Repository identifier on Hugging Face: a community or a user name `/` the name of the dataset " + "(e.g. `lerobot/pusht`, `cadene/aloha_sim_insertion_human`).", + ) + parser.add_argument( + "--branch", + type=str, + default=None, + help="Repo branch to push your dataset. Defaults to the main branch.", + ) + parser.add_argument( + "--num-workers", + type=int, + default=4, + help="Number of workers for parallelizing stats compute. Defaults to 4.", + ) + + args = parser.parse_args() + convert_dataset(**vars(args)) diff --git a/lerobot/common/datasets/video_utils.py b/lerobot/common/datasets/video_utils.py index c38d570d..82beb876 100644 --- a/lerobot/common/datasets/video_utils.py +++ b/lerobot/common/datasets/video_utils.py @@ -252,7 +252,7 @@ def encode_video_frames( g: int | None = 2, crf: int | None = 30, fast_decode: int = 0, - log_level: str | None = "error", + log_level: str | None = "quiet", overwrite: bool = False, ) -> None: """More info on ffmpeg arguments tuning on `benchmark/video/README.md`""" diff --git a/lerobot/common/utils/utils.py b/lerobot/common/utils/utils.py index 563a7b81..bcb4d6bc 100644 --- a/lerobot/common/utils/utils.py +++ b/lerobot/common/utils/utils.py @@ -228,3 +228,13 @@ def is_valid_numpy_dtype_string(dtype_str: str) -> bool: except TypeError: # If a TypeError is raised, the string is not a valid dtype return False + + +def get_elapsed_time_in_days_hours_minutes_seconds(elapsed_time_s: float): + days = int(elapsed_time_s // (24 * 3600)) + elapsed_time_s %= 24 * 3600 + hours = int(elapsed_time_s // 3600) + elapsed_time_s %= 3600 + minutes = int(elapsed_time_s // 60) + seconds = elapsed_time_s % 60 + return days, hours, minutes, seconds diff --git a/tests/test_aggregate_datasets.py b/tests/test_aggregate_datasets.py new file mode 100644 index 00000000..ad5c2022 --- /dev/null +++ b/tests/test_aggregate_datasets.py @@ -0,0 +1,19 @@ +from lerobot.common.datasets.aggregate import aggregate_datasets +from tests.fixtures.constants import DUMMY_REPO_ID + + +def test_aggregate_datasets(tmp_path, lerobot_dataset_factory): + dataset_0 = lerobot_dataset_factory( + root=tmp_path / "test_0", + repo_id=DUMMY_REPO_ID + "_0", + total_episodes=10, + total_frames=400, + ) + dataset_1 = lerobot_dataset_factory( + root=tmp_path / "test_1", + repo_id=DUMMY_REPO_ID + "_1", + total_episodes=10, + total_frames=400, + ) + + dataset_2 = aggregate_datasets([dataset_0, dataset_1])