refactor(robots): lewiki v0.3

This commit is contained in:
Steven Palma 2025-03-13 23:59:39 +01:00 committed by Steven Palma
parent baff7bb116
commit 7efea7d21a
No known key found for this signature in database
4 changed files with 66 additions and 26 deletions

View File

@ -15,3 +15,5 @@ class DeviceAlreadyConnectedError(ConnectionError):
):
self.message = message
super().__init__(self.message)
# TODO(Steven): Consider adding an InvalidActionError

View File

@ -370,19 +370,39 @@ class DaemonLeKiwiRobot(Robot):
return obs_dict
def from_teleop_action_to_motor_action(self, action):
# # Speed control
# elif key.char == self.teleop_keys["speed_up"]:
# self.speed_index = min(self.speed_index + 1, 2)
# print(f"Speed index increased to {self.speed_index}")
# elif key.char == self.teleop_keys["speed_down"]:
# self.speed_index = max(self.speed_index - 1, 0)
# print(f"Speed index decreased to {self.speed_index}")
pass
def from_keyboard_to_wheel_action(self, pressed_keys: np.ndarray):
# Speed control
# TODO(Steven): Handle the right action
if self.teleop_keys["speed_up"] in pressed_keys:
self.speed_index = min(self.speed_index + 1, 2)
if self.teleop_keys["speed_down"] in pressed_keys:
self.speed_index = max(self.speed_index - 1, 0)
speed_setting = self.speed_levels[self.speed_index]
xy_speed = speed_setting["xy"] # e.g. 0.1, 0.25, or 0.4
theta_speed = speed_setting["theta"] # e.g. 30, 60, or 90
# (The rest of your code for generating wheel commands remains unchanged)
x_cmd = 0.0 # m/s forward/backward
y_cmd = 0.0 # m/s lateral
theta_cmd = 0.0 # deg/s rotation
# TODO(Steven): Handle action properly
if self.teleop_keys["forward"] in pressed_keys:
x_cmd += xy_speed
if self.teleop_keys["backward"] in pressed_keys:
x_cmd -= xy_speed
if self.teleop_keys["left"] in pressed_keys:
y_cmd += xy_speed
if self.teleop_keys["right"] in pressed_keys:
y_cmd -= xy_speed
if self.teleop_keys["rotate_left"] in pressed_keys:
theta_cmd += theta_speed
if self.teleop_keys["rotate_right"] in pressed_keys:
theta_cmd -= theta_speed
return self.body_to_wheel_raw(x_cmd, y_cmd, theta_cmd)
# TODO(Steven)
def send_action(self, action: np.ndarray) -> np.ndarray:
# Copied from S100 robot
"""Command lekiwi to move to a target joint configuration.
The relative action magnitude may be clipped depending on the configuration parameter
@ -402,18 +422,24 @@ class DaemonLeKiwiRobot(Robot):
raise DeviceNotConnectedError(
"ManipulatorRobot is not connected. You need to run `robot.connect()`."
)
if self.mode is TELEOP:
# do conversion keys to motor
else:
# convert policy output
goal_pos = action
# TODO(Steven): This won't work if this is called by a policy with body vels outputs
goal_pos: np.array = np.empty(9)
if action.size <6:
# TODO(Steven): Handle this properly
raise Exception
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None:
present_pos = self.actuators.read("Present_Position")
goal_pos = ensure_safe_goal_position(goal_pos, present_pos, self.config.max_relative_target)
# TODO(Steven): Assumes size and order is respected
# TODO(Steven): This assumes this call is always called from a keyboard teleop command
wheel_actions = [v for _,v in self.from_keyboard_to_wheel_action(action[6:])]
goal_pos[:6]=action[:6]
goal_pos[6:]=wheel_actions
# Send goal position to the actuators
# TODO(Steven): Base motors should set a vel instead
self.actuators.write("Goal_Position", goal_pos.astype(np.int32))
self.zmq_cmd_socket.send_string(json.dumps(goal_pos))
return goal_pos

View File

@ -30,14 +30,25 @@ def main():
duration = 0
while duration < 20:
arm_action = leader_arm.get_action()
base_action = keyboard.get_action()
arm_action = leader_arm.get_action() # 6 motors
base_action = keyboard.get_action() # n keys pressed
action = {
**arm_action,
**base_action
}
robot.send_action(action) # Translates to motor space + sends over ZMQ
robot.get_observation() # Receives over ZMQ, translate to body-frame vel
robot.set_mode(TELEOP)
action_sent = robot.send_action(action) # Translates to motor space + sends over ZMQ
obs = robot.get_observation() # Receives over ZMQ, translate to body-frame vel
dataset.save(action_sent, obs)
# TODO(Steven)
robot.set_mode(AUTO)
policy_action = policy.get_action() # This might be in body frame or in key space
robot.send_action(policy_action) # This has no way to know
teleop_step() # teleop
send_action() #policy
duration = time.perf_counter() - start

View File

@ -22,4 +22,5 @@ from ..config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("keyboard")
@dataclass
class KeyboardTeleopConfig(TeleoperatorConfig):
# TODO(Steven): Maybe set in here the keys that we want to capture
mock: bool = False