""" Utilities to control a robot. Useful to record a dataset, replay a recorded episode, run the policy on your robot and record an evaluation dataset, and to recalibrate your robot if needed. Examples of usage: - Recalibrate your robot: ```bash python lerobot/scripts/control_robot.py calibrate ``` - Unlimited teleoperation at highest frequency (~200 Hz is expected), to exit with CTRL+C: ```bash python lerobot/scripts/control_robot.py teleoperate # Remove the cameras from the robot definition. They are not used in 'teleoperate' anyway. python lerobot/scripts/control_robot.py teleoperate --robot-overrides '~cameras' ``` - Unlimited teleoperation at a limited frequency of 30 Hz, to simulate data recording frequency: ```bash python lerobot/scripts/control_robot.py teleoperate \ --fps 30 ``` - Record one episode in order to test replay: ```bash python lerobot/scripts/control_robot.py record \ --fps 30 \ --repo-id $USER/koch_test \ --num-episodes 1 \ --run-compute-stats 0 ``` - Visualize dataset: ```bash python lerobot/scripts/visualize_dataset.py \ --repo-id $USER/koch_test \ --episode-index 0 ``` - Replay this test episode: ```bash python lerobot/scripts/control_robot.py replay \ --fps 30 \ --repo-id $USER/koch_test \ --episode 0 ``` - Record a full dataset in order to train a policy, with 2 seconds of warmup, 30 seconds of recording for each episode, and 10 seconds to reset the environment in between episodes: ```bash python lerobot/scripts/control_robot.py record \ --fps 30 \ --repo-id $USER/koch_pick_place_lego \ --num-episodes 50 \ --warmup-time-s 2 \ --episode-time-s 30 \ --reset-time-s 10 ``` **NOTE**: You can use your keyboard to control data recording flow. - Tap right arrow key '->' to early exit while recording an episode and go to resseting the environment. - Tap right arrow key '->' to early exit while resetting the environment and got to recording the next episode. - Tap left arrow key '<-' to early exit and re-record the current episode. - Tap escape key 'esc' to stop the data recording. This might require a sudo permission to allow your terminal to monitor keyboard events. **NOTE**: You can resume/continue data recording by running the same data recording command and adding `--resume 1`. If the dataset you want to extend is not on the hub, you also need to add `--local-files-only 1`. - Train on this dataset with the ACT policy: ```bash python lerobot/scripts/train.py \ policy=act_koch_real \ env=koch_real \ dataset_repo_id=$USER/koch_pick_place_lego \ hydra.run.dir=outputs/train/act_koch_real ``` - Run the pretrained policy on the robot: ```bash python lerobot/scripts/control_robot.py record \ --fps 30 \ --repo-id $USER/eval_act_koch_real \ --num-episodes 10 \ --warmup-time-s 2 \ --episode-time-s 30 \ --reset-time-s 10 -p outputs/train/act_koch_real/checkpoints/080000/pretrained_model ``` """ import argparse import logging import time from pathlib import Path from typing import List # from safetensors.torch import load_file, save_file from lerobot.common.datasets.lerobot_dataset import LeRobotDataset from lerobot.common.robot_devices.control_utils import ( control_loop, has_method, init_keyboard_listener, init_policy, log_control_info, record_episode, reset_environment, sanity_check_dataset_name, sanity_check_dataset_robot_compatibility, stop_recording, warmup_record, ) from lerobot.common.robot_devices.robots.factory import make_robot from lerobot.common.robot_devices.robots.utils import Robot from lerobot.common.robot_devices.utils import busy_wait, safe_disconnect from lerobot.common.utils.utils import init_hydra_config, init_logging, log_say, none_or_int ######################################################################################## # Control modes ######################################################################################## @safe_disconnect def calibrate(robot: Robot, arms: list[str] | None): # TODO(aliberts): move this code in robots' classes if robot.robot_type.startswith("stretch"): if not robot.is_connected: robot.connect() if not robot.is_homed(): robot.home() return if arms is None: arms = robot.available_arms unknown_arms = [arm_id for arm_id in arms if arm_id not in robot.available_arms] available_arms_str = " ".join(robot.available_arms) unknown_arms_str = " ".join(unknown_arms) if arms is None or len(arms) == 0: raise ValueError( "No arm provided. Use `--arms` as argument with one or more available arms.\n" f"For instance, to recalibrate all arms add: `--arms {available_arms_str}`" ) if len(unknown_arms) > 0: raise ValueError( f"Unknown arms provided ('{unknown_arms_str}'). Available arms are `{available_arms_str}`." ) for arm_id in arms: arm_calib_path = robot.calibration_dir / f"{arm_id}.json" if arm_calib_path.exists(): print(f"Removing '{arm_calib_path}'") arm_calib_path.unlink() else: print(f"Calibration file not found '{arm_calib_path}'") if robot.is_connected: robot.disconnect() # Calling `connect` automatically runs calibration # when the calibration file is missing robot.connect() robot.disconnect() print("Calibration is done! You can now teleoperate and record datasets!") @safe_disconnect def teleoperate( robot: Robot, fps: int | None = None, teleop_time_s: float | None = None, display_cameras: bool = False ): control_loop( robot, control_time_s=teleop_time_s, fps=fps, teleoperate=True, display_cameras=display_cameras, ) @safe_disconnect def record( robot: Robot, root: Path, repo_id: str, single_task: str, pretrained_policy_name_or_path: str | None = None, policy_overrides: List[str] | None = None, fps: int | None = None, warmup_time_s: int | float = 2, episode_time_s: int | float = 10, reset_time_s: int | float = 5, num_episodes: int = 50, video: bool = True, run_compute_stats: bool = True, push_to_hub: bool = True, tags: list[str] | None = None, num_image_writer_processes: int = 0, num_image_writer_threads_per_camera: int = 4, display_cameras: bool = True, play_sounds: bool = True, resume: bool = False, # TODO(rcadene, aliberts): remove local_files_only when refactor with dataset as argument local_files_only: bool = False, ) -> LeRobotDataset: # TODO(rcadene): Add option to record logs listener = None events = None policy = None device = None use_amp = None if single_task: task = single_task else: raise NotImplementedError("Only single-task recording is supported for now") # Load pretrained policy if pretrained_policy_name_or_path is not None: policy, policy_fps, device, use_amp = init_policy(pretrained_policy_name_or_path, policy_overrides) if fps is None: fps = policy_fps logging.warning(f"No fps provided, so using the fps from policy config ({policy_fps}).") elif fps != policy_fps: logging.warning( f"There is a mismatch between the provided fps ({fps}) and the one from policy config ({policy_fps})." ) if resume: dataset = LeRobotDataset( repo_id, root=root, local_files_only=local_files_only, ) dataset.start_image_writer( num_processes=num_image_writer_processes, num_threads=num_image_writer_threads_per_camera * len(robot.cameras), ) sanity_check_dataset_robot_compatibility(dataset, robot, fps, video) else: # Create empty dataset or load existing saved episodes sanity_check_dataset_name(repo_id, policy) dataset = LeRobotDataset.create( repo_id, fps, root=root, robot=robot, use_videos=video, image_writer_processes=num_image_writer_processes, image_writer_threads=num_image_writer_threads_per_camera * len(robot.cameras), ) if not robot.is_connected: robot.connect() listener, events = init_keyboard_listener() # Execute a few seconds without recording to: # 1. teleoperate the robot to move it in starting position if no policy provided, # 2. give times to the robot devices to connect and start synchronizing, # 3. place the cameras windows on screen enable_teleoperation = policy is None log_say("Warmup record", play_sounds) warmup_record(robot, events, enable_teleoperation, warmup_time_s, display_cameras, fps) if has_method(robot, "teleop_safety_stop"): robot.teleop_safety_stop() recorded_episodes = 0 while True: if recorded_episodes >= num_episodes: break # TODO(aliberts): add task prompt for multitask here. Might need to temporarily disable event if # input() messes with them. # if multi_task: # task = input("Enter your task description: ") log_say(f"Recording episode {dataset.num_episodes}", play_sounds) record_episode( dataset=dataset, robot=robot, events=events, episode_time_s=episode_time_s, display_cameras=display_cameras, policy=policy, device=device, use_amp=use_amp, fps=fps, ) # Execute a few seconds without recording to give time to manually reset the environment # Current code logic doesn't allow to teleoperate during this time. # TODO(rcadene): add an option to enable teleoperation during reset # Skip reset for the last episode to be recorded if not events["stop_recording"] and ( (recorded_episodes < num_episodes - 1) or events["rerecord_episode"] ): log_say("Reset the environment", play_sounds) reset_environment(robot, events, reset_time_s) if events["rerecord_episode"]: log_say("Re-record episode", play_sounds) events["rerecord_episode"] = False events["exit_early"] = False dataset.clear_episode_buffer() continue dataset.save_episode(task) recorded_episodes += 1 if events["stop_recording"]: break log_say("Stop recording", play_sounds, blocking=True) stop_recording(robot, listener, display_cameras) if run_compute_stats: logging.info("Computing dataset statistics") dataset.consolidate(run_compute_stats) if push_to_hub: dataset.push_to_hub(tags=tags) log_say("Exiting", play_sounds) return dataset @safe_disconnect def replay( robot: Robot, root: Path, repo_id: str, episode: int, fps: int | None = None, play_sounds: bool = True, local_files_only: bool = False, ): # TODO(rcadene, aliberts): refactor with control_loop, once `dataset` is an instance of LeRobotDataset # TODO(rcadene): Add option to record logs dataset = LeRobotDataset(repo_id, root=root, episodes=[episode], local_files_only=local_files_only) actions = dataset.hf_dataset.select_columns("action") if not robot.is_connected: robot.connect() log_say("Replaying episode", play_sounds, blocking=True) for idx in range(dataset.num_frames): start_episode_t = time.perf_counter() action = actions[idx]["action"] robot.send_action(action) dt_s = time.perf_counter() - start_episode_t busy_wait(1 / fps - dt_s) dt_s = time.perf_counter() - start_episode_t log_control_info(robot, dt_s, fps=fps) if __name__ == "__main__": parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest="mode", required=True) # Set common options for all the subparsers base_parser = argparse.ArgumentParser(add_help=False) base_parser.add_argument( "--robot-path", type=str, default="lerobot/configs/robot/koch.yaml", help="Path to robot yaml file used to instantiate the robot using `make_robot` factory function.", ) base_parser.add_argument( "--robot-overrides", type=str, nargs="*", help="Any key=value arguments to override config values (use dots for.nested=overrides)", ) parser_calib = subparsers.add_parser("calibrate", parents=[base_parser]) parser_calib.add_argument( "--arms", type=str, nargs="*", help="List of arms to calibrate (e.g. `--arms left_follower right_follower left_leader`)", ) parser_teleop = subparsers.add_parser("teleoperate", parents=[base_parser]) parser_teleop.add_argument( "--fps", type=none_or_int, default=None, help="Frames per second (set to None to disable)" ) parser_teleop.add_argument( "--display-cameras", type=int, default=1, help="Display all cameras on screen (set to 1 to display or 0).", ) parser_record = subparsers.add_parser("record", parents=[base_parser]) task_args = parser_record.add_mutually_exclusive_group(required=True) parser_record.add_argument( "--fps", type=none_or_int, default=None, help="Frames per second (set to None to disable)" ) task_args.add_argument( "--single-task", type=str, help="A short but accurate description of the task performed during the recording.", ) # TODO(aliberts): add multi-task support # task_args.add_argument( # "--multi-task", # type=int, # help="You will need to enter the task performed at the start of each episode.", # ) parser_record.add_argument( "--root", type=Path, default=None, help="Root directory where the dataset will be stored (e.g. 'dataset/path').", ) parser_record.add_argument( "--repo-id", type=str, default="lerobot/test", help="Dataset identifier. By convention it should match '{hf_username}/{dataset_name}' (e.g. `lerobot/test`).", ) parser_record.add_argument( "--local-files-only", type=int, default=0, help="Use local files only. By default, this script will try to fetch the dataset from the hub if it exists.", ) parser_record.add_argument( "--warmup-time-s", type=int, default=10, help="Number of seconds before starting data collection. It allows the robot devices to warmup and synchronize.", ) parser_record.add_argument( "--episode-time-s", type=int, default=60, help="Number of seconds for data recording for each episode.", ) parser_record.add_argument( "--reset-time-s", type=int, default=60, help="Number of seconds for resetting the environment after each episode.", ) parser_record.add_argument("--num-episodes", type=int, default=50, help="Number of episodes to record.") parser_record.add_argument( "--run-compute-stats", type=int, default=1, help="By default, run the computation of the data statistics at the end of data collection. Compute intensive and not required to just replay an episode.", ) parser_record.add_argument( "--push-to-hub", type=int, default=1, help="Upload dataset to Hugging Face hub.", ) parser_record.add_argument( "--tags", type=str, nargs="*", help="Add tags to your dataset on the hub.", ) parser_record.add_argument( "--num-image-writer-processes", type=int, default=0, help=( "Number of subprocesses handling the saving of frames as PNGs. Set to 0 to use threads only; " "set to ≥1 to use subprocesses, each using threads to write images. The best number of processes " "and threads depends on your system. We recommend 4 threads per camera with 0 processes. " "If fps is unstable, adjust the thread count. If still unstable, try using 1 or more subprocesses." ), ) parser_record.add_argument( "--num-image-writer-threads-per-camera", type=int, default=4, help=( "Number of threads writing the frames as png images on disk, per camera. " "Too many threads might cause unstable teleoperation fps due to main thread being blocked. " "Not enough threads might cause low camera fps." ), ) parser_record.add_argument( "--resume", type=int, default=0, help="Resume recording on an existing dataset.", ) parser_record.add_argument( "-p", "--pretrained-policy-name-or-path", type=str, help=( "Either the repo ID of a model hosted on the Hub or a path to a directory containing weights " "saved using `Policy.save_pretrained`." ), ) parser_record.add_argument( "--policy-overrides", type=str, nargs="*", help="Any key=value arguments to override config values (use dots for.nested=overrides)", ) parser_replay = subparsers.add_parser("replay", parents=[base_parser]) parser_replay.add_argument( "--fps", type=none_or_int, default=None, help="Frames per second (set to None to disable)" ) parser_replay.add_argument( "--root", type=Path, default=None, help="Root directory where the dataset will be stored (e.g. 'dataset/path').", ) parser_replay.add_argument( "--repo-id", type=str, default="lerobot/test", help="Dataset identifier. By convention it should match '{hf_username}/{dataset_name}' (e.g. `lerobot/test`).", ) parser_replay.add_argument( "--local-files-only", type=int, default=0, help="Use local files only. By default, this script will try to fetch the dataset from the hub if it exists.", ) parser_replay.add_argument("--episode", type=int, default=0, help="Index of the episode to replay.") args = parser.parse_args() init_logging() control_mode = args.mode robot_path = args.robot_path robot_overrides = args.robot_overrides kwargs = vars(args) del kwargs["mode"] del kwargs["robot_path"] del kwargs["robot_overrides"] robot_cfg = init_hydra_config(robot_path, robot_overrides) robot = make_robot(robot_cfg) if control_mode == "calibrate": calibrate(robot, **kwargs) elif control_mode == "teleoperate": teleoperate(robot, **kwargs) elif control_mode == "record": record(robot, **kwargs) elif control_mode == "replay": replay(robot, **kwargs) if robot.is_connected: # Disconnect manually to avoid a "Core dump" during process # termination due to camera threads not properly exiting. robot.disconnect()