Compare commits

...

42 Commits

Author SHA1 Message Date
Remi b8ad0cd514
Merge 53ecec5fb2 into b43ece8934 2025-04-17 16:18:27 +02:00
k1000dai b43ece8934
Add pythno3-dev in Dockerfile to build and modify Readme.md , python-dev to python3-dev (#987)
Co-authored-by: makolon <smakolon385@gmail.com>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2025-04-17 16:17:07 +02:00
Alex Thiele c10c5a0e64
Fix --width --height type parsing on opencv and intelrealsense scripts (#556)
Co-authored-by: Remi <remi.cadene@huggingface.co>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2025-04-17 15:19:23 +02:00
Junshan Huang a8db91c40e
Fix Windows HTML visualization to make videos could be seen (#647)
Co-authored-by: Simon Alibert <75076266+aliberts@users.noreply.github.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2025-04-17 15:07:28 +02:00
HUANG TZU-CHUN 0f5f7ac780
Fix broken links in `examples/4_train_policy_with_script.md` (#697) 2025-04-17 14:59:43 +02:00
pre-commit-ci[bot] 768e36660d
[pre-commit.ci] pre-commit autoupdate (#980)
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
2025-04-14 21:55:06 +02:00
Caroline Pascal 790d6740ba
fix(installation): adding note on `ffmpeg` version during installation (#976)
Co-authored-by: Simon Alibert <75076266+aliberts@users.noreply.github.com>
2025-04-14 15:36:31 +02:00
Remi Cadene 53ecec5fb2 WIP v21 to v30 2025-03-31 07:38:01 +00:00
Remi Cadene 65738f0a80 Improve slurm droid 2025-03-20 14:12:46 +00:00
Remi Cadene 5d184a7811 NIT 2025-03-18 16:55:08 +00:00
Remi Cadene 1a5c1ef9c7 Rename openx to droid + Improve all (not tested) 2025-03-18 16:28:09 +00:00
Remi Cadene 7866c1f7d1 Merge remote-tracking branch 'origin/main' into user/rcadene/2025_02_19_port_openx 2025-03-01 19:17:18 +00:00
Remi Cadene 3666ac9346 WIP UploadDataset 2025-03-01 19:07:22 +00:00
Remi Cadene 3daab2acbb Add upload_large_folder 2025-02-23 18:19:12 +00:00
Remi Cadene c36d2253d0 Aggregate works 2025-02-23 18:18:46 +00:00
Remi Cadene e2e6f6e666 Add auto_downsample_height_width 2025-02-23 18:15:39 +00:00
Remi Cadene ff0029f84b aggregate works 2025-02-22 15:33:47 +00:00
Remi Cadene 39ad2d16d4 let's go 2025-02-22 11:12:39 +00:00
Remi Cadene 689c5efc72 optimize shard 2025-02-22 10:13:09 +00:00
Remi Cadene eda0b996cd new dir 2025-02-21 23:56:44 +00:00
Remi Cadene 15e7a9d541 before new launch from scratch 2025-02-21 23:14:22 +00:00
Remi Cadene 52fb4143b5 workers 2025-02-21 13:08:21 +00:00
Remi Cadene 93c80b2cb1 rm brake 2025-02-20 23:24:03 +00:00
Remi Cadene 5fbbaa1bc0 fix No such file or directory error 2025-02-20 23:04:58 +00:00
Remi Cadene 71d1f5e2c9 WIP 2025-02-20 23:04:31 +00:00
Remi Cadene b520941cd9 Merge remote-tracking branch 'origin/user/aliberts/2025_02_10_dataset_v2.1' into user/rcadene/2025_02_19_port_openx 2025-02-20 17:34:13 +00:00
Simon Alibert 64ed5258e6 Fix batch convert 2025-02-20 09:00:14 +01:00
Simon Alibert 392a8c32a7 Improve doc 2025-02-20 08:24:41 +01:00
Simon Alibert 969ef745a2
Remove dataset `consolidate` (#752) 2025-02-19 16:02:54 +01:00
Simon Alibert 6fe42a72db Add tag 2025-02-19 15:01:44 +01:00
Simon Alibert 2487228ea7
Use `HF_HOME` env variable (#753) 2025-02-19 14:49:46 +01:00
Remi Cadene 76436ca1de Merge remote-tracking branch 'tavish9_lerobot_openx/main' into user/rcadene/2025_02_19_port_openx 2025-02-19 12:58:18 +00:00
Simon Alibert fbf2f2222a
Remove `local_files_only` and use `codebase_version` instead of branches (#734) 2025-02-19 08:36:32 +01:00
Tavish 02bc4e03e0 support openx/rlds to lerobot 2025-02-18 22:25:58 +08:00
Simon Alibert 624eaf1175 Merge remote-tracking branch 'origin/main' into user/aliberts/2025_02_10_dataset_v2.1 2025-02-17 12:06:05 +01:00
Simon Alibert aed3eb4a94 Merge remote-tracking branch 'origin/main' into user/aliberts/2025_02_10_dataset_v2.1 2025-02-15 15:56:24 +01:00
Simon Alibert 8426c64f42
Per-episode stats (#521)
Co-authored-by: Remi Cadene <re.cadene@gmail.com>
Co-authored-by: Remi <remi.cadene@huggingface.co>
2025-02-15 15:47:16 +01:00
Remi 7c2bbee613
Validate features during `add_frame` + Add 2D-to-5D + Add string (#720) 2025-02-14 19:59:48 +01:00
Remi 9d6886dd08
Add frame level task (#693)
Co-authored-by: Simon Alibert <75076266+aliberts@users.noreply.github.com>
2025-02-14 14:22:22 +01:00
Simon Alibert d67ca342e9 Merge remote-tracking branch 'origin/main' into user/aliberts/2025_02_10_dataset_v2.1 2025-02-11 17:17:39 +01:00
Simon Alibert 57c9c21c39 Merge remote-tracking branch 'origin/main' into user/aliberts/2025_02_10_dataset_v2.1 2025-02-10 17:22:57 +01:00
Simon Alibert 38c14571cc Bump CODEBASE_VERSION 2025-02-10 16:39:34 +01:00
19 changed files with 1639 additions and 24 deletions

View File

@ -48,7 +48,7 @@ repos:
- id: pyupgrade - id: pyupgrade
- repo: https://github.com/astral-sh/ruff-pre-commit - repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.11.4 rev: v0.11.5
hooks: hooks:
- id: ruff - id: ruff
args: [--fix] args: [--fix]
@ -57,7 +57,7 @@ repos:
##### Security ##### ##### Security #####
- repo: https://github.com/gitleaks/gitleaks - repo: https://github.com/gitleaks/gitleaks
rev: v8.24.2 rev: v8.24.3
hooks: hooks:
- id: gitleaks - id: gitleaks

View File

@ -103,13 +103,20 @@ When using `miniconda`, install `ffmpeg` in your environment:
conda install ffmpeg -c conda-forge conda install ffmpeg -c conda-forge
``` ```
> **NOTE:** This usually installs `ffmpeg 7.X` for your platform compiled with the `libsvtav1` encoder. If `libsvtav1` is not supported (check supported encoders with `ffmpeg -encoders`), you can:
> - _[On any platform]_ Explicitly install `ffmpeg 7.X` using:
> ```bash
> conda install ffmpeg=7.1.1 -c conda-forge
> ```
> - _[On Linux only]_ Install [ffmpeg build dependencies](https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu#GettheDependencies) and [compile ffmpeg from source with libsvtav1](https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu#libsvtav1), and make sure you use the corresponding ffmpeg binary to your install with `which ffmpeg`.
Install 🤗 LeRobot: Install 🤗 LeRobot:
```bash ```bash
pip install -e . pip install -e .
``` ```
> **NOTE:** If you encounter build errors, you may need to install additional dependencies (`cmake`, `build-essential`, and `ffmpeg libs`). On Linux, run: > **NOTE:** If you encounter build errors, you may need to install additional dependencies (`cmake`, `build-essential`, and `ffmpeg libs`). On Linux, run:
`sudo apt-get install cmake build-essential python-dev pkg-config libavformat-dev libavcodec-dev libavdevice-dev libavutil-dev libswscale-dev libswresample-dev libavfilter-dev pkg-config`. For other systems, see: [Compiling PyAV](https://pyav.org/docs/develop/overview/installation.html#bring-your-own-ffmpeg) `sudo apt-get install cmake build-essential python3-dev pkg-config libavformat-dev libavcodec-dev libavdevice-dev libavutil-dev libswscale-dev libswresample-dev libavfilter-dev pkg-config`. For other systems, see: [Compiling PyAV](https://pyav.org/docs/develop/overview/installation.html#bring-your-own-ffmpeg)
For simulations, 🤗 LeRobot comes with gymnasium environments that can be installed as extras: For simulations, 🤗 LeRobot comes with gymnasium environments that can be installed as extras:
- [aloha](https://github.com/huggingface/gym-aloha) - [aloha](https://github.com/huggingface/gym-aloha)

View File

@ -14,7 +14,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
tcpdump sysstat screen tmux \ tcpdump sysstat screen tmux \
libglib2.0-0 libgl1-mesa-glx libegl1-mesa \ libglib2.0-0 libgl1-mesa-glx libegl1-mesa \
speech-dispatcher portaudio19-dev libgeos-dev \ speech-dispatcher portaudio19-dev libgeos-dev \
python${PYTHON_VERSION} python${PYTHON_VERSION}-venv \ python${PYTHON_VERSION} python${PYTHON_VERSION}-venv python${PYTHON_VERSION}-dev \
&& apt-get clean && rm -rf /var/lib/apt/lists/* && apt-get clean && rm -rf /var/lib/apt/lists/*
# Install ffmpeg build dependencies. See: # Install ffmpeg build dependencies. See:

View File

@ -4,7 +4,7 @@ This tutorial will explain the training script, how to use it, and particularly
## The training script ## The training script
LeRobot offers a training script at [`lerobot/scripts/train.py`](../../lerobot/scripts/train.py). At a high level it does the following: LeRobot offers a training script at [`lerobot/scripts/train.py`](../lerobot/scripts/train.py). At a high level it does the following:
- Initialize/load a configuration for the following steps using. - Initialize/load a configuration for the following steps using.
- Instantiates a dataset. - Instantiates a dataset.
@ -21,7 +21,7 @@ In the training script, the main function `train` expects a `TrainPipelineConfig
def train(cfg: TrainPipelineConfig): def train(cfg: TrainPipelineConfig):
``` ```
You can inspect the `TrainPipelineConfig` defined in [`lerobot/configs/train.py`](../../lerobot/configs/train.py) (which is heavily commented and meant to be a reference to understand any option) You can inspect the `TrainPipelineConfig` defined in [`lerobot/configs/train.py`](../lerobot/configs/train.py) (which is heavily commented and meant to be a reference to understand any option)
When running the script, inputs for the command line are parsed thanks to the `@parser.wrap()` decorator and an instance of this class is automatically generated. Under the hood, this is done with [Draccus](https://github.com/dlwh/draccus) which is a tool dedicated for this purpose. If you're familiar with Hydra, Draccus can similarly load configurations from config files (.json, .yaml) and also override their values through command line inputs. Unlike Hydra, these configurations are pre-defined in the code through dataclasses rather than being defined entirely in config files. This allows for more rigorous serialization/deserialization, typing, and to manipulate configuration as objects directly in the code and not as dictionaries or namespaces (which enables nice features in an IDE such as autocomplete, jump-to-def, etc.) When running the script, inputs for the command line are parsed thanks to the `@parser.wrap()` decorator and an instance of this class is automatically generated. Under the hood, this is done with [Draccus](https://github.com/dlwh/draccus) which is a tool dedicated for this purpose. If you're familiar with Hydra, Draccus can similarly load configurations from config files (.json, .yaml) and also override their values through command line inputs. Unlike Hydra, these configurations are pre-defined in the code through dataclasses rather than being defined entirely in config files. This allows for more rigorous serialization/deserialization, typing, and to manipulate configuration as objects directly in the code and not as dictionaries or namespaces (which enables nice features in an IDE such as autocomplete, jump-to-def, etc.)
@ -50,7 +50,7 @@ By default, every field takes its default value specified in the dataclass. If a
## Specifying values from the CLI ## Specifying values from the CLI
Let's say that we want to train [Diffusion Policy](../../lerobot/common/policies/diffusion) on the [pusht](https://huggingface.co/datasets/lerobot/pusht) dataset, using the [gym_pusht](https://github.com/huggingface/gym-pusht) environment for evaluation. The command to do so would look like this: Let's say that we want to train [Diffusion Policy](../lerobot/common/policies/diffusion) on the [pusht](https://huggingface.co/datasets/lerobot/pusht) dataset, using the [gym_pusht](https://github.com/huggingface/gym-pusht) environment for evaluation. The command to do so would look like this:
```bash ```bash
python lerobot/scripts/train.py \ python lerobot/scripts/train.py \
--dataset.repo_id=lerobot/pusht \ --dataset.repo_id=lerobot/pusht \
@ -60,10 +60,10 @@ python lerobot/scripts/train.py \
Let's break this down: Let's break this down:
- To specify the dataset, we just need to specify its `repo_id` on the hub which is the only required argument in the `DatasetConfig`. The rest of the fields have default values and in this case we are fine with those so we can just add the option `--dataset.repo_id=lerobot/pusht`. - To specify the dataset, we just need to specify its `repo_id` on the hub which is the only required argument in the `DatasetConfig`. The rest of the fields have default values and in this case we are fine with those so we can just add the option `--dataset.repo_id=lerobot/pusht`.
- To specify the policy, we can just select diffusion policy using `--policy` appended with `.type`. Here, `.type` is a special argument which allows us to select config classes inheriting from `draccus.ChoiceRegistry` and that have been decorated with the `register_subclass()` method. To have a better explanation of this feature, have a look at this [Draccus demo](https://github.com/dlwh/draccus?tab=readme-ov-file#more-flexible-configuration-with-choice-types). In our code, we use this mechanism mainly to select policies, environments, robots, and some other components like optimizers. The policies available to select are located in [lerobot/common/policies](../../lerobot/common/policies) - To specify the policy, we can just select diffusion policy using `--policy` appended with `.type`. Here, `.type` is a special argument which allows us to select config classes inheriting from `draccus.ChoiceRegistry` and that have been decorated with the `register_subclass()` method. To have a better explanation of this feature, have a look at this [Draccus demo](https://github.com/dlwh/draccus?tab=readme-ov-file#more-flexible-configuration-with-choice-types). In our code, we use this mechanism mainly to select policies, environments, robots, and some other components like optimizers. The policies available to select are located in [lerobot/common/policies](../lerobot/common/policies)
- Similarly, we select the environment with `--env.type=pusht`. The different environment configs are available in [`lerobot/common/envs/configs.py`](../../lerobot/common/envs/configs.py) - Similarly, we select the environment with `--env.type=pusht`. The different environment configs are available in [`lerobot/common/envs/configs.py`](../lerobot/common/envs/configs.py)
Let's see another example. Let's say you've been training [ACT](../../lerobot/common/policies/act) on [lerobot/aloha_sim_insertion_human](https://huggingface.co/datasets/lerobot/aloha_sim_insertion_human) using the [gym-aloha](https://github.com/huggingface/gym-aloha) environment for evaluation with: Let's see another example. Let's say you've been training [ACT](../lerobot/common/policies/act) on [lerobot/aloha_sim_insertion_human](https://huggingface.co/datasets/lerobot/aloha_sim_insertion_human) using the [gym-aloha](https://github.com/huggingface/gym-aloha) environment for evaluation with:
```bash ```bash
python lerobot/scripts/train.py \ python lerobot/scripts/train.py \
--policy.type=act \ --policy.type=act \
@ -74,7 +74,7 @@ python lerobot/scripts/train.py \
> Notice we added `--output_dir` to explicitly tell where to write outputs from this run (checkpoints, training state, configs etc.). This is not mandatory and if you don't specify it, a default directory will be created from the current date and time, env.type and policy.type. This will typically look like `outputs/train/2025-01-24/16-10-05_aloha_act`. > Notice we added `--output_dir` to explicitly tell where to write outputs from this run (checkpoints, training state, configs etc.). This is not mandatory and if you don't specify it, a default directory will be created from the current date and time, env.type and policy.type. This will typically look like `outputs/train/2025-01-24/16-10-05_aloha_act`.
We now want to train a different policy for aloha on another task. We'll change the dataset and use [lerobot/aloha_sim_transfer_cube_human](https://huggingface.co/datasets/lerobot/aloha_sim_transfer_cube_human) instead. Of course, we also need to change the task of the environment as well to match this other task. We now want to train a different policy for aloha on another task. We'll change the dataset and use [lerobot/aloha_sim_transfer_cube_human](https://huggingface.co/datasets/lerobot/aloha_sim_transfer_cube_human) instead. Of course, we also need to change the task of the environment as well to match this other task.
Looking at the [`AlohaEnv`](../../lerobot/common/envs/configs.py) config, the task is `"AlohaInsertion-v0"` by default, which corresponds to the task we trained on in the command above. The [gym-aloha](https://github.com/huggingface/gym-aloha?tab=readme-ov-file#description) environment also has the `AlohaTransferCube-v0` task which corresponds to this other task we want to train on. Putting this together, we can train this new policy on this different task using: Looking at the [`AlohaEnv`](../lerobot/common/envs/configs.py) config, the task is `"AlohaInsertion-v0"` by default, which corresponds to the task we trained on in the command above. The [gym-aloha](https://github.com/huggingface/gym-aloha?tab=readme-ov-file#description) environment also has the `AlohaTransferCube-v0` task which corresponds to this other task we want to train on. Putting this together, we can train this new policy on this different task using:
```bash ```bash
python lerobot/scripts/train.py \ python lerobot/scripts/train.py \
--policy.type=act \ --policy.type=act \

View File

@ -830,11 +830,6 @@ It contains:
- `dtRphone:33.84 (29.5hz)` which is the delta time of capturing an image from the phone camera in the thread running asynchronously. - `dtRphone:33.84 (29.5hz)` which is the delta time of capturing an image from the phone camera in the thread running asynchronously.
Troubleshooting: Troubleshooting:
- On Linux, if you encounter any issue during video encoding with `ffmpeg: unknown encoder libsvtav1`, you can:
- install with conda-forge by running `conda install -c conda-forge ffmpeg` (it should be compiled with `libsvtav1`),
> **NOTE:** This usually installs `ffmpeg 7.X` for your platform (check the version installed with `ffmpeg -encoders | grep libsvtav1`). If it isn't `ffmpeg 7.X` or lacks `libsvtav1` support, you can explicitly install `ffmpeg 7.X` using: `conda install ffmpeg=7.1.1 -c conda-forge`
- or, install [ffmpeg build dependencies](https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu#GettheDependencies) and [compile ffmpeg from source with libsvtav1](https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu#libsvtav1),
- and, make sure you use the corresponding ffmpeg binary to your install with `which ffmpeg`.
- On Linux, if the left and right arrow keys and escape key don't have any effect during data recording, make sure you've set the `$DISPLAY` environment variable. See [pynput limitations](https://pynput.readthedocs.io/en/latest/limitations.html#linux). - On Linux, if the left and right arrow keys and escape key don't have any effect during data recording, make sure you've set the `$DISPLAY` environment variable. See [pynput limitations](https://pynput.readthedocs.io/en/latest/limitations.html#linux).
At the end of data recording, your dataset will be uploaded on your Hugging Face page (e.g. https://huggingface.co/datasets/cadene/koch_test) that you can obtain by running: At the end of data recording, your dataset will be uploaded on your Hugging Face page (e.g. https://huggingface.co/datasets/cadene/koch_test) that you can obtain by running:

View File

@ -0,0 +1,144 @@
# Port DROID 1.0.1 dataset to LeRobotDataset
## Download
TODO
It will take 2 TB in your local disk.
## Port on a single computer
First, install tensorflow dataset utilities to read from raw files:
```bash
pip install tensorflow
pip install tensorflow_datasets
```
Then run this script to start porting the dataset:
```bash
python examples/port_datasets/droid_rlds/port_droid.py \
--raw-dir /your/data/droid/1.0.1 \
--repo-id your_id/droid_1.0.1 \
--push-to-hub
```
It will take 400GB in your local disk.
As usual, your LeRobotDataset will be stored in your huggingface/lerobot cache folder.
WARNING: it will take 7 days for porting the dataset locally and 3 days to upload, so we will need to parallelize over multiple nodes on a slurm cluster.
NOTE: For development, run this script to start porting a shard:
```bash
python examples/port_datasets/droid_rlds/port.py \
--raw-dir /your/data/droid/1.0.1 \
--repo-id your_id/droid_1.0.1 \
--num-shards 2048 \
--shard-index 0
```
## Port over SLURM
Install slurm utilities from Hugging Face:
```bash
pip install datatrove
```
### 1. Port one shard per job
Run this script to start porting shards of the dataset:
```bash
python examples/port_datasets/droid_rlds/slurm_port_shards.py \
--raw-dir /your/data/droid/1.0.1 \
--repo-id your_id/droid_1.0.1 \
--logs-dir /your/logs \
--job-name port_droid \
--partition your_partition \
--workers 2048 \
--cpus-per-task 8 \
--mem-per-cpu 1950M
```
**Note on how to set your command line arguments**
Regarding `--partition`, find yours by running:
```bash
info --format="%R"`
```
and select the CPU partition if you have one. No GPU needed.
Regarding `--workers`, it is the number of slurm jobs you will launch in parallel. 2048 is the maximum number, since there is 2048 shards in Droid. This big number will certainly max-out your cluster.
Regarding `--cpus-per-task` and `--mem-per-cpu`, by default it will use ~16GB of RAM (8*1950M) which is recommended to load the raw frames and 8 CPUs which can be useful to parallelize the encoding of the frames.
Find the number of CPUs and Memory of the nodes of your partition by running:
```bash
sinfo -N -p your_partition -h -o "%N cpus=%c mem=%m"
```
**Useful commands to check progress and debug**
Check if your jobs are running:
```bash
squeue -u $USER`
```
You should see a list with job indices like `15125385_155` where `15125385` is the index of the run and `155` is the worker index. The output/print of this worker is written in real time in `/your/logs/job_name/slurm_jobs/15125385_155.out`. For instance, you can inspect the content of this file by running `less /your/logs/job_name/slurm_jobs/15125385_155.out`.
Check the progression of your jobs by running:
```bash
jobs_status /your/logs
```
If it's not 100% and no more slurm job is running, it means that some of them failed. Inspect the logs by running:
```bash
failed_logs /your/logs/job_name
```
If there is an issue in the code, you can fix it in debug mode with `--slurm 0` which allows to set breakpoint:
```bash
python examples/port_datasets/droid_rlds/slurm_port_shards.py --slurm 0 ...
```
And you can relaunch the same command, which will skip the completed jobs:
```bash
python examples/port_datasets/droid_rlds/slurm_port_shards.py --slurm 1 ...
```
Once all jobs are completed, you will have one dataset per shard (e.g. `droid_1.0.1_world_2048_rank_1594`) saved on disk in your `/lerobot/home/dir/your_id` directory. You can find your `/lerobot/home/dir` by running:
```bash
python -c "from lerobot.common.constants import HF_LEROBOT_HOME;print(HF_LEROBOT_HOME)"
```
### 2. Aggregate all shards
Run this script to start aggregation:
```bash
python examples/port_datasets/droid_rlds/slurm_aggregate_shards.py \
--repo-id your_id/droid_1.0.1 \
--logs-dir /your/logs \
--job-name aggr_droid \
--partition your_partition \
--workers 2048 \
--cpus-per-task 8 \
--mem-per-cpu 1950M
```
Once all jobs are completed, you will have one dataset your `/lerobot/home/dir/your_id/droid_1.0.1` directory.
### 3. Upload dataset
Run this script to start uploading:
```bash
python examples/port_datasets/droid_rlds/slurm_upload.py \
--repo-id your_id/droid_1.0.1 \
--logs-dir /your/logs \
--job-name upload_droid \
--partition your_partition \
--workers 50 \
--cpus-per-task 4 \
--mem-per-cpu 1950M
```

View File

@ -0,0 +1,411 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
import time
from pathlib import Path
import numpy as np
import tensorflow_datasets as tfds
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
from lerobot.common.utils.utils import get_elapsed_time_in_days_hours_minutes_seconds
DROID_SHARDS = 2048
DROID_FPS = 15
DROID_ROBOT_TYPE = "Franka"
# Dataset schema slightly adapted from: https://droid-dataset.github.io/droid/the-droid-dataset.html#-dataset-schema
DROID_FEATURES = {
# true on first step of the episode
"is_first": {
"dtype": "bool",
"shape": (1,),
"names": None,
},
# true on last step of the episode
"is_last": {
"dtype": "bool",
"shape": (1,),
"names": None,
},
# true on last step of the episode if it is a terminal step, True for demos
"is_terminal": {
"dtype": "bool",
"shape": (1,),
"names": None,
},
# language_instruction is also stored as "task" to follow LeRobot standard
"language_instruction": {
"dtype": "string",
"shape": (1,),
"names": None,
},
"language_instruction_2": {
"dtype": "string",
"shape": (1,),
"names": None,
},
"language_instruction_3": {
"dtype": "string",
"shape": (1,),
"names": None,
},
"observation.state.gripper_position": {
"dtype": "float32",
"shape": (1,),
"names": {
"axes": ["gripper"],
},
},
"observation.state.cartesian_position": {
"dtype": "float32",
"shape": (6,),
"names": {
"axes": ["x", "y", "z", "roll", "pitch", "yaw"],
},
},
"observation.state.joint_position": {
"dtype": "float32",
"shape": (7,),
"names": {
"axes": ["x", "y", "z", "roll", "pitch", "yaw"],
},
},
# Add this new feature to follow LeRobot standard of using joint position + gripper
"observation.state": {
"dtype": "float32",
"shape": (8,),
"names": {
"axes": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6", "gripper"],
},
},
# Initially called wrist_image_left
"observation.images.wrist_left": {
"dtype": "video",
"shape": (180, 320, 3),
"names": [
"height",
"width",
"channels",
],
},
# Initially called exterior_image_1_left
"observation.images.exterior_1_left": {
"dtype": "video",
"shape": (180, 320, 3),
"names": [
"height",
"width",
"channels",
],
},
# Initially called exterior_image_2_left
"observation.images.exterior_2_left": {
"dtype": "video",
"shape": (180, 320, 3),
"names": [
"height",
"width",
"channels",
],
},
"action.gripper_position": {
"dtype": "float32",
"shape": (1,),
"names": {
"axes": ["gripper"],
},
},
"action.gripper_velocity": {
"dtype": "float32",
"shape": (1,),
"names": {
"axes": ["gripper"],
},
},
"action.cartesian_position": {
"dtype": "float32",
"shape": (6,),
"names": {
"axes": ["x", "y", "z", "roll", "pitch", "yaw"],
},
},
"action.cartesian_velocity": {
"dtype": "float32",
"shape": (6,),
"names": {
"axes": ["x", "y", "z", "roll", "pitch", "yaw"],
},
},
"action.joint_position": {
"dtype": "float32",
"shape": (7,),
"names": {
"axes": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"],
},
},
"action.joint_velocity": {
"dtype": "float32",
"shape": (7,),
"names": {
"axes": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"],
},
},
# This feature was called "action" in RLDS dataset and consists of [6x joint velocities, 1x gripper position]
"action.original": {
"dtype": "float32",
"shape": (7,),
"names": {
"axes": ["x", "y", "z", "roll", "pitch", "yaw", "gripper"],
},
},
# Add this new feature to follow LeRobot standard of using joint position + gripper
"action": {
"dtype": "float32",
"shape": (8,),
"names": {
"axes": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6", "gripper"],
},
},
"discount": {
"dtype": "float32",
"shape": (1,),
"names": None,
},
"reward": {
"dtype": "float32",
"shape": (1,),
"names": None,
},
# Meta data that are the same for all frames in the episode
"task_category": {
"dtype": "string",
"shape": (1,),
"names": None,
},
"building": {
"dtype": "string",
"shape": (1,),
"names": None,
},
"collector_id": {
"dtype": "string",
"shape": (1,),
"names": None,
},
"date": {
"dtype": "string",
"shape": (1,),
"names": None,
},
"camera_extrinsics.wrist_left": {
"dtype": "float32",
"shape": (6,),
"names": {
"axes": ["x", "y", "z", "roll", "pitch", "yaw"],
},
},
"camera_extrinsics.exterior_1_left": {
"dtype": "float32",
"shape": (6,),
"names": {
"axes": ["x", "y", "z", "roll", "pitch", "yaw"],
},
},
"camera_extrinsics.exterior_2_left": {
"dtype": "float32",
"shape": (6,),
"names": {
"axes": ["x", "y", "z", "roll", "pitch", "yaw"],
},
},
"is_episode_successful": {
"dtype": "bool",
"shape": (1,),
"names": None,
},
}
def is_episode_successful(tf_episode_metadata):
# Adapted from: https://github.com/droid-dataset/droid_policy_learning/blob/dd1020eb20d981f90b5ff07dc80d80d5c0cb108b/robomimic/utils/rlds_utils.py#L8
return "/success/" in tf_episode_metadata["file_path"].numpy().decode()
def generate_lerobot_frames(tf_episode):
m = tf_episode["episode_metadata"]
frame_meta = {
"task_category": m["building"].numpy().decode(),
"building": m["building"].numpy().decode(),
"collector_id": m["collector_id"].numpy().decode(),
"date": m["date"].numpy().decode(),
"camera_extrinsics.wrist_left": m["extrinsics_wrist_cam"].numpy(),
"camera_extrinsics.exterior_1_left": m["extrinsics_exterior_cam_1"].numpy(),
"camera_extrinsics.exterior_2_left": m["extrinsics_exterior_cam_2"].numpy(),
"is_episode_successful": np.array([is_episode_successful(m)]),
}
for f in tf_episode["steps"]:
# Dataset schema slightly adapted from: https://droid-dataset.github.io/droid/the-droid-dataset.html#-dataset-schema
frame = {
"is_first": np.array([f["is_first"].numpy()]),
"is_last": np.array([f["is_last"].numpy()]),
"is_terminal": np.array([f["is_terminal"].numpy()]),
"language_instruction": f["language_instruction"].numpy().decode(),
"language_instruction_2": f["language_instruction_2"].numpy().decode(),
"language_instruction_3": f["language_instruction_3"].numpy().decode(),
"observation.state.gripper_position": f["observation"]["gripper_position"].numpy(),
"observation.state.cartesian_position": f["observation"]["cartesian_position"].numpy(),
"observation.state.joint_position": f["observation"]["joint_position"].numpy(),
"observation.images.wrist_left": f["observation"]["wrist_image_left"].numpy(),
"observation.images.exterior_1_left": f["observation"]["exterior_image_1_left"].numpy(),
"observation.images.exterior_2_left": f["observation"]["exterior_image_2_left"].numpy(),
"action.gripper_position": f["action_dict"]["gripper_position"].numpy(),
"action.gripper_velocity": f["action_dict"]["gripper_velocity"].numpy(),
"action.cartesian_position": f["action_dict"]["cartesian_position"].numpy(),
"action.cartesian_velocity": f["action_dict"]["cartesian_velocity"].numpy(),
"action.joint_position": f["action_dict"]["joint_position"].numpy(),
"action.joint_velocity": f["action_dict"]["joint_velocity"].numpy(),
"discount": np.array([f["discount"].numpy()]),
"reward": np.array([f["reward"].numpy()]),
"action.original": f["action"].numpy(),
}
# language_instruction is also stored as "task" to follow LeRobot standard
frame["task"] = frame["language_instruction"]
# Add this new feature to follow LeRobot standard of using joint position + gripper
frame["observation.state"] = np.concatenate(
[frame["observation.state.joint_position"], frame["observation.state.gripper_position"]]
)
frame["action"] = np.concatenate([frame["action.joint_position"], frame["action.gripper_position"]])
# Meta data that are the same for all frames in the episode
frame.update(frame_meta)
# Cast fp64 to fp32
for key in frame:
if isinstance(frame[key], np.ndarray) and frame[key].dtype == np.float64:
frame[key] = frame[key].astype(np.float32)
yield frame
def port_droid(
raw_dir: Path,
repo_id: str,
push_to_hub: bool = False,
num_shards: int | None = None,
shard_index: int | None = None,
):
dataset_name = raw_dir.parent.name
version = raw_dir.name
data_dir = raw_dir.parent.parent
builder = tfds.builder(f"{dataset_name}/{version}", data_dir=data_dir, version="")
if num_shards is not None:
tfds_num_shards = builder.info.splits["train"].num_shards
if tfds_num_shards != DROID_SHARDS:
raise ValueError(
f"Number of shards of Droid dataset is expected to be {DROID_SHARDS} but is {tfds_num_shards}."
)
if num_shards != tfds_num_shards:
raise ValueError(
f"We only shard over the fixed number of shards provided by tensorflow dataset ({tfds_num_shards}), but {num_shards} shards provided instead."
)
if shard_index >= tfds_num_shards:
raise ValueError(
f"Shard index is greater than the num of shards ({shard_index} >= {num_shards})."
)
raw_dataset = builder.as_dataset(split=f"train[{shard_index}shard]")
else:
raw_dataset = builder.as_dataset(split="train")
lerobot_dataset = LeRobotDataset.create(
repo_id=repo_id,
robot_type=DROID_ROBOT_TYPE,
fps=DROID_FPS,
features=DROID_FEATURES,
)
start_time = time.time()
num_episodes = raw_dataset.cardinality().numpy().item()
logging.info(f"Number of episodes {num_episodes}")
for episode_index, episode in enumerate(raw_dataset):
elapsed_time = time.time() - start_time
d, h, m, s = get_elapsed_time_in_days_hours_minutes_seconds(elapsed_time)
logging.info(
f"{episode_index} / {num_episodes} episodes processed (after {d} days, {h} hours, {m} minutes, {s:.3f} seconds)"
)
for frame in generate_lerobot_frames(episode):
lerobot_dataset.add_frame(frame)
lerobot_dataset.save_episode()
logging.info("Save_episode")
if push_to_hub:
lerobot_dataset.push_to_hub(
# Add openx tag, since it belongs to the openx collection of datasets
tags=["openx"],
private=False,
)
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--raw-dir",
type=Path,
required=True,
help="Directory containing input raw datasets (e.g. `path/to/dataset` or `path/to/dataset/version).",
)
parser.add_argument(
"--repo-id",
type=str,
help="Repositery identifier on Hugging Face: a community or a user name `/` the name of the dataset, required when push-to-hub is True",
)
parser.add_argument(
"--push-to-hub",
action="store_true",
help="Upload to hub.",
)
parser.add_argument(
"--num-shards",
type=int,
default=None,
help="Number of shards. Can be either None to load the full dataset, or 2048 to load one of the 2048 tensorflow dataset files.",
)
parser.add_argument(
"--shard-index",
type=int,
default=None,
help="Index of the shard. Can be either None to load the full dataset, or in [0,2047] to load one of the 2048 tensorflow dataset files.",
)
args = parser.parse_args()
port_droid(**vars(args))
if __name__ == "__main__":
main()

View File

@ -0,0 +1,288 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
from pathlib import Path
import tqdm
from datatrove.executor import LocalPipelineExecutor
from datatrove.executor.slurm import SlurmPipelineExecutor
from datatrove.pipeline.base import PipelineStep
from examples.port_datasets.droid_rlds.port_droid import DROID_SHARDS
from lerobot.common.datasets.aggregate import validate_all_metadata
from lerobot.common.datasets.lerobot_dataset import LeRobotDatasetMetadata
from lerobot.common.datasets.utils import write_episode, write_episode_stats, write_info, write_task
from lerobot.common.utils.utils import init_logging
class AggregateDatasets(PipelineStep):
def __init__(
self,
repo_ids: list[str],
aggregated_repo_id: str,
):
super().__init__()
self.repo_ids = repo_ids
self.aggr_repo_id = aggregated_repo_id
self.create_aggr_dataset()
def create_aggr_dataset(self):
init_logging()
logging.info("Start aggregate_datasets")
all_metadata = [LeRobotDatasetMetadata(repo_id) for repo_id in self.repo_ids]
fps, robot_type, features = validate_all_metadata(all_metadata)
# Create resulting dataset folder
aggr_meta = LeRobotDatasetMetadata.create(
repo_id=self.aggr_repo_id,
fps=fps,
robot_type=robot_type,
features=features,
)
logging.info("Find all tasks")
# find all tasks, deduplicate them, create new task indices for each dataset
# indexed by dataset index
datasets_task_index_to_aggr_task_index = {}
aggr_task_index = 0
for dataset_index, meta in enumerate(tqdm.tqdm(all_metadata, desc="Find all tasks")):
task_index_to_aggr_task_index = {}
for task_index, task in meta.tasks.items():
if task not in aggr_meta.task_to_task_index:
# add the task to aggr tasks mappings
aggr_meta.tasks[aggr_task_index] = task
aggr_meta.task_to_task_index[task] = aggr_task_index
aggr_task_index += 1
# add task_index anyway
task_index_to_aggr_task_index[task_index] = aggr_meta.task_to_task_index[task]
datasets_task_index_to_aggr_task_index[dataset_index] = task_index_to_aggr_task_index
logging.info("Prepare copy data and videos")
datasets_ep_idx_to_aggr_ep_idx = {}
datasets_aggr_episode_index_shift = {}
aggr_episode_index_shift = 0
for dataset_index, meta in enumerate(tqdm.tqdm(all_metadata, desc="Prepare copy data and videos")):
ep_idx_to_aggr_ep_idx = {}
for episode_index in range(meta.total_episodes):
aggr_episode_index = episode_index + aggr_episode_index_shift
ep_idx_to_aggr_ep_idx[episode_index] = aggr_episode_index
datasets_ep_idx_to_aggr_ep_idx[dataset_index] = ep_idx_to_aggr_ep_idx
datasets_aggr_episode_index_shift[dataset_index] = aggr_episode_index_shift
# populate episodes
for episode_index, episode_dict in meta.episodes.items():
aggr_episode_index = episode_index + aggr_episode_index_shift
episode_dict["episode_index"] = aggr_episode_index
aggr_meta.episodes[aggr_episode_index] = episode_dict
# populate episodes_stats
for episode_index, episode_stats in meta.episodes_stats.items():
aggr_episode_index = episode_index + aggr_episode_index_shift
aggr_meta.episodes_stats[aggr_episode_index] = episode_stats
# populate info
aggr_meta.info["total_episodes"] += meta.total_episodes
aggr_meta.info["total_frames"] += meta.total_frames
aggr_meta.info["total_videos"] += len(aggr_meta.video_keys) * meta.total_episodes
aggr_episode_index_shift += meta.total_episodes
logging.info("Write meta data")
aggr_meta.info["total_tasks"] = len(aggr_meta.tasks)
aggr_meta.info["total_chunks"] = aggr_meta.get_episode_chunk(aggr_episode_index_shift - 1)
aggr_meta.info["splits"] = {"train": f"0:{aggr_meta.info['total_episodes']}"}
# create a new episodes jsonl with updated episode_index using write_episode
for episode_dict in tqdm.tqdm(aggr_meta.episodes.values(), desc="Write episodes"):
write_episode(episode_dict, aggr_meta.root)
# create a new episode_stats jsonl with updated episode_index using write_episode_stats
for episode_index, episode_stats in tqdm.tqdm(
aggr_meta.episodes_stats.items(), desc="Write episodes stats"
):
write_episode_stats(episode_index, episode_stats, aggr_meta.root)
# create a new task jsonl with updated episode_index using write_task
for task_index, task in tqdm.tqdm(aggr_meta.tasks.items(), desc="Write tasks"):
write_task(task_index, task, aggr_meta.root)
write_info(aggr_meta.info, aggr_meta.root)
self.datasets_task_index_to_aggr_task_index = datasets_task_index_to_aggr_task_index
self.datasets_ep_idx_to_aggr_ep_idx = datasets_ep_idx_to_aggr_ep_idx
self.datasets_aggr_episode_index_shift = datasets_aggr_episode_index_shift
logging.info("Meta data done writing!")
def run(self, data=None, rank: int = 0, world_size: int = 1):
import logging
import shutil
import pandas as pd
from lerobot.common.datasets.aggregate import get_update_episode_and_task_func
from lerobot.common.datasets.lerobot_dataset import LeRobotDatasetMetadata
from lerobot.common.utils.utils import init_logging
init_logging()
aggr_meta = LeRobotDatasetMetadata(self.aggr_repo_id)
all_metadata = [LeRobotDatasetMetadata(repo_id) for repo_id in self.repo_ids]
if world_size != len(all_metadata):
raise ValueError()
dataset_index = rank
meta = all_metadata[dataset_index]
aggr_episode_index_shift = self.datasets_aggr_episode_index_shift[dataset_index]
logging.info("Copy data")
for episode_index in range(meta.total_episodes):
aggr_episode_index = self.datasets_ep_idx_to_aggr_ep_idx[dataset_index][episode_index]
data_path = meta.root / meta.get_data_file_path(episode_index)
aggr_data_path = aggr_meta.root / aggr_meta.get_data_file_path(aggr_episode_index)
# update episode_index and task_index
df = pd.read_parquet(data_path)
update_row_func = get_update_episode_and_task_func(
aggr_episode_index_shift, self.datasets_task_index_to_aggr_task_index[dataset_index]
)
df = df.apply(update_row_func, axis=1)
aggr_data_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(aggr_data_path)
logging.info("Copy videos")
for episode_index in range(meta.total_episodes):
aggr_episode_index = episode_index + aggr_episode_index_shift
for vid_key in meta.video_keys:
video_path = meta.root / meta.get_video_file_path(episode_index, vid_key)
aggr_video_path = aggr_meta.root / aggr_meta.get_video_file_path(aggr_episode_index, vid_key)
aggr_video_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(video_path, aggr_video_path)
# copy_command = f"cp {video_path} {aggr_video_path} &"
# subprocess.Popen(copy_command, shell=True)
logging.info("Done!")
def make_aggregate_executor(
repo_ids, repo_id, job_name, logs_dir, workers, partition, cpus_per_task, mem_per_cpu, slurm=True
):
kwargs = {
"pipeline": [
AggregateDatasets(repo_ids, repo_id),
],
"logging_dir": str(logs_dir / job_name),
}
if slurm:
kwargs.update(
{
"job_name": job_name,
"tasks": DROID_SHARDS,
"workers": workers,
"time": "08:00:00",
"partition": partition,
"cpus_per_task": cpus_per_task,
"sbatch_args": {"mem-per-cpu": mem_per_cpu},
}
)
executor = SlurmPipelineExecutor(**kwargs)
else:
kwargs.update(
{
"tasks": DROID_SHARDS,
"workers": 1,
}
)
executor = LocalPipelineExecutor(**kwargs)
return executor
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--repo-id",
type=str,
help="Repositery identifier on Hugging Face: a community or a user name `/` the name of the dataset, required when push-to-hub is True.",
)
parser.add_argument(
"--logs-dir",
type=Path,
help="Path to logs directory for `datatrove`.",
)
parser.add_argument(
"--job-name",
type=str,
default="aggr_droid",
help="Job name used in slurm, and name of the directory created inside the provided logs directory.",
)
parser.add_argument(
"--slurm",
type=int,
default=1,
help="Launch over slurm. Use `--slurm 0` to launch sequentially (useful to debug).",
)
parser.add_argument(
"--workers",
type=int,
default=2048,
help="Number of slurm workers. It should be less than the maximum number of shards.",
)
parser.add_argument(
"--partition",
type=str,
help="Slurm partition. Ideally a CPU partition. No need for GPU partition.",
)
parser.add_argument(
"--cpus-per-task",
type=int,
default=8,
help="Number of cpus that each slurm worker will use.",
)
parser.add_argument(
"--mem-per-cpu",
type=str,
default="1950M",
help="Memory per cpu that each worker will use.",
)
args = parser.parse_args()
kwargs = vars(args)
kwargs["slurm"] = kwargs.pop("slurm") == 1
repo_ids = [f"{args.repo_id}_world_{DROID_SHARDS}_rank_{rank}" for rank in range(DROID_SHARDS)]
aggregate_executor = make_aggregate_executor(repo_ids, **kwargs)
aggregate_executor.run()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,161 @@
import argparse
from pathlib import Path
from datatrove.executor import LocalPipelineExecutor
from datatrove.executor.slurm import SlurmPipelineExecutor
from datatrove.pipeline.base import PipelineStep
from examples.port_datasets.droid_rlds.port_droid import DROID_SHARDS
from lerobot.common.datasets.lerobot_dataset import LeRobotDatasetMetadata
def validate_shard(repo_id):
"""Sanity check that ensure meta data can be loaded and all files are present."""
meta = LeRobotDatasetMetadata(repo_id)
if meta.total_episodes == 0:
raise ValueError("Number of episodes is 0.")
for ep_idx in range(meta.total_episodes):
data_path = meta.root / meta.get_data_file_path(ep_idx)
if not data_path.exists():
raise ValueError(f"Parquet file is missing in: {data_path}")
for vid_key in meta.video_keys:
vid_path = meta.root / meta.get_video_file_path(ep_idx, vid_key)
if not vid_path.exists():
raise ValueError(f"Video file is missing in: {vid_path}")
class PortDroidShards(PipelineStep):
def __init__(
self,
raw_dir: Path | str,
repo_id: str = None,
):
super().__init__()
self.raw_dir = Path(raw_dir)
self.repo_id = repo_id
def run(self, data=None, rank: int = 0, world_size: int = 1):
from datasets.utils.tqdm import disable_progress_bars
from examples.port_datasets.droid_rlds.port_droid import port_droid
from lerobot.common.utils.utils import init_logging
init_logging()
disable_progress_bars()
shard_repo_id = f"{self.repo_id}_world_{world_size}_rank_{rank}"
port_droid(
self.raw_dir,
shard_repo_id,
push_to_hub=False,
num_shards=world_size,
shard_index=rank,
)
validate_shard(shard_repo_id)
def make_port_executor(
raw_dir, repo_id, job_name, logs_dir, workers, partition, cpus_per_task, mem_per_cpu, slurm=True
):
kwargs = {
"pipeline": [
PortDroidShards(raw_dir, repo_id),
],
"logging_dir": str(logs_dir / job_name),
}
if slurm:
kwargs.update(
{
"job_name": job_name,
"tasks": DROID_SHARDS,
"workers": workers,
"time": "08:00:00",
"partition": partition,
"cpus_per_task": cpus_per_task,
"sbatch_args": {"mem-per-cpu": mem_per_cpu},
}
)
executor = SlurmPipelineExecutor(**kwargs)
else:
kwargs.update(
{
"tasks": 1,
"workers": 1,
}
)
executor = LocalPipelineExecutor(**kwargs)
return executor
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--raw-dir",
type=Path,
required=True,
help="Directory containing input raw datasets (e.g. `path/to/dataset` or `path/to/dataset/version).",
)
parser.add_argument(
"--repo-id",
type=str,
help="Repositery identifier on Hugging Face: a community or a user name `/` the name of the dataset, required when push-to-hub is True.",
)
parser.add_argument(
"--logs-dir",
type=Path,
help="Path to logs directory for `datatrove`.",
)
parser.add_argument(
"--job-name",
type=str,
default="port_droid",
help="Job name used in slurm, and name of the directory created inside the provided logs directory.",
)
parser.add_argument(
"--slurm",
type=int,
default=1,
help="Launch over slurm. Use `--slurm 0` to launch sequentially (useful to debug).",
)
parser.add_argument(
"--workers",
type=int,
default=2048,
help="Number of slurm workers. It should be less than the maximum number of shards.",
)
parser.add_argument(
"--partition",
type=str,
help="Slurm partition. Ideally a CPU partition. No need for GPU partition.",
)
parser.add_argument(
"--cpus-per-task",
type=int,
default=8,
help="Number of cpus that each slurm worker will use.",
)
parser.add_argument(
"--mem-per-cpu",
type=str,
default="1950M",
help="Memory per cpu that each worker will use.",
)
args = parser.parse_args()
kwargs = vars(args)
kwargs["slurm"] = kwargs.pop("slurm") == 1
port_executor = make_port_executor(**kwargs)
port_executor.run()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,263 @@
import argparse
import logging
import os
from pathlib import Path
from datatrove.executor import LocalPipelineExecutor
from datatrove.executor.slurm import SlurmPipelineExecutor
from datatrove.pipeline.base import PipelineStep
from huggingface_hub import HfApi
from huggingface_hub.constants import REPOCARD_NAME
from examples.port_datasets.droid_rlds.port_droid import DROID_SHARDS
from lerobot.common.datasets.lerobot_dataset import CODEBASE_VERSION, LeRobotDatasetMetadata
from lerobot.common.datasets.utils import create_lerobot_dataset_card
from lerobot.common.utils.utils import init_logging
class UploadDataset(PipelineStep):
def __init__(
self,
repo_id: str,
branch: str | None = None,
revision: str | None = None,
tags: list | None = None,
license: str | None = "apache-2.0",
private: bool = False,
distant_repo_id: str | None = None,
**card_kwargs,
):
super().__init__()
self.repo_id = repo_id
self.distant_repo_id = self.repo_id if distant_repo_id is None else distant_repo_id
self.branch = branch
self.tags = tags
self.license = license
self.private = private
self.card_kwargs = card_kwargs
self.revision = revision if revision else CODEBASE_VERSION
if os.environ.get("HF_HUB_ENABLE_HF_TRANSFER", "0") != "1":
logging.warning(
'HF_HUB_ENABLE_HF_TRANSFER is not set to "1". Install hf_transfer and set the env '
"variable for faster uploads:\npip install hf-transfer\nexport HF_HUB_ENABLE_HF_TRANSFER=1"
)
self.create_repo()
def create_repo(self):
logging.info(f"Loading meta data from {self.repo_id}...")
meta = LeRobotDatasetMetadata(self.repo_id)
logging.info(f"Creating repo {self.distant_repo_id}...")
hub_api = HfApi()
hub_api.create_repo(
repo_id=self.distant_repo_id,
private=self.private,
repo_type="dataset",
exist_ok=True,
)
if self.branch:
hub_api.create_branch(
repo_id=self.distant_repo_id,
branch=self.branch,
revision=self.revision,
repo_type="dataset",
exist_ok=True,
)
if not hub_api.file_exists(
self.distant_repo_id, REPOCARD_NAME, repo_type="dataset", revision=self.branch
):
card = create_lerobot_dataset_card(
tags=self.tags, dataset_info=meta.info, license=self.license, **self.card_kwargs
)
card.push_to_hub(repo_id=self.distant_repo_id, repo_type="dataset", revision=self.branch)
def list_files_recursively(directory):
base_path = Path(directory)
return [str(file.relative_to(base_path)) for file in base_path.rglob("*") if file.is_file()]
logging.info(f"Listing all local files from {self.repo_id}...")
self.file_paths = list_files_recursively(meta.root)
self.file_paths = sorted(self.file_paths)
def create_chunks(self, lst, n):
from itertools import islice
it = iter(lst)
return [list(islice(it, size)) for size in [len(lst) // n + (i < len(lst) % n) for i in range(n)]]
def create_commits(self, additions):
import logging
import math
import random
import time
from huggingface_hub import create_commit
from huggingface_hub.utils import HfHubHTTPError
FILES_BETWEEN_COMMITS = 10 # noqa: N806
BASE_DELAY = 0.1 # noqa: N806
MAX_RETRIES = 12 # noqa: N806
# Split the files into smaller chunks for faster commit
# and avoiding "A commit has happened since" error
num_chunks = math.ceil(len(additions) / FILES_BETWEEN_COMMITS)
chunks = self.create_chunks(additions, num_chunks)
for chunk in chunks:
retries = 0
while True:
try:
create_commit(
self.distant_repo_id,
repo_type="dataset",
operations=chunk,
commit_message=f"DataTrove upload ({len(chunk)} files)",
revision=self.branch,
)
# TODO: every 100 chunks super_squach_commits()
logging.info("create_commit completed!")
break
except HfHubHTTPError as e:
if "A commit has happened since" in e.server_message:
if retries >= MAX_RETRIES:
logging.error(f"Failed to create commit after {MAX_RETRIES=}. Giving up.")
raise e
logging.info("Commit creation race condition issue. Waiting...")
time.sleep(BASE_DELAY * 2**retries + random.uniform(0, 2))
retries += 1
else:
raise e
def run(self, data=None, rank: int = 0, world_size: int = 1):
import logging
from datasets.utils.tqdm import disable_progress_bars
from huggingface_hub import CommitOperationAdd, preupload_lfs_files
from lerobot.common.datasets.lerobot_dataset import LeRobotDatasetMetadata
from lerobot.common.utils.utils import init_logging
init_logging()
disable_progress_bars()
chunks = self.create_chunks(self.file_paths, world_size)
file_paths = chunks[rank]
if len(file_paths) == 0:
raise ValueError(file_paths)
logging.info("Pre-uploading LFS files...")
for i, path in enumerate(file_paths):
logging.info(f"{i}: {path}")
meta = LeRobotDatasetMetadata(self.repo_id)
additions = [
CommitOperationAdd(path_in_repo=path, path_or_fileobj=meta.root / path) for path in file_paths
]
preupload_lfs_files(
repo_id=self.distant_repo_id, repo_type="dataset", additions=additions, revision=self.branch
)
logging.info("Creating commits...")
self.create_commits(additions)
logging.info("Done!")
def make_upload_executor(
repo_id, job_name, logs_dir, workers, partition, cpus_per_task, mem_per_cpu, slurm=True
):
kwargs = {
"pipeline": [
UploadDataset(repo_id),
],
"logging_dir": str(logs_dir / job_name),
}
if slurm:
kwargs.update(
{
"job_name": job_name,
"tasks": DROID_SHARDS,
"workers": workers,
"time": "08:00:00",
"partition": partition,
"cpus_per_task": cpus_per_task,
"sbatch_args": {"mem-per-cpu": mem_per_cpu},
}
)
executor = SlurmPipelineExecutor(**kwargs)
else:
kwargs.update(
{
"tasks": DROID_SHARDS,
"workers": 1,
}
)
executor = LocalPipelineExecutor(**kwargs)
return executor
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--repo-id",
type=str,
help="Repositery identifier on Hugging Face: a community or a user name `/` the name of the dataset, required when push-to-hub is True.",
)
parser.add_argument(
"--logs-dir",
type=Path,
help="Path to logs directory for `datatrove`.",
)
parser.add_argument(
"--job-name",
type=str,
default="upload_droid",
help="Job name used in slurm, and name of the directory created inside the provided logs directory.",
)
parser.add_argument(
"--slurm",
type=int,
default=1,
help="Launch over slurm. Use `--slurm 0` to launch sequentially (useful to debug).",
)
parser.add_argument(
"--workers",
type=int,
default=50,
help="Number of slurm workers. It should be less than the maximum number of shards.",
)
parser.add_argument(
"--partition",
type=str,
help="Slurm partition. Ideally a CPU partition. No need for GPU partition.",
)
parser.add_argument(
"--cpus-per-task",
type=int,
default=8,
help="Number of cpus that each slurm worker will use.",
)
parser.add_argument(
"--mem-per-cpu",
type=str,
default="1950M",
help="Memory per cpu that each worker will use.",
)
init_logging()
args = parser.parse_args()
kwargs = vars(args)
kwargs["slurm"] = kwargs.pop("slurm") == 1
upload_executor = make_upload_executor(**kwargs)
upload_executor.run()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,175 @@
import logging
import shutil
import pandas as pd
import tqdm
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
from lerobot.common.datasets.utils import write_episode, write_episode_stats, write_info, write_task
from lerobot.common.utils.utils import init_logging
def validate_all_metadata(all_metadata: list[LeRobotDatasetMetadata]):
# validate same fps, robot_type, features
fps = all_metadata[0].fps
robot_type = all_metadata[0].robot_type
features = all_metadata[0].features
for meta in tqdm.tqdm(all_metadata, desc="Validate all meta data"):
if fps != meta.fps:
raise ValueError(f"Same fps is expected, but got fps={meta.fps} instead of {fps}.")
if robot_type != meta.robot_type:
raise ValueError(
f"Same robot_type is expected, but got robot_type={meta.robot_type} instead of {robot_type}."
)
if features != meta.features:
raise ValueError(
f"Same features is expected, but got features={meta.features} instead of {features}."
)
return fps, robot_type, features
def get_update_episode_and_task_func(episode_index_to_add, task_index_to_global_task_index):
def _update(row):
row["episode_index"] = row["episode_index"] + episode_index_to_add
row["task_index"] = task_index_to_global_task_index[row["task_index"]]
return row
return _update
def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, aggr_root=None):
logging.info("Start aggregate_datasets")
all_metadata = [LeRobotDatasetMetadata(repo_id) for repo_id in repo_ids]
fps, robot_type, features = validate_all_metadata(all_metadata)
# Create resulting dataset folder
aggr_meta = LeRobotDatasetMetadata.create(
repo_id=aggr_repo_id,
fps=fps,
robot_type=robot_type,
features=features,
root=aggr_root,
)
logging.info("Find all tasks")
# find all tasks, deduplicate them, create new task indices for each dataset
# indexed by dataset index
datasets_task_index_to_aggr_task_index = {}
aggr_task_index = 0
for dataset_index, meta in enumerate(tqdm.tqdm(all_metadata, desc="Find all tasks")):
task_index_to_aggr_task_index = {}
for task_index, task in meta.tasks.items():
if task not in aggr_meta.task_to_task_index:
# add the task to aggr tasks mappings
aggr_meta.tasks[aggr_task_index] = task
aggr_meta.task_to_task_index[task] = aggr_task_index
aggr_task_index += 1
# add task_index anyway
task_index_to_aggr_task_index[task_index] = aggr_meta.task_to_task_index[task]
datasets_task_index_to_aggr_task_index[dataset_index] = task_index_to_aggr_task_index
logging.info("Copy data and videos")
aggr_episode_index_shift = 0
for dataset_index, meta in enumerate(tqdm.tqdm(all_metadata, desc="Copy data and videos")):
# cp data
for episode_index in range(meta.total_episodes):
aggr_episode_index = episode_index + aggr_episode_index_shift
data_path = meta.root / meta.get_data_file_path(episode_index)
aggr_data_path = aggr_meta.root / aggr_meta.get_data_file_path(aggr_episode_index)
# update episode_index and task_index
df = pd.read_parquet(data_path)
update_row_func = get_update_episode_and_task_func(
aggr_episode_index_shift, datasets_task_index_to_aggr_task_index[dataset_index]
)
df = df.apply(update_row_func, axis=1)
aggr_data_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(aggr_data_path)
# cp videos
for episode_index in range(meta.total_episodes):
aggr_episode_index = episode_index + aggr_episode_index_shift
for vid_key in meta.video_keys:
video_path = meta.root / meta.get_video_file_path(episode_index, vid_key)
aggr_video_path = aggr_meta.root / aggr_meta.get_video_file_path(aggr_episode_index, vid_key)
aggr_video_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(video_path, aggr_video_path)
# copy_command = f"cp {video_path} {aggr_video_path} &"
# subprocess.Popen(copy_command, shell=True)
# populate episodes
for episode_index, episode_dict in meta.episodes.items():
aggr_episode_index = episode_index + aggr_episode_index_shift
episode_dict["episode_index"] = aggr_episode_index
aggr_meta.episodes[aggr_episode_index] = episode_dict
# populate episodes_stats
for episode_index, episode_stats in meta.episodes_stats.items():
aggr_episode_index = episode_index + aggr_episode_index_shift
aggr_meta.episodes_stats[aggr_episode_index] = episode_stats
# populate info
aggr_meta.info["total_episodes"] += meta.total_episodes
aggr_meta.info["total_frames"] += meta.total_frames
aggr_meta.info["total_videos"] += len(aggr_meta.video_keys) * meta.total_episodes
aggr_episode_index_shift += meta.total_episodes
logging.info("write meta data")
aggr_meta.info["total_chunks"] = aggr_meta.get_episode_chunk(aggr_episode_index_shift - 1)
aggr_meta.info["splits"] = {"train": f"0:{aggr_meta.info['total_episodes']}"}
# create a new episodes jsonl with updated episode_index using write_episode
for episode_dict in aggr_meta.episodes.values():
write_episode(episode_dict, aggr_meta.root)
# create a new episode_stats jsonl with updated episode_index using write_episode_stats
for episode_index, episode_stats in aggr_meta.episodes_stats.items():
write_episode_stats(episode_index, episode_stats, aggr_meta.root)
# create a new task jsonl with updated episode_index using write_task
for task_index, task in aggr_meta.tasks.items():
write_task(task_index, task, aggr_meta.root)
write_info(aggr_meta.info, aggr_meta.root)
if __name__ == "__main__":
init_logging()
repo_id = "cadene/droid"
aggr_repo_id = "cadene/droid"
datetime = "2025-02-22_11-23-54"
# root = Path(f"/tmp/{repo_id}")
# if root.exists():
# shutil.rmtree(root)
root = None
# all_metadata = [LeRobotDatasetMetadata(f"{repo_id}_{datetime}_world_2048_rank_{rank}") for rank in range(2048)]
# aggregate_datasets(
# all_metadata,
# aggr_repo_id,
# root=root,
# )
aggr_dataset = LeRobotDataset(
repo_id=aggr_repo_id,
root=root,
)
aggr_dataset.push_to_hub(tags=["openx"])
# for meta in all_metadata:
# dataset = LeRobotDataset(repo_id=meta.repo_id, root=meta.root)
# dataset.push_to_hub(tags=["openx"])

View File

@ -74,7 +74,7 @@ from lerobot.common.datasets.video_utils import (
) )
from lerobot.common.robot_devices.robots.utils import Robot from lerobot.common.robot_devices.robots.utils import Robot
CODEBASE_VERSION = "v2.1" CODEBASE_VERSION = "v3.0"
class LeRobotDatasetMetadata: class LeRobotDatasetMetadata:
@ -617,6 +617,8 @@ class LeRobotDataset(torch.utils.data.Dataset):
"""hf_dataset contains all the observations, states, actions, rewards, etc.""" """hf_dataset contains all the observations, states, actions, rewards, etc."""
if self.episodes is None: if self.episodes is None:
path = str(self.root / "data") path = str(self.root / "data")
# TODO(rcadene): load_dataset convert parquet to arrow.
# set num_proc to accelerate this conversion
hf_dataset = load_dataset("parquet", data_dir=path, split="train") hf_dataset = load_dataset("parquet", data_dir=path, split="train")
else: else:
files = [str(self.root / self.meta.get_data_file_path(ep_idx)) for ep_idx in self.episodes] files = [str(self.root / self.meta.get_data_file_path(ep_idx)) for ep_idx in self.episodes]

View File

@ -0,0 +1,137 @@
"""
This script will help you convert any LeRobot dataset already pushed to the hub from codebase version 2.1 to
3.0. It will:
- Generate per-episodes stats and writes them in `episodes_stats.jsonl`
- Check consistency between these new stats and the old ones.
- Remove the deprecated `stats.json`.
- Update codebase_version in `info.json`.
- Push this new version to the hub on the 'main' branch and tags it with "v2.1".
Usage:
```bash
python lerobot/common/datasets/v30/convert_dataset_v21_to_v30.py \
--repo-id=lerobot/pusht
```
"""
import argparse
import logging
from datasets import Dataset
from huggingface_hub import snapshot_download
from lerobot.common.constants import HF_LEROBOT_HOME
from lerobot.common.datasets.utils import (
load_episodes_stats,
)
V21 = "v2.1"
class SuppressWarnings:
def __enter__(self):
self.previous_level = logging.getLogger().getEffectiveLevel()
logging.getLogger().setLevel(logging.ERROR)
def __exit__(self, exc_type, exc_val, exc_tb):
logging.getLogger().setLevel(self.previous_level)
def convert_dataset(
repo_id: str,
branch: str | None = None,
num_workers: int = 4,
):
root = HF_LEROBOT_HOME / repo_id
snapshot_download(
repo_id,
repo_type="dataset",
revision=V21,
local_dir=root,
)
# Concatenate videos
# Create
"""
-------------------------
OLD
data/chunk-000/episode_000000.parquet
NEW
data/chunk-000/file_000.parquet
-------------------------
OLD
videos/chunk-000/CAMERA/episode_000000.mp4
NEW
videos/chunk-000/file_000.mp4
-------------------------
OLD
episodes.jsonl
{"episode_index": 1, "tasks": ["Put the blue block in the green bowl"], "length": 266}
NEW
meta/episodes/chunk-000/episodes_000.parquet
episode_index | video_chunk_index | video_file_index | data_chunk_index | data_file_index | tasks | length
-------------------------
OLD
tasks.jsonl
{"task_index": 1, "task": "Put the blue block in the green bowl"}
NEW
meta/tasks/chunk-000/file_000.parquet
task_index | task
-------------------------
OLD
episodes_stats.jsonl
NEW
meta/episodes_stats/chunk-000/file_000.parquet
episode_index | mean | std | min | max
-------------------------
UPDATE
meta/info.json
-------------------------
"""
new_root = HF_LEROBOT_HOME / f"{repo_id}_v30"
new_root.mkdir(parents=True, exist_ok=True)
episodes_stats = load_episodes_stats(root)
hf_dataset = Dataset.from_dict(episodes_stats) # noqa: F841
meta_ep_st_ch = new_root / "meta/episodes_stats/chunk-000"
meta_ep_st_ch.mkdir(parents=True, exist_ok=True)
# hf_dataset.to_parquet(meta_ep_st_ch / 'file_000.parquet')
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--repo-id",
type=str,
required=True,
help="Repository identifier on Hugging Face: a community or a user name `/` the name of the dataset "
"(e.g. `lerobot/pusht`, `cadene/aloha_sim_insertion_human`).",
)
parser.add_argument(
"--branch",
type=str,
default=None,
help="Repo branch to push your dataset. Defaults to the main branch.",
)
parser.add_argument(
"--num-workers",
type=int,
default=4,
help="Number of workers for parallelizing stats compute. Defaults to 4.",
)
args = parser.parse_args()
convert_dataset(**vars(args))

View File

@ -252,7 +252,7 @@ def encode_video_frames(
g: int | None = 2, g: int | None = 2,
crf: int | None = 30, crf: int | None = 30,
fast_decode: int = 0, fast_decode: int = 0,
log_level: str | None = "error", log_level: str | None = "quiet",
overwrite: bool = False, overwrite: bool = False,
) -> None: ) -> None:
"""More info on ffmpeg arguments tuning on `benchmark/video/README.md`""" """More info on ffmpeg arguments tuning on `benchmark/video/README.md`"""

View File

@ -512,13 +512,13 @@ if __name__ == "__main__":
) )
parser.add_argument( parser.add_argument(
"--width", "--width",
type=str, type=int,
default=640, default=640,
help="Set the width for all cameras. If not provided, use the default width of each camera.", help="Set the width for all cameras. If not provided, use the default width of each camera.",
) )
parser.add_argument( parser.add_argument(
"--height", "--height",
type=str, type=int,
default=480, default=480,
help="Set the height for all cameras. If not provided, use the default height of each camera.", help="Set the height for all cameras. If not provided, use the default height of each camera.",
) )

View File

@ -492,13 +492,13 @@ if __name__ == "__main__":
) )
parser.add_argument( parser.add_argument(
"--width", "--width",
type=str, type=int,
default=None, default=None,
help="Set the width for all cameras. If not provided, use the default width of each camera.", help="Set the width for all cameras. If not provided, use the default width of each camera.",
) )
parser.add_argument( parser.add_argument(
"--height", "--height",
type=str, type=int,
default=None, default=None,
help="Set the height for all cameras. If not provided, use the default height of each camera.", help="Set the height for all cameras. If not provided, use the default height of each camera.",
) )

View File

@ -228,3 +228,13 @@ def is_valid_numpy_dtype_string(dtype_str: str) -> bool:
except TypeError: except TypeError:
# If a TypeError is raised, the string is not a valid dtype # If a TypeError is raised, the string is not a valid dtype
return False return False
def get_elapsed_time_in_days_hours_minutes_seconds(elapsed_time_s: float):
days = int(elapsed_time_s // (24 * 3600))
elapsed_time_s %= 24 * 3600
hours = int(elapsed_time_s // 3600)
elapsed_time_s %= 3600
minutes = int(elapsed_time_s // 60)
seconds = elapsed_time_s % 60
return days, hours, minutes, seconds

View File

@ -174,7 +174,10 @@ def run_server(
dataset.meta.get_video_file_path(episode_id, key) for key in dataset.meta.video_keys dataset.meta.get_video_file_path(episode_id, key) for key in dataset.meta.video_keys
] ]
videos_info = [ videos_info = [
{"url": url_for("static", filename=video_path), "filename": video_path.parent.name} {
"url": url_for("static", filename=str(video_path).replace("\\", "/")),
"filename": video_path.parent.name,
}
for video_path in video_paths for video_path in video_paths
] ]
tasks = dataset.meta.episodes[episode_id]["tasks"] tasks = dataset.meta.episodes[episode_id]["tasks"]
@ -381,7 +384,7 @@ def visualize_dataset_html(
if isinstance(dataset, LeRobotDataset): if isinstance(dataset, LeRobotDataset):
ln_videos_dir = static_dir / "videos" ln_videos_dir = static_dir / "videos"
if not ln_videos_dir.exists(): if not ln_videos_dir.exists():
ln_videos_dir.symlink_to((dataset.root / "videos").resolve()) ln_videos_dir.symlink_to((dataset.root / "videos").resolve().as_posix())
if serve: if serve:
run_server(dataset, episodes, host, port, static_dir, template_dir) run_server(dataset, episodes, host, port, static_dir, template_dir)

View File

@ -0,0 +1,19 @@
from lerobot.common.datasets.aggregate import aggregate_datasets
from tests.fixtures.constants import DUMMY_REPO_ID
def test_aggregate_datasets(tmp_path, lerobot_dataset_factory):
dataset_0 = lerobot_dataset_factory(
root=tmp_path / "test_0",
repo_id=DUMMY_REPO_ID + "_0",
total_episodes=10,
total_frames=400,
)
dataset_1 = lerobot_dataset_factory(
root=tmp_path / "test_1",
repo_id=DUMMY_REPO_ID + "_1",
total_episodes=10,
total_frames=400,
)
dataset_2 = aggregate_datasets([dataset_0, dataset_1])