diff --git a/lerobot/common/datasets/pusht.py b/lerobot/common/datasets/pusht.py index 3602856e..85aa5e7c 100644 --- a/lerobot/common/datasets/pusht.py +++ b/lerobot/common/datasets/pusht.py @@ -177,6 +177,14 @@ class PushtExperienceReplay(TensorDictReplayBuffer): transform=transform, ) + @property + def num_samples(self): + return len(self) + + @property + def num_episodes(self): + return len(self._storage._storage["episode"].unique()) + @property def data_path_root(self): if self.streaming: diff --git a/lerobot/common/datasets/simxarm.py b/lerobot/common/datasets/simxarm.py index b0e17d52..a6108140 100644 --- a/lerobot/common/datasets/simxarm.py +++ b/lerobot/common/datasets/simxarm.py @@ -109,6 +109,14 @@ class SimxarmExperienceReplay(TensorDictReplayBuffer): transform=transform, ) + @property + def num_samples(self): + return len(self) + + @property + def num_episodes(self): + return len(self._storage._storage["episode"].unique()) + @property def data_path_root(self): if self.streaming: diff --git a/lerobot/common/logger.py b/lerobot/common/logger.py index ddf2ef04..7ce7371b 100644 --- a/lerobot/common/logger.py +++ b/lerobot/common/logger.py @@ -8,25 +8,6 @@ import pandas as pd from omegaconf import OmegaConf from termcolor import colored -CONSOLE_FORMAT = [ - ("episode", "E", "int"), - ("step", "S", "int"), - ("avg_sum_reward", "RS", "float"), - ("avg_max_reward", "RM", "float"), - ("pc_success", "SR", "float"), - ("total_time", "T", "time"), -] -AGENT_METRICS = [ - "consistency_loss", - "reward_loss", - "value_loss", - "total_loss", - "weighted_loss", - "pi_loss", - "grad_norm", -] - - def make_dir(dir_path): """Create directory if it does not already exist.""" with contextlib.suppress(OSError): @@ -80,10 +61,11 @@ class Logger: """Primary logger object. Logs either locally or using wandb.""" def __init__(self, log_dir, job_name, cfg): - self._log_dir = make_dir(Path(log_dir)) + self._log_dir = Path(log_dir) + self._log_dir.mkdir(parents=True, exist_ok=True) self._job_name = job_name - self._model_dir = make_dir(self._log_dir / "models") - self._buffer_dir = make_dir(self._log_dir / "buffers") + self._model_dir = self._log_dir / "models" + self._buffer_dir = self._log_dir / "buffers" self._save_model = cfg.save_model self._save_buffer = cfg.save_buffer self._group = cfg_to_group(cfg) @@ -121,10 +103,11 @@ class Logger: print(colored("Logs will be synced with wandb.", "blue", attrs=["bold"])) self._wandb = wandb - def save_model(self, agent, identifier): + def save_model(self, policy, identifier): if self._save_model: + self._model_dir.mkdir(parents=True, exist_ok=True) fp = self._model_dir / f"{str(identifier)}.pt" - agent.save(fp) + policy.save(fp) if self._wandb: artifact = self._wandb.Artifact( self._group + "-" + str(self._seed) + "-" + str(identifier), @@ -134,6 +117,7 @@ class Logger: self._wandb.log_artifact(artifact) def save_buffer(self, buffer, identifier): + self._buffer_dir.mkdir(parents=True, exist_ok=True) fp = self._buffer_dir / f"{str(identifier)}.pkl" buffer.save(fp) if self._wandb: @@ -153,31 +137,13 @@ class Logger: self._wandb.finish() print_run(self._cfg, self._eval[-1][-1]) - def _format(self, key, value, ty): - if ty == "int": - return f'{colored(key + ":", "yellow")} {int(value):,}' - elif ty == "float": - return f'{colored(key + ":", "yellow")} {value:.01f}' - elif ty == "time": - value = str(datetime.timedelta(seconds=int(value))) - return f'{colored(key + ":", "yellow")} {value}' - else: - raise f"invalid log format type: {ty}" - - def _print(self, d, category): - category = colored(category, "blue" if category == "train" else "green") - pieces = [f" {category:<14}"] - for k, disp_k, ty in CONSOLE_FORMAT: - pieces.append(f"{self._format(disp_k, d.get(k, 0), ty):<26}") - print(" ".join(pieces)) - - def log(self, d, category="train"): - assert category in {"train", "eval"} + def log_dict(self, d, step, mode="train"): + assert mode in {"train", "eval"} if self._wandb is not None: for k, v in d.items(): - self._wandb.log({category + "/" + k: v}, step=d["step"]) - if category == "eval": - keys = ["step", "avg_sum_reward", "avg_max_reward", "pc_success"] - self._eval.append(np.array([d[key] for key in keys])) - pd.DataFrame(np.array(self._eval)).to_csv(self._log_dir / "eval.log", header=keys, index=None) - self._print(d, category) + self._wandb.log({f"{mode}/{k}": v}, step=step) + + def log_video(self, video, step, mode="train"): + assert mode in {"train", "eval"} + wandb_video = self._wandb.Video(video, fps=self.cfg.fps, format="mp4") + self._wandb.log({f"{mode}/video": wandb_video}, step=step) diff --git a/lerobot/common/policies/diffusion.py b/lerobot/common/policies/diffusion.py index 92187290..4546f020 100644 --- a/lerobot/common/policies/diffusion.py +++ b/lerobot/common/policies/diffusion.py @@ -1,4 +1,5 @@ import copy +import time import hydra import torch @@ -110,6 +111,8 @@ class DiffusionPolicy(nn.Module): return action def update(self, replay_buffer, step): + start_time = time.time() + self.diffusion.train() num_slices = self.cfg.batch_size @@ -125,19 +128,31 @@ class DiffusionPolicy(nn.Module): out = { "obs": { - "image": batch["observation", "image"].to(self.device), - "agent_pos": batch["observation", "state"].to(self.device), + "image": batch["observation", "image"].to( + self.device, non_blocking=True + ), + "agent_pos": batch["observation", "state"].to( + self.device, non_blocking=True + ), }, - "action": batch["action"].to(self.device), + "action": batch["action"].to(self.device, non_blocking=True), } return out batch = replay_buffer.sample(batch_size) if self.cfg.balanced_sampling else replay_buffer.sample() batch = process_batch(batch, self.cfg.horizon, num_slices) + data_s = time.time() - start_time + loss = self.diffusion.compute_loss(batch) loss.backward() + grad_norm = torch.nn.utils.clip_grad_norm_( + self.diffusion.parameters(), + self.cfg.grad_clip_norm, + error_if_nonfinite=False, + ) + self.optimizer.step() self.optimizer.zero_grad() self.lr_scheduler.step() @@ -145,9 +160,12 @@ class DiffusionPolicy(nn.Module): if self.ema is not None: self.ema.step(self.diffusion) - metrics = { - "total_loss": loss.item(), + info = { + "loss": loss.item(), + "grad_norm": float(grad_norm), "lr": self.lr_scheduler.get_last_lr()[0], + "data_s": data_s, + "update_s": time.time() - start_time, } # TODO(rcadene): remove hardcoding @@ -155,7 +173,7 @@ class DiffusionPolicy(nn.Module): if step % 168 == 0: self.global_step += 1 - return metrics + return info def save(self, fp): torch.save(self.state_dict(), fp) diff --git a/lerobot/common/policies/tdmpc.py b/lerobot/common/policies/tdmpc.py index 374c4089..19f00b5f 100644 --- a/lerobot/common/policies/tdmpc.py +++ b/lerobot/common/policies/tdmpc.py @@ -1,5 +1,6 @@ # ruff: noqa: N806 +import time from copy import deepcopy import einops @@ -285,6 +286,7 @@ class TDMPC(nn.Module): def update(self, replay_buffer, step, demo_buffer=None): """Main update function. Corresponds to one iteration of the model learning.""" + start_time = time.time() num_slices = self.cfg.batch_size batch_size = self.cfg.horizon * num_slices @@ -326,6 +328,14 @@ class TDMPC(nn.Module): } reward = batch["next", "reward"] + # TODO(rcadene): add non_blocking=True + # for key in obs: + # obs[key] = obs[key].to(self.device, non_blocking=True) + # next_obses[key] = next_obses[key].to(self.device, non_blocking=True) + + # action = action.to(self.device, non_blocking=True) + # reward = reward.to(self.device, non_blocking=True) + # TODO(rcadene): rearrange directly in offline dataset if reward.ndim == 2: reward = einops.rearrange(reward, "h t -> h t 1") @@ -399,6 +409,8 @@ class TDMPC(nn.Module): self.std = h.linear_schedule(self.cfg.std_schedule, step) self.model.train() + data_s = time.time() - start_time + # Compute targets with torch.no_grad(): next_z = self.model.encode(next_obses) @@ -482,21 +494,23 @@ class TDMPC(nn.Module): h.ema(self.model._Qs, self.model_target._Qs, self.cfg.tau) self.model.eval() - metrics = { + + info = { "consistency_loss": float(consistency_loss.mean().item()), "reward_loss": float(reward_loss.mean().item()), "Q_value_loss": float(q_value_loss.mean().item()), "V_value_loss": float(v_value_loss.mean().item()), - "total_loss": float(total_loss.mean().item()), - "weighted_loss": float(weighted_loss.mean().item()), + "sum_loss": float(total_loss.mean().item()), + "loss": float(weighted_loss.mean().item()), "grad_norm": float(grad_norm), + "lr": self.cfg.lr, + "data_s": data_s, + "update_s": time.time() - start_time, } - # for key in ["demo_batch_size", "expectile"]: - # if hasattr(self, key): - metrics["demo_batch_size"] = demo_batch_size - metrics["expectile"] = expectile - metrics.update(value_info) - metrics.update(pi_update_info) + info["demo_batch_size"] = demo_batch_size + info["expectile"] = expectile + info.update(value_info) + info.update(pi_update_info) self.step[0] = step - return metrics + return info diff --git a/lerobot/common/utils.py b/lerobot/common/utils.py index a95adbc1..1e7f4daf 100644 --- a/lerobot/common/utils.py +++ b/lerobot/common/utils.py @@ -1,4 +1,6 @@ +import logging import random +from datetime import datetime import numpy as np import torch @@ -10,3 +12,34 @@ def set_seed(seed): np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed_all(seed) + + +def init_logging(): + def custom_format(record): + dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + fnameline = f"{record.pathname}:{record.lineno}" + message = f"{record.levelname} {dt} {fnameline[-15:]:>15} {record.msg}" + return message + + logging.basicConfig(level=logging.INFO) + + for handler in logging.root.handlers[:]: + logging.root.removeHandler(handler) + + formatter = logging.Formatter() + formatter.format = custom_format + console_handler = logging.StreamHandler() + console_handler.setFormatter(formatter) + logging.getLogger().addHandler(console_handler) + + +def format_number_KMB(num): + suffixes = ["", "K", "M", "B", "T", "Q"] + divisor = 1000.0 + + for suffix in suffixes: + if abs(num) < divisor: + return f"{num:.0f}{suffix}" + num /= divisor + + return num diff --git a/lerobot/configs/policy/diffusion.yaml b/lerobot/configs/policy/diffusion.yaml index 8e88468d..d15a7c4e 100644 --- a/lerobot/configs/policy/diffusion.yaml +++ b/lerobot/configs/policy/diffusion.yaml @@ -59,6 +59,7 @@ policy: use_ema: true lr_scheduler: cosine lr_warmup_steps: 500 + grad_clip_norm: 0 noise_scheduler: # _target_: diffusers.schedulers.scheduling_ddpm.DDPMScheduler diff --git a/lerobot/scripts/eval.py b/lerobot/scripts/eval.py index 8240f654..3c5b8cf1 100644 --- a/lerobot/scripts/eval.py +++ b/lerobot/scripts/eval.py @@ -1,4 +1,5 @@ import threading +import time from pathlib import Path import hydra @@ -29,6 +30,7 @@ def eval_policy( fps: int = 15, return_first_video: bool = False, ): + start = time.time() sum_rewards = [] max_rewards = [] successes = [] @@ -84,14 +86,16 @@ def eval_policy( for thread in threads: thread.join() - metrics = { + info = { "avg_sum_reward": np.nanmean(sum_rewards), "avg_max_reward": np.nanmean(max_rewards), "pc_success": np.nanmean(successes) * 100, + "eval_s": time.time() - start, + "eval_ep_s": (time.time() - start) / num_episodes, } if return_first_video: - return metrics, first_video - return metrics + return info, first_video + return info @hydra.main(version_base=None, config_name="default", config_path="../configs") diff --git a/lerobot/scripts/train.py b/lerobot/scripts/train.py index c5268aa8..e8c3b0eb 100644 --- a/lerobot/scripts/train.py +++ b/lerobot/scripts/train.py @@ -1,3 +1,4 @@ +import logging import time import hydra @@ -12,7 +13,7 @@ from lerobot.common.datasets.factory import make_offline_buffer from lerobot.common.envs.factory import make_env from lerobot.common.logger import Logger from lerobot.common.policies.factory import make_policy -from lerobot.common.utils import set_seed +from lerobot.common.utils import format_number_KMB, init_logging, set_seed from lerobot.scripts.eval import eval_policy @@ -34,36 +35,77 @@ def train_notebook(out_dir=None, job_name=None, config_name="default", config_pa train(cfg, out_dir=out_dir, job_name=job_name) -def log_training_metrics(logger, metrics, step, online_episode_idx, start_time, is_offline): - common_metrics = { - "episode": online_episode_idx, - "step": step, - "total_time": time.time() - start_time, - "is_offline": float(is_offline), - } - metrics.update(common_metrics) - logger.log(metrics, category="train") +def log_train_info(logger, info, step, cfg, offline_buffer, is_offline): + loss = info["loss"] + grad_norm = info["grad_norm"] + lr = info["lr"] + data_s = info["data_s"] + update_s = info["update_s"] + + # A sample is an (observation,action) pair, where observation and action + # can be on multiple timestamps. In a batch, we have `batch_size`` number of samples. + num_samples = (step + 1) * cfg.policy.batch_size + avg_samples_per_ep = offline_buffer.num_samples / offline_buffer.num_episodes + num_episodes = num_samples / avg_samples_per_ep + num_epochs = num_samples / offline_buffer.num_samples + log_items = [ + f"step:{format_number_KMB(step)}", + # number of samples seen during training + f"smpl:{format_number_KMB(num_samples)}", + # number of episodes seen during training + f"ep:{format_number_KMB(num_episodes)}", + # number of time all unique samples are seen + f"epch:{num_epochs:.2f}", + f"loss:{loss:.3f}", + f"grdn:{grad_norm:.3f}", + f"lr:{lr:0.1e}", + # in seconds + f"data_s:{data_s:.3f}", + f"updt_s:{update_s:.3f}", + ] + logging.info(" ".join(log_items)) + + info["step"] = step + info["num_samples"] = num_samples + info["num_episodes"] = num_episodes + info["num_epochs"] = num_epochs + info["is_offline"] = is_offline + + logger.log_dict(info, step, mode="train") -def eval_policy_and_log(env, td_policy, step, online_episode_idx, start_time, cfg, logger, is_offline): - common_metrics = { - "episode": online_episode_idx, - "step": step, - "total_time": time.time() - start_time, - "is_offline": float(is_offline), - } - metrics, first_video = eval_policy( - env, - td_policy, - num_episodes=cfg.eval_episodes, - return_first_video=True, - ) - metrics.update(common_metrics) - logger.log(metrics, category="eval") +def log_eval_info(logger, info, step, cfg, offline_buffer, is_offline): + eval_s = info["eval_s"] + avg_sum_reward = info["avg_sum_reward"] + pc_success = info["pc_success"] - if cfg.wandb.enable: - eval_video = logger._wandb.Video(first_video, fps=cfg.fps, format="mp4") - logger._wandb.log({"eval_video": eval_video}, step=step) + # A sample is an (observation,action) pair, where observation and action + # can be on multiple timestamps. In a batch, we have `batch_size`` number of samples. + num_samples = (step + 1) * cfg.policy.batch_size + avg_samples_per_ep = offline_buffer.num_samples / offline_buffer.num_episodes + num_episodes = num_samples / avg_samples_per_ep + num_epochs = num_samples / offline_buffer.num_samples + log_items = [ + f"step:{format_number_KMB(step)}", + # number of samples seen during training + f"smpl:{format_number_KMB(num_samples)}", + # number of episodes seen during training + f"ep:{format_number_KMB(num_episodes)}", + # number of time all unique samples are seen + f"epch:{num_epochs:.2f}", + f"∑rwrd:{avg_sum_reward:.3f}", + f"success:{pc_success:.1f}%", + f"eval_s:{eval_s:.3f}", + ] + logging.info(" ".join(log_items)) + + info["step"] = step + info["num_samples"] = num_samples + info["num_episodes"] = num_episodes + info["num_epochs"] = num_epochs + info["is_offline"] = is_offline + + logger.log_dict(info, step, mode="eval") def train(cfg: dict, out_dir=None, job_name=None): @@ -72,15 +114,17 @@ def train(cfg: dict, out_dir=None, job_name=None): if job_name is None: raise NotImplementedError() + init_logging() + assert torch.cuda.is_available() torch.backends.cudnn.benchmark = True set_seed(cfg.seed) - print(colored("Work dir:", "yellow", attrs=["bold"]), out_dir) + logging.info(colored("Work dir:", "yellow", attrs=["bold"]) + f" {out_dir}") - print("make_env") + logging.info("make_env") env = make_env(cfg) - print("make_policy") + logging.info("make_policy") policy = make_policy(cfg) td_policy = TensorDictModule( @@ -89,12 +133,12 @@ def train(cfg: dict, out_dir=None, job_name=None): out_keys=["action"], ) - print("make_offline_buffer") + logging.info("make_offline_buffer") offline_buffer = make_offline_buffer(cfg) # TODO(rcadene): move balanced_sampling, per_alpha, per_beta outside policy if cfg.policy.balanced_sampling: - print("make online_buffer") + logging.info("make online_buffer") num_traj_per_batch = cfg.policy.batch_size online_sampler = PrioritizedSliceSampler( @@ -112,41 +156,41 @@ def train(cfg: dict, out_dir=None, job_name=None): logger = Logger(out_dir, job_name, cfg) - online_episode_idx = 0 - start_time = time.time() + online_ep_idx = 0 step = 0 # number of policy update + is_offline = True for offline_step in range(cfg.offline_steps): if offline_step == 0: - print("Start offline training on a fixed dataset") + logging.info("Start offline training on a fixed dataset") # TODO(rcadene): is it ok if step_t=0 = 0 and not 1 as previously done? - metrics = policy.update(offline_buffer, step) - + train_info = policy.update(offline_buffer, step) if step % cfg.log_freq == 0: - log_training_metrics(logger, metrics, step, online_episode_idx, start_time, is_offline=False) + log_train_info(logger, train_info, step, cfg, offline_buffer, is_offline) if step > 0 and step % cfg.eval_freq == 0: - eval_policy_and_log( + eval_info, first_video = eval_policy( env, td_policy, - step, - online_episode_idx, - start_time, - cfg, - logger, - is_offline=True, + num_episodes=cfg.eval_episodes, + return_first_video=True, ) + log_eval_info(logger, eval_info, step, cfg, offline_buffer, is_offline) + if cfg.wandb.enable: + logger.log_video(first_video, step, mode="eval") if step > 0 and cfg.save_model and step % cfg.save_freq == 0: - print(f"Checkpoint model at step {step}") + logging.info(f"Checkpoint model at step {step}") logger.save_model(policy, identifier=step) step += 1 demo_buffer = offline_buffer if cfg.policy.balanced_sampling else None + online_step = 0 + is_offline = False for env_step in range(cfg.online_steps): if env_step == 0: - print("Start online training by interacting with environment") + logging.info("Start online training by interacting with environment") # TODO: use SyncDataCollector for that? # TODO: add configurable number of rollout? (default=1) with torch.no_grad(): @@ -156,47 +200,49 @@ def train(cfg: dict, out_dir=None, job_name=None): auto_cast_to_device=True, ) assert len(rollout) <= cfg.env.episode_length - rollout["episode"] = torch.tensor([online_episode_idx] * len(rollout), dtype=torch.int) + rollout["episode"] = torch.tensor([online_ep_idx] * len(rollout), dtype=torch.int) online_buffer.extend(rollout) ep_sum_reward = rollout["next", "reward"].sum() ep_max_reward = rollout["next", "reward"].max() ep_success = rollout["next", "success"].any() - metrics = { + rollout_info = { "avg_sum_reward": np.nanmean(ep_sum_reward), "avg_max_reward": np.nanmean(ep_max_reward), "pc_success": np.nanmean(ep_success) * 100, + "online_ep_idx": online_ep_idx, + "ep_length": len(rollout), } - online_episode_idx += 1 + online_ep_idx += 1 for _ in range(cfg.policy.utd): - train_metrics = policy.update( + train_info = policy.update( online_buffer, step, demo_buffer=demo_buffer, ) - metrics.update(train_metrics) if step % cfg.log_freq == 0: - log_training_metrics(logger, metrics, step, online_episode_idx, start_time, is_offline=False) + train_info.update(rollout_info) + log_train_info(logger, train_info, step, cfg, offline_buffer, is_offline) if step > 0 and step % cfg.eval_freq == 0: - eval_policy_and_log( + eval_info, first_video = eval_policy( env, td_policy, - step, - online_episode_idx, - start_time, - cfg, - logger, - is_offline=False, + num_episodes=cfg.eval_episodes, + return_first_video=True, ) + log_eval_info(L, eval_info, step, cfg, offline_buffer, is_offline) + if cfg.wandb.enable: + logger.log_video(first_video, step, mode="eval") if step > 0 and cfg.save_model and step % cfg.save_freq == 0: - print(f"Checkpoint model at step {step}") + logging.info(f"Checkpoint model at step {step}") logger.save_model(policy, identifier=step) step += 1 + online_step += 1 if __name__ == "__main__":