Apply suggestions from code review (lekiwi_client.py)

Co-authored-by: Simon Alibert <75076266+aliberts@users.noreply.github.com>
This commit is contained in:
Steven Palma 2025-04-17 23:41:56 +02:00 committed by GitHub
parent d94ed4bd9a
commit 338efe80d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 2 additions and 47 deletions

View File

@ -31,15 +31,6 @@ from .config_lekiwi import LeKiwiClientConfig, RobotMode
# TODO(Steven): This doesn't need to inherit from Robot
# But we do it for now to offer a familiar API
# TODO(Steven): This doesn't need to take care of the
# mapping from teleop to motor commands, but given that
# we already have a middle-man (this class) we add it here
# Other options include:
# 1. Adding it to the Telop implementation for lekiwi
# (meaning each robot will need a teleop imple) or
# 2. Adding it into the robot implementation
# (meaning the policy might be needed to be train
# over the teleop action space)
# TODO(Steven): Check if we can move everything to 32 instead
class LeKiwiClient(Robot):
config_class = LeKiwiClientConfig
@ -125,8 +116,6 @@ class LeKiwiClient(Robot):
@property
def is_connected(self) -> bool:
# TODO(Steven): Ideally we could check instead the status of the sockets
# I didn't find any API that allows us to do that easily
return self._is_connected
@property
@ -136,7 +125,6 @@ class LeKiwiClient(Robot):
def connect(self) -> None:
"""Establishes ZMQ sockets with the remote mobile robot"""
# TODO(Steven): Consider instead returning a bool + warn
if self._is_connected:
raise DeviceAlreadyConnectedError(
"LeKiwi Daemon is already connected. Do not run `robot.connect()` twice."
@ -156,11 +144,8 @@ class LeKiwiClient(Robot):
self._is_connected = True
def calibrate(self) -> None:
logging.warning("LeKiwiClient has nothing to calibrate.")
return
pass
# Consider moving these static functions out of the class
# Copied from robot_lekiwi MobileManipulator class* (before the refactor)
@staticmethod
def _degps_to_raw(degps: float) -> int:
steps_per_deg = 4096.0 / 360.0
@ -173,7 +158,6 @@ class LeKiwiClient(Robot):
speed_int = -0x8000 # -32768 -> minimum negative value
return speed_int
# Copied from robot_lekiwi MobileManipulator class* (before the refactor)
@staticmethod
def _raw_to_degps(raw_speed: int) -> float:
steps_per_deg = 4096.0 / 360.0
@ -181,7 +165,6 @@ class LeKiwiClient(Robot):
degps = magnitude / steps_per_deg
return degps
# Copied from robot_lekiwi MobileManipulator class* (before the refactor)
def _body_to_wheel_raw(
self,
x_cmd: float,
@ -241,10 +224,8 @@ class LeKiwiClient(Robot):
# Convert each wheels angular speed (deg/s) to a raw integer.
wheel_raw = [LeKiwiClient._degps_to_raw(deg) for deg in wheel_degps]
# TODO(Steven): remove hard-coded names
return {"left_wheel": wheel_raw[0], "back_wheel": wheel_raw[1], "right_wheel": wheel_raw[2]}
# Copied from robot_lekiwi MobileManipulator class
def _wheel_raw_to_body(
self, wheel_raw: dict[str, Any], wheel_radius: float = 0.05, base_radius: float = 0.125
) -> dict[str, Any]:
@ -263,7 +244,6 @@ class LeKiwiClient(Robot):
theta_cmd : Rotational velocity in deg/s.
"""
# TODO(Steven): No check is done for dict keys
# Convert each raw command back to an angular speed in deg/s.
wheel_degps = np.array([LeKiwiClient._raw_to_degps(int(v)) for _, v in wheel_raw.items()])
# Convert from deg/s to rad/s.
@ -284,7 +264,6 @@ class LeKiwiClient(Robot):
# TODO(Steven): This is flaky, for example, if we received a state but failed decoding the image, we will not update any value
# TODO(Steven): All this function needs to be refactored
# Copied from robot_lekiwi MobileManipulator class* (before the refactor)
def _get_data(self):
# Copied from robot_lekiwi.py
"""Polls the video socket for up to 15 ms. If data arrives, decode only
@ -341,7 +320,6 @@ class LeKiwiClient(Robot):
if state_observation is not None and frames is not None:
self.last_frames = frames
# TODO(Steven): hard-coded name of expected keys, not good
remote_arm_state_tensor = {k: v for k, v in state_observation.items() if k.startswith("arm")}
self.last_remote_arm_state = remote_arm_state_tensor
@ -358,8 +336,6 @@ class LeKiwiClient(Robot):
return (self.last_frames, self.last_present_speed, self.last_remote_arm_state)
return frames, present_speed, remote_arm_state_tensor
# TODO(Steven): The returned space is different from the get_observation of LeKiwi
# This returns body-frames velocities instead of wheel pos/speeds
def get_observation(self) -> dict[str, Any]:
"""
Capture observations from the remote robot: current follower arm positions,
@ -423,18 +399,6 @@ class LeKiwiClient(Robot):
def configure(self):
pass
# TODO(Steven): This assumes this call is always called with a keyboard as a teleop device. It breaks if we teleop with other device
# TODO(Steven): Doing this mapping in here adds latecy between send_action and movement from the user perspective.
# t0: get teleop_cmd
# t1: send_action(teleop_cmd)
# t2: mapping teleop_cmd -> motor_cmd
# t3: execute motor_md
# This mapping for other robots/teleop devices might be slower. Doing this in the teleop will make this explicit
# t0': get teleop_cmd
# t1': mapping teleop_cmd -> motor_cmd
# t2': send_action(motor_cmd)
# t3': execute motor_cmd
# t3'-t2' << t3-t1
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
"""Command lekiwi to move to a target joint configuration. Translates to motor space + sends over ZMQ
@ -453,11 +417,9 @@ class LeKiwiClient(Robot):
)
if self.robot_mode is RobotMode.AUTO:
# TODO(Steven): Not yet implemented. The policy outputs might need a different conversion
raise InvalidActionError("Policy output as action input is not yet well defined")
raise NotImplementedError
goal_pos = {}
# TODO(Steven): This assumes teleop mode is always used with keyboard. Tomorrow we could teleop with another device ... ?
if self.robot_mode is RobotMode.TELEOP:
motors_name = self.state_feature.get("names").get("motors")
@ -484,10 +446,6 @@ class LeKiwiClient(Robot):
return goal_pos
def print_logs(self):
# TODO(Steven): Refactor logger
pass
def disconnect(self):
"""Cleans ZMQ comms"""
@ -500,6 +458,3 @@ class LeKiwiClient(Robot):
self.zmq_context.term()
self._is_connected = False
def __del__(self):
if getattr(self, "is_connected", False):
self.disconnect()