Enhance logging for actor and learner servers

- Implemented process-specific logging for actor and learner servers to improve traceability.
- Created a dedicated logs directory and ensured it exists before logging.
- Initialized logging with explicit log files for each process, including actor transitions, interactions, and policy.
- Updated the actor CLI to validate configuration and set up logging accordingly.
This commit is contained in:
AdilZouitine 2025-03-28 10:43:03 +00:00
parent 02b9ea9446
commit 49baa1ff49
2 changed files with 68 additions and 5 deletions

View File

@ -18,6 +18,7 @@ import time
from functools import lru_cache
from queue import Empty
from statistics import mean, quantiles
import os
# from lerobot.scripts.eval import eval_policy
import grpc
@ -66,6 +67,15 @@ def receive_policy(
logging.info("[ACTOR] Start receiving parameters from the Learner")
if not use_threads(cfg):
# Create a process-specific log file
log_dir = os.path.join(cfg.output_dir, "logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f"actor_receive_policy_{os.getpid()}.log")
# Initialize logging with explicit log file
init_logging(log_file=log_file)
logging.info(f"Actor receive policy process logging initialized")
# Setup process handlers to handle shutdown signal
# But use shutdown event from the main process
setup_process_handlers(use_threads=False)
@ -147,6 +157,15 @@ def send_transitions(
"""
if not use_threads(cfg):
# Create a process-specific log file
log_dir = os.path.join(cfg.output_dir, "logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f"actor_transitions_{os.getpid()}.log")
# Initialize logging with explicit log file
init_logging(log_file=log_file)
logging.info(f"Actor transitions process logging initialized")
# Setup process handlers to handle shutdown signal
# But use shutdown event from the main process
setup_process_handlers(False)
@ -187,6 +206,15 @@ def send_interactions(
"""
if not use_threads(cfg):
# Create a process-specific log file
log_dir = os.path.join(cfg.output_dir, "logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f"actor_interactions_{os.getpid()}.log")
# Initialize logging with explicit log file
init_logging(log_file=log_file)
logging.info(f"Actor interactions process logging initialized")
# Setup process handlers to handle shutdown signal
# But use shutdown event from the main process
setup_process_handlers(False)
@ -283,6 +311,13 @@ def act_with_policy(
Args:
cfg (DictConfig): Configuration settings for the interaction process.
"""
# Initialize logging for multiprocessing
if not use_threads(cfg):
log_dir = os.path.join(cfg.output_dir, "logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f"actor_policy_{os.getpid()}.log")
init_logging(log_file=log_file)
logging.info(f"Actor policy process logging initialized")
logging.info("make_env online")
@ -496,12 +531,20 @@ def use_threads(cfg: TrainPipelineConfig) -> bool:
@parser.wrap()
def actor_cli(cfg: TrainPipelineConfig):
cfg.validate()
if not use_threads(cfg):
import torch.multiprocessing as mp
mp.set_start_method("spawn")
init_logging(log_file="actor.log")
# Create logs directory to ensure it exists
log_dir = os.path.join(cfg.output_dir, "logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f"actor_{cfg.job_name}.log")
# Initialize logging with explicit log file
init_logging(log_file=log_file)
logging.info(f"Actor logging initialized, writing to {log_file}")
shutdown_event = setup_process_handlers(use_threads(cfg))

View File

@ -376,8 +376,14 @@ def start_learner_server(
cfg: TrainPipelineConfig,
):
if not use_threads(cfg):
# We need init logging for MP separataly
init_logging()
# Create a process-specific log file
log_dir = os.path.join(cfg.output_dir, "logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f"learner_server_process_{os.getpid()}.log")
# Initialize logging with explicit log file
init_logging(log_file=log_file)
logging.info(f"Learner server process logging initialized")
# Setup process handlers to handle shutdown signal
# But use shutdown event from the main process
@ -499,6 +505,13 @@ def add_actor_information_and_train(
interaction_message_queue (Queue): Queue for receiving interaction messages from the actor.
parameters_queue (Queue): Queue for sending policy parameters to the actor.
"""
# Initialize logging for multiprocessing
if not use_threads(cfg):
log_dir = os.path.join(cfg.output_dir, "logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f"learner_train_process_{os.getpid()}.log")
init_logging(log_file=log_file)
logging.info(f"Initialized logging for actor information and training process")
device = get_safe_torch_device(try_device=cfg.policy.device, log=True)
storage_device = get_safe_torch_device(try_device=cfg.policy.storage_device)
@ -940,7 +953,14 @@ def train(cfg: TrainPipelineConfig, job_name: str | None = None):
if job_name is None:
raise ValueError("Job name must be specified either in config or as a parameter")
init_logging()
# Create logs directory to ensure it exists
log_dir = os.path.join(cfg.output_dir, "logs")
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f"learner_{job_name}.log")
# Initialize logging with explicit log file
init_logging(log_file=log_file)
logging.info(f"Learner logging initialized, writing to {log_file}")
logging.info(pformat(cfg.to_dict()))
# Setup WandB logging if enabled