Compare commits
42 Commits
1ab97f62b5
...
b8ad0cd514
Author | SHA1 | Date |
---|---|---|
|
b8ad0cd514 | |
|
b43ece8934 | |
|
c10c5a0e64 | |
|
a8db91c40e | |
|
0f5f7ac780 | |
|
768e36660d | |
|
790d6740ba | |
|
53ecec5fb2 | |
|
65738f0a80 | |
|
5d184a7811 | |
|
1a5c1ef9c7 | |
|
7866c1f7d1 | |
|
3666ac9346 | |
|
3daab2acbb | |
|
c36d2253d0 | |
|
e2e6f6e666 | |
|
ff0029f84b | |
|
39ad2d16d4 | |
|
689c5efc72 | |
|
eda0b996cd | |
|
15e7a9d541 | |
|
52fb4143b5 | |
|
93c80b2cb1 | |
|
5fbbaa1bc0 | |
|
71d1f5e2c9 | |
|
b520941cd9 | |
|
64ed5258e6 | |
|
392a8c32a7 | |
|
969ef745a2 | |
|
6fe42a72db | |
|
2487228ea7 | |
|
76436ca1de | |
|
fbf2f2222a | |
|
02bc4e03e0 | |
|
624eaf1175 | |
|
aed3eb4a94 | |
|
8426c64f42 | |
|
7c2bbee613 | |
|
9d6886dd08 | |
|
d67ca342e9 | |
|
57c9c21c39 | |
|
38c14571cc |
|
@ -48,7 +48,7 @@ repos:
|
|||
- id: pyupgrade
|
||||
|
||||
- repo: https://github.com/astral-sh/ruff-pre-commit
|
||||
rev: v0.11.4
|
||||
rev: v0.11.5
|
||||
hooks:
|
||||
- id: ruff
|
||||
args: [--fix]
|
||||
|
@ -57,7 +57,7 @@ repos:
|
|||
|
||||
##### Security #####
|
||||
- repo: https://github.com/gitleaks/gitleaks
|
||||
rev: v8.24.2
|
||||
rev: v8.24.3
|
||||
hooks:
|
||||
- id: gitleaks
|
||||
|
||||
|
|
|
@ -103,13 +103,20 @@ When using `miniconda`, install `ffmpeg` in your environment:
|
|||
conda install ffmpeg -c conda-forge
|
||||
```
|
||||
|
||||
> **NOTE:** This usually installs `ffmpeg 7.X` for your platform compiled with the `libsvtav1` encoder. If `libsvtav1` is not supported (check supported encoders with `ffmpeg -encoders`), you can:
|
||||
> - _[On any platform]_ Explicitly install `ffmpeg 7.X` using:
|
||||
> ```bash
|
||||
> conda install ffmpeg=7.1.1 -c conda-forge
|
||||
> ```
|
||||
> - _[On Linux only]_ Install [ffmpeg build dependencies](https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu#GettheDependencies) and [compile ffmpeg from source with libsvtav1](https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu#libsvtav1), and make sure you use the corresponding ffmpeg binary to your install with `which ffmpeg`.
|
||||
|
||||
Install 🤗 LeRobot:
|
||||
```bash
|
||||
pip install -e .
|
||||
```
|
||||
|
||||
> **NOTE:** If you encounter build errors, you may need to install additional dependencies (`cmake`, `build-essential`, and `ffmpeg libs`). On Linux, run:
|
||||
`sudo apt-get install cmake build-essential python-dev pkg-config libavformat-dev libavcodec-dev libavdevice-dev libavutil-dev libswscale-dev libswresample-dev libavfilter-dev pkg-config`. For other systems, see: [Compiling PyAV](https://pyav.org/docs/develop/overview/installation.html#bring-your-own-ffmpeg)
|
||||
`sudo apt-get install cmake build-essential python3-dev pkg-config libavformat-dev libavcodec-dev libavdevice-dev libavutil-dev libswscale-dev libswresample-dev libavfilter-dev pkg-config`. For other systems, see: [Compiling PyAV](https://pyav.org/docs/develop/overview/installation.html#bring-your-own-ffmpeg)
|
||||
|
||||
For simulations, 🤗 LeRobot comes with gymnasium environments that can be installed as extras:
|
||||
- [aloha](https://github.com/huggingface/gym-aloha)
|
||||
|
|
|
@ -14,7 +14,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
|
|||
tcpdump sysstat screen tmux \
|
||||
libglib2.0-0 libgl1-mesa-glx libegl1-mesa \
|
||||
speech-dispatcher portaudio19-dev libgeos-dev \
|
||||
python${PYTHON_VERSION} python${PYTHON_VERSION}-venv \
|
||||
python${PYTHON_VERSION} python${PYTHON_VERSION}-venv python${PYTHON_VERSION}-dev \
|
||||
&& apt-get clean && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Install ffmpeg build dependencies. See:
|
||||
|
|
|
@ -4,7 +4,7 @@ This tutorial will explain the training script, how to use it, and particularly
|
|||
|
||||
## The training script
|
||||
|
||||
LeRobot offers a training script at [`lerobot/scripts/train.py`](../../lerobot/scripts/train.py). At a high level it does the following:
|
||||
LeRobot offers a training script at [`lerobot/scripts/train.py`](../lerobot/scripts/train.py). At a high level it does the following:
|
||||
|
||||
- Initialize/load a configuration for the following steps using.
|
||||
- Instantiates a dataset.
|
||||
|
@ -21,7 +21,7 @@ In the training script, the main function `train` expects a `TrainPipelineConfig
|
|||
def train(cfg: TrainPipelineConfig):
|
||||
```
|
||||
|
||||
You can inspect the `TrainPipelineConfig` defined in [`lerobot/configs/train.py`](../../lerobot/configs/train.py) (which is heavily commented and meant to be a reference to understand any option)
|
||||
You can inspect the `TrainPipelineConfig` defined in [`lerobot/configs/train.py`](../lerobot/configs/train.py) (which is heavily commented and meant to be a reference to understand any option)
|
||||
|
||||
When running the script, inputs for the command line are parsed thanks to the `@parser.wrap()` decorator and an instance of this class is automatically generated. Under the hood, this is done with [Draccus](https://github.com/dlwh/draccus) which is a tool dedicated for this purpose. If you're familiar with Hydra, Draccus can similarly load configurations from config files (.json, .yaml) and also override their values through command line inputs. Unlike Hydra, these configurations are pre-defined in the code through dataclasses rather than being defined entirely in config files. This allows for more rigorous serialization/deserialization, typing, and to manipulate configuration as objects directly in the code and not as dictionaries or namespaces (which enables nice features in an IDE such as autocomplete, jump-to-def, etc.)
|
||||
|
||||
|
@ -50,7 +50,7 @@ By default, every field takes its default value specified in the dataclass. If a
|
|||
|
||||
## Specifying values from the CLI
|
||||
|
||||
Let's say that we want to train [Diffusion Policy](../../lerobot/common/policies/diffusion) on the [pusht](https://huggingface.co/datasets/lerobot/pusht) dataset, using the [gym_pusht](https://github.com/huggingface/gym-pusht) environment for evaluation. The command to do so would look like this:
|
||||
Let's say that we want to train [Diffusion Policy](../lerobot/common/policies/diffusion) on the [pusht](https://huggingface.co/datasets/lerobot/pusht) dataset, using the [gym_pusht](https://github.com/huggingface/gym-pusht) environment for evaluation. The command to do so would look like this:
|
||||
```bash
|
||||
python lerobot/scripts/train.py \
|
||||
--dataset.repo_id=lerobot/pusht \
|
||||
|
@ -60,10 +60,10 @@ python lerobot/scripts/train.py \
|
|||
|
||||
Let's break this down:
|
||||
- To specify the dataset, we just need to specify its `repo_id` on the hub which is the only required argument in the `DatasetConfig`. The rest of the fields have default values and in this case we are fine with those so we can just add the option `--dataset.repo_id=lerobot/pusht`.
|
||||
- To specify the policy, we can just select diffusion policy using `--policy` appended with `.type`. Here, `.type` is a special argument which allows us to select config classes inheriting from `draccus.ChoiceRegistry` and that have been decorated with the `register_subclass()` method. To have a better explanation of this feature, have a look at this [Draccus demo](https://github.com/dlwh/draccus?tab=readme-ov-file#more-flexible-configuration-with-choice-types). In our code, we use this mechanism mainly to select policies, environments, robots, and some other components like optimizers. The policies available to select are located in [lerobot/common/policies](../../lerobot/common/policies)
|
||||
- Similarly, we select the environment with `--env.type=pusht`. The different environment configs are available in [`lerobot/common/envs/configs.py`](../../lerobot/common/envs/configs.py)
|
||||
- To specify the policy, we can just select diffusion policy using `--policy` appended with `.type`. Here, `.type` is a special argument which allows us to select config classes inheriting from `draccus.ChoiceRegistry` and that have been decorated with the `register_subclass()` method. To have a better explanation of this feature, have a look at this [Draccus demo](https://github.com/dlwh/draccus?tab=readme-ov-file#more-flexible-configuration-with-choice-types). In our code, we use this mechanism mainly to select policies, environments, robots, and some other components like optimizers. The policies available to select are located in [lerobot/common/policies](../lerobot/common/policies)
|
||||
- Similarly, we select the environment with `--env.type=pusht`. The different environment configs are available in [`lerobot/common/envs/configs.py`](../lerobot/common/envs/configs.py)
|
||||
|
||||
Let's see another example. Let's say you've been training [ACT](../../lerobot/common/policies/act) on [lerobot/aloha_sim_insertion_human](https://huggingface.co/datasets/lerobot/aloha_sim_insertion_human) using the [gym-aloha](https://github.com/huggingface/gym-aloha) environment for evaluation with:
|
||||
Let's see another example. Let's say you've been training [ACT](../lerobot/common/policies/act) on [lerobot/aloha_sim_insertion_human](https://huggingface.co/datasets/lerobot/aloha_sim_insertion_human) using the [gym-aloha](https://github.com/huggingface/gym-aloha) environment for evaluation with:
|
||||
```bash
|
||||
python lerobot/scripts/train.py \
|
||||
--policy.type=act \
|
||||
|
@ -74,7 +74,7 @@ python lerobot/scripts/train.py \
|
|||
> Notice we added `--output_dir` to explicitly tell where to write outputs from this run (checkpoints, training state, configs etc.). This is not mandatory and if you don't specify it, a default directory will be created from the current date and time, env.type and policy.type. This will typically look like `outputs/train/2025-01-24/16-10-05_aloha_act`.
|
||||
|
||||
We now want to train a different policy for aloha on another task. We'll change the dataset and use [lerobot/aloha_sim_transfer_cube_human](https://huggingface.co/datasets/lerobot/aloha_sim_transfer_cube_human) instead. Of course, we also need to change the task of the environment as well to match this other task.
|
||||
Looking at the [`AlohaEnv`](../../lerobot/common/envs/configs.py) config, the task is `"AlohaInsertion-v0"` by default, which corresponds to the task we trained on in the command above. The [gym-aloha](https://github.com/huggingface/gym-aloha?tab=readme-ov-file#description) environment also has the `AlohaTransferCube-v0` task which corresponds to this other task we want to train on. Putting this together, we can train this new policy on this different task using:
|
||||
Looking at the [`AlohaEnv`](../lerobot/common/envs/configs.py) config, the task is `"AlohaInsertion-v0"` by default, which corresponds to the task we trained on in the command above. The [gym-aloha](https://github.com/huggingface/gym-aloha?tab=readme-ov-file#description) environment also has the `AlohaTransferCube-v0` task which corresponds to this other task we want to train on. Putting this together, we can train this new policy on this different task using:
|
||||
```bash
|
||||
python lerobot/scripts/train.py \
|
||||
--policy.type=act \
|
||||
|
|
|
@ -830,11 +830,6 @@ It contains:
|
|||
- `dtRphone:33.84 (29.5hz)` which is the delta time of capturing an image from the phone camera in the thread running asynchronously.
|
||||
|
||||
Troubleshooting:
|
||||
- On Linux, if you encounter any issue during video encoding with `ffmpeg: unknown encoder libsvtav1`, you can:
|
||||
- install with conda-forge by running `conda install -c conda-forge ffmpeg` (it should be compiled with `libsvtav1`),
|
||||
> **NOTE:** This usually installs `ffmpeg 7.X` for your platform (check the version installed with `ffmpeg -encoders | grep libsvtav1`). If it isn't `ffmpeg 7.X` or lacks `libsvtav1` support, you can explicitly install `ffmpeg 7.X` using: `conda install ffmpeg=7.1.1 -c conda-forge`
|
||||
- or, install [ffmpeg build dependencies](https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu#GettheDependencies) and [compile ffmpeg from source with libsvtav1](https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu#libsvtav1),
|
||||
- and, make sure you use the corresponding ffmpeg binary to your install with `which ffmpeg`.
|
||||
- On Linux, if the left and right arrow keys and escape key don't have any effect during data recording, make sure you've set the `$DISPLAY` environment variable. See [pynput limitations](https://pynput.readthedocs.io/en/latest/limitations.html#linux).
|
||||
|
||||
At the end of data recording, your dataset will be uploaded on your Hugging Face page (e.g. https://huggingface.co/datasets/cadene/koch_test) that you can obtain by running:
|
||||
|
|
|
@ -0,0 +1,144 @@
|
|||
# Port DROID 1.0.1 dataset to LeRobotDataset
|
||||
|
||||
## Download
|
||||
|
||||
TODO
|
||||
|
||||
It will take 2 TB in your local disk.
|
||||
|
||||
## Port on a single computer
|
||||
|
||||
First, install tensorflow dataset utilities to read from raw files:
|
||||
```bash
|
||||
pip install tensorflow
|
||||
pip install tensorflow_datasets
|
||||
```
|
||||
|
||||
Then run this script to start porting the dataset:
|
||||
```bash
|
||||
python examples/port_datasets/droid_rlds/port_droid.py \
|
||||
--raw-dir /your/data/droid/1.0.1 \
|
||||
--repo-id your_id/droid_1.0.1 \
|
||||
--push-to-hub
|
||||
```
|
||||
|
||||
It will take 400GB in your local disk.
|
||||
|
||||
As usual, your LeRobotDataset will be stored in your huggingface/lerobot cache folder.
|
||||
|
||||
WARNING: it will take 7 days for porting the dataset locally and 3 days to upload, so we will need to parallelize over multiple nodes on a slurm cluster.
|
||||
|
||||
NOTE: For development, run this script to start porting a shard:
|
||||
```bash
|
||||
python examples/port_datasets/droid_rlds/port.py \
|
||||
--raw-dir /your/data/droid/1.0.1 \
|
||||
--repo-id your_id/droid_1.0.1 \
|
||||
--num-shards 2048 \
|
||||
--shard-index 0
|
||||
```
|
||||
|
||||
## Port over SLURM
|
||||
|
||||
Install slurm utilities from Hugging Face:
|
||||
```bash
|
||||
pip install datatrove
|
||||
```
|
||||
|
||||
|
||||
### 1. Port one shard per job
|
||||
|
||||
Run this script to start porting shards of the dataset:
|
||||
```bash
|
||||
python examples/port_datasets/droid_rlds/slurm_port_shards.py \
|
||||
--raw-dir /your/data/droid/1.0.1 \
|
||||
--repo-id your_id/droid_1.0.1 \
|
||||
--logs-dir /your/logs \
|
||||
--job-name port_droid \
|
||||
--partition your_partition \
|
||||
--workers 2048 \
|
||||
--cpus-per-task 8 \
|
||||
--mem-per-cpu 1950M
|
||||
```
|
||||
|
||||
**Note on how to set your command line arguments**
|
||||
|
||||
Regarding `--partition`, find yours by running:
|
||||
```bash
|
||||
info --format="%R"`
|
||||
```
|
||||
and select the CPU partition if you have one. No GPU needed.
|
||||
|
||||
Regarding `--workers`, it is the number of slurm jobs you will launch in parallel. 2048 is the maximum number, since there is 2048 shards in Droid. This big number will certainly max-out your cluster.
|
||||
|
||||
Regarding `--cpus-per-task` and `--mem-per-cpu`, by default it will use ~16GB of RAM (8*1950M) which is recommended to load the raw frames and 8 CPUs which can be useful to parallelize the encoding of the frames.
|
||||
|
||||
Find the number of CPUs and Memory of the nodes of your partition by running:
|
||||
```bash
|
||||
sinfo -N -p your_partition -h -o "%N cpus=%c mem=%m"
|
||||
```
|
||||
|
||||
**Useful commands to check progress and debug**
|
||||
|
||||
Check if your jobs are running:
|
||||
```bash
|
||||
squeue -u $USER`
|
||||
```
|
||||
|
||||
You should see a list with job indices like `15125385_155` where `15125385` is the index of the run and `155` is the worker index. The output/print of this worker is written in real time in `/your/logs/job_name/slurm_jobs/15125385_155.out`. For instance, you can inspect the content of this file by running `less /your/logs/job_name/slurm_jobs/15125385_155.out`.
|
||||
|
||||
Check the progression of your jobs by running:
|
||||
```bash
|
||||
jobs_status /your/logs
|
||||
```
|
||||
|
||||
If it's not 100% and no more slurm job is running, it means that some of them failed. Inspect the logs by running:
|
||||
```bash
|
||||
failed_logs /your/logs/job_name
|
||||
```
|
||||
|
||||
If there is an issue in the code, you can fix it in debug mode with `--slurm 0` which allows to set breakpoint:
|
||||
```bash
|
||||
python examples/port_datasets/droid_rlds/slurm_port_shards.py --slurm 0 ...
|
||||
```
|
||||
|
||||
And you can relaunch the same command, which will skip the completed jobs:
|
||||
```bash
|
||||
python examples/port_datasets/droid_rlds/slurm_port_shards.py --slurm 1 ...
|
||||
```
|
||||
|
||||
Once all jobs are completed, you will have one dataset per shard (e.g. `droid_1.0.1_world_2048_rank_1594`) saved on disk in your `/lerobot/home/dir/your_id` directory. You can find your `/lerobot/home/dir` by running:
|
||||
```bash
|
||||
python -c "from lerobot.common.constants import HF_LEROBOT_HOME;print(HF_LEROBOT_HOME)"
|
||||
```
|
||||
|
||||
|
||||
### 2. Aggregate all shards
|
||||
|
||||
Run this script to start aggregation:
|
||||
```bash
|
||||
python examples/port_datasets/droid_rlds/slurm_aggregate_shards.py \
|
||||
--repo-id your_id/droid_1.0.1 \
|
||||
--logs-dir /your/logs \
|
||||
--job-name aggr_droid \
|
||||
--partition your_partition \
|
||||
--workers 2048 \
|
||||
--cpus-per-task 8 \
|
||||
--mem-per-cpu 1950M
|
||||
```
|
||||
|
||||
Once all jobs are completed, you will have one dataset your `/lerobot/home/dir/your_id/droid_1.0.1` directory.
|
||||
|
||||
|
||||
### 3. Upload dataset
|
||||
|
||||
Run this script to start uploading:
|
||||
```bash
|
||||
python examples/port_datasets/droid_rlds/slurm_upload.py \
|
||||
--repo-id your_id/droid_1.0.1 \
|
||||
--logs-dir /your/logs \
|
||||
--job-name upload_droid \
|
||||
--partition your_partition \
|
||||
--workers 50 \
|
||||
--cpus-per-task 4 \
|
||||
--mem-per-cpu 1950M
|
||||
```
|
|
@ -0,0 +1,411 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
import tensorflow_datasets as tfds
|
||||
|
||||
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.common.utils.utils import get_elapsed_time_in_days_hours_minutes_seconds
|
||||
|
||||
DROID_SHARDS = 2048
|
||||
DROID_FPS = 15
|
||||
DROID_ROBOT_TYPE = "Franka"
|
||||
|
||||
# Dataset schema slightly adapted from: https://droid-dataset.github.io/droid/the-droid-dataset.html#-dataset-schema
|
||||
DROID_FEATURES = {
|
||||
# true on first step of the episode
|
||||
"is_first": {
|
||||
"dtype": "bool",
|
||||
"shape": (1,),
|
||||
"names": None,
|
||||
},
|
||||
# true on last step of the episode
|
||||
"is_last": {
|
||||
"dtype": "bool",
|
||||
"shape": (1,),
|
||||
"names": None,
|
||||
},
|
||||
# true on last step of the episode if it is a terminal step, True for demos
|
||||
"is_terminal": {
|
||||
"dtype": "bool",
|
||||
"shape": (1,),
|
||||
"names": None,
|
||||
},
|
||||
# language_instruction is also stored as "task" to follow LeRobot standard
|
||||
"language_instruction": {
|
||||
"dtype": "string",
|
||||
"shape": (1,),
|
||||
"names": None,
|
||||
},
|
||||
"language_instruction_2": {
|
||||
"dtype": "string",
|
||||
"shape": (1,),
|
||||
"names": None,
|
||||
},
|
||||
"language_instruction_3": {
|
||||
"dtype": "string",
|
||||
"shape": (1,),
|
||||
"names": None,
|
||||
},
|
||||
"observation.state.gripper_position": {
|
||||
"dtype": "float32",
|
||||
"shape": (1,),
|
||||
"names": {
|
||||
"axes": ["gripper"],
|
||||
},
|
||||
},
|
||||
"observation.state.cartesian_position": {
|
||||
"dtype": "float32",
|
||||
"shape": (6,),
|
||||
"names": {
|
||||
"axes": ["x", "y", "z", "roll", "pitch", "yaw"],
|
||||
},
|
||||
},
|
||||
"observation.state.joint_position": {
|
||||
"dtype": "float32",
|
||||
"shape": (7,),
|
||||
"names": {
|
||||
"axes": ["x", "y", "z", "roll", "pitch", "yaw"],
|
||||
},
|
||||
},
|
||||
# Add this new feature to follow LeRobot standard of using joint position + gripper
|
||||
"observation.state": {
|
||||
"dtype": "float32",
|
||||
"shape": (8,),
|
||||
"names": {
|
||||
"axes": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6", "gripper"],
|
||||
},
|
||||
},
|
||||
# Initially called wrist_image_left
|
||||
"observation.images.wrist_left": {
|
||||
"dtype": "video",
|
||||
"shape": (180, 320, 3),
|
||||
"names": [
|
||||
"height",
|
||||
"width",
|
||||
"channels",
|
||||
],
|
||||
},
|
||||
# Initially called exterior_image_1_left
|
||||
"observation.images.exterior_1_left": {
|
||||
"dtype": "video",
|
||||
"shape": (180, 320, 3),
|
||||
"names": [
|
||||
"height",
|
||||
"width",
|
||||
"channels",
|
||||
],
|
||||
},
|
||||
# Initially called exterior_image_2_left
|
||||
"observation.images.exterior_2_left": {
|
||||
"dtype": "video",
|
||||
"shape": (180, 320, 3),
|
||||
"names": [
|
||||
"height",
|
||||
"width",
|
||||
"channels",
|
||||
],
|
||||
},
|
||||
"action.gripper_position": {
|
||||
"dtype": "float32",
|
||||
"shape": (1,),
|
||||
"names": {
|
||||
"axes": ["gripper"],
|
||||
},
|
||||
},
|
||||
"action.gripper_velocity": {
|
||||
"dtype": "float32",
|
||||
"shape": (1,),
|
||||
"names": {
|
||||
"axes": ["gripper"],
|
||||
},
|
||||
},
|
||||
"action.cartesian_position": {
|
||||
"dtype": "float32",
|
||||
"shape": (6,),
|
||||
"names": {
|
||||
"axes": ["x", "y", "z", "roll", "pitch", "yaw"],
|
||||
},
|
||||
},
|
||||
"action.cartesian_velocity": {
|
||||
"dtype": "float32",
|
||||
"shape": (6,),
|
||||
"names": {
|
||||
"axes": ["x", "y", "z", "roll", "pitch", "yaw"],
|
||||
},
|
||||
},
|
||||
"action.joint_position": {
|
||||
"dtype": "float32",
|
||||
"shape": (7,),
|
||||
"names": {
|
||||
"axes": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"],
|
||||
},
|
||||
},
|
||||
"action.joint_velocity": {
|
||||
"dtype": "float32",
|
||||
"shape": (7,),
|
||||
"names": {
|
||||
"axes": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"],
|
||||
},
|
||||
},
|
||||
# This feature was called "action" in RLDS dataset and consists of [6x joint velocities, 1x gripper position]
|
||||
"action.original": {
|
||||
"dtype": "float32",
|
||||
"shape": (7,),
|
||||
"names": {
|
||||
"axes": ["x", "y", "z", "roll", "pitch", "yaw", "gripper"],
|
||||
},
|
||||
},
|
||||
# Add this new feature to follow LeRobot standard of using joint position + gripper
|
||||
"action": {
|
||||
"dtype": "float32",
|
||||
"shape": (8,),
|
||||
"names": {
|
||||
"axes": ["joint_0", "joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6", "gripper"],
|
||||
},
|
||||
},
|
||||
"discount": {
|
||||
"dtype": "float32",
|
||||
"shape": (1,),
|
||||
"names": None,
|
||||
},
|
||||
"reward": {
|
||||
"dtype": "float32",
|
||||
"shape": (1,),
|
||||
"names": None,
|
||||
},
|
||||
# Meta data that are the same for all frames in the episode
|
||||
"task_category": {
|
||||
"dtype": "string",
|
||||
"shape": (1,),
|
||||
"names": None,
|
||||
},
|
||||
"building": {
|
||||
"dtype": "string",
|
||||
"shape": (1,),
|
||||
"names": None,
|
||||
},
|
||||
"collector_id": {
|
||||
"dtype": "string",
|
||||
"shape": (1,),
|
||||
"names": None,
|
||||
},
|
||||
"date": {
|
||||
"dtype": "string",
|
||||
"shape": (1,),
|
||||
"names": None,
|
||||
},
|
||||
"camera_extrinsics.wrist_left": {
|
||||
"dtype": "float32",
|
||||
"shape": (6,),
|
||||
"names": {
|
||||
"axes": ["x", "y", "z", "roll", "pitch", "yaw"],
|
||||
},
|
||||
},
|
||||
"camera_extrinsics.exterior_1_left": {
|
||||
"dtype": "float32",
|
||||
"shape": (6,),
|
||||
"names": {
|
||||
"axes": ["x", "y", "z", "roll", "pitch", "yaw"],
|
||||
},
|
||||
},
|
||||
"camera_extrinsics.exterior_2_left": {
|
||||
"dtype": "float32",
|
||||
"shape": (6,),
|
||||
"names": {
|
||||
"axes": ["x", "y", "z", "roll", "pitch", "yaw"],
|
||||
},
|
||||
},
|
||||
"is_episode_successful": {
|
||||
"dtype": "bool",
|
||||
"shape": (1,),
|
||||
"names": None,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def is_episode_successful(tf_episode_metadata):
|
||||
# Adapted from: https://github.com/droid-dataset/droid_policy_learning/blob/dd1020eb20d981f90b5ff07dc80d80d5c0cb108b/robomimic/utils/rlds_utils.py#L8
|
||||
return "/success/" in tf_episode_metadata["file_path"].numpy().decode()
|
||||
|
||||
|
||||
def generate_lerobot_frames(tf_episode):
|
||||
m = tf_episode["episode_metadata"]
|
||||
frame_meta = {
|
||||
"task_category": m["building"].numpy().decode(),
|
||||
"building": m["building"].numpy().decode(),
|
||||
"collector_id": m["collector_id"].numpy().decode(),
|
||||
"date": m["date"].numpy().decode(),
|
||||
"camera_extrinsics.wrist_left": m["extrinsics_wrist_cam"].numpy(),
|
||||
"camera_extrinsics.exterior_1_left": m["extrinsics_exterior_cam_1"].numpy(),
|
||||
"camera_extrinsics.exterior_2_left": m["extrinsics_exterior_cam_2"].numpy(),
|
||||
"is_episode_successful": np.array([is_episode_successful(m)]),
|
||||
}
|
||||
for f in tf_episode["steps"]:
|
||||
# Dataset schema slightly adapted from: https://droid-dataset.github.io/droid/the-droid-dataset.html#-dataset-schema
|
||||
frame = {
|
||||
"is_first": np.array([f["is_first"].numpy()]),
|
||||
"is_last": np.array([f["is_last"].numpy()]),
|
||||
"is_terminal": np.array([f["is_terminal"].numpy()]),
|
||||
"language_instruction": f["language_instruction"].numpy().decode(),
|
||||
"language_instruction_2": f["language_instruction_2"].numpy().decode(),
|
||||
"language_instruction_3": f["language_instruction_3"].numpy().decode(),
|
||||
"observation.state.gripper_position": f["observation"]["gripper_position"].numpy(),
|
||||
"observation.state.cartesian_position": f["observation"]["cartesian_position"].numpy(),
|
||||
"observation.state.joint_position": f["observation"]["joint_position"].numpy(),
|
||||
"observation.images.wrist_left": f["observation"]["wrist_image_left"].numpy(),
|
||||
"observation.images.exterior_1_left": f["observation"]["exterior_image_1_left"].numpy(),
|
||||
"observation.images.exterior_2_left": f["observation"]["exterior_image_2_left"].numpy(),
|
||||
"action.gripper_position": f["action_dict"]["gripper_position"].numpy(),
|
||||
"action.gripper_velocity": f["action_dict"]["gripper_velocity"].numpy(),
|
||||
"action.cartesian_position": f["action_dict"]["cartesian_position"].numpy(),
|
||||
"action.cartesian_velocity": f["action_dict"]["cartesian_velocity"].numpy(),
|
||||
"action.joint_position": f["action_dict"]["joint_position"].numpy(),
|
||||
"action.joint_velocity": f["action_dict"]["joint_velocity"].numpy(),
|
||||
"discount": np.array([f["discount"].numpy()]),
|
||||
"reward": np.array([f["reward"].numpy()]),
|
||||
"action.original": f["action"].numpy(),
|
||||
}
|
||||
|
||||
# language_instruction is also stored as "task" to follow LeRobot standard
|
||||
frame["task"] = frame["language_instruction"]
|
||||
|
||||
# Add this new feature to follow LeRobot standard of using joint position + gripper
|
||||
frame["observation.state"] = np.concatenate(
|
||||
[frame["observation.state.joint_position"], frame["observation.state.gripper_position"]]
|
||||
)
|
||||
frame["action"] = np.concatenate([frame["action.joint_position"], frame["action.gripper_position"]])
|
||||
|
||||
# Meta data that are the same for all frames in the episode
|
||||
frame.update(frame_meta)
|
||||
|
||||
# Cast fp64 to fp32
|
||||
for key in frame:
|
||||
if isinstance(frame[key], np.ndarray) and frame[key].dtype == np.float64:
|
||||
frame[key] = frame[key].astype(np.float32)
|
||||
|
||||
yield frame
|
||||
|
||||
|
||||
def port_droid(
|
||||
raw_dir: Path,
|
||||
repo_id: str,
|
||||
push_to_hub: bool = False,
|
||||
num_shards: int | None = None,
|
||||
shard_index: int | None = None,
|
||||
):
|
||||
dataset_name = raw_dir.parent.name
|
||||
version = raw_dir.name
|
||||
data_dir = raw_dir.parent.parent
|
||||
|
||||
builder = tfds.builder(f"{dataset_name}/{version}", data_dir=data_dir, version="")
|
||||
|
||||
if num_shards is not None:
|
||||
tfds_num_shards = builder.info.splits["train"].num_shards
|
||||
if tfds_num_shards != DROID_SHARDS:
|
||||
raise ValueError(
|
||||
f"Number of shards of Droid dataset is expected to be {DROID_SHARDS} but is {tfds_num_shards}."
|
||||
)
|
||||
if num_shards != tfds_num_shards:
|
||||
raise ValueError(
|
||||
f"We only shard over the fixed number of shards provided by tensorflow dataset ({tfds_num_shards}), but {num_shards} shards provided instead."
|
||||
)
|
||||
if shard_index >= tfds_num_shards:
|
||||
raise ValueError(
|
||||
f"Shard index is greater than the num of shards ({shard_index} >= {num_shards})."
|
||||
)
|
||||
|
||||
raw_dataset = builder.as_dataset(split=f"train[{shard_index}shard]")
|
||||
else:
|
||||
raw_dataset = builder.as_dataset(split="train")
|
||||
|
||||
lerobot_dataset = LeRobotDataset.create(
|
||||
repo_id=repo_id,
|
||||
robot_type=DROID_ROBOT_TYPE,
|
||||
fps=DROID_FPS,
|
||||
features=DROID_FEATURES,
|
||||
)
|
||||
|
||||
start_time = time.time()
|
||||
num_episodes = raw_dataset.cardinality().numpy().item()
|
||||
logging.info(f"Number of episodes {num_episodes}")
|
||||
|
||||
for episode_index, episode in enumerate(raw_dataset):
|
||||
elapsed_time = time.time() - start_time
|
||||
d, h, m, s = get_elapsed_time_in_days_hours_minutes_seconds(elapsed_time)
|
||||
|
||||
logging.info(
|
||||
f"{episode_index} / {num_episodes} episodes processed (after {d} days, {h} hours, {m} minutes, {s:.3f} seconds)"
|
||||
)
|
||||
|
||||
for frame in generate_lerobot_frames(episode):
|
||||
lerobot_dataset.add_frame(frame)
|
||||
|
||||
lerobot_dataset.save_episode()
|
||||
logging.info("Save_episode")
|
||||
|
||||
if push_to_hub:
|
||||
lerobot_dataset.push_to_hub(
|
||||
# Add openx tag, since it belongs to the openx collection of datasets
|
||||
tags=["openx"],
|
||||
private=False,
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
|
||||
parser.add_argument(
|
||||
"--raw-dir",
|
||||
type=Path,
|
||||
required=True,
|
||||
help="Directory containing input raw datasets (e.g. `path/to/dataset` or `path/to/dataset/version).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--repo-id",
|
||||
type=str,
|
||||
help="Repositery identifier on Hugging Face: a community or a user name `/` the name of the dataset, required when push-to-hub is True",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--push-to-hub",
|
||||
action="store_true",
|
||||
help="Upload to hub.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--num-shards",
|
||||
type=int,
|
||||
default=None,
|
||||
help="Number of shards. Can be either None to load the full dataset, or 2048 to load one of the 2048 tensorflow dataset files.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--shard-index",
|
||||
type=int,
|
||||
default=None,
|
||||
help="Index of the shard. Can be either None to load the full dataset, or in [0,2047] to load one of the 2048 tensorflow dataset files.",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
port_droid(**vars(args))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -0,0 +1,288 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
import tqdm
|
||||
from datatrove.executor import LocalPipelineExecutor
|
||||
from datatrove.executor.slurm import SlurmPipelineExecutor
|
||||
from datatrove.pipeline.base import PipelineStep
|
||||
|
||||
from examples.port_datasets.droid_rlds.port_droid import DROID_SHARDS
|
||||
from lerobot.common.datasets.aggregate import validate_all_metadata
|
||||
from lerobot.common.datasets.lerobot_dataset import LeRobotDatasetMetadata
|
||||
from lerobot.common.datasets.utils import write_episode, write_episode_stats, write_info, write_task
|
||||
from lerobot.common.utils.utils import init_logging
|
||||
|
||||
|
||||
class AggregateDatasets(PipelineStep):
|
||||
def __init__(
|
||||
self,
|
||||
repo_ids: list[str],
|
||||
aggregated_repo_id: str,
|
||||
):
|
||||
super().__init__()
|
||||
self.repo_ids = repo_ids
|
||||
self.aggr_repo_id = aggregated_repo_id
|
||||
|
||||
self.create_aggr_dataset()
|
||||
|
||||
def create_aggr_dataset(self):
|
||||
init_logging()
|
||||
|
||||
logging.info("Start aggregate_datasets")
|
||||
|
||||
all_metadata = [LeRobotDatasetMetadata(repo_id) for repo_id in self.repo_ids]
|
||||
|
||||
fps, robot_type, features = validate_all_metadata(all_metadata)
|
||||
|
||||
# Create resulting dataset folder
|
||||
aggr_meta = LeRobotDatasetMetadata.create(
|
||||
repo_id=self.aggr_repo_id,
|
||||
fps=fps,
|
||||
robot_type=robot_type,
|
||||
features=features,
|
||||
)
|
||||
|
||||
logging.info("Find all tasks")
|
||||
# find all tasks, deduplicate them, create new task indices for each dataset
|
||||
# indexed by dataset index
|
||||
datasets_task_index_to_aggr_task_index = {}
|
||||
aggr_task_index = 0
|
||||
for dataset_index, meta in enumerate(tqdm.tqdm(all_metadata, desc="Find all tasks")):
|
||||
task_index_to_aggr_task_index = {}
|
||||
|
||||
for task_index, task in meta.tasks.items():
|
||||
if task not in aggr_meta.task_to_task_index:
|
||||
# add the task to aggr tasks mappings
|
||||
aggr_meta.tasks[aggr_task_index] = task
|
||||
aggr_meta.task_to_task_index[task] = aggr_task_index
|
||||
aggr_task_index += 1
|
||||
|
||||
# add task_index anyway
|
||||
task_index_to_aggr_task_index[task_index] = aggr_meta.task_to_task_index[task]
|
||||
|
||||
datasets_task_index_to_aggr_task_index[dataset_index] = task_index_to_aggr_task_index
|
||||
|
||||
logging.info("Prepare copy data and videos")
|
||||
datasets_ep_idx_to_aggr_ep_idx = {}
|
||||
datasets_aggr_episode_index_shift = {}
|
||||
aggr_episode_index_shift = 0
|
||||
for dataset_index, meta in enumerate(tqdm.tqdm(all_metadata, desc="Prepare copy data and videos")):
|
||||
ep_idx_to_aggr_ep_idx = {}
|
||||
|
||||
for episode_index in range(meta.total_episodes):
|
||||
aggr_episode_index = episode_index + aggr_episode_index_shift
|
||||
ep_idx_to_aggr_ep_idx[episode_index] = aggr_episode_index
|
||||
|
||||
datasets_ep_idx_to_aggr_ep_idx[dataset_index] = ep_idx_to_aggr_ep_idx
|
||||
datasets_aggr_episode_index_shift[dataset_index] = aggr_episode_index_shift
|
||||
|
||||
# populate episodes
|
||||
for episode_index, episode_dict in meta.episodes.items():
|
||||
aggr_episode_index = episode_index + aggr_episode_index_shift
|
||||
episode_dict["episode_index"] = aggr_episode_index
|
||||
aggr_meta.episodes[aggr_episode_index] = episode_dict
|
||||
|
||||
# populate episodes_stats
|
||||
for episode_index, episode_stats in meta.episodes_stats.items():
|
||||
aggr_episode_index = episode_index + aggr_episode_index_shift
|
||||
aggr_meta.episodes_stats[aggr_episode_index] = episode_stats
|
||||
|
||||
# populate info
|
||||
aggr_meta.info["total_episodes"] += meta.total_episodes
|
||||
aggr_meta.info["total_frames"] += meta.total_frames
|
||||
aggr_meta.info["total_videos"] += len(aggr_meta.video_keys) * meta.total_episodes
|
||||
|
||||
aggr_episode_index_shift += meta.total_episodes
|
||||
|
||||
logging.info("Write meta data")
|
||||
aggr_meta.info["total_tasks"] = len(aggr_meta.tasks)
|
||||
aggr_meta.info["total_chunks"] = aggr_meta.get_episode_chunk(aggr_episode_index_shift - 1)
|
||||
aggr_meta.info["splits"] = {"train": f"0:{aggr_meta.info['total_episodes']}"}
|
||||
|
||||
# create a new episodes jsonl with updated episode_index using write_episode
|
||||
for episode_dict in tqdm.tqdm(aggr_meta.episodes.values(), desc="Write episodes"):
|
||||
write_episode(episode_dict, aggr_meta.root)
|
||||
|
||||
# create a new episode_stats jsonl with updated episode_index using write_episode_stats
|
||||
for episode_index, episode_stats in tqdm.tqdm(
|
||||
aggr_meta.episodes_stats.items(), desc="Write episodes stats"
|
||||
):
|
||||
write_episode_stats(episode_index, episode_stats, aggr_meta.root)
|
||||
|
||||
# create a new task jsonl with updated episode_index using write_task
|
||||
for task_index, task in tqdm.tqdm(aggr_meta.tasks.items(), desc="Write tasks"):
|
||||
write_task(task_index, task, aggr_meta.root)
|
||||
|
||||
write_info(aggr_meta.info, aggr_meta.root)
|
||||
|
||||
self.datasets_task_index_to_aggr_task_index = datasets_task_index_to_aggr_task_index
|
||||
self.datasets_ep_idx_to_aggr_ep_idx = datasets_ep_idx_to_aggr_ep_idx
|
||||
self.datasets_aggr_episode_index_shift = datasets_aggr_episode_index_shift
|
||||
|
||||
logging.info("Meta data done writing!")
|
||||
|
||||
def run(self, data=None, rank: int = 0, world_size: int = 1):
|
||||
import logging
|
||||
import shutil
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from lerobot.common.datasets.aggregate import get_update_episode_and_task_func
|
||||
from lerobot.common.datasets.lerobot_dataset import LeRobotDatasetMetadata
|
||||
from lerobot.common.utils.utils import init_logging
|
||||
|
||||
init_logging()
|
||||
|
||||
aggr_meta = LeRobotDatasetMetadata(self.aggr_repo_id)
|
||||
all_metadata = [LeRobotDatasetMetadata(repo_id) for repo_id in self.repo_ids]
|
||||
|
||||
if world_size != len(all_metadata):
|
||||
raise ValueError()
|
||||
|
||||
dataset_index = rank
|
||||
meta = all_metadata[dataset_index]
|
||||
aggr_episode_index_shift = self.datasets_aggr_episode_index_shift[dataset_index]
|
||||
|
||||
logging.info("Copy data")
|
||||
for episode_index in range(meta.total_episodes):
|
||||
aggr_episode_index = self.datasets_ep_idx_to_aggr_ep_idx[dataset_index][episode_index]
|
||||
data_path = meta.root / meta.get_data_file_path(episode_index)
|
||||
aggr_data_path = aggr_meta.root / aggr_meta.get_data_file_path(aggr_episode_index)
|
||||
|
||||
# update episode_index and task_index
|
||||
df = pd.read_parquet(data_path)
|
||||
update_row_func = get_update_episode_and_task_func(
|
||||
aggr_episode_index_shift, self.datasets_task_index_to_aggr_task_index[dataset_index]
|
||||
)
|
||||
df = df.apply(update_row_func, axis=1)
|
||||
|
||||
aggr_data_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
df.to_parquet(aggr_data_path)
|
||||
|
||||
logging.info("Copy videos")
|
||||
for episode_index in range(meta.total_episodes):
|
||||
aggr_episode_index = episode_index + aggr_episode_index_shift
|
||||
for vid_key in meta.video_keys:
|
||||
video_path = meta.root / meta.get_video_file_path(episode_index, vid_key)
|
||||
aggr_video_path = aggr_meta.root / aggr_meta.get_video_file_path(aggr_episode_index, vid_key)
|
||||
aggr_video_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
shutil.copy(video_path, aggr_video_path)
|
||||
|
||||
# copy_command = f"cp {video_path} {aggr_video_path} &"
|
||||
# subprocess.Popen(copy_command, shell=True)
|
||||
|
||||
logging.info("Done!")
|
||||
|
||||
|
||||
def make_aggregate_executor(
|
||||
repo_ids, repo_id, job_name, logs_dir, workers, partition, cpus_per_task, mem_per_cpu, slurm=True
|
||||
):
|
||||
kwargs = {
|
||||
"pipeline": [
|
||||
AggregateDatasets(repo_ids, repo_id),
|
||||
],
|
||||
"logging_dir": str(logs_dir / job_name),
|
||||
}
|
||||
|
||||
if slurm:
|
||||
kwargs.update(
|
||||
{
|
||||
"job_name": job_name,
|
||||
"tasks": DROID_SHARDS,
|
||||
"workers": workers,
|
||||
"time": "08:00:00",
|
||||
"partition": partition,
|
||||
"cpus_per_task": cpus_per_task,
|
||||
"sbatch_args": {"mem-per-cpu": mem_per_cpu},
|
||||
}
|
||||
)
|
||||
executor = SlurmPipelineExecutor(**kwargs)
|
||||
else:
|
||||
kwargs.update(
|
||||
{
|
||||
"tasks": DROID_SHARDS,
|
||||
"workers": 1,
|
||||
}
|
||||
)
|
||||
executor = LocalPipelineExecutor(**kwargs)
|
||||
|
||||
return executor
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
|
||||
parser.add_argument(
|
||||
"--repo-id",
|
||||
type=str,
|
||||
help="Repositery identifier on Hugging Face: a community or a user name `/` the name of the dataset, required when push-to-hub is True.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--logs-dir",
|
||||
type=Path,
|
||||
help="Path to logs directory for `datatrove`.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--job-name",
|
||||
type=str,
|
||||
default="aggr_droid",
|
||||
help="Job name used in slurm, and name of the directory created inside the provided logs directory.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--slurm",
|
||||
type=int,
|
||||
default=1,
|
||||
help="Launch over slurm. Use `--slurm 0` to launch sequentially (useful to debug).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--workers",
|
||||
type=int,
|
||||
default=2048,
|
||||
help="Number of slurm workers. It should be less than the maximum number of shards.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--partition",
|
||||
type=str,
|
||||
help="Slurm partition. Ideally a CPU partition. No need for GPU partition.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--cpus-per-task",
|
||||
type=int,
|
||||
default=8,
|
||||
help="Number of cpus that each slurm worker will use.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--mem-per-cpu",
|
||||
type=str,
|
||||
default="1950M",
|
||||
help="Memory per cpu that each worker will use.",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
kwargs = vars(args)
|
||||
kwargs["slurm"] = kwargs.pop("slurm") == 1
|
||||
|
||||
repo_ids = [f"{args.repo_id}_world_{DROID_SHARDS}_rank_{rank}" for rank in range(DROID_SHARDS)]
|
||||
aggregate_executor = make_aggregate_executor(repo_ids, **kwargs)
|
||||
aggregate_executor.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -0,0 +1,161 @@
|
|||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
from datatrove.executor import LocalPipelineExecutor
|
||||
from datatrove.executor.slurm import SlurmPipelineExecutor
|
||||
from datatrove.pipeline.base import PipelineStep
|
||||
|
||||
from examples.port_datasets.droid_rlds.port_droid import DROID_SHARDS
|
||||
from lerobot.common.datasets.lerobot_dataset import LeRobotDatasetMetadata
|
||||
|
||||
|
||||
def validate_shard(repo_id):
|
||||
"""Sanity check that ensure meta data can be loaded and all files are present."""
|
||||
meta = LeRobotDatasetMetadata(repo_id)
|
||||
|
||||
if meta.total_episodes == 0:
|
||||
raise ValueError("Number of episodes is 0.")
|
||||
|
||||
for ep_idx in range(meta.total_episodes):
|
||||
data_path = meta.root / meta.get_data_file_path(ep_idx)
|
||||
|
||||
if not data_path.exists():
|
||||
raise ValueError(f"Parquet file is missing in: {data_path}")
|
||||
|
||||
for vid_key in meta.video_keys:
|
||||
vid_path = meta.root / meta.get_video_file_path(ep_idx, vid_key)
|
||||
if not vid_path.exists():
|
||||
raise ValueError(f"Video file is missing in: {vid_path}")
|
||||
|
||||
|
||||
class PortDroidShards(PipelineStep):
|
||||
def __init__(
|
||||
self,
|
||||
raw_dir: Path | str,
|
||||
repo_id: str = None,
|
||||
):
|
||||
super().__init__()
|
||||
self.raw_dir = Path(raw_dir)
|
||||
self.repo_id = repo_id
|
||||
|
||||
def run(self, data=None, rank: int = 0, world_size: int = 1):
|
||||
from datasets.utils.tqdm import disable_progress_bars
|
||||
|
||||
from examples.port_datasets.droid_rlds.port_droid import port_droid
|
||||
from lerobot.common.utils.utils import init_logging
|
||||
|
||||
init_logging()
|
||||
disable_progress_bars()
|
||||
|
||||
shard_repo_id = f"{self.repo_id}_world_{world_size}_rank_{rank}"
|
||||
|
||||
port_droid(
|
||||
self.raw_dir,
|
||||
shard_repo_id,
|
||||
push_to_hub=False,
|
||||
num_shards=world_size,
|
||||
shard_index=rank,
|
||||
)
|
||||
|
||||
validate_shard(shard_repo_id)
|
||||
|
||||
|
||||
def make_port_executor(
|
||||
raw_dir, repo_id, job_name, logs_dir, workers, partition, cpus_per_task, mem_per_cpu, slurm=True
|
||||
):
|
||||
kwargs = {
|
||||
"pipeline": [
|
||||
PortDroidShards(raw_dir, repo_id),
|
||||
],
|
||||
"logging_dir": str(logs_dir / job_name),
|
||||
}
|
||||
|
||||
if slurm:
|
||||
kwargs.update(
|
||||
{
|
||||
"job_name": job_name,
|
||||
"tasks": DROID_SHARDS,
|
||||
"workers": workers,
|
||||
"time": "08:00:00",
|
||||
"partition": partition,
|
||||
"cpus_per_task": cpus_per_task,
|
||||
"sbatch_args": {"mem-per-cpu": mem_per_cpu},
|
||||
}
|
||||
)
|
||||
executor = SlurmPipelineExecutor(**kwargs)
|
||||
else:
|
||||
kwargs.update(
|
||||
{
|
||||
"tasks": 1,
|
||||
"workers": 1,
|
||||
}
|
||||
)
|
||||
executor = LocalPipelineExecutor(**kwargs)
|
||||
|
||||
return executor
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
|
||||
parser.add_argument(
|
||||
"--raw-dir",
|
||||
type=Path,
|
||||
required=True,
|
||||
help="Directory containing input raw datasets (e.g. `path/to/dataset` or `path/to/dataset/version).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--repo-id",
|
||||
type=str,
|
||||
help="Repositery identifier on Hugging Face: a community or a user name `/` the name of the dataset, required when push-to-hub is True.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--logs-dir",
|
||||
type=Path,
|
||||
help="Path to logs directory for `datatrove`.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--job-name",
|
||||
type=str,
|
||||
default="port_droid",
|
||||
help="Job name used in slurm, and name of the directory created inside the provided logs directory.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--slurm",
|
||||
type=int,
|
||||
default=1,
|
||||
help="Launch over slurm. Use `--slurm 0` to launch sequentially (useful to debug).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--workers",
|
||||
type=int,
|
||||
default=2048,
|
||||
help="Number of slurm workers. It should be less than the maximum number of shards.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--partition",
|
||||
type=str,
|
||||
help="Slurm partition. Ideally a CPU partition. No need for GPU partition.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--cpus-per-task",
|
||||
type=int,
|
||||
default=8,
|
||||
help="Number of cpus that each slurm worker will use.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--mem-per-cpu",
|
||||
type=str,
|
||||
default="1950M",
|
||||
help="Memory per cpu that each worker will use.",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
kwargs = vars(args)
|
||||
kwargs["slurm"] = kwargs.pop("slurm") == 1
|
||||
port_executor = make_port_executor(**kwargs)
|
||||
port_executor.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -0,0 +1,263 @@
|
|||
import argparse
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from datatrove.executor import LocalPipelineExecutor
|
||||
from datatrove.executor.slurm import SlurmPipelineExecutor
|
||||
from datatrove.pipeline.base import PipelineStep
|
||||
from huggingface_hub import HfApi
|
||||
from huggingface_hub.constants import REPOCARD_NAME
|
||||
|
||||
from examples.port_datasets.droid_rlds.port_droid import DROID_SHARDS
|
||||
from lerobot.common.datasets.lerobot_dataset import CODEBASE_VERSION, LeRobotDatasetMetadata
|
||||
from lerobot.common.datasets.utils import create_lerobot_dataset_card
|
||||
from lerobot.common.utils.utils import init_logging
|
||||
|
||||
|
||||
class UploadDataset(PipelineStep):
|
||||
def __init__(
|
||||
self,
|
||||
repo_id: str,
|
||||
branch: str | None = None,
|
||||
revision: str | None = None,
|
||||
tags: list | None = None,
|
||||
license: str | None = "apache-2.0",
|
||||
private: bool = False,
|
||||
distant_repo_id: str | None = None,
|
||||
**card_kwargs,
|
||||
):
|
||||
super().__init__()
|
||||
self.repo_id = repo_id
|
||||
self.distant_repo_id = self.repo_id if distant_repo_id is None else distant_repo_id
|
||||
self.branch = branch
|
||||
self.tags = tags
|
||||
self.license = license
|
||||
self.private = private
|
||||
self.card_kwargs = card_kwargs
|
||||
self.revision = revision if revision else CODEBASE_VERSION
|
||||
|
||||
if os.environ.get("HF_HUB_ENABLE_HF_TRANSFER", "0") != "1":
|
||||
logging.warning(
|
||||
'HF_HUB_ENABLE_HF_TRANSFER is not set to "1". Install hf_transfer and set the env '
|
||||
"variable for faster uploads:\npip install hf-transfer\nexport HF_HUB_ENABLE_HF_TRANSFER=1"
|
||||
)
|
||||
|
||||
self.create_repo()
|
||||
|
||||
def create_repo(self):
|
||||
logging.info(f"Loading meta data from {self.repo_id}...")
|
||||
meta = LeRobotDatasetMetadata(self.repo_id)
|
||||
|
||||
logging.info(f"Creating repo {self.distant_repo_id}...")
|
||||
hub_api = HfApi()
|
||||
hub_api.create_repo(
|
||||
repo_id=self.distant_repo_id,
|
||||
private=self.private,
|
||||
repo_type="dataset",
|
||||
exist_ok=True,
|
||||
)
|
||||
if self.branch:
|
||||
hub_api.create_branch(
|
||||
repo_id=self.distant_repo_id,
|
||||
branch=self.branch,
|
||||
revision=self.revision,
|
||||
repo_type="dataset",
|
||||
exist_ok=True,
|
||||
)
|
||||
|
||||
if not hub_api.file_exists(
|
||||
self.distant_repo_id, REPOCARD_NAME, repo_type="dataset", revision=self.branch
|
||||
):
|
||||
card = create_lerobot_dataset_card(
|
||||
tags=self.tags, dataset_info=meta.info, license=self.license, **self.card_kwargs
|
||||
)
|
||||
card.push_to_hub(repo_id=self.distant_repo_id, repo_type="dataset", revision=self.branch)
|
||||
|
||||
def list_files_recursively(directory):
|
||||
base_path = Path(directory)
|
||||
return [str(file.relative_to(base_path)) for file in base_path.rglob("*") if file.is_file()]
|
||||
|
||||
logging.info(f"Listing all local files from {self.repo_id}...")
|
||||
self.file_paths = list_files_recursively(meta.root)
|
||||
self.file_paths = sorted(self.file_paths)
|
||||
|
||||
def create_chunks(self, lst, n):
|
||||
from itertools import islice
|
||||
|
||||
it = iter(lst)
|
||||
return [list(islice(it, size)) for size in [len(lst) // n + (i < len(lst) % n) for i in range(n)]]
|
||||
|
||||
def create_commits(self, additions):
|
||||
import logging
|
||||
import math
|
||||
import random
|
||||
import time
|
||||
|
||||
from huggingface_hub import create_commit
|
||||
from huggingface_hub.utils import HfHubHTTPError
|
||||
|
||||
FILES_BETWEEN_COMMITS = 10 # noqa: N806
|
||||
BASE_DELAY = 0.1 # noqa: N806
|
||||
MAX_RETRIES = 12 # noqa: N806
|
||||
|
||||
# Split the files into smaller chunks for faster commit
|
||||
# and avoiding "A commit has happened since" error
|
||||
num_chunks = math.ceil(len(additions) / FILES_BETWEEN_COMMITS)
|
||||
chunks = self.create_chunks(additions, num_chunks)
|
||||
|
||||
for chunk in chunks:
|
||||
retries = 0
|
||||
while True:
|
||||
try:
|
||||
create_commit(
|
||||
self.distant_repo_id,
|
||||
repo_type="dataset",
|
||||
operations=chunk,
|
||||
commit_message=f"DataTrove upload ({len(chunk)} files)",
|
||||
revision=self.branch,
|
||||
)
|
||||
# TODO: every 100 chunks super_squach_commits()
|
||||
logging.info("create_commit completed!")
|
||||
break
|
||||
except HfHubHTTPError as e:
|
||||
if "A commit has happened since" in e.server_message:
|
||||
if retries >= MAX_RETRIES:
|
||||
logging.error(f"Failed to create commit after {MAX_RETRIES=}. Giving up.")
|
||||
raise e
|
||||
logging.info("Commit creation race condition issue. Waiting...")
|
||||
time.sleep(BASE_DELAY * 2**retries + random.uniform(0, 2))
|
||||
retries += 1
|
||||
else:
|
||||
raise e
|
||||
|
||||
def run(self, data=None, rank: int = 0, world_size: int = 1):
|
||||
import logging
|
||||
|
||||
from datasets.utils.tqdm import disable_progress_bars
|
||||
from huggingface_hub import CommitOperationAdd, preupload_lfs_files
|
||||
|
||||
from lerobot.common.datasets.lerobot_dataset import LeRobotDatasetMetadata
|
||||
from lerobot.common.utils.utils import init_logging
|
||||
|
||||
init_logging()
|
||||
disable_progress_bars()
|
||||
|
||||
chunks = self.create_chunks(self.file_paths, world_size)
|
||||
file_paths = chunks[rank]
|
||||
|
||||
if len(file_paths) == 0:
|
||||
raise ValueError(file_paths)
|
||||
|
||||
logging.info("Pre-uploading LFS files...")
|
||||
for i, path in enumerate(file_paths):
|
||||
logging.info(f"{i}: {path}")
|
||||
|
||||
meta = LeRobotDatasetMetadata(self.repo_id)
|
||||
additions = [
|
||||
CommitOperationAdd(path_in_repo=path, path_or_fileobj=meta.root / path) for path in file_paths
|
||||
]
|
||||
preupload_lfs_files(
|
||||
repo_id=self.distant_repo_id, repo_type="dataset", additions=additions, revision=self.branch
|
||||
)
|
||||
|
||||
logging.info("Creating commits...")
|
||||
self.create_commits(additions)
|
||||
logging.info("Done!")
|
||||
|
||||
|
||||
def make_upload_executor(
|
||||
repo_id, job_name, logs_dir, workers, partition, cpus_per_task, mem_per_cpu, slurm=True
|
||||
):
|
||||
kwargs = {
|
||||
"pipeline": [
|
||||
UploadDataset(repo_id),
|
||||
],
|
||||
"logging_dir": str(logs_dir / job_name),
|
||||
}
|
||||
|
||||
if slurm:
|
||||
kwargs.update(
|
||||
{
|
||||
"job_name": job_name,
|
||||
"tasks": DROID_SHARDS,
|
||||
"workers": workers,
|
||||
"time": "08:00:00",
|
||||
"partition": partition,
|
||||
"cpus_per_task": cpus_per_task,
|
||||
"sbatch_args": {"mem-per-cpu": mem_per_cpu},
|
||||
}
|
||||
)
|
||||
executor = SlurmPipelineExecutor(**kwargs)
|
||||
else:
|
||||
kwargs.update(
|
||||
{
|
||||
"tasks": DROID_SHARDS,
|
||||
"workers": 1,
|
||||
}
|
||||
)
|
||||
executor = LocalPipelineExecutor(**kwargs)
|
||||
|
||||
return executor
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
|
||||
parser.add_argument(
|
||||
"--repo-id",
|
||||
type=str,
|
||||
help="Repositery identifier on Hugging Face: a community or a user name `/` the name of the dataset, required when push-to-hub is True.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--logs-dir",
|
||||
type=Path,
|
||||
help="Path to logs directory for `datatrove`.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--job-name",
|
||||
type=str,
|
||||
default="upload_droid",
|
||||
help="Job name used in slurm, and name of the directory created inside the provided logs directory.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--slurm",
|
||||
type=int,
|
||||
default=1,
|
||||
help="Launch over slurm. Use `--slurm 0` to launch sequentially (useful to debug).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--workers",
|
||||
type=int,
|
||||
default=50,
|
||||
help="Number of slurm workers. It should be less than the maximum number of shards.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--partition",
|
||||
type=str,
|
||||
help="Slurm partition. Ideally a CPU partition. No need for GPU partition.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--cpus-per-task",
|
||||
type=int,
|
||||
default=8,
|
||||
help="Number of cpus that each slurm worker will use.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--mem-per-cpu",
|
||||
type=str,
|
||||
default="1950M",
|
||||
help="Memory per cpu that each worker will use.",
|
||||
)
|
||||
|
||||
init_logging()
|
||||
|
||||
args = parser.parse_args()
|
||||
kwargs = vars(args)
|
||||
kwargs["slurm"] = kwargs.pop("slurm") == 1
|
||||
upload_executor = make_upload_executor(**kwargs)
|
||||
upload_executor.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -0,0 +1,175 @@
|
|||
import logging
|
||||
import shutil
|
||||
|
||||
import pandas as pd
|
||||
import tqdm
|
||||
|
||||
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
|
||||
from lerobot.common.datasets.utils import write_episode, write_episode_stats, write_info, write_task
|
||||
from lerobot.common.utils.utils import init_logging
|
||||
|
||||
|
||||
def validate_all_metadata(all_metadata: list[LeRobotDatasetMetadata]):
|
||||
# validate same fps, robot_type, features
|
||||
|
||||
fps = all_metadata[0].fps
|
||||
robot_type = all_metadata[0].robot_type
|
||||
features = all_metadata[0].features
|
||||
|
||||
for meta in tqdm.tqdm(all_metadata, desc="Validate all meta data"):
|
||||
if fps != meta.fps:
|
||||
raise ValueError(f"Same fps is expected, but got fps={meta.fps} instead of {fps}.")
|
||||
if robot_type != meta.robot_type:
|
||||
raise ValueError(
|
||||
f"Same robot_type is expected, but got robot_type={meta.robot_type} instead of {robot_type}."
|
||||
)
|
||||
if features != meta.features:
|
||||
raise ValueError(
|
||||
f"Same features is expected, but got features={meta.features} instead of {features}."
|
||||
)
|
||||
|
||||
return fps, robot_type, features
|
||||
|
||||
|
||||
def get_update_episode_and_task_func(episode_index_to_add, task_index_to_global_task_index):
|
||||
def _update(row):
|
||||
row["episode_index"] = row["episode_index"] + episode_index_to_add
|
||||
row["task_index"] = task_index_to_global_task_index[row["task_index"]]
|
||||
return row
|
||||
|
||||
return _update
|
||||
|
||||
|
||||
def aggregate_datasets(repo_ids: list[str], aggr_repo_id: str, aggr_root=None):
|
||||
logging.info("Start aggregate_datasets")
|
||||
|
||||
all_metadata = [LeRobotDatasetMetadata(repo_id) for repo_id in repo_ids]
|
||||
|
||||
fps, robot_type, features = validate_all_metadata(all_metadata)
|
||||
|
||||
# Create resulting dataset folder
|
||||
aggr_meta = LeRobotDatasetMetadata.create(
|
||||
repo_id=aggr_repo_id,
|
||||
fps=fps,
|
||||
robot_type=robot_type,
|
||||
features=features,
|
||||
root=aggr_root,
|
||||
)
|
||||
|
||||
logging.info("Find all tasks")
|
||||
# find all tasks, deduplicate them, create new task indices for each dataset
|
||||
# indexed by dataset index
|
||||
datasets_task_index_to_aggr_task_index = {}
|
||||
aggr_task_index = 0
|
||||
for dataset_index, meta in enumerate(tqdm.tqdm(all_metadata, desc="Find all tasks")):
|
||||
task_index_to_aggr_task_index = {}
|
||||
|
||||
for task_index, task in meta.tasks.items():
|
||||
if task not in aggr_meta.task_to_task_index:
|
||||
# add the task to aggr tasks mappings
|
||||
aggr_meta.tasks[aggr_task_index] = task
|
||||
aggr_meta.task_to_task_index[task] = aggr_task_index
|
||||
aggr_task_index += 1
|
||||
|
||||
# add task_index anyway
|
||||
task_index_to_aggr_task_index[task_index] = aggr_meta.task_to_task_index[task]
|
||||
|
||||
datasets_task_index_to_aggr_task_index[dataset_index] = task_index_to_aggr_task_index
|
||||
|
||||
logging.info("Copy data and videos")
|
||||
aggr_episode_index_shift = 0
|
||||
for dataset_index, meta in enumerate(tqdm.tqdm(all_metadata, desc="Copy data and videos")):
|
||||
# cp data
|
||||
for episode_index in range(meta.total_episodes):
|
||||
aggr_episode_index = episode_index + aggr_episode_index_shift
|
||||
data_path = meta.root / meta.get_data_file_path(episode_index)
|
||||
aggr_data_path = aggr_meta.root / aggr_meta.get_data_file_path(aggr_episode_index)
|
||||
|
||||
# update episode_index and task_index
|
||||
df = pd.read_parquet(data_path)
|
||||
update_row_func = get_update_episode_and_task_func(
|
||||
aggr_episode_index_shift, datasets_task_index_to_aggr_task_index[dataset_index]
|
||||
)
|
||||
df = df.apply(update_row_func, axis=1)
|
||||
|
||||
aggr_data_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
df.to_parquet(aggr_data_path)
|
||||
|
||||
# cp videos
|
||||
for episode_index in range(meta.total_episodes):
|
||||
aggr_episode_index = episode_index + aggr_episode_index_shift
|
||||
for vid_key in meta.video_keys:
|
||||
video_path = meta.root / meta.get_video_file_path(episode_index, vid_key)
|
||||
aggr_video_path = aggr_meta.root / aggr_meta.get_video_file_path(aggr_episode_index, vid_key)
|
||||
aggr_video_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
shutil.copy(video_path, aggr_video_path)
|
||||
|
||||
# copy_command = f"cp {video_path} {aggr_video_path} &"
|
||||
# subprocess.Popen(copy_command, shell=True)
|
||||
|
||||
# populate episodes
|
||||
for episode_index, episode_dict in meta.episodes.items():
|
||||
aggr_episode_index = episode_index + aggr_episode_index_shift
|
||||
episode_dict["episode_index"] = aggr_episode_index
|
||||
aggr_meta.episodes[aggr_episode_index] = episode_dict
|
||||
|
||||
# populate episodes_stats
|
||||
for episode_index, episode_stats in meta.episodes_stats.items():
|
||||
aggr_episode_index = episode_index + aggr_episode_index_shift
|
||||
aggr_meta.episodes_stats[aggr_episode_index] = episode_stats
|
||||
|
||||
# populate info
|
||||
aggr_meta.info["total_episodes"] += meta.total_episodes
|
||||
aggr_meta.info["total_frames"] += meta.total_frames
|
||||
aggr_meta.info["total_videos"] += len(aggr_meta.video_keys) * meta.total_episodes
|
||||
|
||||
aggr_episode_index_shift += meta.total_episodes
|
||||
|
||||
logging.info("write meta data")
|
||||
|
||||
aggr_meta.info["total_chunks"] = aggr_meta.get_episode_chunk(aggr_episode_index_shift - 1)
|
||||
aggr_meta.info["splits"] = {"train": f"0:{aggr_meta.info['total_episodes']}"}
|
||||
|
||||
# create a new episodes jsonl with updated episode_index using write_episode
|
||||
for episode_dict in aggr_meta.episodes.values():
|
||||
write_episode(episode_dict, aggr_meta.root)
|
||||
|
||||
# create a new episode_stats jsonl with updated episode_index using write_episode_stats
|
||||
for episode_index, episode_stats in aggr_meta.episodes_stats.items():
|
||||
write_episode_stats(episode_index, episode_stats, aggr_meta.root)
|
||||
|
||||
# create a new task jsonl with updated episode_index using write_task
|
||||
for task_index, task in aggr_meta.tasks.items():
|
||||
write_task(task_index, task, aggr_meta.root)
|
||||
|
||||
write_info(aggr_meta.info, aggr_meta.root)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
init_logging()
|
||||
repo_id = "cadene/droid"
|
||||
aggr_repo_id = "cadene/droid"
|
||||
datetime = "2025-02-22_11-23-54"
|
||||
|
||||
# root = Path(f"/tmp/{repo_id}")
|
||||
# if root.exists():
|
||||
# shutil.rmtree(root)
|
||||
root = None
|
||||
|
||||
# all_metadata = [LeRobotDatasetMetadata(f"{repo_id}_{datetime}_world_2048_rank_{rank}") for rank in range(2048)]
|
||||
|
||||
# aggregate_datasets(
|
||||
# all_metadata,
|
||||
# aggr_repo_id,
|
||||
# root=root,
|
||||
# )
|
||||
|
||||
aggr_dataset = LeRobotDataset(
|
||||
repo_id=aggr_repo_id,
|
||||
root=root,
|
||||
)
|
||||
aggr_dataset.push_to_hub(tags=["openx"])
|
||||
|
||||
# for meta in all_metadata:
|
||||
# dataset = LeRobotDataset(repo_id=meta.repo_id, root=meta.root)
|
||||
# dataset.push_to_hub(tags=["openx"])
|
|
@ -74,7 +74,7 @@ from lerobot.common.datasets.video_utils import (
|
|||
)
|
||||
from lerobot.common.robot_devices.robots.utils import Robot
|
||||
|
||||
CODEBASE_VERSION = "v2.1"
|
||||
CODEBASE_VERSION = "v3.0"
|
||||
|
||||
|
||||
class LeRobotDatasetMetadata:
|
||||
|
@ -617,6 +617,8 @@ class LeRobotDataset(torch.utils.data.Dataset):
|
|||
"""hf_dataset contains all the observations, states, actions, rewards, etc."""
|
||||
if self.episodes is None:
|
||||
path = str(self.root / "data")
|
||||
# TODO(rcadene): load_dataset convert parquet to arrow.
|
||||
# set num_proc to accelerate this conversion
|
||||
hf_dataset = load_dataset("parquet", data_dir=path, split="train")
|
||||
else:
|
||||
files = [str(self.root / self.meta.get_data_file_path(ep_idx)) for ep_idx in self.episodes]
|
||||
|
|
|
@ -0,0 +1,137 @@
|
|||
"""
|
||||
This script will help you convert any LeRobot dataset already pushed to the hub from codebase version 2.1 to
|
||||
3.0. It will:
|
||||
|
||||
- Generate per-episodes stats and writes them in `episodes_stats.jsonl`
|
||||
- Check consistency between these new stats and the old ones.
|
||||
- Remove the deprecated `stats.json`.
|
||||
- Update codebase_version in `info.json`.
|
||||
- Push this new version to the hub on the 'main' branch and tags it with "v2.1".
|
||||
|
||||
Usage:
|
||||
|
||||
```bash
|
||||
python lerobot/common/datasets/v30/convert_dataset_v21_to_v30.py \
|
||||
--repo-id=lerobot/pusht
|
||||
```
|
||||
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
|
||||
from datasets import Dataset
|
||||
from huggingface_hub import snapshot_download
|
||||
|
||||
from lerobot.common.constants import HF_LEROBOT_HOME
|
||||
from lerobot.common.datasets.utils import (
|
||||
load_episodes_stats,
|
||||
)
|
||||
|
||||
V21 = "v2.1"
|
||||
|
||||
|
||||
class SuppressWarnings:
|
||||
def __enter__(self):
|
||||
self.previous_level = logging.getLogger().getEffectiveLevel()
|
||||
logging.getLogger().setLevel(logging.ERROR)
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
logging.getLogger().setLevel(self.previous_level)
|
||||
|
||||
|
||||
def convert_dataset(
|
||||
repo_id: str,
|
||||
branch: str | None = None,
|
||||
num_workers: int = 4,
|
||||
):
|
||||
root = HF_LEROBOT_HOME / repo_id
|
||||
snapshot_download(
|
||||
repo_id,
|
||||
repo_type="dataset",
|
||||
revision=V21,
|
||||
local_dir=root,
|
||||
)
|
||||
|
||||
# Concatenate videos
|
||||
|
||||
# Create
|
||||
|
||||
"""
|
||||
-------------------------
|
||||
OLD
|
||||
data/chunk-000/episode_000000.parquet
|
||||
|
||||
NEW
|
||||
data/chunk-000/file_000.parquet
|
||||
-------------------------
|
||||
OLD
|
||||
videos/chunk-000/CAMERA/episode_000000.mp4
|
||||
|
||||
NEW
|
||||
videos/chunk-000/file_000.mp4
|
||||
-------------------------
|
||||
OLD
|
||||
episodes.jsonl
|
||||
{"episode_index": 1, "tasks": ["Put the blue block in the green bowl"], "length": 266}
|
||||
|
||||
NEW
|
||||
meta/episodes/chunk-000/episodes_000.parquet
|
||||
episode_index | video_chunk_index | video_file_index | data_chunk_index | data_file_index | tasks | length
|
||||
-------------------------
|
||||
OLD
|
||||
tasks.jsonl
|
||||
{"task_index": 1, "task": "Put the blue block in the green bowl"}
|
||||
|
||||
NEW
|
||||
meta/tasks/chunk-000/file_000.parquet
|
||||
task_index | task
|
||||
-------------------------
|
||||
OLD
|
||||
episodes_stats.jsonl
|
||||
|
||||
NEW
|
||||
meta/episodes_stats/chunk-000/file_000.parquet
|
||||
episode_index | mean | std | min | max
|
||||
-------------------------
|
||||
UPDATE
|
||||
meta/info.json
|
||||
-------------------------
|
||||
"""
|
||||
|
||||
new_root = HF_LEROBOT_HOME / f"{repo_id}_v30"
|
||||
new_root.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
episodes_stats = load_episodes_stats(root)
|
||||
hf_dataset = Dataset.from_dict(episodes_stats) # noqa: F841
|
||||
|
||||
meta_ep_st_ch = new_root / "meta/episodes_stats/chunk-000"
|
||||
meta_ep_st_ch.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# hf_dataset.to_parquet(meta_ep_st_ch / 'file_000.parquet')
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--repo-id",
|
||||
type=str,
|
||||
required=True,
|
||||
help="Repository identifier on Hugging Face: a community or a user name `/` the name of the dataset "
|
||||
"(e.g. `lerobot/pusht`, `cadene/aloha_sim_insertion_human`).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--branch",
|
||||
type=str,
|
||||
default=None,
|
||||
help="Repo branch to push your dataset. Defaults to the main branch.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--num-workers",
|
||||
type=int,
|
||||
default=4,
|
||||
help="Number of workers for parallelizing stats compute. Defaults to 4.",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
convert_dataset(**vars(args))
|
|
@ -252,7 +252,7 @@ def encode_video_frames(
|
|||
g: int | None = 2,
|
||||
crf: int | None = 30,
|
||||
fast_decode: int = 0,
|
||||
log_level: str | None = "error",
|
||||
log_level: str | None = "quiet",
|
||||
overwrite: bool = False,
|
||||
) -> None:
|
||||
"""More info on ffmpeg arguments tuning on `benchmark/video/README.md`"""
|
||||
|
|
|
@ -512,13 +512,13 @@ if __name__ == "__main__":
|
|||
)
|
||||
parser.add_argument(
|
||||
"--width",
|
||||
type=str,
|
||||
type=int,
|
||||
default=640,
|
||||
help="Set the width for all cameras. If not provided, use the default width of each camera.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--height",
|
||||
type=str,
|
||||
type=int,
|
||||
default=480,
|
||||
help="Set the height for all cameras. If not provided, use the default height of each camera.",
|
||||
)
|
||||
|
|
|
@ -492,13 +492,13 @@ if __name__ == "__main__":
|
|||
)
|
||||
parser.add_argument(
|
||||
"--width",
|
||||
type=str,
|
||||
type=int,
|
||||
default=None,
|
||||
help="Set the width for all cameras. If not provided, use the default width of each camera.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--height",
|
||||
type=str,
|
||||
type=int,
|
||||
default=None,
|
||||
help="Set the height for all cameras. If not provided, use the default height of each camera.",
|
||||
)
|
||||
|
|
|
@ -228,3 +228,13 @@ def is_valid_numpy_dtype_string(dtype_str: str) -> bool:
|
|||
except TypeError:
|
||||
# If a TypeError is raised, the string is not a valid dtype
|
||||
return False
|
||||
|
||||
|
||||
def get_elapsed_time_in_days_hours_minutes_seconds(elapsed_time_s: float):
|
||||
days = int(elapsed_time_s // (24 * 3600))
|
||||
elapsed_time_s %= 24 * 3600
|
||||
hours = int(elapsed_time_s // 3600)
|
||||
elapsed_time_s %= 3600
|
||||
minutes = int(elapsed_time_s // 60)
|
||||
seconds = elapsed_time_s % 60
|
||||
return days, hours, minutes, seconds
|
||||
|
|
|
@ -174,7 +174,10 @@ def run_server(
|
|||
dataset.meta.get_video_file_path(episode_id, key) for key in dataset.meta.video_keys
|
||||
]
|
||||
videos_info = [
|
||||
{"url": url_for("static", filename=video_path), "filename": video_path.parent.name}
|
||||
{
|
||||
"url": url_for("static", filename=str(video_path).replace("\\", "/")),
|
||||
"filename": video_path.parent.name,
|
||||
}
|
||||
for video_path in video_paths
|
||||
]
|
||||
tasks = dataset.meta.episodes[episode_id]["tasks"]
|
||||
|
@ -381,7 +384,7 @@ def visualize_dataset_html(
|
|||
if isinstance(dataset, LeRobotDataset):
|
||||
ln_videos_dir = static_dir / "videos"
|
||||
if not ln_videos_dir.exists():
|
||||
ln_videos_dir.symlink_to((dataset.root / "videos").resolve())
|
||||
ln_videos_dir.symlink_to((dataset.root / "videos").resolve().as_posix())
|
||||
|
||||
if serve:
|
||||
run_server(dataset, episodes, host, port, static_dir, template_dir)
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
from lerobot.common.datasets.aggregate import aggregate_datasets
|
||||
from tests.fixtures.constants import DUMMY_REPO_ID
|
||||
|
||||
|
||||
def test_aggregate_datasets(tmp_path, lerobot_dataset_factory):
|
||||
dataset_0 = lerobot_dataset_factory(
|
||||
root=tmp_path / "test_0",
|
||||
repo_id=DUMMY_REPO_ID + "_0",
|
||||
total_episodes=10,
|
||||
total_frames=400,
|
||||
)
|
||||
dataset_1 = lerobot_dataset_factory(
|
||||
root=tmp_path / "test_1",
|
||||
repo_id=DUMMY_REPO_ID + "_1",
|
||||
total_episodes=10,
|
||||
total_frames=400,
|
||||
)
|
||||
|
||||
dataset_2 = aggregate_datasets([dataset_0, dataset_1])
|
Loading…
Reference in New Issue