diff --git a/lerobot/scripts/server/actor_server.py b/lerobot/scripts/server/actor_server.py index 5ff0bab0..eda6d314 100644 --- a/lerobot/scripts/server/actor_server.py +++ b/lerobot/scripts/server/actor_server.py @@ -14,11 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import os import time from functools import lru_cache from queue import Empty from statistics import mean, quantiles -import os # from lerobot.scripts.eval import eval_policy import grpc @@ -57,245 +57,132 @@ from lerobot.scripts.server.utils import get_last_item_from_queue, setup_process ACTOR_SHUTDOWN_TIMEOUT = 30 -def receive_policy( - cfg: TrainPipelineConfig, - parameters_queue: Queue, - shutdown_event: any, # Event, - learner_client: hilserl_pb2_grpc.LearnerServiceStub | None = None, - grpc_channel: grpc.Channel | None = None, -): - logging.info("[ACTOR] Start receiving parameters from the Learner") +################################################# +# Main entry point # +################################################# + +@parser.wrap() +def actor_cli(cfg: TrainPipelineConfig): + cfg.validate() if not use_threads(cfg): - # Create a process-specific log file - log_dir = os.path.join(cfg.output_dir, "logs") - os.makedirs(log_dir, exist_ok=True) - log_file = os.path.join(log_dir, f"actor_receive_policy_{os.getpid()}.log") - - # Initialize logging with explicit log file - init_logging(log_file=log_file) - logging.info(f"Actor receive policy process logging initialized") - - # Setup process handlers to handle shutdown signal - # But use shutdown event from the main process - setup_process_handlers(use_threads=False) + import torch.multiprocessing as mp - if grpc_channel is None or learner_client is None: - learner_client, grpc_channel = learner_service_client( - host=cfg.policy.actor_learner_config.learner_host, - port=cfg.policy.actor_learner_config.learner_port, - ) + mp.set_start_method("spawn") - try: - iterator = learner_client.StreamParameters(hilserl_pb2.Empty()) - receive_bytes_in_chunks( - iterator, - parameters_queue, - shutdown_event, - log_prefix="[ACTOR] parameters", - ) + # Create logs directory to ensure it exists + log_dir = os.path.join(cfg.output_dir, "logs") + os.makedirs(log_dir, exist_ok=True) + log_file = os.path.join(log_dir, f"actor_{cfg.job_name}.log") - except grpc.RpcError as e: - logging.error(f"[ACTOR] gRPC error: {e}") + # Initialize logging with explicit log file + init_logging(log_file=log_file) + logging.info(f"Actor logging initialized, writing to {log_file}") - if not use_threads(cfg): - grpc_channel.close() - logging.info("[ACTOR] Received policy loop stopped") + shutdown_event = setup_process_handlers(use_threads(cfg)) - -def transitions_stream(shutdown_event: Event, transitions_queue: Queue) -> hilserl_pb2.Empty: - while not shutdown_event.is_set(): - try: - message = transitions_queue.get(block=True, timeout=5) - except Empty: - logging.debug("[ACTOR] Transition queue is empty") - continue - - yield from send_bytes_in_chunks( - message, hilserl_pb2.Transition, log_prefix="[ACTOR] Send transitions" - ) - - return hilserl_pb2.Empty() - - -def interactions_stream( - shutdown_event: any, # Event, - interactions_queue: Queue, -) -> hilserl_pb2.Empty: - while not shutdown_event.is_set(): - try: - message = interactions_queue.get(block=True, timeout=5) - except Empty: - logging.debug("[ACTOR] Interaction queue is empty") - continue - - yield from send_bytes_in_chunks( - message, - hilserl_pb2.InteractionMessage, - log_prefix="[ACTOR] Send interactions", - ) - - return hilserl_pb2.Empty() - - -def send_transitions( - cfg: TrainPipelineConfig, - transitions_queue: Queue, - shutdown_event: any, # Event, - learner_client: hilserl_pb2_grpc.LearnerServiceStub | None = None, - grpc_channel: grpc.Channel | None = None, -) -> hilserl_pb2.Empty: - """ - Sends transitions to the learner. - - This function continuously retrieves messages from the queue and processes: - - - **Transition Data:** - - A batch of transitions (observation, action, reward, next observation) is collected. - - Transitions are moved to the CPU and serialized using PyTorch. - - The serialized data is wrapped in a `hilserl_pb2.Transition` message and sent to the learner. - """ - - if not use_threads(cfg): - # Create a process-specific log file - log_dir = os.path.join(cfg.output_dir, "logs") - os.makedirs(log_dir, exist_ok=True) - log_file = os.path.join(log_dir, f"actor_transitions_{os.getpid()}.log") - - # Initialize logging with explicit log file - init_logging(log_file=log_file) - logging.info(f"Actor transitions process logging initialized") - - # Setup process handlers to handle shutdown signal - # But use shutdown event from the main process - setup_process_handlers(False) - - if grpc_channel is None or learner_client is None: - learner_client, grpc_channel = learner_service_client( - host=cfg.policy.actor_learner_config.learner_host, - port=cfg.policy.actor_learner_config.learner_port, - ) - - try: - learner_client.SendTransitions(transitions_stream(shutdown_event, transitions_queue)) - except grpc.RpcError as e: - logging.error(f"[ACTOR] gRPC error: {e}") - - logging.info("[ACTOR] Finished streaming transitions") - - if not use_threads(cfg): - grpc_channel.close() - logging.info("[ACTOR] Transitions process stopped") - - -def send_interactions( - cfg: TrainPipelineConfig, - interactions_queue: Queue, - shutdown_event: any, # Event, - learner_client: hilserl_pb2_grpc.LearnerServiceStub | None = None, - grpc_channel: grpc.Channel | None = None, -) -> hilserl_pb2.Empty: - """ - Sends interactions to the learner. - - This function continuously retrieves messages from the queue and processes: - - - **Interaction Messages:** - - Contains useful statistics about episodic rewards and policy timings. - - The message is serialized using `pickle` and sent to the learner. - """ - - if not use_threads(cfg): - # Create a process-specific log file - log_dir = os.path.join(cfg.output_dir, "logs") - os.makedirs(log_dir, exist_ok=True) - log_file = os.path.join(log_dir, f"actor_interactions_{os.getpid()}.log") - - # Initialize logging with explicit log file - init_logging(log_file=log_file) - logging.info(f"Actor interactions process logging initialized") - - # Setup process handlers to handle shutdown signal - # But use shutdown event from the main process - setup_process_handlers(False) - - if grpc_channel is None or learner_client is None: - learner_client, grpc_channel = learner_service_client( - host=cfg.policy.actor_learner_config.learner_host, - port=cfg.policy.actor_learner_config.learner_port, - ) - - try: - learner_client.SendInteractions(interactions_stream(shutdown_event, interactions_queue)) - except grpc.RpcError as e: - logging.error(f"[ACTOR] gRPC error: {e}") - - logging.info("[ACTOR] Finished streaming interactions") - - if not use_threads(cfg): - grpc_channel.close() - logging.info("[ACTOR] Interactions process stopped") - - -@lru_cache(maxsize=1) -def learner_service_client( - host="127.0.0.1", port=50051 -) -> tuple[hilserl_pb2_grpc.LearnerServiceStub, grpc.Channel]: - import json - - """ - Returns a client for the learner service. - - GRPC uses HTTP/2, which is a binary protocol and multiplexes requests over a single connection. - So we need to create only one client and reuse it. - """ - - service_config = { - "methodConfig": [ - { - "name": [{}], # Applies to ALL methods in ALL services - "retryPolicy": { - "maxAttempts": 5, # Max retries (total attempts = 5) - "initialBackoff": "0.1s", # First retry after 0.1s - "maxBackoff": "2s", # Max wait time between retries - "backoffMultiplier": 2, # Exponential backoff factor - "retryableStatusCodes": [ - "UNAVAILABLE", - "DEADLINE_EXCEEDED", - ], # Retries on network failures - }, - } - ] - } - - service_config_json = json.dumps(service_config) - - channel = grpc.insecure_channel( - f"{host}:{port}", - options=[ - ("grpc.max_receive_message_length", learner_service.MAX_MESSAGE_SIZE), - ("grpc.max_send_message_length", learner_service.MAX_MESSAGE_SIZE), - ("grpc.enable_retries", 1), - ("grpc.service_config", service_config_json), - ], + learner_client, grpc_channel = learner_service_client( + host=cfg.policy.actor_learner_config.learner_host, + port=cfg.policy.actor_learner_config.learner_port, ) - stub = hilserl_pb2_grpc.LearnerServiceStub(channel) - logging.info("[ACTOR] Learner service client created") - return stub, channel + + logging.info("[ACTOR] Establishing connection with Learner") + if not establish_learner_connection(learner_client, shutdown_event): + logging.error("[ACTOR] Failed to establish connection with Learner") + return + + if not use_threads(cfg): + # If we use multithreading, we can reuse the channel + grpc_channel.close() + grpc_channel = None + + logging.info("[ACTOR] Connection with Learner established") + + parameters_queue = Queue() + transitions_queue = Queue() + interactions_queue = Queue() + + concurrency_entity = None + if use_threads(cfg): + from threading import Thread + + concurrency_entity = Thread + else: + from multiprocessing import Process + + concurrency_entity = Process + + receive_policy_process = concurrency_entity( + target=receive_policy, + args=(cfg, parameters_queue, shutdown_event, grpc_channel), + daemon=True, + ) + + transitions_process = concurrency_entity( + target=send_transitions, + args=(cfg, transitions_queue, shutdown_event, grpc_channel), + daemon=True, + ) + + interactions_process = concurrency_entity( + target=send_interactions, + args=(cfg, interactions_queue, shutdown_event, grpc_channel), + daemon=True, + ) + + transitions_process.start() + interactions_process.start() + receive_policy_process.start() + + # HACK: FOR MANISKILL we do not have a reward classifier + # TODO: Remove this once we merge into main + reward_classifier = None + # if ( + # cfg.env.reward_classifier["pretrained_path"] is not None + # and cfg.env.reward_classifier["config_path"] is not None + # ): + # reward_classifier = get_classifier( + # pretrained_path=cfg.env.reward_classifier["pretrained_path"], + # config_path=cfg.env.reward_classifier["config_path"], + # ) + + act_with_policy( + cfg=cfg, + reward_classifier=reward_classifier, + shutdown_event=shutdown_event, + parameters_queue=parameters_queue, + transitions_queue=transitions_queue, + interactions_queue=interactions_queue, + ) + logging.info("[ACTOR] Policy process joined") + + logging.info("[ACTOR] Closing queues") + transitions_queue.close() + interactions_queue.close() + parameters_queue.close() + + transitions_process.join() + logging.info("[ACTOR] Transitions process joined") + interactions_process.join() + logging.info("[ACTOR] Interactions process joined") + receive_policy_process.join() + logging.info("[ACTOR] Receive policy process joined") + + logging.info("[ACTOR] join queues") + transitions_queue.cancel_join_thread() + interactions_queue.cancel_join_thread() + parameters_queue.cancel_join_thread() + + logging.info("[ACTOR] queues closed") -def update_policy_parameters(policy: SACPolicy, parameters_queue: Queue, device): - if not parameters_queue.empty(): - logging.info("[ACTOR] Load new parameters from Learner.") - bytes_state_dict = get_last_item_from_queue(parameters_queue) - state_dict = bytes_to_state_dict(bytes_state_dict) - state_dict = move_state_dict_to_device(state_dict, device=device) - policy.load_state_dict(state_dict) +################################################# +# Core algorithm functions # +################################################# def act_with_policy( cfg: TrainPipelineConfig, - # robot: Robot, reward_classifier: nn.Module, shutdown_event: any, # Event, parameters_queue: Queue, @@ -309,7 +196,12 @@ def act_with_policy( Once an episode is completed, updated network parameters received from the learner are retrieved from a queue and loaded into the network. Args: - cfg (DictConfig): Configuration settings for the interaction process. + cfg: Configuration settings for the interaction process. + reward_classifier: Reward classifier to use for the interaction process. + shutdown_event: Event to check if the process should shutdown. + parameters_queue: Queue to receive updated network parameters from the learner. + transitions_queue: Queue to send transitions to the learner. + interactions_queue: Queue to send interactions to the learner. """ # Initialize logging for multiprocessing if not use_threads(cfg): @@ -317,7 +209,7 @@ def act_with_policy( os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, f"actor_policy_{os.getpid()}.log") init_logging(log_file=log_file) - logging.info(f"Actor policy process logging initialized") + logging.info("Actor policy process logging initialized") logging.info("make_env online") @@ -462,6 +354,278 @@ def act_with_policy( busy_wait(1 / cfg.env.fps - dt_time) +################################################# +# Communication Functions - Group all gRPC/messaging functions # +################################################# + + +def establish_learner_connection( + stub, + shutdown_event: any, # Event, + attempts=30, +): + for _ in range(attempts): + if shutdown_event.is_set(): + logging.info("[ACTOR] Shutting down establish_learner_connection") + return False + + # Force a connection attempt and check state + try: + logging.info("[ACTOR] Send ready message to Learner") + if stub.Ready(hilserl_pb2.Empty()) == hilserl_pb2.Empty(): + return True + except grpc.RpcError as e: + logging.error(f"[ACTOR] Waiting for Learner to be ready... {e}") + time.sleep(2) + return False + + +@lru_cache(maxsize=1) +def learner_service_client( + host="127.0.0.1", port=50051 +) -> tuple[hilserl_pb2_grpc.LearnerServiceStub, grpc.Channel]: + import json + + """ + Returns a client for the learner service. + + GRPC uses HTTP/2, which is a binary protocol and multiplexes requests over a single connection. + So we need to create only one client and reuse it. + """ + + service_config = { + "methodConfig": [ + { + "name": [{}], # Applies to ALL methods in ALL services + "retryPolicy": { + "maxAttempts": 5, # Max retries (total attempts = 5) + "initialBackoff": "0.1s", # First retry after 0.1s + "maxBackoff": "2s", # Max wait time between retries + "backoffMultiplier": 2, # Exponential backoff factor + "retryableStatusCodes": [ + "UNAVAILABLE", + "DEADLINE_EXCEEDED", + ], # Retries on network failures + }, + } + ] + } + + service_config_json = json.dumps(service_config) + + channel = grpc.insecure_channel( + f"{host}:{port}", + options=[ + ("grpc.max_receive_message_length", learner_service.MAX_MESSAGE_SIZE), + ("grpc.max_send_message_length", learner_service.MAX_MESSAGE_SIZE), + ("grpc.enable_retries", 1), + ("grpc.service_config", service_config_json), + ], + ) + stub = hilserl_pb2_grpc.LearnerServiceStub(channel) + logging.info("[ACTOR] Learner service client created") + return stub, channel + + +def receive_policy( + cfg: TrainPipelineConfig, + parameters_queue: Queue, + shutdown_event: any, # Event, + learner_client: hilserl_pb2_grpc.LearnerServiceStub | None = None, + grpc_channel: grpc.Channel | None = None, +): + logging.info("[ACTOR] Start receiving parameters from the Learner") + + if not use_threads(cfg): + # Create a process-specific log file + log_dir = os.path.join(cfg.output_dir, "logs") + os.makedirs(log_dir, exist_ok=True) + log_file = os.path.join(log_dir, f"actor_receive_policy_{os.getpid()}.log") + + # Initialize logging with explicit log file + init_logging(log_file=log_file) + logging.info(f"Actor receive policy process logging initialized") + + # Setup process handlers to handle shutdown signal + # But use shutdown event from the main process + setup_process_handlers(use_threads=False) + + if grpc_channel is None or learner_client is None: + learner_client, grpc_channel = learner_service_client( + host=cfg.policy.actor_learner_config.learner_host, + port=cfg.policy.actor_learner_config.learner_port, + ) + + try: + iterator = learner_client.StreamParameters(hilserl_pb2.Empty()) + receive_bytes_in_chunks( + iterator, + parameters_queue, + shutdown_event, + log_prefix="[ACTOR] parameters", + ) + + except grpc.RpcError as e: + logging.error(f"[ACTOR] gRPC error: {e}") + + if not use_threads(cfg): + grpc_channel.close() + logging.info("[ACTOR] Received policy loop stopped") + + +def send_transitions( + cfg: TrainPipelineConfig, + transitions_queue: Queue, + shutdown_event: any, # Event, + learner_client: hilserl_pb2_grpc.LearnerServiceStub | None = None, + grpc_channel: grpc.Channel | None = None, +) -> hilserl_pb2.Empty: + """ + Sends transitions to the learner. + + This function continuously retrieves messages from the queue and processes: + + - **Transition Data:** + - A batch of transitions (observation, action, reward, next observation) is collected. + - Transitions are moved to the CPU and serialized using PyTorch. + - The serialized data is wrapped in a `hilserl_pb2.Transition` message and sent to the learner. + """ + + if not use_threads(cfg): + # Create a process-specific log file + log_dir = os.path.join(cfg.output_dir, "logs") + os.makedirs(log_dir, exist_ok=True) + log_file = os.path.join(log_dir, f"actor_transitions_{os.getpid()}.log") + + # Initialize logging with explicit log file + init_logging(log_file=log_file) + logging.info("Actor transitions process logging initialized") + + # Setup process handlers to handle shutdown signal + # But use shutdown event from the main process + setup_process_handlers(False) + + if grpc_channel is None or learner_client is None: + learner_client, grpc_channel = learner_service_client( + host=cfg.policy.actor_learner_config.learner_host, + port=cfg.policy.actor_learner_config.learner_port, + ) + + try: + learner_client.SendTransitions(transitions_stream(shutdown_event, transitions_queue)) + except grpc.RpcError as e: + logging.error(f"[ACTOR] gRPC error: {e}") + + logging.info("[ACTOR] Finished streaming transitions") + + if not use_threads(cfg): + grpc_channel.close() + logging.info("[ACTOR] Transitions process stopped") + + +def send_interactions( + cfg: TrainPipelineConfig, + interactions_queue: Queue, + shutdown_event: any, # Event, + learner_client: hilserl_pb2_grpc.LearnerServiceStub | None = None, + grpc_channel: grpc.Channel | None = None, +) -> hilserl_pb2.Empty: + """ + Sends interactions to the learner. + + This function continuously retrieves messages from the queue and processes: + + - **Interaction Messages:** + - Contains useful statistics about episodic rewards and policy timings. + - The message is serialized using `pickle` and sent to the learner. + """ + + if not use_threads(cfg): + # Create a process-specific log file + log_dir = os.path.join(cfg.output_dir, "logs") + os.makedirs(log_dir, exist_ok=True) + log_file = os.path.join(log_dir, f"actor_interactions_{os.getpid()}.log") + + # Initialize logging with explicit log file + init_logging(log_file=log_file) + logging.info("Actor interactions process logging initialized") + + # Setup process handlers to handle shutdown signal + # But use shutdown event from the main process + setup_process_handlers(False) + + if grpc_channel is None or learner_client is None: + learner_client, grpc_channel = learner_service_client( + host=cfg.policy.actor_learner_config.learner_host, + port=cfg.policy.actor_learner_config.learner_port, + ) + + try: + learner_client.SendInteractions(interactions_stream(shutdown_event, interactions_queue)) + except grpc.RpcError as e: + logging.error(f"[ACTOR] gRPC error: {e}") + + logging.info("[ACTOR] Finished streaming interactions") + + if not use_threads(cfg): + grpc_channel.close() + logging.info("[ACTOR] Interactions process stopped") + + +def transitions_stream(shutdown_event: Event, transitions_queue: Queue) -> hilserl_pb2.Empty: + while not shutdown_event.is_set(): + try: + message = transitions_queue.get(block=True, timeout=5) + except Empty: + logging.debug("[ACTOR] Transition queue is empty") + continue + + yield from send_bytes_in_chunks( + message, hilserl_pb2.Transition, log_prefix="[ACTOR] Send transitions" + ) + + return hilserl_pb2.Empty() + + +def interactions_stream( + shutdown_event: any, # Event, + interactions_queue: Queue, +) -> hilserl_pb2.Empty: + while not shutdown_event.is_set(): + try: + message = interactions_queue.get(block=True, timeout=5) + except Empty: + logging.debug("[ACTOR] Interaction queue is empty") + continue + + yield from send_bytes_in_chunks( + message, + hilserl_pb2.InteractionMessage, + log_prefix="[ACTOR] Send interactions", + ) + + return hilserl_pb2.Empty() + + +################################################# +# Policy functions # +################################################# + + +def update_policy_parameters(policy: SACPolicy, parameters_queue: Queue, device): + if not parameters_queue.empty(): + logging.info("[ACTOR] Load new parameters from Learner.") + bytes_state_dict = get_last_item_from_queue(parameters_queue) + state_dict = bytes_to_state_dict(bytes_state_dict) + state_dict = move_state_dict_to_device(state_dict, device=device) + policy.load_state_dict(state_dict) + + +################################################# +# Utilities functions # +################################################# + + def push_transitions_to_transport_queue(transitions: list, transitions_queue): """Send transitions to learner in smaller chunks to avoid network issues. @@ -504,144 +668,9 @@ def log_policy_frequency_issue(policy_fps: float, cfg: TrainPipelineConfig, inte ) -def establish_learner_connection( - stub, - shutdown_event: any, # Event, - attempts=30, -): - for _ in range(attempts): - if shutdown_event.is_set(): - logging.info("[ACTOR] Shutting down establish_learner_connection") - return False - - # Force a connection attempt and check state - try: - logging.info("[ACTOR] Send ready message to Learner") - if stub.Ready(hilserl_pb2.Empty()) == hilserl_pb2.Empty(): - return True - except grpc.RpcError as e: - logging.error(f"[ACTOR] Waiting for Learner to be ready... {e}") - time.sleep(2) - return False - - def use_threads(cfg: TrainPipelineConfig) -> bool: return cfg.policy.concurrency.actor == "threads" -@parser.wrap() -def actor_cli(cfg: TrainPipelineConfig): - cfg.validate() - if not use_threads(cfg): - import torch.multiprocessing as mp - - mp.set_start_method("spawn") - - # Create logs directory to ensure it exists - log_dir = os.path.join(cfg.output_dir, "logs") - os.makedirs(log_dir, exist_ok=True) - log_file = os.path.join(log_dir, f"actor_{cfg.job_name}.log") - - # Initialize logging with explicit log file - init_logging(log_file=log_file) - logging.info(f"Actor logging initialized, writing to {log_file}") - - shutdown_event = setup_process_handlers(use_threads(cfg)) - - learner_client, grpc_channel = learner_service_client( - host=cfg.policy.actor_learner_config.learner_host, - port=cfg.policy.actor_learner_config.learner_port, - ) - - logging.info("[ACTOR] Establishing connection with Learner") - if not establish_learner_connection(learner_client, shutdown_event): - logging.error("[ACTOR] Failed to establish connection with Learner") - return - - if not use_threads(cfg): - # If we use multithreading, we can reuse the channel - grpc_channel.close() - grpc_channel = None - - logging.info("[ACTOR] Connection with Learner established") - - parameters_queue = Queue() - transitions_queue = Queue() - interactions_queue = Queue() - - concurrency_entity = None - if use_threads(cfg): - from threading import Thread - - concurrency_entity = Thread - else: - from multiprocessing import Process - - concurrency_entity = Process - - receive_policy_process = concurrency_entity( - target=receive_policy, - args=(cfg, parameters_queue, shutdown_event, grpc_channel), - daemon=True, - ) - - transitions_process = concurrency_entity( - target=send_transitions, - args=(cfg, transitions_queue, shutdown_event, grpc_channel), - daemon=True, - ) - - interactions_process = concurrency_entity( - target=send_interactions, - args=(cfg, interactions_queue, shutdown_event, grpc_channel), - daemon=True, - ) - - transitions_process.start() - interactions_process.start() - receive_policy_process.start() - - # HACK: FOR MANISKILL we do not have a reward classifier - # TODO: Remove this once we merge into main - reward_classifier = None - # if ( - # cfg.env.reward_classifier["pretrained_path"] is not None - # and cfg.env.reward_classifier["config_path"] is not None - # ): - # reward_classifier = get_classifier( - # pretrained_path=cfg.env.reward_classifier["pretrained_path"], - # config_path=cfg.env.reward_classifier["config_path"], - # ) - - act_with_policy( - cfg=cfg, - reward_classifier=reward_classifier, - shutdown_event=shutdown_event, - parameters_queue=parameters_queue, - transitions_queue=transitions_queue, - interactions_queue=interactions_queue, - ) - logging.info("[ACTOR] Policy process joined") - - logging.info("[ACTOR] Closing queues") - transitions_queue.close() - interactions_queue.close() - parameters_queue.close() - - transitions_process.join() - logging.info("[ACTOR] Transitions process joined") - interactions_process.join() - logging.info("[ACTOR] Interactions process joined") - receive_policy_process.join() - logging.info("[ACTOR] Receive policy process joined") - - logging.info("[ACTOR] join queues") - transitions_queue.cancel_join_thread() - interactions_queue.cancel_join_thread() - parameters_queue.cancel_join_thread() - - logging.info("[ACTOR] queues closed") - - if __name__ == "__main__": actor_cli()