lerobot/lerobot/scripts/control_sim_robot.py

562 lines
19 KiB
Python

# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utilities to control a robot in simulation.
Useful to record a dataset, replay a recorded episode and record an evaluation dataset.
Examples of usage:
- Unlimited teleoperation at a limited frequency of 30 Hz, to simulate data recording frequency.
You can modify this value depending on how fast your simulation can run:
```bash
python lerobot/scripts/control_robot.py teleoperate \
--fps 30 \
--robot-path lerobot/configs/robot/your_robot_config.yaml \
--sim-config lerobot/configs/env/your_sim_config.yaml
```
- Record one episode in order to test replay:
```bash
python lerobot/scripts/control_sim_robot.py record \
--robot-path lerobot/configs/robot/your_robot_config.yaml \
--sim-config lerobot/configs/env/your_sim_config.yaml \
--fps 30 \
--repo-id $USER/robot_sim_test \
--num-episodes 1 \
--run-compute-stats 0
```
Enable the --push-to-hub 1 to push the recorded dataset to the huggingface hub.
- Visualize dataset:
```bash
python lerobot/scripts/visualize_dataset.py \
--repo-id $USER/robot_sim_test \
--episode-index 0
```
- Replay a sequence of test episodes:
```bash
python lerobot/scripts/control_sim_robot.py replay \
--robot-path lerobot/configs/robot/your_robot_config.yaml \
--sim-config lerobot/configs/env/your_sim_config.yaml \
--fps 30 \
--repo-id $USER/robot_sim_test \
--episode 0
```
Note: The seed is saved, therefore, during replay we can load the same environment state as the one during collection.
- Record a full dataset in order to train a policy,
30 seconds of recording for each episode, and 10 seconds to reset the environment in between episodes:
```bash
python lerobot/scripts/control_sim_robot.py record \
--robot-path lerobot/configs/robot/your_robot_config.yaml \
--sim-config lerobot/configs/env/your_sim_config.yaml \
--fps 30 \
--repo-id $USER/robot_sim_test \
--num-episodes 50 \
--episode-time-s 30 \
```
**NOTE**: You can use your keyboard to control data recording flow.
- Tap right arrow key '->' to early exit while recording an episode and go to resetting the environment.
- Tap right arrow key '->' to early exit while resetting the environment and got to recording the next episode.
- Tap left arrow key '<-' to early exit and re-record the current episode.
- Tap escape key 'esc' to stop the data recording.
This might require a sudo permission to allow your terminal to monitor keyboard events.
**NOTE**: You can resume/continue data recording by running the same data recording command twice.
"""
import argparse
import importlib
import logging
import time
from pathlib import Path
import cv2
import gymnasium as gym
import numpy as np
import torch
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
from lerobot.common.robot_devices.control_utils import (
init_keyboard_listener,
init_policy,
is_headless,
log_control_info,
predict_action,
sanity_check_dataset_name,
sanity_check_dataset_robot_compatibility,
stop_recording,
)
from lerobot.common.robot_devices.robots.utils import Robot, make_robot
from lerobot.common.robot_devices.utils import busy_wait
from lerobot.common.utils.utils import init_hydra_config, init_logging, log_say
raise NotImplementedError("This script is currently deactivated")
DEFAULT_FEATURES = {
"next.reward": {
"dtype": "float32",
"shape": (1,),
"names": None,
},
"next.success": {
"dtype": "bool",
"shape": (1,),
"names": None,
},
"seed": {
"dtype": "int64",
"shape": (1,),
"names": None,
},
"timestamp": {
"dtype": "float32",
"shape": (1,),
"names": None,
},
}
########################################################################################
# Utilities
########################################################################################
def none_or_int(value):
if value == "None":
return None
return int(value)
def init_sim_calibration(robot, cfg):
# Constants necessary for transforming the joint pos of the real robot to the sim
# depending on the robot description used in that sim.
start_pos = np.array(robot.leader_arms.main.calibration["start_pos"])
axis_directions = np.array(cfg.get("axis_directions", [1]))
offsets = np.array(cfg.get("offsets", [0])) * np.pi
return {"start_pos": start_pos, "axis_directions": axis_directions, "offsets": offsets}
def real_positions_to_sim(real_positions, axis_directions, start_pos, offsets):
"""Counts - starting position -> radians -> align axes -> offset"""
return axis_directions * (real_positions - start_pos) * 2.0 * np.pi / 4096 + offsets
########################################################################################
# Control modes
########################################################################################
def teleoperate(env, robot: Robot, process_action_fn, teleop_time_s=None):
env = env()
env.reset()
start_teleop_t = time.perf_counter()
while True:
leader_pos = robot.leader_arms.main.read("Present_Position")
action = process_action_fn(leader_pos)
env.step(np.expand_dims(action, 0))
if teleop_time_s is not None and time.perf_counter() - start_teleop_t > teleop_time_s:
print("Teleoperation processes finished.")
break
def record(
env,
robot: Robot,
process_action_from_leader,
root: Path,
repo_id: str,
task: str,
fps: int | None = None,
tags: list[str] | None = None,
pretrained_policy_name_or_path: str = None,
policy_overrides: bool | None = None,
episode_time_s: int = 30,
num_episodes: int = 50,
video: bool = True,
push_to_hub: bool = True,
num_image_writer_processes: int = 0,
num_image_writer_threads_per_camera: int = 4,
display_cameras: bool = False,
play_sounds: bool = True,
resume: bool = False,
local_files_only: bool = False,
run_compute_stats: bool = True,
) -> LeRobotDataset:
# Load pretrained policy
policy = None
if pretrained_policy_name_or_path is not None:
policy, policy_fps, device, use_amp = init_policy(pretrained_policy_name_or_path, policy_overrides)
if fps is None:
fps = policy_fps
logging.warning(f"No fps provided, so using the fps from policy config ({policy_fps}).")
if policy is None and process_action_from_leader is None:
raise ValueError("Either policy or process_action_fn has to be set to enable control in sim.")
# initialize listener before sim env
listener, events = init_keyboard_listener()
# create sim env
env = env()
# Create empty dataset or load existing saved episodes
num_cameras = sum([1 if "image" in key else 0 for key in env.observation_space])
# get image keys
image_keys = [key for key in env.observation_space if "image" in key]
state_keys_dict = env_cfg.state_keys
if resume:
dataset = LeRobotDataset(
repo_id,
root=root,
local_files_only=local_files_only,
)
dataset.start_image_writer(
num_processes=num_image_writer_processes,
num_threads=num_image_writer_threads_per_camera * num_cameras,
)
sanity_check_dataset_robot_compatibility(dataset, robot, fps, video)
else:
features = DEFAULT_FEATURES
# add image keys to features
for key in image_keys:
shape = env.observation_space[key].shape
if not key.startswith("observation.image."):
key = "observation.image." + key
features[key] = {"dtype": "video", "names": ["channels", "height", "width"], "shape": shape}
for key, obs_key in state_keys_dict.items():
features[key] = {
"dtype": "float32",
"names": None,
"shape": env.observation_space[obs_key].shape,
}
features["action"] = {"dtype": "float32", "shape": env.action_space.shape, "names": None}
# Create empty dataset or load existing saved episodes
sanity_check_dataset_name(repo_id, policy)
dataset = LeRobotDataset.create(
repo_id,
fps,
root=root,
features=features,
use_videos=video,
image_writer_processes=num_image_writer_processes,
image_writer_threads=num_image_writer_threads_per_camera * num_cameras,
)
recorded_episodes = 0
while True:
log_say(f"Recording episode {dataset.num_episodes}", play_sounds)
if events is None:
events = {"exit_early": False}
if episode_time_s is None:
episode_time_s = float("inf")
timestamp = 0
start_episode_t = time.perf_counter()
seed = np.random.randint(0, 1e5)
observation, info = env.reset(seed=seed)
while timestamp < episode_time_s:
start_loop_t = time.perf_counter()
if policy is not None:
action = predict_action(observation, policy, device, use_amp)
else:
leader_pos = robot.leader_arms.main.read("Present_Position")
action = process_action_from_leader(leader_pos)
observation, reward, terminated, _, info = env.step(action)
success = info.get("is_success", False)
env_timestamp = info.get("timestamp", dataset.episode_buffer["size"] / fps)
frame = {
"action": torch.from_numpy(action),
"next.reward": reward,
"next.success": success,
"seed": seed,
"timestamp": env_timestamp,
}
for key in image_keys:
if not key.startswith("observation.image"):
frame["observation.image." + key] = observation[key]
else:
frame[key] = observation[key]
for key, obs_key in state_keys_dict.items():
frame[key] = torch.from_numpy(observation[obs_key])
dataset.add_frame(frame)
if display_cameras and not is_headless():
for key in image_keys:
cv2.imshow(key, cv2.cvtColor(observation[key], cv2.COLOR_RGB2BGR))
cv2.waitKey(1)
if fps is not None:
dt_s = time.perf_counter() - start_loop_t
busy_wait(1 / fps - dt_s)
dt_s = time.perf_counter() - start_loop_t
log_control_info(robot, dt_s, fps=fps)
timestamp = time.perf_counter() - start_episode_t
if events["exit_early"] or terminated:
events["exit_early"] = False
break
if events["rerecord_episode"]:
log_say("Re-record episode", play_sounds)
events["rerecord_episode"] = False
events["exit_early"] = False
dataset.clear_episode_buffer()
continue
dataset.save_episode(task=task)
recorded_episodes += 1
if events["stop_recording"] or recorded_episodes >= num_episodes:
break
else:
logging.info("Waiting for a few seconds before starting next episode recording...")
busy_wait(3)
log_say("Stop recording", play_sounds, blocking=True)
stop_recording(robot, listener, display_cameras)
if run_compute_stats:
logging.info("Computing dataset statistics")
dataset.consolidate(run_compute_stats)
if push_to_hub:
dataset.push_to_hub(tags=tags)
log_say("Exiting", play_sounds)
return dataset
def replay(
env, root: Path, repo_id: str, episode: int, fps: int | None = None, local_files_only: bool = True
):
env = env()
local_dir = Path(root) / repo_id
if not local_dir.exists():
raise ValueError(local_dir)
dataset = LeRobotDataset(repo_id, root=root, local_files_only=local_files_only)
items = dataset.hf_dataset.select_columns("action")
seeds = dataset.hf_dataset.select_columns("seed")["seed"]
from_idx = dataset.episode_data_index["from"][episode].item()
to_idx = dataset.episode_data_index["to"][episode].item()
env.reset(seed=seeds[from_idx].item())
logging.info("Replaying episode")
log_say("Replaying episode", play_sounds=True)
for idx in range(from_idx, to_idx):
start_episode_t = time.perf_counter()
action = items[idx]["action"]
env.step(action.unsqueeze(0).numpy())
dt_s = time.perf_counter() - start_episode_t
busy_wait(1 / fps - dt_s)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="mode", required=True)
# Set common options for all the subparsers
base_parser = argparse.ArgumentParser(add_help=False)
base_parser.add_argument(
"--robot-path",
type=str,
default="lerobot/configs/robot/koch.yaml",
help="Path to robot yaml file used to instantiate the robot using `make_robot` factory function.",
)
base_parser.add_argument(
"--sim-config",
help="Path to a yaml config you want to use for initializing a sim environment based on gym ",
)
parser_record = subparsers.add_parser("teleoperate", parents=[base_parser])
parser_record = subparsers.add_parser("record", parents=[base_parser])
parser_record.add_argument(
"--fps", type=none_or_int, default=None, help="Frames per second (set to None to disable)"
)
parser_record.add_argument(
"--root",
type=Path,
default=None,
help="Root directory where the dataset will be stored locally at '{root}/{repo_id}' (e.g. 'data/hf_username/dataset_name').",
)
parser_record.add_argument(
"--repo-id",
type=str,
default="lerobot/test",
help="Dataset identifier. By convention it should match '{hf_username}/{dataset_name}' (e.g. `lerobot/test`).",
)
parser_record.add_argument(
"--episode-time-s",
type=int,
default=60,
help="Number of seconds for data recording for each episode.",
)
parser_record.add_argument(
"--task",
type=str,
required=True,
help="A description of the task preformed during recording that can be used as a language instruction.",
)
parser_record.add_argument("--num-episodes", type=int, default=50, help="Number of episodes to record.")
parser_record.add_argument(
"--run-compute-stats",
type=int,
default=1,
help="By default, run the computation of the data statistics at the end of data collection. Compute intensive and not required to just replay an episode.",
)
parser_record.add_argument(
"--push-to-hub",
type=int,
default=1,
help="Upload dataset to Hugging Face hub.",
)
parser_record.add_argument(
"--tags",
type=str,
nargs="*",
help="Add tags to your dataset on the hub.",
)
parser_record.add_argument(
"--num-image-writer-processes",
type=int,
default=0,
help=(
"Number of subprocesses handling the saving of frames as PNG. Set to 0 to use threads only; "
"set to ≥1 to use subprocesses, each using threads to write images. The best number of processes "
"and threads depends on your system. We recommend 4 threads per camera with 0 processes. "
"If fps is unstable, adjust the thread count. If still unstable, try using 1 or more subprocesses."
),
)
parser_record.add_argument(
"--num-image-writer-threads-per-camera",
type=int,
default=4,
help=(
"Number of threads writing the frames as png images on disk, per camera. "
"Too much threads might cause unstable teleoperation fps due to main thread being blocked. "
"Not enough threads might cause low camera fps."
),
)
parser_record.add_argument(
"--display-cameras",
type=int,
default=0,
help="Visualize image observations with opencv.",
)
parser_record.add_argument(
"--resume",
type=int,
default=0,
help="Resume recording on an existing dataset.",
)
parser_replay = subparsers.add_parser("replay", parents=[base_parser])
parser_replay.add_argument(
"--fps", type=none_or_int, default=None, help="Frames per second (set to None to disable)"
)
parser_replay.add_argument(
"--root",
type=Path,
default=None,
help="Root directory where the dataset will be stored locally (e.g. 'data/hf_username/dataset_name'). By default, stored in cache folder.",
)
parser_replay.add_argument(
"--repo-id",
type=str,
default="lerobot/test",
help="Dataset identifier. By convention it should match '{hf_username}/{dataset_name}' (e.g. `lerobot/test`).",
)
parser_replay.add_argument("--episode", type=int, default=0, help="Index of the episodes to replay.")
args = parser.parse_args()
init_logging()
control_mode = args.mode
robot_path = args.robot_path
env_config_path = args.sim_config
kwargs = vars(args)
del kwargs["mode"]
del kwargs["robot_path"]
del kwargs["sim_config"]
# make gym env
env_cfg = init_hydra_config(env_config_path)
importlib.import_module(f"gym_{env_cfg.env.type}")
def env_constructor():
return gym.make(env_cfg.env.handle, disable_env_checker=True, **env_cfg.env.gym)
robot = None
process_leader_actions_fn = None
if control_mode in ["teleoperate", "record"]:
# make robot
robot_overrides = ["~cameras", "~follower_arms"]
# TODO(rcadene): remove
robot_cfg = init_hydra_config(robot_path, robot_overrides)
robot = make_robot(robot_cfg)
robot.connect()
calib_kwgs = init_sim_calibration(robot, env_cfg.calibration)
def process_leader_actions_fn(action):
return real_positions_to_sim(action, **calib_kwgs)
robot.leader_arms.main.calibration = None
if control_mode == "teleoperate":
teleoperate(env_constructor, robot, process_leader_actions_fn)
elif control_mode == "record":
record(env_constructor, robot, process_leader_actions_fn, **kwargs)
elif control_mode == "replay":
replay(env_constructor, **kwargs)
else:
raise ValueError(
f"Invalid control mode: '{control_mode}', only valid modes are teleoperate, record and replay."
)
if robot and robot.is_connected:
# Disconnect manually to avoid a "Core dump" during process
# termination due to camera threads not properly exiting.
robot.disconnect()