Refactor actor_server.py for improved structure and logging
- Consolidated logging initialization and enhanced logging for actor processes. - Streamlined the handling of gRPC connections and process management. - Improved readability by organizing core algorithm functions and communication functions. - Added detailed comments and documentation for clarity. - Ensured proper queue management and shutdown handling for actor processes.
This commit is contained in:
parent
176557d770
commit
eb710647bf
|
@ -14,11 +14,11 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
import time
|
import time
|
||||||
from functools import lru_cache
|
from functools import lru_cache
|
||||||
from queue import Empty
|
from queue import Empty
|
||||||
from statistics import mean, quantiles
|
from statistics import mean, quantiles
|
||||||
import os
|
|
||||||
|
|
||||||
# from lerobot.scripts.eval import eval_policy
|
# from lerobot.scripts.eval import eval_policy
|
||||||
import grpc
|
import grpc
|
||||||
|
@ -57,245 +57,132 @@ from lerobot.scripts.server.utils import get_last_item_from_queue, setup_process
|
||||||
ACTOR_SHUTDOWN_TIMEOUT = 30
|
ACTOR_SHUTDOWN_TIMEOUT = 30
|
||||||
|
|
||||||
|
|
||||||
def receive_policy(
|
#################################################
|
||||||
cfg: TrainPipelineConfig,
|
# Main entry point #
|
||||||
parameters_queue: Queue,
|
#################################################
|
||||||
shutdown_event: any, # Event,
|
|
||||||
learner_client: hilserl_pb2_grpc.LearnerServiceStub | None = None,
|
|
||||||
grpc_channel: grpc.Channel | None = None,
|
|
||||||
):
|
|
||||||
logging.info("[ACTOR] Start receiving parameters from the Learner")
|
|
||||||
|
|
||||||
|
|
||||||
|
@parser.wrap()
|
||||||
|
def actor_cli(cfg: TrainPipelineConfig):
|
||||||
|
cfg.validate()
|
||||||
if not use_threads(cfg):
|
if not use_threads(cfg):
|
||||||
# Create a process-specific log file
|
import torch.multiprocessing as mp
|
||||||
|
|
||||||
|
mp.set_start_method("spawn")
|
||||||
|
|
||||||
|
# Create logs directory to ensure it exists
|
||||||
log_dir = os.path.join(cfg.output_dir, "logs")
|
log_dir = os.path.join(cfg.output_dir, "logs")
|
||||||
os.makedirs(log_dir, exist_ok=True)
|
os.makedirs(log_dir, exist_ok=True)
|
||||||
log_file = os.path.join(log_dir, f"actor_receive_policy_{os.getpid()}.log")
|
log_file = os.path.join(log_dir, f"actor_{cfg.job_name}.log")
|
||||||
|
|
||||||
# Initialize logging with explicit log file
|
# Initialize logging with explicit log file
|
||||||
init_logging(log_file=log_file)
|
init_logging(log_file=log_file)
|
||||||
logging.info(f"Actor receive policy process logging initialized")
|
logging.info(f"Actor logging initialized, writing to {log_file}")
|
||||||
|
|
||||||
# Setup process handlers to handle shutdown signal
|
shutdown_event = setup_process_handlers(use_threads(cfg))
|
||||||
# But use shutdown event from the main process
|
|
||||||
setup_process_handlers(use_threads=False)
|
|
||||||
|
|
||||||
if grpc_channel is None or learner_client is None:
|
|
||||||
learner_client, grpc_channel = learner_service_client(
|
learner_client, grpc_channel = learner_service_client(
|
||||||
host=cfg.policy.actor_learner_config.learner_host,
|
host=cfg.policy.actor_learner_config.learner_host,
|
||||||
port=cfg.policy.actor_learner_config.learner_port,
|
port=cfg.policy.actor_learner_config.learner_port,
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
logging.info("[ACTOR] Establishing connection with Learner")
|
||||||
iterator = learner_client.StreamParameters(hilserl_pb2.Empty())
|
if not establish_learner_connection(learner_client, shutdown_event):
|
||||||
receive_bytes_in_chunks(
|
logging.error("[ACTOR] Failed to establish connection with Learner")
|
||||||
iterator,
|
return
|
||||||
parameters_queue,
|
|
||||||
shutdown_event,
|
|
||||||
log_prefix="[ACTOR] parameters",
|
|
||||||
)
|
|
||||||
|
|
||||||
except grpc.RpcError as e:
|
|
||||||
logging.error(f"[ACTOR] gRPC error: {e}")
|
|
||||||
|
|
||||||
if not use_threads(cfg):
|
if not use_threads(cfg):
|
||||||
|
# If we use multithreading, we can reuse the channel
|
||||||
grpc_channel.close()
|
grpc_channel.close()
|
||||||
logging.info("[ACTOR] Received policy loop stopped")
|
grpc_channel = None
|
||||||
|
|
||||||
|
logging.info("[ACTOR] Connection with Learner established")
|
||||||
|
|
||||||
def transitions_stream(shutdown_event: Event, transitions_queue: Queue) -> hilserl_pb2.Empty:
|
parameters_queue = Queue()
|
||||||
while not shutdown_event.is_set():
|
transitions_queue = Queue()
|
||||||
try:
|
interactions_queue = Queue()
|
||||||
message = transitions_queue.get(block=True, timeout=5)
|
|
||||||
except Empty:
|
|
||||||
logging.debug("[ACTOR] Transition queue is empty")
|
|
||||||
continue
|
|
||||||
|
|
||||||
yield from send_bytes_in_chunks(
|
concurrency_entity = None
|
||||||
message, hilserl_pb2.Transition, log_prefix="[ACTOR] Send transitions"
|
if use_threads(cfg):
|
||||||
|
from threading import Thread
|
||||||
|
|
||||||
|
concurrency_entity = Thread
|
||||||
|
else:
|
||||||
|
from multiprocessing import Process
|
||||||
|
|
||||||
|
concurrency_entity = Process
|
||||||
|
|
||||||
|
receive_policy_process = concurrency_entity(
|
||||||
|
target=receive_policy,
|
||||||
|
args=(cfg, parameters_queue, shutdown_event, grpc_channel),
|
||||||
|
daemon=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
return hilserl_pb2.Empty()
|
transitions_process = concurrency_entity(
|
||||||
|
target=send_transitions,
|
||||||
|
args=(cfg, transitions_queue, shutdown_event, grpc_channel),
|
||||||
def interactions_stream(
|
daemon=True,
|
||||||
shutdown_event: any, # Event,
|
|
||||||
interactions_queue: Queue,
|
|
||||||
) -> hilserl_pb2.Empty:
|
|
||||||
while not shutdown_event.is_set():
|
|
||||||
try:
|
|
||||||
message = interactions_queue.get(block=True, timeout=5)
|
|
||||||
except Empty:
|
|
||||||
logging.debug("[ACTOR] Interaction queue is empty")
|
|
||||||
continue
|
|
||||||
|
|
||||||
yield from send_bytes_in_chunks(
|
|
||||||
message,
|
|
||||||
hilserl_pb2.InteractionMessage,
|
|
||||||
log_prefix="[ACTOR] Send interactions",
|
|
||||||
)
|
)
|
||||||
|
|
||||||
return hilserl_pb2.Empty()
|
interactions_process = concurrency_entity(
|
||||||
|
target=send_interactions,
|
||||||
|
args=(cfg, interactions_queue, shutdown_event, grpc_channel),
|
||||||
def send_transitions(
|
daemon=True,
|
||||||
cfg: TrainPipelineConfig,
|
|
||||||
transitions_queue: Queue,
|
|
||||||
shutdown_event: any, # Event,
|
|
||||||
learner_client: hilserl_pb2_grpc.LearnerServiceStub | None = None,
|
|
||||||
grpc_channel: grpc.Channel | None = None,
|
|
||||||
) -> hilserl_pb2.Empty:
|
|
||||||
"""
|
|
||||||
Sends transitions to the learner.
|
|
||||||
|
|
||||||
This function continuously retrieves messages from the queue and processes:
|
|
||||||
|
|
||||||
- **Transition Data:**
|
|
||||||
- A batch of transitions (observation, action, reward, next observation) is collected.
|
|
||||||
- Transitions are moved to the CPU and serialized using PyTorch.
|
|
||||||
- The serialized data is wrapped in a `hilserl_pb2.Transition` message and sent to the learner.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if not use_threads(cfg):
|
|
||||||
# Create a process-specific log file
|
|
||||||
log_dir = os.path.join(cfg.output_dir, "logs")
|
|
||||||
os.makedirs(log_dir, exist_ok=True)
|
|
||||||
log_file = os.path.join(log_dir, f"actor_transitions_{os.getpid()}.log")
|
|
||||||
|
|
||||||
# Initialize logging with explicit log file
|
|
||||||
init_logging(log_file=log_file)
|
|
||||||
logging.info(f"Actor transitions process logging initialized")
|
|
||||||
|
|
||||||
# Setup process handlers to handle shutdown signal
|
|
||||||
# But use shutdown event from the main process
|
|
||||||
setup_process_handlers(False)
|
|
||||||
|
|
||||||
if grpc_channel is None or learner_client is None:
|
|
||||||
learner_client, grpc_channel = learner_service_client(
|
|
||||||
host=cfg.policy.actor_learner_config.learner_host,
|
|
||||||
port=cfg.policy.actor_learner_config.learner_port,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
transitions_process.start()
|
||||||
learner_client.SendTransitions(transitions_stream(shutdown_event, transitions_queue))
|
interactions_process.start()
|
||||||
except grpc.RpcError as e:
|
receive_policy_process.start()
|
||||||
logging.error(f"[ACTOR] gRPC error: {e}")
|
|
||||||
|
|
||||||
logging.info("[ACTOR] Finished streaming transitions")
|
# HACK: FOR MANISKILL we do not have a reward classifier
|
||||||
|
# TODO: Remove this once we merge into main
|
||||||
|
reward_classifier = None
|
||||||
|
# if (
|
||||||
|
# cfg.env.reward_classifier["pretrained_path"] is not None
|
||||||
|
# and cfg.env.reward_classifier["config_path"] is not None
|
||||||
|
# ):
|
||||||
|
# reward_classifier = get_classifier(
|
||||||
|
# pretrained_path=cfg.env.reward_classifier["pretrained_path"],
|
||||||
|
# config_path=cfg.env.reward_classifier["config_path"],
|
||||||
|
# )
|
||||||
|
|
||||||
if not use_threads(cfg):
|
act_with_policy(
|
||||||
grpc_channel.close()
|
cfg=cfg,
|
||||||
logging.info("[ACTOR] Transitions process stopped")
|
reward_classifier=reward_classifier,
|
||||||
|
shutdown_event=shutdown_event,
|
||||||
|
parameters_queue=parameters_queue,
|
||||||
def send_interactions(
|
transitions_queue=transitions_queue,
|
||||||
cfg: TrainPipelineConfig,
|
interactions_queue=interactions_queue,
|
||||||
interactions_queue: Queue,
|
|
||||||
shutdown_event: any, # Event,
|
|
||||||
learner_client: hilserl_pb2_grpc.LearnerServiceStub | None = None,
|
|
||||||
grpc_channel: grpc.Channel | None = None,
|
|
||||||
) -> hilserl_pb2.Empty:
|
|
||||||
"""
|
|
||||||
Sends interactions to the learner.
|
|
||||||
|
|
||||||
This function continuously retrieves messages from the queue and processes:
|
|
||||||
|
|
||||||
- **Interaction Messages:**
|
|
||||||
- Contains useful statistics about episodic rewards and policy timings.
|
|
||||||
- The message is serialized using `pickle` and sent to the learner.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if not use_threads(cfg):
|
|
||||||
# Create a process-specific log file
|
|
||||||
log_dir = os.path.join(cfg.output_dir, "logs")
|
|
||||||
os.makedirs(log_dir, exist_ok=True)
|
|
||||||
log_file = os.path.join(log_dir, f"actor_interactions_{os.getpid()}.log")
|
|
||||||
|
|
||||||
# Initialize logging with explicit log file
|
|
||||||
init_logging(log_file=log_file)
|
|
||||||
logging.info(f"Actor interactions process logging initialized")
|
|
||||||
|
|
||||||
# Setup process handlers to handle shutdown signal
|
|
||||||
# But use shutdown event from the main process
|
|
||||||
setup_process_handlers(False)
|
|
||||||
|
|
||||||
if grpc_channel is None or learner_client is None:
|
|
||||||
learner_client, grpc_channel = learner_service_client(
|
|
||||||
host=cfg.policy.actor_learner_config.learner_host,
|
|
||||||
port=cfg.policy.actor_learner_config.learner_port,
|
|
||||||
)
|
)
|
||||||
|
logging.info("[ACTOR] Policy process joined")
|
||||||
|
|
||||||
try:
|
logging.info("[ACTOR] Closing queues")
|
||||||
learner_client.SendInteractions(interactions_stream(shutdown_event, interactions_queue))
|
transitions_queue.close()
|
||||||
except grpc.RpcError as e:
|
interactions_queue.close()
|
||||||
logging.error(f"[ACTOR] gRPC error: {e}")
|
parameters_queue.close()
|
||||||
|
|
||||||
logging.info("[ACTOR] Finished streaming interactions")
|
transitions_process.join()
|
||||||
|
logging.info("[ACTOR] Transitions process joined")
|
||||||
|
interactions_process.join()
|
||||||
|
logging.info("[ACTOR] Interactions process joined")
|
||||||
|
receive_policy_process.join()
|
||||||
|
logging.info("[ACTOR] Receive policy process joined")
|
||||||
|
|
||||||
if not use_threads(cfg):
|
logging.info("[ACTOR] join queues")
|
||||||
grpc_channel.close()
|
transitions_queue.cancel_join_thread()
|
||||||
logging.info("[ACTOR] Interactions process stopped")
|
interactions_queue.cancel_join_thread()
|
||||||
|
parameters_queue.cancel_join_thread()
|
||||||
|
|
||||||
|
logging.info("[ACTOR] queues closed")
|
||||||
|
|
||||||
|
|
||||||
@lru_cache(maxsize=1)
|
#################################################
|
||||||
def learner_service_client(
|
# Core algorithm functions #
|
||||||
host="127.0.0.1", port=50051
|
#################################################
|
||||||
) -> tuple[hilserl_pb2_grpc.LearnerServiceStub, grpc.Channel]:
|
|
||||||
import json
|
|
||||||
|
|
||||||
"""
|
|
||||||
Returns a client for the learner service.
|
|
||||||
|
|
||||||
GRPC uses HTTP/2, which is a binary protocol and multiplexes requests over a single connection.
|
|
||||||
So we need to create only one client and reuse it.
|
|
||||||
"""
|
|
||||||
|
|
||||||
service_config = {
|
|
||||||
"methodConfig": [
|
|
||||||
{
|
|
||||||
"name": [{}], # Applies to ALL methods in ALL services
|
|
||||||
"retryPolicy": {
|
|
||||||
"maxAttempts": 5, # Max retries (total attempts = 5)
|
|
||||||
"initialBackoff": "0.1s", # First retry after 0.1s
|
|
||||||
"maxBackoff": "2s", # Max wait time between retries
|
|
||||||
"backoffMultiplier": 2, # Exponential backoff factor
|
|
||||||
"retryableStatusCodes": [
|
|
||||||
"UNAVAILABLE",
|
|
||||||
"DEADLINE_EXCEEDED",
|
|
||||||
], # Retries on network failures
|
|
||||||
},
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
|
|
||||||
service_config_json = json.dumps(service_config)
|
|
||||||
|
|
||||||
channel = grpc.insecure_channel(
|
|
||||||
f"{host}:{port}",
|
|
||||||
options=[
|
|
||||||
("grpc.max_receive_message_length", learner_service.MAX_MESSAGE_SIZE),
|
|
||||||
("grpc.max_send_message_length", learner_service.MAX_MESSAGE_SIZE),
|
|
||||||
("grpc.enable_retries", 1),
|
|
||||||
("grpc.service_config", service_config_json),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
stub = hilserl_pb2_grpc.LearnerServiceStub(channel)
|
|
||||||
logging.info("[ACTOR] Learner service client created")
|
|
||||||
return stub, channel
|
|
||||||
|
|
||||||
|
|
||||||
def update_policy_parameters(policy: SACPolicy, parameters_queue: Queue, device):
|
|
||||||
if not parameters_queue.empty():
|
|
||||||
logging.info("[ACTOR] Load new parameters from Learner.")
|
|
||||||
bytes_state_dict = get_last_item_from_queue(parameters_queue)
|
|
||||||
state_dict = bytes_to_state_dict(bytes_state_dict)
|
|
||||||
state_dict = move_state_dict_to_device(state_dict, device=device)
|
|
||||||
policy.load_state_dict(state_dict)
|
|
||||||
|
|
||||||
|
|
||||||
def act_with_policy(
|
def act_with_policy(
|
||||||
cfg: TrainPipelineConfig,
|
cfg: TrainPipelineConfig,
|
||||||
# robot: Robot,
|
|
||||||
reward_classifier: nn.Module,
|
reward_classifier: nn.Module,
|
||||||
shutdown_event: any, # Event,
|
shutdown_event: any, # Event,
|
||||||
parameters_queue: Queue,
|
parameters_queue: Queue,
|
||||||
|
@ -309,7 +196,12 @@ def act_with_policy(
|
||||||
Once an episode is completed, updated network parameters received from the learner are retrieved from a queue and loaded into the network.
|
Once an episode is completed, updated network parameters received from the learner are retrieved from a queue and loaded into the network.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
cfg (DictConfig): Configuration settings for the interaction process.
|
cfg: Configuration settings for the interaction process.
|
||||||
|
reward_classifier: Reward classifier to use for the interaction process.
|
||||||
|
shutdown_event: Event to check if the process should shutdown.
|
||||||
|
parameters_queue: Queue to receive updated network parameters from the learner.
|
||||||
|
transitions_queue: Queue to send transitions to the learner.
|
||||||
|
interactions_queue: Queue to send interactions to the learner.
|
||||||
"""
|
"""
|
||||||
# Initialize logging for multiprocessing
|
# Initialize logging for multiprocessing
|
||||||
if not use_threads(cfg):
|
if not use_threads(cfg):
|
||||||
|
@ -317,7 +209,7 @@ def act_with_policy(
|
||||||
os.makedirs(log_dir, exist_ok=True)
|
os.makedirs(log_dir, exist_ok=True)
|
||||||
log_file = os.path.join(log_dir, f"actor_policy_{os.getpid()}.log")
|
log_file = os.path.join(log_dir, f"actor_policy_{os.getpid()}.log")
|
||||||
init_logging(log_file=log_file)
|
init_logging(log_file=log_file)
|
||||||
logging.info(f"Actor policy process logging initialized")
|
logging.info("Actor policy process logging initialized")
|
||||||
|
|
||||||
logging.info("make_env online")
|
logging.info("make_env online")
|
||||||
|
|
||||||
|
@ -462,6 +354,278 @@ def act_with_policy(
|
||||||
busy_wait(1 / cfg.env.fps - dt_time)
|
busy_wait(1 / cfg.env.fps - dt_time)
|
||||||
|
|
||||||
|
|
||||||
|
#################################################
|
||||||
|
# Communication Functions - Group all gRPC/messaging functions #
|
||||||
|
#################################################
|
||||||
|
|
||||||
|
|
||||||
|
def establish_learner_connection(
|
||||||
|
stub,
|
||||||
|
shutdown_event: any, # Event,
|
||||||
|
attempts=30,
|
||||||
|
):
|
||||||
|
for _ in range(attempts):
|
||||||
|
if shutdown_event.is_set():
|
||||||
|
logging.info("[ACTOR] Shutting down establish_learner_connection")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Force a connection attempt and check state
|
||||||
|
try:
|
||||||
|
logging.info("[ACTOR] Send ready message to Learner")
|
||||||
|
if stub.Ready(hilserl_pb2.Empty()) == hilserl_pb2.Empty():
|
||||||
|
return True
|
||||||
|
except grpc.RpcError as e:
|
||||||
|
logging.error(f"[ACTOR] Waiting for Learner to be ready... {e}")
|
||||||
|
time.sleep(2)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
@lru_cache(maxsize=1)
|
||||||
|
def learner_service_client(
|
||||||
|
host="127.0.0.1", port=50051
|
||||||
|
) -> tuple[hilserl_pb2_grpc.LearnerServiceStub, grpc.Channel]:
|
||||||
|
import json
|
||||||
|
|
||||||
|
"""
|
||||||
|
Returns a client for the learner service.
|
||||||
|
|
||||||
|
GRPC uses HTTP/2, which is a binary protocol and multiplexes requests over a single connection.
|
||||||
|
So we need to create only one client and reuse it.
|
||||||
|
"""
|
||||||
|
|
||||||
|
service_config = {
|
||||||
|
"methodConfig": [
|
||||||
|
{
|
||||||
|
"name": [{}], # Applies to ALL methods in ALL services
|
||||||
|
"retryPolicy": {
|
||||||
|
"maxAttempts": 5, # Max retries (total attempts = 5)
|
||||||
|
"initialBackoff": "0.1s", # First retry after 0.1s
|
||||||
|
"maxBackoff": "2s", # Max wait time between retries
|
||||||
|
"backoffMultiplier": 2, # Exponential backoff factor
|
||||||
|
"retryableStatusCodes": [
|
||||||
|
"UNAVAILABLE",
|
||||||
|
"DEADLINE_EXCEEDED",
|
||||||
|
], # Retries on network failures
|
||||||
|
},
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
service_config_json = json.dumps(service_config)
|
||||||
|
|
||||||
|
channel = grpc.insecure_channel(
|
||||||
|
f"{host}:{port}",
|
||||||
|
options=[
|
||||||
|
("grpc.max_receive_message_length", learner_service.MAX_MESSAGE_SIZE),
|
||||||
|
("grpc.max_send_message_length", learner_service.MAX_MESSAGE_SIZE),
|
||||||
|
("grpc.enable_retries", 1),
|
||||||
|
("grpc.service_config", service_config_json),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
stub = hilserl_pb2_grpc.LearnerServiceStub(channel)
|
||||||
|
logging.info("[ACTOR] Learner service client created")
|
||||||
|
return stub, channel
|
||||||
|
|
||||||
|
|
||||||
|
def receive_policy(
|
||||||
|
cfg: TrainPipelineConfig,
|
||||||
|
parameters_queue: Queue,
|
||||||
|
shutdown_event: any, # Event,
|
||||||
|
learner_client: hilserl_pb2_grpc.LearnerServiceStub | None = None,
|
||||||
|
grpc_channel: grpc.Channel | None = None,
|
||||||
|
):
|
||||||
|
logging.info("[ACTOR] Start receiving parameters from the Learner")
|
||||||
|
|
||||||
|
if not use_threads(cfg):
|
||||||
|
# Create a process-specific log file
|
||||||
|
log_dir = os.path.join(cfg.output_dir, "logs")
|
||||||
|
os.makedirs(log_dir, exist_ok=True)
|
||||||
|
log_file = os.path.join(log_dir, f"actor_receive_policy_{os.getpid()}.log")
|
||||||
|
|
||||||
|
# Initialize logging with explicit log file
|
||||||
|
init_logging(log_file=log_file)
|
||||||
|
logging.info(f"Actor receive policy process logging initialized")
|
||||||
|
|
||||||
|
# Setup process handlers to handle shutdown signal
|
||||||
|
# But use shutdown event from the main process
|
||||||
|
setup_process_handlers(use_threads=False)
|
||||||
|
|
||||||
|
if grpc_channel is None or learner_client is None:
|
||||||
|
learner_client, grpc_channel = learner_service_client(
|
||||||
|
host=cfg.policy.actor_learner_config.learner_host,
|
||||||
|
port=cfg.policy.actor_learner_config.learner_port,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
iterator = learner_client.StreamParameters(hilserl_pb2.Empty())
|
||||||
|
receive_bytes_in_chunks(
|
||||||
|
iterator,
|
||||||
|
parameters_queue,
|
||||||
|
shutdown_event,
|
||||||
|
log_prefix="[ACTOR] parameters",
|
||||||
|
)
|
||||||
|
|
||||||
|
except grpc.RpcError as e:
|
||||||
|
logging.error(f"[ACTOR] gRPC error: {e}")
|
||||||
|
|
||||||
|
if not use_threads(cfg):
|
||||||
|
grpc_channel.close()
|
||||||
|
logging.info("[ACTOR] Received policy loop stopped")
|
||||||
|
|
||||||
|
|
||||||
|
def send_transitions(
|
||||||
|
cfg: TrainPipelineConfig,
|
||||||
|
transitions_queue: Queue,
|
||||||
|
shutdown_event: any, # Event,
|
||||||
|
learner_client: hilserl_pb2_grpc.LearnerServiceStub | None = None,
|
||||||
|
grpc_channel: grpc.Channel | None = None,
|
||||||
|
) -> hilserl_pb2.Empty:
|
||||||
|
"""
|
||||||
|
Sends transitions to the learner.
|
||||||
|
|
||||||
|
This function continuously retrieves messages from the queue and processes:
|
||||||
|
|
||||||
|
- **Transition Data:**
|
||||||
|
- A batch of transitions (observation, action, reward, next observation) is collected.
|
||||||
|
- Transitions are moved to the CPU and serialized using PyTorch.
|
||||||
|
- The serialized data is wrapped in a `hilserl_pb2.Transition` message and sent to the learner.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not use_threads(cfg):
|
||||||
|
# Create a process-specific log file
|
||||||
|
log_dir = os.path.join(cfg.output_dir, "logs")
|
||||||
|
os.makedirs(log_dir, exist_ok=True)
|
||||||
|
log_file = os.path.join(log_dir, f"actor_transitions_{os.getpid()}.log")
|
||||||
|
|
||||||
|
# Initialize logging with explicit log file
|
||||||
|
init_logging(log_file=log_file)
|
||||||
|
logging.info("Actor transitions process logging initialized")
|
||||||
|
|
||||||
|
# Setup process handlers to handle shutdown signal
|
||||||
|
# But use shutdown event from the main process
|
||||||
|
setup_process_handlers(False)
|
||||||
|
|
||||||
|
if grpc_channel is None or learner_client is None:
|
||||||
|
learner_client, grpc_channel = learner_service_client(
|
||||||
|
host=cfg.policy.actor_learner_config.learner_host,
|
||||||
|
port=cfg.policy.actor_learner_config.learner_port,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
learner_client.SendTransitions(transitions_stream(shutdown_event, transitions_queue))
|
||||||
|
except grpc.RpcError as e:
|
||||||
|
logging.error(f"[ACTOR] gRPC error: {e}")
|
||||||
|
|
||||||
|
logging.info("[ACTOR] Finished streaming transitions")
|
||||||
|
|
||||||
|
if not use_threads(cfg):
|
||||||
|
grpc_channel.close()
|
||||||
|
logging.info("[ACTOR] Transitions process stopped")
|
||||||
|
|
||||||
|
|
||||||
|
def send_interactions(
|
||||||
|
cfg: TrainPipelineConfig,
|
||||||
|
interactions_queue: Queue,
|
||||||
|
shutdown_event: any, # Event,
|
||||||
|
learner_client: hilserl_pb2_grpc.LearnerServiceStub | None = None,
|
||||||
|
grpc_channel: grpc.Channel | None = None,
|
||||||
|
) -> hilserl_pb2.Empty:
|
||||||
|
"""
|
||||||
|
Sends interactions to the learner.
|
||||||
|
|
||||||
|
This function continuously retrieves messages from the queue and processes:
|
||||||
|
|
||||||
|
- **Interaction Messages:**
|
||||||
|
- Contains useful statistics about episodic rewards and policy timings.
|
||||||
|
- The message is serialized using `pickle` and sent to the learner.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not use_threads(cfg):
|
||||||
|
# Create a process-specific log file
|
||||||
|
log_dir = os.path.join(cfg.output_dir, "logs")
|
||||||
|
os.makedirs(log_dir, exist_ok=True)
|
||||||
|
log_file = os.path.join(log_dir, f"actor_interactions_{os.getpid()}.log")
|
||||||
|
|
||||||
|
# Initialize logging with explicit log file
|
||||||
|
init_logging(log_file=log_file)
|
||||||
|
logging.info("Actor interactions process logging initialized")
|
||||||
|
|
||||||
|
# Setup process handlers to handle shutdown signal
|
||||||
|
# But use shutdown event from the main process
|
||||||
|
setup_process_handlers(False)
|
||||||
|
|
||||||
|
if grpc_channel is None or learner_client is None:
|
||||||
|
learner_client, grpc_channel = learner_service_client(
|
||||||
|
host=cfg.policy.actor_learner_config.learner_host,
|
||||||
|
port=cfg.policy.actor_learner_config.learner_port,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
learner_client.SendInteractions(interactions_stream(shutdown_event, interactions_queue))
|
||||||
|
except grpc.RpcError as e:
|
||||||
|
logging.error(f"[ACTOR] gRPC error: {e}")
|
||||||
|
|
||||||
|
logging.info("[ACTOR] Finished streaming interactions")
|
||||||
|
|
||||||
|
if not use_threads(cfg):
|
||||||
|
grpc_channel.close()
|
||||||
|
logging.info("[ACTOR] Interactions process stopped")
|
||||||
|
|
||||||
|
|
||||||
|
def transitions_stream(shutdown_event: Event, transitions_queue: Queue) -> hilserl_pb2.Empty:
|
||||||
|
while not shutdown_event.is_set():
|
||||||
|
try:
|
||||||
|
message = transitions_queue.get(block=True, timeout=5)
|
||||||
|
except Empty:
|
||||||
|
logging.debug("[ACTOR] Transition queue is empty")
|
||||||
|
continue
|
||||||
|
|
||||||
|
yield from send_bytes_in_chunks(
|
||||||
|
message, hilserl_pb2.Transition, log_prefix="[ACTOR] Send transitions"
|
||||||
|
)
|
||||||
|
|
||||||
|
return hilserl_pb2.Empty()
|
||||||
|
|
||||||
|
|
||||||
|
def interactions_stream(
|
||||||
|
shutdown_event: any, # Event,
|
||||||
|
interactions_queue: Queue,
|
||||||
|
) -> hilserl_pb2.Empty:
|
||||||
|
while not shutdown_event.is_set():
|
||||||
|
try:
|
||||||
|
message = interactions_queue.get(block=True, timeout=5)
|
||||||
|
except Empty:
|
||||||
|
logging.debug("[ACTOR] Interaction queue is empty")
|
||||||
|
continue
|
||||||
|
|
||||||
|
yield from send_bytes_in_chunks(
|
||||||
|
message,
|
||||||
|
hilserl_pb2.InteractionMessage,
|
||||||
|
log_prefix="[ACTOR] Send interactions",
|
||||||
|
)
|
||||||
|
|
||||||
|
return hilserl_pb2.Empty()
|
||||||
|
|
||||||
|
|
||||||
|
#################################################
|
||||||
|
# Policy functions #
|
||||||
|
#################################################
|
||||||
|
|
||||||
|
|
||||||
|
def update_policy_parameters(policy: SACPolicy, parameters_queue: Queue, device):
|
||||||
|
if not parameters_queue.empty():
|
||||||
|
logging.info("[ACTOR] Load new parameters from Learner.")
|
||||||
|
bytes_state_dict = get_last_item_from_queue(parameters_queue)
|
||||||
|
state_dict = bytes_to_state_dict(bytes_state_dict)
|
||||||
|
state_dict = move_state_dict_to_device(state_dict, device=device)
|
||||||
|
policy.load_state_dict(state_dict)
|
||||||
|
|
||||||
|
|
||||||
|
#################################################
|
||||||
|
# Utilities functions #
|
||||||
|
#################################################
|
||||||
|
|
||||||
|
|
||||||
def push_transitions_to_transport_queue(transitions: list, transitions_queue):
|
def push_transitions_to_transport_queue(transitions: list, transitions_queue):
|
||||||
"""Send transitions to learner in smaller chunks to avoid network issues.
|
"""Send transitions to learner in smaller chunks to avoid network issues.
|
||||||
|
|
||||||
|
@ -504,144 +668,9 @@ def log_policy_frequency_issue(policy_fps: float, cfg: TrainPipelineConfig, inte
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def establish_learner_connection(
|
|
||||||
stub,
|
|
||||||
shutdown_event: any, # Event,
|
|
||||||
attempts=30,
|
|
||||||
):
|
|
||||||
for _ in range(attempts):
|
|
||||||
if shutdown_event.is_set():
|
|
||||||
logging.info("[ACTOR] Shutting down establish_learner_connection")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Force a connection attempt and check state
|
|
||||||
try:
|
|
||||||
logging.info("[ACTOR] Send ready message to Learner")
|
|
||||||
if stub.Ready(hilserl_pb2.Empty()) == hilserl_pb2.Empty():
|
|
||||||
return True
|
|
||||||
except grpc.RpcError as e:
|
|
||||||
logging.error(f"[ACTOR] Waiting for Learner to be ready... {e}")
|
|
||||||
time.sleep(2)
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def use_threads(cfg: TrainPipelineConfig) -> bool:
|
def use_threads(cfg: TrainPipelineConfig) -> bool:
|
||||||
return cfg.policy.concurrency.actor == "threads"
|
return cfg.policy.concurrency.actor == "threads"
|
||||||
|
|
||||||
|
|
||||||
@parser.wrap()
|
|
||||||
def actor_cli(cfg: TrainPipelineConfig):
|
|
||||||
cfg.validate()
|
|
||||||
if not use_threads(cfg):
|
|
||||||
import torch.multiprocessing as mp
|
|
||||||
|
|
||||||
mp.set_start_method("spawn")
|
|
||||||
|
|
||||||
# Create logs directory to ensure it exists
|
|
||||||
log_dir = os.path.join(cfg.output_dir, "logs")
|
|
||||||
os.makedirs(log_dir, exist_ok=True)
|
|
||||||
log_file = os.path.join(log_dir, f"actor_{cfg.job_name}.log")
|
|
||||||
|
|
||||||
# Initialize logging with explicit log file
|
|
||||||
init_logging(log_file=log_file)
|
|
||||||
logging.info(f"Actor logging initialized, writing to {log_file}")
|
|
||||||
|
|
||||||
shutdown_event = setup_process_handlers(use_threads(cfg))
|
|
||||||
|
|
||||||
learner_client, grpc_channel = learner_service_client(
|
|
||||||
host=cfg.policy.actor_learner_config.learner_host,
|
|
||||||
port=cfg.policy.actor_learner_config.learner_port,
|
|
||||||
)
|
|
||||||
|
|
||||||
logging.info("[ACTOR] Establishing connection with Learner")
|
|
||||||
if not establish_learner_connection(learner_client, shutdown_event):
|
|
||||||
logging.error("[ACTOR] Failed to establish connection with Learner")
|
|
||||||
return
|
|
||||||
|
|
||||||
if not use_threads(cfg):
|
|
||||||
# If we use multithreading, we can reuse the channel
|
|
||||||
grpc_channel.close()
|
|
||||||
grpc_channel = None
|
|
||||||
|
|
||||||
logging.info("[ACTOR] Connection with Learner established")
|
|
||||||
|
|
||||||
parameters_queue = Queue()
|
|
||||||
transitions_queue = Queue()
|
|
||||||
interactions_queue = Queue()
|
|
||||||
|
|
||||||
concurrency_entity = None
|
|
||||||
if use_threads(cfg):
|
|
||||||
from threading import Thread
|
|
||||||
|
|
||||||
concurrency_entity = Thread
|
|
||||||
else:
|
|
||||||
from multiprocessing import Process
|
|
||||||
|
|
||||||
concurrency_entity = Process
|
|
||||||
|
|
||||||
receive_policy_process = concurrency_entity(
|
|
||||||
target=receive_policy,
|
|
||||||
args=(cfg, parameters_queue, shutdown_event, grpc_channel),
|
|
||||||
daemon=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
transitions_process = concurrency_entity(
|
|
||||||
target=send_transitions,
|
|
||||||
args=(cfg, transitions_queue, shutdown_event, grpc_channel),
|
|
||||||
daemon=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
interactions_process = concurrency_entity(
|
|
||||||
target=send_interactions,
|
|
||||||
args=(cfg, interactions_queue, shutdown_event, grpc_channel),
|
|
||||||
daemon=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
transitions_process.start()
|
|
||||||
interactions_process.start()
|
|
||||||
receive_policy_process.start()
|
|
||||||
|
|
||||||
# HACK: FOR MANISKILL we do not have a reward classifier
|
|
||||||
# TODO: Remove this once we merge into main
|
|
||||||
reward_classifier = None
|
|
||||||
# if (
|
|
||||||
# cfg.env.reward_classifier["pretrained_path"] is not None
|
|
||||||
# and cfg.env.reward_classifier["config_path"] is not None
|
|
||||||
# ):
|
|
||||||
# reward_classifier = get_classifier(
|
|
||||||
# pretrained_path=cfg.env.reward_classifier["pretrained_path"],
|
|
||||||
# config_path=cfg.env.reward_classifier["config_path"],
|
|
||||||
# )
|
|
||||||
|
|
||||||
act_with_policy(
|
|
||||||
cfg=cfg,
|
|
||||||
reward_classifier=reward_classifier,
|
|
||||||
shutdown_event=shutdown_event,
|
|
||||||
parameters_queue=parameters_queue,
|
|
||||||
transitions_queue=transitions_queue,
|
|
||||||
interactions_queue=interactions_queue,
|
|
||||||
)
|
|
||||||
logging.info("[ACTOR] Policy process joined")
|
|
||||||
|
|
||||||
logging.info("[ACTOR] Closing queues")
|
|
||||||
transitions_queue.close()
|
|
||||||
interactions_queue.close()
|
|
||||||
parameters_queue.close()
|
|
||||||
|
|
||||||
transitions_process.join()
|
|
||||||
logging.info("[ACTOR] Transitions process joined")
|
|
||||||
interactions_process.join()
|
|
||||||
logging.info("[ACTOR] Interactions process joined")
|
|
||||||
receive_policy_process.join()
|
|
||||||
logging.info("[ACTOR] Receive policy process joined")
|
|
||||||
|
|
||||||
logging.info("[ACTOR] join queues")
|
|
||||||
transitions_queue.cancel_join_thread()
|
|
||||||
interactions_queue.cancel_join_thread()
|
|
||||||
parameters_queue.cancel_join_thread()
|
|
||||||
|
|
||||||
logging.info("[ACTOR] queues closed")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
actor_cli()
|
actor_cli()
|
||||||
|
|
Loading…
Reference in New Issue