fix(robots): fix send_action in lekiwi_client

This commit is contained in:
Steven Palma 2025-04-04 17:52:57 +02:00
parent 8e62c4ed02
commit 1eeeab5f4c
No known key found for this signature in database
2 changed files with 28 additions and 17 deletions

View File

@ -131,6 +131,7 @@ class LeKiwi(Robot):
input("Move robot to the middle of its range of motion and press ENTER....")
homing_offsets = self.bus.set_half_turn_homings(motors)
# TODO(Steven): Might be worth to do this also in other robots
full_turn_motor = [
motor for motor in motors if any(keyword in motor for keyword in ["wheel", "gripper"])
]

View File

@ -76,7 +76,7 @@ class LeKiwiClient(Robot):
]
self.speed_index = 0 # Start at slow
self.is_connected = False
self._is_connected = False
self.logs = {}
@property
@ -128,7 +128,7 @@ class LeKiwiClient(Robot):
@property
def is_connected(self) -> bool:
pass
return self._is_connected
@property
def is_calibrated(self) -> bool:
@ -137,7 +137,7 @@ class LeKiwiClient(Robot):
def connect(self) -> None:
"""Establishes ZMQ sockets with the remote mobile robot"""
if self.is_connected:
if self._is_connected:
raise DeviceAlreadyConnectedError(
"LeKiwi Daemon is already connected. Do not run `robot.connect()` twice."
)
@ -153,7 +153,7 @@ class LeKiwiClient(Robot):
self.zmq_observation_socket.connect(zmq_observations_locator)
self.zmq_observation_socket.setsockopt(zmq.CONFLATE, 1)
self.is_connected = True
self._is_connected = True
def calibrate(self) -> None:
# TODO(Steven): Nothing to calibrate.
@ -369,7 +369,7 @@ class LeKiwiClient(Robot):
present wheel speeds (converted to body-frame velocities: x, y, theta),
and a camera frame. Receives over ZMQ, translate to body-frame vel
"""
if not self.is_connected:
if not self._is_connected:
raise DeviceNotConnectedError("LeKiwiClient is not connected. You need to run `robot.connect()`.")
obs_dict = {}
@ -454,28 +454,38 @@ class LeKiwiClient(Robot):
Returns:
np.ndarray: the action sent to the motors, potentially clipped.
"""
if not self.is_connected:
if not self._is_connected:
raise DeviceNotConnectedError(
"ManipulatorRobot is not connected. You need to run `robot.connect()`."
)
goal_pos: np.array = np.zeros(9)
if self.robot_mode is RobotMode.AUTO:
# TODO(Steven): Not yet implemented. The policy outputs might need a different conversion
raise Exception
goal_pos = {}
# TODO(Steven): This assumes teleop mode is always used with keyboard. Tomorrow we could teleop with another device ... ?
if self.robot_mode is RobotMode.TELEOP:
if action.size < 6:
logging.error("Action should include at least the 6 states of the leader arm")
# TODO(Steven): I don't like this
motors_name = self.state_feature.get("names").get("motors")
common_keys = [key for key in action if any(key in motor for motor in motors_name)]
if len(common_keys) < 6:
logging.error("Action should include at least the states of the leader arm")
raise InvalidActionError
goal_pos[:6] = action[:6]
if action.size > 6:
# TODO(Steven): Assumes size and order is respected
wheel_actions = [v for _, v in self._from_keyboard_to_wheel_action(action[6:]).items()]
goal_pos[6:] = wheel_actions
self.zmq_cmd_socket.send_string(json.dumps(goal_pos.tolist())) # action is in motor space
arm_actions = {"arm_" + arm_motor: action[arm_motor] for arm_motor in common_keys}
goal_pos = arm_actions
if len(action) > 6:
keyboard_keys = set(action.keys()) - set(common_keys)
wheel_actions = {
"base_" + k: v for k, v in self._from_keyboard_to_wheel_action(keyboard_keys).items()
}
goal_pos = {**arm_actions, **wheel_actions}
self.zmq_cmd_socket.send_string(json.dumps(goal_pos)) # action is in motor space
return goal_pos
@ -486,7 +496,7 @@ class LeKiwiClient(Robot):
def disconnect(self):
"""Cleans ZMQ comms"""
if not self.is_connected:
if not self._is_connected:
raise DeviceNotConnectedError(
"LeKiwi is not connected. You need to run `robot.connect()` before disconnecting."
)
@ -494,7 +504,7 @@ class LeKiwiClient(Robot):
self.zmq_observation_socket.close()
self.zmq_cmd_socket.close()
self.zmq_context.term()
self.is_connected = False
self._is_connected = False
def __del__(self):
if getattr(self, "is_connected", False):