""" Utilities to control a robot. Useful to record a dataset, replay a recorded episode, run the policy on your robot and record an evaluation dataset, and to recalibrate your robot if needed. Examples of usage: - Recalibrate your robot: ```bash python lerobot/scripts/control_robot.py calibrate ``` - Unlimited teleoperation at highest frequency (~200 Hz is expected), to exit with CTRL+C: ```bash python lerobot/scripts/control_robot.py teleoperate # Remove the cameras from the robot definition. They are not used in 'teleoperate' anyway. python lerobot/scripts/control_robot.py teleoperate --robot-overrides '~cameras' ``` - Unlimited teleoperation at a limited frequency of 30 Hz, to simulate data recording frequency: ```bash python lerobot/scripts/control_robot.py teleoperate \ --fps 30 ``` - Record one episode in order to test replay: ```bash python lerobot/scripts/control_robot.py record \ --fps 30 \ --root tmp/data \ --repo-id $USER/koch_test \ --num-episodes 1 \ --run-compute-stats 0 ``` - Visualize dataset: ```bash python lerobot/scripts/visualize_dataset.py \ --root tmp/data \ --repo-id $USER/koch_test \ --episode-index 0 ``` - Replay this test episode: ```bash python lerobot/scripts/control_robot.py replay \ --fps 30 \ --root tmp/data \ --repo-id $USER/koch_test \ --episode 0 ``` - Record a full dataset in order to train a policy, with 2 seconds of warmup, 30 seconds of recording for each episode, and 10 seconds to reset the environment in between episodes: ```bash python lerobot/scripts/control_robot.py record \ --fps 30 \ --root data \ --repo-id $USER/koch_pick_place_lego \ --num-episodes 50 \ --warmup-time-s 2 \ --episode-time-s 30 \ --reset-time-s 10 ``` **NOTE**: You can use your keyboard to control data recording flow. - Tap right arrow key '->' to early exit while recording an episode and go to resseting the environment. - Tap right arrow key '->' to early exit while resetting the environment and got to recording the next episode. - Tap left arrow key '<-' to early exit and re-record the current episode. - Tap escape key 'esc' to stop the data recording. This might require a sudo permission to allow your terminal to monitor keyboard events. **NOTE**: You can resume/continue data recording by running the same data recording command twice. To avoid resuming by deleting the dataset, use `--force-override 1`. - Train on this dataset with the ACT policy: ```bash DATA_DIR=data python lerobot/scripts/train.py \ policy=act_koch_real \ env=koch_real \ dataset_repo_id=$USER/koch_pick_place_lego \ hydra.run.dir=outputs/train/act_koch_real ``` - Run the pretrained policy on the robot: ```bash python lerobot/scripts/control_robot.py record \ --fps 30 \ --root data \ --repo-id $USER/eval_act_koch_real \ --num-episodes 10 \ --warmup-time-s 2 \ --episode-time-s 30 \ --reset-time-s 10 -p outputs/train/act_koch_real/checkpoints/080000/pretrained_model ``` """ import argparse import concurrent.futures import json import logging import multiprocessing import os import platform import shutil import time import traceback from contextlib import nullcontext from functools import cache from pathlib import Path import cv2 import torch import tqdm from omegaconf import DictConfig from PIL import Image from termcolor import colored # from safetensors.torch import load_file, save_file from lerobot.common.datasets.compute_stats import compute_stats from lerobot.common.datasets.lerobot_dataset import CODEBASE_VERSION, LeRobotDataset from lerobot.common.datasets.push_dataset_to_hub.aloha_hdf5_format import to_hf_dataset from lerobot.common.datasets.push_dataset_to_hub.utils import concatenate_episodes, get_default_encoding from lerobot.common.datasets.utils import calculate_episode_data_index, create_branch from lerobot.common.datasets.video_utils import encode_video_frames from lerobot.common.policies.factory import make_policy from lerobot.common.robot_devices.robots.factory import make_robot from lerobot.common.robot_devices.robots.utils import Robot, get_arm_id from lerobot.common.robot_devices.utils import busy_wait, safe_disconnect from lerobot.common.utils.utils import get_safe_torch_device, init_hydra_config, init_logging, set_global_seed from lerobot.scripts.eval import get_pretrained_policy_path from lerobot.scripts.push_dataset_to_hub import ( push_dataset_card_to_hub, push_meta_data_to_hub, push_videos_to_hub, save_meta_data, ) ######################################################################################## # Utilities ######################################################################################## def say(text, blocking=False): # Check if mac, linux, or windows. if platform.system() == "Darwin": cmd = f'say "{text}"' elif platform.system() == "Linux": cmd = f'spd-say "{text}"' elif platform.system() == "Windows": cmd = ( 'PowerShell -Command "Add-Type -AssemblyName System.Speech; ' f"(New-Object System.Speech.Synthesis.SpeechSynthesizer).Speak('{text}')\"" ) if not blocking and platform.system() in ["Darwin", "Linux"]: # TODO(rcadene): Make it work for Windows # Use the ampersand to run command in the background cmd += " &" os.system(cmd) def save_image(img_tensor, key, frame_index, episode_index, videos_dir: str): img = Image.fromarray(img_tensor.numpy()) path = Path(videos_dir) / f"{key}_episode_{episode_index:06d}" / f"frame_{frame_index:06d}.png" path.parent.mkdir(parents=True, exist_ok=True) img.save(str(path), quality=100) def none_or_int(value): if value == "None": return None return int(value) def log_control_info(robot: Robot, dt_s, episode_index=None, frame_index=None, fps=None): log_items = [] if episode_index is not None: log_items.append(f"ep:{episode_index}") if frame_index is not None: log_items.append(f"frame:{frame_index}") def log_dt(shortname, dt_val_s): nonlocal log_items, fps info_str = f"{shortname}:{dt_val_s * 1000:5.2f} ({1/ dt_val_s:3.1f}hz)" if fps is not None: actual_fps = 1 / dt_val_s if actual_fps < fps - 1: info_str = colored(info_str, "yellow") log_items.append(info_str) # total step time displayed in milliseconds and its frequency log_dt("dt", dt_s) # TODO(aliberts): move robot-specific logs logic in robot.print_logs() if not robot.robot_type.startswith("stretch"): for name in robot.leader_arms: key = f"read_leader_{name}_pos_dt_s" if key in robot.logs: log_dt("dtRlead", robot.logs[key]) for name in robot.follower_arms: key = f"write_follower_{name}_goal_pos_dt_s" if key in robot.logs: log_dt("dtWfoll", robot.logs[key]) key = f"read_follower_{name}_pos_dt_s" if key in robot.logs: log_dt("dtRfoll", robot.logs[key]) for name in robot.cameras: key = f"read_camera_{name}_dt_s" if key in robot.logs: log_dt(f"dtR{name}", robot.logs[key]) info_str = " ".join(log_items) logging.info(info_str) @cache def is_headless(): """Detects if python is running without a monitor.""" try: import pynput # noqa return False except Exception: print( "Error trying to import pynput. Switching to headless mode. " "As a result, the video stream from the cameras won't be shown, " "and you won't be able to change the control flow with keyboards. " "For more info, see traceback below.\n" ) traceback.print_exc() print() return True def has_method(_object: object, method_name: str): return hasattr(_object, method_name) and callable(getattr(_object, method_name)) def get_available_arms(robot): # TODO(rcadene): moves this function in manipulator class? available_arms = [] for name in robot.follower_arms: arm_id = get_arm_id(name, "follower") available_arms.append(arm_id) for name in robot.leader_arms: arm_id = get_arm_id(name, "leader") available_arms.append(arm_id) return available_arms ######################################################################################## # Asynchrounous saving of images on disk ######################################################################################## def loop_to_save_images_in_threads(image_queue, num_threads): if num_threads < 1: raise NotImplementedError(f"Only `num_threads>=1` is supported for now, but {num_threads=} given.") with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: futures = [] while True: # Blocks until a frame is available frame_data = image_queue.get() # As usually done, exit loop when receiving None to stop the worker if frame_data is None: break image, key, frame_index, episode_index, videos_dir = frame_data futures.append(executor.submit(save_image, image, key, frame_index, episode_index, videos_dir)) # Before exiting function, wait for all threads to complete with tqdm.tqdm(total=len(futures), desc="Writing images") as progress_bar: concurrent.futures.wait(futures) progress_bar.update(len(futures)) def start_image_writer_processes(image_queue, num_processes, num_threads_per_process): if num_processes < 1: raise ValueError(f"Only `num_processes>=1` is supported, but {num_processes=} given.") if num_threads_per_process < 1: raise NotImplementedError( "Only `num_threads_per_process>=1` is supported for now, but {num_threads_per_process=} given." ) processes = [] for _ in range(num_processes): process = multiprocessing.Process( target=loop_to_save_images_in_threads, args=(image_queue, num_threads_per_process), ) process.start() processes.append(process) return processes def stop_processes(processes, queue, timeout): # Send None to each process to signal them to stop for _ in processes: queue.put(None) # Close the queue, no more items can be put in the queue queue.close() # Wait maximum 20 seconds for all processes to terminate for process in processes: process.join(timeout=timeout) # If not terminated after 20 seconds, force termination if process.is_alive(): process.terminate() # Ensure all background queue threads have finished queue.join_thread() def start_image_writer(num_processes, num_threads): """This function abstract away the initialisation of processes or/and threads to save images on disk asynchrounously, which is critical to control a robot and record data at a high frame rate. When `num_processes=0`, it returns a dictionary containing a threads pool of size `num_threads`. When `num_processes>0`, it returns a dictionary containing a processes pool of size `num_processes`, where each subprocess starts their own threads pool of size `num_threads`. The optimal number of processes and threads depends on your computer capabilities. We advise to use 4 threads per camera with 0 processes. If the fps is not stable, try to increase or lower the number of threads. If it is still not stable, try to use 1 subprocess, or more. """ image_writer = {} if num_processes == 0: futures = [] threads_pool = concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) image_writer["threads_pool"], image_writer["futures"] = threads_pool, futures else: # TODO(rcadene): When using num_processes>1, `multiprocessing.Manager().Queue()` # might be better than `multiprocessing.Queue()`. Source: https://www.geeksforgeeks.org/python-multiprocessing-queue-vs-multiprocessing-manager-queue image_queue = multiprocessing.Queue() processes_pool = start_image_writer_processes( image_queue, num_processes=num_processes, num_threads_per_process=num_threads ) image_writer["processes_pool"], image_writer["image_queue"] = processes_pool, image_queue return image_writer def async_save_image(image_writer, image, key, frame_index, episode_index, videos_dir): """This function abstract away the saving of an image on disk asynchrounously. It uses a dictionary called image writer which contains either a pool of processes or a pool of threads. """ if "threads_pool" in image_writer: threads_pool, futures = image_writer["threads_pool"], image_writer["futures"] futures.append(threads_pool.submit(save_image, image, key, frame_index, episode_index, videos_dir)) else: image_queue = image_writer["image_queue"] image_queue.put((image, key, frame_index, episode_index, videos_dir)) def stop_image_writer(image_writer, timeout): if "threads_pool" in image_writer: futures = image_writer["futures"] # Before exiting function, wait for all threads to complete with tqdm.tqdm(total=len(futures), desc="Writing images") as progress_bar: concurrent.futures.wait(futures, timeout=timeout) progress_bar.update(len(futures)) else: processes_pool, image_queue = image_writer["processes_pool"], image_writer["image_queue"] stop_processes(processes_pool, image_queue, timeout=timeout) ######################################################################################## # Control modes ######################################################################################## @safe_disconnect def calibrate(robot: Robot, arms: list[str] | None): # TODO(aliberts): move this code in robots' classes if robot.robot_type.startswith("stretch"): if not robot.is_connected: robot.connect() if not robot.is_homed(): robot.home() return available_arms = get_available_arms(robot) unknown_arms = [arm_id for arm_id in arms if arm_id not in available_arms] available_arms_str = " ".join(available_arms) unknown_arms_str = " ".join(unknown_arms) if arms is None or len(arms) == 0: raise ValueError( "No arm provided. Use `--arms` as argument with one or more available arms.\n" f"For instance, to recalibrate all arms add: `--arms {available_arms_str}`" ) if len(unknown_arms) > 0: raise ValueError( f"Unknown arms provided ('{unknown_arms_str}'). Available arms are `{available_arms_str}`." ) for arm_id in arms: arm_calib_path = robot.calibration_dir / f"{arm_id}.json" if arm_calib_path.exists(): print(f"Removing '{arm_calib_path}'") arm_calib_path.unlink() else: print(f"Calibration file not found '{arm_calib_path}'") if robot.is_connected: robot.disconnect() # Calling `connect` automatically runs calibration # when the calibration file is missing robot.connect() robot.disconnect() print("Calibration is done! You can now teleoperate and record datasets!") @safe_disconnect def teleoperate(robot: Robot, fps: int | None = None, teleop_time_s: float | None = None): # TODO(rcadene): Add option to record logs if not robot.is_connected: robot.connect() start_teleop_t = time.perf_counter() while True: start_loop_t = time.perf_counter() robot.teleop_step() if fps is not None: dt_s = time.perf_counter() - start_loop_t busy_wait(1 / fps - dt_s) dt_s = time.perf_counter() - start_loop_t log_control_info(robot, dt_s, fps=fps) if teleop_time_s is not None and time.perf_counter() - start_teleop_t > teleop_time_s: break @safe_disconnect def record( robot: Robot, policy: torch.nn.Module | None = None, hydra_cfg: DictConfig | None = None, fps: int | None = None, root="data", repo_id="lerobot/debug", warmup_time_s=2, episode_time_s=10, reset_time_s=5, num_episodes=50, video=True, run_compute_stats=True, push_to_hub=True, tags=None, num_image_writer_processes=0, num_image_writer_threads_per_camera=4, force_override=False, display_cameras=True, play_sounds=True, ): # TODO(rcadene): Add option to record logs # TODO(rcadene): Clean this function via decomposition in higher level functions _, dataset_name = repo_id.split("/") if dataset_name.startswith("eval_") and policy is None: raise ValueError( f"Your dataset name begins by 'eval_' ({dataset_name}) but no policy is provided ({policy})." ) if not robot.is_connected: robot.connect() local_dir = Path(root) / repo_id if local_dir.exists() and force_override: shutil.rmtree(local_dir) episodes_dir = local_dir / "episodes" episodes_dir.mkdir(parents=True, exist_ok=True) videos_dir = local_dir / "videos" videos_dir.mkdir(parents=True, exist_ok=True) # Logic to resume data recording rec_info_path = episodes_dir / "data_recording_info.json" if rec_info_path.exists(): with open(rec_info_path) as f: rec_info = json.load(f) episode_index = rec_info["last_episode_index"] + 1 else: episode_index = 0 if is_headless(): logging.warning( "Headless environment detected. On-screen cameras display and keyboard inputs will not be available." ) # Allow to exit early while recording an episode or resetting the environment, # by tapping the right arrow key '->'. This might require a sudo permission # to allow your terminal to monitor keyboard events. exit_early = False rerecord_episode = False stop_recording = False # Only import pynput if not in a headless environment if not is_headless(): from pynput import keyboard def on_press(key): nonlocal exit_early, rerecord_episode, stop_recording try: if key == keyboard.Key.right: print("Right arrow key pressed. Exiting loop...") exit_early = True elif key == keyboard.Key.left: print("Left arrow key pressed. Exiting loop and rerecord the last episode...") rerecord_episode = True exit_early = True elif key == keyboard.Key.esc: print("Escape key pressed. Stopping data recording...") stop_recording = True exit_early = True except Exception as e: print(f"Error handling key press: {e}") listener = keyboard.Listener(on_press=on_press) listener.start() # Load policy if any if policy is not None: # Check device is available device = get_safe_torch_device(hydra_cfg.device, log=True) policy.eval() policy.to(device) torch.backends.cudnn.benchmark = True torch.backends.cuda.matmul.allow_tf32 = True set_global_seed(hydra_cfg.seed) # override fps using policy fps fps = hydra_cfg.env.fps # Execute a few seconds without recording data, to give times # to the robot devices to connect and start synchronizing. timestamp = 0 start_warmup_t = time.perf_counter() is_warmup_print = False while timestamp < warmup_time_s: if not is_warmup_print: logging.info("Warming up (no data recording)") if play_sounds: say("Warming up") is_warmup_print = True start_loop_t = time.perf_counter() if policy is None: observation, action = robot.teleop_step(record_data=True) else: observation = robot.capture_observation() if display_cameras and not is_headless(): image_keys = [key for key in observation if "image" in key] for key in image_keys: cv2.imshow(key, cv2.cvtColor(observation[key].numpy(), cv2.COLOR_RGB2BGR)) cv2.waitKey(1) dt_s = time.perf_counter() - start_loop_t busy_wait(1 / fps - dt_s) dt_s = time.perf_counter() - start_loop_t log_control_info(robot, dt_s, fps=fps) timestamp = time.perf_counter() - start_warmup_t if has_method(robot, "teleop_safety_stop"): robot.teleop_safety_stop() has_camera = len(robot.cameras) > 0 if has_camera: # Initialize processes or/and threads dedicated to save images on disk asynchronously, # which is critical to control a robot and record data at a high frame rate. image_writer = start_image_writer( num_processes=num_image_writer_processes, num_threads=num_image_writer_threads_per_camera * len(robot.cameras), ) # Using `try` to exist smoothly if an exception is raised try: # Start recording all episodes while episode_index < num_episodes: logging.info(f"Recording episode {episode_index}") if play_sounds: say(f"Recording episode {episode_index}") ep_dict = {} frame_index = 0 timestamp = 0 start_episode_t = time.perf_counter() while timestamp < episode_time_s: start_loop_t = time.perf_counter() if policy is None: observation, action = robot.teleop_step(record_data=True) else: observation = robot.capture_observation() image_keys = [key for key in observation if "image" in key] not_image_keys = [key for key in observation if "image" not in key] if has_camera > 0: for key in image_keys: async_save_image( image_writer, image=observation[key], key=key, frame_index=frame_index, episode_index=episode_index, videos_dir=str(videos_dir), ) if display_cameras and not is_headless(): image_keys = [key for key in observation if "image" in key] for key in image_keys: cv2.imshow(key, cv2.cvtColor(observation[key].numpy(), cv2.COLOR_RGB2BGR)) cv2.waitKey(1) for key in not_image_keys: if key not in ep_dict: ep_dict[key] = [] ep_dict[key].append(observation[key]) if policy is not None: with ( torch.inference_mode(), torch.autocast(device_type=device.type) if device.type == "cuda" and hydra_cfg.use_amp else nullcontext(), ): # Convert to pytorch format: channel first and float32 in [0,1] with batch dimension for name in observation: if "image" in name: observation[name] = observation[name].type(torch.float32) / 255 observation[name] = observation[name].permute(2, 0, 1).contiguous() observation[name] = observation[name].unsqueeze(0) observation[name] = observation[name].to(device) # Compute the next action with the policy # based on the current observation action = policy.select_action(observation) # Remove batch dimension action = action.squeeze(0) # Move to cpu, if not already the case action = action.to("cpu") # Order the robot to move action_sent = robot.send_action(action) # Action can eventually be clipped using `max_relative_target`, # so action actually sent is saved in the dataset. action = {"action": action_sent} for key in action: if key not in ep_dict: ep_dict[key] = [] ep_dict[key].append(action[key]) frame_index += 1 dt_s = time.perf_counter() - start_loop_t busy_wait(1 / fps - dt_s) dt_s = time.perf_counter() - start_loop_t log_control_info(robot, dt_s, fps=fps) timestamp = time.perf_counter() - start_episode_t if exit_early: exit_early = False break # TODO(alibets): allow for teleop during reset if has_method(robot, "teleop_safety_stop"): robot.teleop_safety_stop() if not stop_recording: # Start resetting env while the executor are finishing logging.info("Reset the environment") if play_sounds: say("Reset the environment") timestamp = 0 start_vencod_t = time.perf_counter() # During env reset we save the data and encode the videos num_frames = frame_index for key in image_keys: if video: tmp_imgs_dir = videos_dir / f"{key}_episode_{episode_index:06d}" fname = f"{key}_episode_{episode_index:06d}.mp4" video_path = local_dir / "videos" / fname if video_path.exists(): video_path.unlink() # Store the reference to the video frame, even tho the videos are not yet encoded ep_dict[key] = [] for i in range(num_frames): ep_dict[key].append({"path": f"videos/{fname}", "timestamp": i / fps}) else: imgs_dir = videos_dir / f"{key}_episode_{episode_index:06d}" ep_dict[key] = [] for i in range(num_frames): img_path = imgs_dir / f"frame_{i:06d}.png" ep_dict[key].append({"path": str(img_path)}) for key in not_image_keys: ep_dict[key] = torch.stack(ep_dict[key]) for key in action: ep_dict[key] = torch.stack(ep_dict[key]) ep_dict["episode_index"] = torch.tensor([episode_index] * num_frames) ep_dict["frame_index"] = torch.arange(0, num_frames, 1) ep_dict["timestamp"] = torch.arange(0, num_frames, 1) / fps done = torch.zeros(num_frames, dtype=torch.bool) done[-1] = True ep_dict["next.done"] = done ep_path = episodes_dir / f"episode_{episode_index}.pth" print("Saving episode dictionary...") torch.save(ep_dict, ep_path) rec_info = { "last_episode_index": episode_index, } with open(rec_info_path, "w") as f: json.dump(rec_info, f) is_last_episode = stop_recording or (episode_index == (num_episodes - 1)) # Wait if necessary with tqdm.tqdm(total=reset_time_s, desc="Waiting") as pbar: while timestamp < reset_time_s and not is_last_episode: time.sleep(1) timestamp = time.perf_counter() - start_vencod_t pbar.update(1) if exit_early: exit_early = False break # Skip updating episode index which forces re-recording episode if rerecord_episode: rerecord_episode = False continue episode_index += 1 if is_last_episode: logging.info("Done recording") if play_sounds: say("Done recording", blocking=True) if not is_headless(): listener.stop() if has_camera > 0: logging.info("Waiting for image writer to terminate...") stop_image_writer(image_writer, timeout=20) except Exception as e: if has_camera > 0: logging.info("Waiting for image writer to terminate...") stop_image_writer(image_writer, timeout=20) raise e robot.disconnect() if display_cameras and not is_headless(): cv2.destroyAllWindows() num_episodes = episode_index if video: logging.info("Encoding videos") if play_sounds: say("Encoding videos") # Use ffmpeg to convert frames stored as png into mp4 videos for episode_index in tqdm.tqdm(range(num_episodes)): for key in image_keys: tmp_imgs_dir = videos_dir / f"{key}_episode_{episode_index:06d}" fname = f"{key}_episode_{episode_index:06d}.mp4" video_path = local_dir / "videos" / fname if video_path.exists(): # Skip if video is already encoded. Could be the case when resuming data recording. continue # note: `encode_video_frames` is a blocking call. Making it asynchronous shouldn't speedup encoding, # since video encoding with ffmpeg is already using multithreading. encode_video_frames(tmp_imgs_dir, video_path, fps, overwrite=True) shutil.rmtree(tmp_imgs_dir) logging.info("Concatenating episodes") ep_dicts = [] for episode_index in tqdm.tqdm(range(num_episodes)): ep_path = episodes_dir / f"episode_{episode_index}.pth" ep_dict = torch.load(ep_path) ep_dicts.append(ep_dict) data_dict = concatenate_episodes(ep_dicts) total_frames = data_dict["frame_index"].shape[0] data_dict["index"] = torch.arange(0, total_frames, 1) hf_dataset = to_hf_dataset(data_dict, video) episode_data_index = calculate_episode_data_index(hf_dataset) info = { "codebase_version": CODEBASE_VERSION, "fps": fps, "video": video, } if video: info["encoding"] = get_default_encoding() lerobot_dataset = LeRobotDataset.from_preloaded( repo_id=repo_id, hf_dataset=hf_dataset, episode_data_index=episode_data_index, info=info, videos_dir=videos_dir, ) if run_compute_stats: logging.info("Computing dataset statistics") if play_sounds: say("Computing dataset statistics") stats = compute_stats(lerobot_dataset) lerobot_dataset.stats = stats else: stats = {} logging.info("Skipping computation of the dataset statistics") hf_dataset = hf_dataset.with_format(None) # to remove transforms that cant be saved hf_dataset.save_to_disk(str(local_dir / "train")) meta_data_dir = local_dir / "meta_data" save_meta_data(info, stats, episode_data_index, meta_data_dir) if push_to_hub: hf_dataset.push_to_hub(repo_id, revision="main") push_meta_data_to_hub(repo_id, meta_data_dir, revision="main") push_dataset_card_to_hub(repo_id, revision="main", tags=tags) if video: push_videos_to_hub(repo_id, videos_dir, revision="main") create_branch(repo_id, repo_type="dataset", branch=CODEBASE_VERSION) logging.info("Exiting") if play_sounds: say("Exiting") return lerobot_dataset def replay( robot: Robot, episode: int, fps: int | None = None, root="data", repo_id="lerobot/debug", play_sounds=True ): # TODO(rcadene): Add option to record logs local_dir = Path(root) / repo_id if not local_dir.exists(): raise ValueError(local_dir) dataset = LeRobotDataset(repo_id, root=root) items = dataset.hf_dataset.select_columns("action") from_idx = dataset.episode_data_index["from"][episode].item() to_idx = dataset.episode_data_index["to"][episode].item() if not robot.is_connected: robot.connect() logging.info("Replaying episode") if play_sounds: say("Replaying episode", blocking=True) for idx in range(from_idx, to_idx): start_episode_t = time.perf_counter() action = items[idx]["action"] robot.send_action(action) dt_s = time.perf_counter() - start_episode_t busy_wait(1 / fps - dt_s) dt_s = time.perf_counter() - start_episode_t log_control_info(robot, dt_s, fps=fps) if __name__ == "__main__": parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest="mode", required=True) # Set common options for all the subparsers base_parser = argparse.ArgumentParser(add_help=False) base_parser.add_argument( "--robot-path", type=str, default="lerobot/configs/robot/koch.yaml", help="Path to robot yaml file used to instantiate the robot using `make_robot` factory function.", ) base_parser.add_argument( "--robot-overrides", type=str, nargs="*", help="Any key=value arguments to override config values (use dots for.nested=overrides)", ) parser_calib = subparsers.add_parser("calibrate", parents=[base_parser]) parser_calib.add_argument( "--arms", type=str, nargs="*", help="List of arms to calibrate (e.g. `--arms left_follower right_follower left_leader`)", ) parser_teleop = subparsers.add_parser("teleoperate", parents=[base_parser]) parser_teleop.add_argument( "--fps", type=none_or_int, default=None, help="Frames per second (set to None to disable)" ) parser_record = subparsers.add_parser("record", parents=[base_parser]) parser_record.add_argument( "--fps", type=none_or_int, default=None, help="Frames per second (set to None to disable)" ) parser_record.add_argument( "--root", type=Path, default="data", help="Root directory where the dataset will be stored locally at '{root}/{repo_id}' (e.g. 'data/hf_username/dataset_name').", ) parser_record.add_argument( "--repo-id", type=str, default="lerobot/test", help="Dataset identifier. By convention it should match '{hf_username}/{dataset_name}' (e.g. `lerobot/test`).", ) parser_record.add_argument( "--warmup-time-s", type=int, default=10, help="Number of seconds before starting data collection. It allows the robot devices to warmup and synchronize.", ) parser_record.add_argument( "--episode-time-s", type=int, default=60, help="Number of seconds for data recording for each episode.", ) parser_record.add_argument( "--reset-time-s", type=int, default=60, help="Number of seconds for resetting the environment after each episode.", ) parser_record.add_argument("--num-episodes", type=int, default=50, help="Number of episodes to record.") parser_record.add_argument( "--run-compute-stats", type=int, default=1, help="By default, run the computation of the data statistics at the end of data collection. Compute intensive and not required to just replay an episode.", ) parser_record.add_argument( "--push-to-hub", type=int, default=1, help="Upload dataset to Hugging Face hub.", ) parser_record.add_argument( "--tags", type=str, nargs="*", help="Add tags to your dataset on the hub.", ) parser_record.add_argument( "--num-image-writer-processes", type=int, default=0, help=( "Number of subprocesses handling the saving of frames as PNGs. Set to 0 to use threads only; " "set to ≥1 to use subprocesses, each using threads to write images. The best number of processes " "and threads depends on your system. We recommend 4 threads per camera with 0 processes. " "If fps is unstable, adjust the thread count. If still unstable, try using 1 or more subprocesses." ), ) parser_record.add_argument( "--num-image-writer-threads-per-camera", type=int, default=4, help=( "Number of threads writing the frames as png images on disk, per camera. " "Too many threads might cause unstable teleoperation fps due to main thread being blocked. " "Not enough threads might cause low camera fps." ), ) parser_record.add_argument( "--force-override", type=int, default=0, help="By default, data recording is resumed. When set to 1, delete the local directory and start data recording from scratch.", ) parser_record.add_argument( "-p", "--pretrained-policy-name-or-path", type=str, help=( "Either the repo ID of a model hosted on the Hub or a path to a directory containing weights " "saved using `Policy.save_pretrained`." ), ) parser_record.add_argument( "--policy-overrides", type=str, nargs="*", help="Any key=value arguments to override config values (use dots for.nested=overrides)", ) parser_replay = subparsers.add_parser("replay", parents=[base_parser]) parser_replay.add_argument( "--fps", type=none_or_int, default=None, help="Frames per second (set to None to disable)" ) parser_replay.add_argument( "--root", type=Path, default="data", help="Root directory where the dataset will be stored locally at '{root}/{repo_id}' (e.g. 'data/hf_username/dataset_name').", ) parser_replay.add_argument( "--repo-id", type=str, default="lerobot/test", help="Dataset identifier. By convention it should match '{hf_username}/{dataset_name}' (e.g. `lerobot/test`).", ) parser_replay.add_argument("--episode", type=int, default=0, help="Index of the episode to replay.") args = parser.parse_args() init_logging() control_mode = args.mode robot_path = args.robot_path robot_overrides = args.robot_overrides kwargs = vars(args) del kwargs["mode"] del kwargs["robot_path"] del kwargs["robot_overrides"] robot_cfg = init_hydra_config(robot_path, robot_overrides) robot = make_robot(robot_cfg) if control_mode == "calibrate": calibrate(robot, **kwargs) elif control_mode == "teleoperate": teleoperate(robot, **kwargs) elif control_mode == "record": pretrained_policy_name_or_path = args.pretrained_policy_name_or_path policy_overrides = args.policy_overrides del kwargs["pretrained_policy_name_or_path"] del kwargs["policy_overrides"] policy_cfg = None if pretrained_policy_name_or_path is not None: pretrained_policy_path = get_pretrained_policy_path(pretrained_policy_name_or_path) policy_cfg = init_hydra_config(pretrained_policy_path / "config.yaml", policy_overrides) policy = make_policy(hydra_cfg=policy_cfg, pretrained_policy_name_or_path=pretrained_policy_path) record(robot, policy, policy_cfg, **kwargs) else: record(robot, **kwargs) elif control_mode == "replay": replay(robot, **kwargs) if robot.is_connected: # Disconnect manually to avoid a "Core dump" during process # termination due to camera threads not properly exiting. robot.disconnect()