From 49baa1ff4991cfee1d4f7de0be1d82597ecdb826 Mon Sep 17 00:00:00 2001 From: AdilZouitine Date: Fri, 28 Mar 2025 10:43:03 +0000 Subject: [PATCH] Enhance logging for actor and learner servers - Implemented process-specific logging for actor and learner servers to improve traceability. - Created a dedicated logs directory and ensured it exists before logging. - Initialized logging with explicit log files for each process, including actor transitions, interactions, and policy. - Updated the actor CLI to validate configuration and set up logging accordingly. --- lerobot/scripts/server/actor_server.py | 47 +++++++++++++++++++++++- lerobot/scripts/server/learner_server.py | 26 +++++++++++-- 2 files changed, 68 insertions(+), 5 deletions(-) diff --git a/lerobot/scripts/server/actor_server.py b/lerobot/scripts/server/actor_server.py index e28cfea1..5ff0bab0 100644 --- a/lerobot/scripts/server/actor_server.py +++ b/lerobot/scripts/server/actor_server.py @@ -18,6 +18,7 @@ import time from functools import lru_cache from queue import Empty from statistics import mean, quantiles +import os # from lerobot.scripts.eval import eval_policy import grpc @@ -66,6 +67,15 @@ def receive_policy( logging.info("[ACTOR] Start receiving parameters from the Learner") if not use_threads(cfg): + # Create a process-specific log file + log_dir = os.path.join(cfg.output_dir, "logs") + os.makedirs(log_dir, exist_ok=True) + log_file = os.path.join(log_dir, f"actor_receive_policy_{os.getpid()}.log") + + # Initialize logging with explicit log file + init_logging(log_file=log_file) + logging.info(f"Actor receive policy process logging initialized") + # Setup process handlers to handle shutdown signal # But use shutdown event from the main process setup_process_handlers(use_threads=False) @@ -147,6 +157,15 @@ def send_transitions( """ if not use_threads(cfg): + # Create a process-specific log file + log_dir = os.path.join(cfg.output_dir, "logs") + os.makedirs(log_dir, exist_ok=True) + log_file = os.path.join(log_dir, f"actor_transitions_{os.getpid()}.log") + + # Initialize logging with explicit log file + init_logging(log_file=log_file) + logging.info(f"Actor transitions process logging initialized") + # Setup process handlers to handle shutdown signal # But use shutdown event from the main process setup_process_handlers(False) @@ -187,6 +206,15 @@ def send_interactions( """ if not use_threads(cfg): + # Create a process-specific log file + log_dir = os.path.join(cfg.output_dir, "logs") + os.makedirs(log_dir, exist_ok=True) + log_file = os.path.join(log_dir, f"actor_interactions_{os.getpid()}.log") + + # Initialize logging with explicit log file + init_logging(log_file=log_file) + logging.info(f"Actor interactions process logging initialized") + # Setup process handlers to handle shutdown signal # But use shutdown event from the main process setup_process_handlers(False) @@ -283,6 +311,13 @@ def act_with_policy( Args: cfg (DictConfig): Configuration settings for the interaction process. """ + # Initialize logging for multiprocessing + if not use_threads(cfg): + log_dir = os.path.join(cfg.output_dir, "logs") + os.makedirs(log_dir, exist_ok=True) + log_file = os.path.join(log_dir, f"actor_policy_{os.getpid()}.log") + init_logging(log_file=log_file) + logging.info(f"Actor policy process logging initialized") logging.info("make_env online") @@ -496,12 +531,20 @@ def use_threads(cfg: TrainPipelineConfig) -> bool: @parser.wrap() def actor_cli(cfg: TrainPipelineConfig): + cfg.validate() if not use_threads(cfg): import torch.multiprocessing as mp mp.set_start_method("spawn") - - init_logging(log_file="actor.log") + + # Create logs directory to ensure it exists + log_dir = os.path.join(cfg.output_dir, "logs") + os.makedirs(log_dir, exist_ok=True) + log_file = os.path.join(log_dir, f"actor_{cfg.job_name}.log") + + # Initialize logging with explicit log file + init_logging(log_file=log_file) + logging.info(f"Actor logging initialized, writing to {log_file}") shutdown_event = setup_process_handlers(use_threads(cfg)) diff --git a/lerobot/scripts/server/learner_server.py b/lerobot/scripts/server/learner_server.py index da3e6606..cee65377 100644 --- a/lerobot/scripts/server/learner_server.py +++ b/lerobot/scripts/server/learner_server.py @@ -376,8 +376,14 @@ def start_learner_server( cfg: TrainPipelineConfig, ): if not use_threads(cfg): - # We need init logging for MP separataly - init_logging() + # Create a process-specific log file + log_dir = os.path.join(cfg.output_dir, "logs") + os.makedirs(log_dir, exist_ok=True) + log_file = os.path.join(log_dir, f"learner_server_process_{os.getpid()}.log") + + # Initialize logging with explicit log file + init_logging(log_file=log_file) + logging.info(f"Learner server process logging initialized") # Setup process handlers to handle shutdown signal # But use shutdown event from the main process @@ -499,6 +505,13 @@ def add_actor_information_and_train( interaction_message_queue (Queue): Queue for receiving interaction messages from the actor. parameters_queue (Queue): Queue for sending policy parameters to the actor. """ + # Initialize logging for multiprocessing + if not use_threads(cfg): + log_dir = os.path.join(cfg.output_dir, "logs") + os.makedirs(log_dir, exist_ok=True) + log_file = os.path.join(log_dir, f"learner_train_process_{os.getpid()}.log") + init_logging(log_file=log_file) + logging.info(f"Initialized logging for actor information and training process") device = get_safe_torch_device(try_device=cfg.policy.device, log=True) storage_device = get_safe_torch_device(try_device=cfg.policy.storage_device) @@ -940,7 +953,14 @@ def train(cfg: TrainPipelineConfig, job_name: str | None = None): if job_name is None: raise ValueError("Job name must be specified either in config or as a parameter") - init_logging() + # Create logs directory to ensure it exists + log_dir = os.path.join(cfg.output_dir, "logs") + os.makedirs(log_dir, exist_ok=True) + log_file = os.path.join(log_dir, f"learner_{job_name}.log") + + # Initialize logging with explicit log file + init_logging(log_file=log_file) + logging.info(f"Learner logging initialized, writing to {log_file}") logging.info(pformat(cfg.to_dict())) # Setup WandB logging if enabled