diff --git a/deploy/deploy_real/config.py b/deploy/deploy_real/config.py index d6c8dfd..212dcfc 100644 --- a/deploy/deploy_real/config.py +++ b/deploy/deploy_real/config.py @@ -40,6 +40,11 @@ class Config: self.arm_waist_kps = [] self.arm_waist_kds = [] self.arm_waist_target = [] + + if 'motor_joint' in config: + self.motor_joint = config['motor_joint'] + else: + self.motor_joint=[] self.ang_vel_scale = config["ang_vel_scale"] self.dof_pos_scale = config["dof_pos_scale"] diff --git a/deploy/deploy_real/configs/ik.yaml b/deploy/deploy_real/configs/ik.yaml new file mode 100644 index 0000000..b284faa --- /dev/null +++ b/deploy/deploy_real/configs/ik.yaml @@ -0,0 +1,102 @@ +# +control_dt: 0.02 + +msg_type: "hg" # "hg" or "go" +imu_type: "pelvis" # "torso" or "pelvis" + +lowcmd_topic: "rt/lowcmd" +lowstate_topic: "rt/lowstate" + +policy_path: "{LEGGED_GYM_ROOT_DIR}/deploy/pre_train/g1/policy.pt" + +motor_joint: [ +'left_hip_pitch_joint' +'left_hip_roll_joint' +'left_hip_yaw_joint' +'left_knee_joint' +'left_ankle_pitch_joint' +'left_ankle_roll_joint' +'right_hip_pitch_joint' +'right_hip_roll_joint' +'right_hip_yaw_joint' +'right_knee_joint' +'right_ankle_pitch_joint' +'right_ankle_roll_joint' +'waist_yaw_joint' +'waist_roll_joint' +'waist_pitch_joint' +'left_shoulder_pitch_joint' +'left_shoulder_roll_joint' +'left_shoulder_yaw_joint' +'left_elbow_joint' +'left_wrist_roll_joint' +'left_wrist_pitch_joint' +'left_wrist_yaw_joint' +'right_shoulder_pitch_joint' +'right_shoulder_roll_joint' +'right_shoulder_yaw_joint' +'right_elbow_joint' +'right_wrist_roll_joint' +'right_wrist_pitch_joint' +'right_wrist_yaw_joint' +] +left_arm_joint2motor_idx: [15,16,17,18,19,20,21] +all_joint2motor_idx: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, + 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, + 22, 23, 24, 25, 26, 27, 28] +# kps: [100, 100, 100, 150, 40, 40, 100, 100, 100, 150, 40, 40] +# kds: [2, 2, 2, 4, 2, 2, 2, 2, 2, 4, 2, 2] +# default_angles: [-0.1, 0.0, 0.0, 0.3, -0.2, 0.0, +# -0.1, 0.0, 0.0, 0.3, -0.2, 0.0] +kps: [ + 100, 100, 100, 150, 40, 40, 100, 100, 100, 150, 40, 40, + 150, 150, 150, + 100, 100, 50, 50, 20, 20, 20, + 100, 100, 50, 50, 20, 20, 20 + ] +kds: [ + 2, 2, 2, 4, 2, 2, 2, 2, 2, 4, 2, 2, + 3, 3, 3, + 2, 2, 2, 2, 1, 1, 1, + 2, 2, 2, 2, 1, 1, 1 + ] +default_angles: [ + -0.2, 0.0, 0.0, 0.42, -0.23, 0.0, + -0.2, 0.0, 0.0, 0.42, -0.23, 0.0, + 0., 0., 0., + 0.35, 0.16, 0., 0.87, 0., 0., 0., + 0.35, -0.16, 0., 0.87, 0., 0., 0., + ] + +# arm_waist_joint2motor_idx: [12, 13, 14, +# 15, 16, 17, 18, 19, 20, 21, +# 22, 23, 24, 25, 26, 27, 28] + +# arm_waist_kps: [300, 300, 300, +# 100, 100, 50, 50, 20, 20, 20, +# 100, 100, 50, 50, 20, 20, 20] + +# arm_waist_kds: [3, 3, 3, +# 2, 2, 2, 2, 1, 1, 1, +# 2, 2, 2, 2, 1, 1, 1] + +# arm_waist_target: [ 0, 0, 0, +# 0, 0, 0, 0, 0, 0, 0, +# 0, 0, 0, 0, 0, 0, 0] + +# ang_vel_scale: 0.25 +ang_vel_scale: 1.0 +dof_pos_scale: 1.0 +# dof_vel_scale: 0.05 +dof_vel_scale: 1.0 +# action_scale: 0.25 +action_scale: 0.5 +# cmd_scale: [2.0, 2.0, 0.25] +cmd_scale: [0.0, 0.0, 0.0] +# num_actions: 12 +num_actions: 29 +# num_obs: 47 +num_obs: 96 + +# max_cmd: [0.8, 0.5, 1.57] +max_cmd: [1.0, 1.0, 1.0] diff --git a/deploy/deploy_real/deploy_real_ros_ikctrl.py b/deploy/deploy_real/deploy_real_ros_ikctrl.py new file mode 100644 index 0000000..4795f38 --- /dev/null +++ b/deploy/deploy_real/deploy_real_ros_ikctrl.py @@ -0,0 +1,298 @@ +from legged_gym import LEGGED_GYM_ROOT_DIR +from typing import Union +import numpy as np +import time +import torch + +import rclpy as rp +from unitree_hg.msg import LowCmd as LowCmdHG, LowState as LowStateHG +from unitree_go.msg import LowCmd as LowCmdGo, LowState as LowStateGo +from common.command_helper_ros import create_damping_cmd, create_zero_cmd, init_cmd_hg, init_cmd_go, MotorMode +from common.rotation_helper import get_gravity_orientation, transform_imu_data +from common.remote_controller import RemoteController, KeyMap +from config import Config +from common.crc import CRC + +from enum import Enum +from ikctrl import IKCtrl + +class Mode(Enum): + wait = 0 + zero_torque = 1 + default_pos = 2 + damping = 3 + policy = 4 + null = 5 + +class Controller: + def __init__(self, config: Config) -> None: + self.config = config + self.remote_controller = RemoteController() + act_joint = ["left_shoulder_pitch_joint", + "left_shoulder_roll_joint", + "left_shoulder_yaw_joint", + "left_elbow_joint", + "left_wrist_roll_joint", + "left_wrist_pitch_joint", + "left_wrist_yaw_joint"] + self.ikctrl = IKCtrl('/input/unitree_ros/robots/g1_description/g1_29dof_with_hand_rev_1_0.urdf', + act_joint) + self.lim_lo_pin = self.ikctrl.robot.model.lowerPositionLimit + self.lim_hi_pin = self.ikctrl.robot.model.upperPositionLimit + + self.pin_from_mot = np.zeros(29) # FIXME(ycho): hardcoded + self.mot_from_pin = np.zeros(43) # FIXME(ycho): hardcoded + self.mot_from_pin_act = np.zeros(7) # FIXME(ycho): hardcoded + for i_mot, j in enumerate( self.config.motor_joint ): + i_pin = (self.ikctrl.robot.index(j) - 1) + self.pin_from_mot[i_mot] = i_pin + self.mot_from_pin[i_pin] = i_mot + if j in act_joint: + i_act = act_joint.index(j) + self.mot_from_pin_act[i_act] = i_mot + + + # Initialize the policy network + self.policy = torch.jit.load(config.policy_path) + # Initializing process variables + self.qj = np.zeros(43, dtype=np.float32) + self.dqj = np.zeros(43, dtype=np.float32) + self.action = np.zeros(7, dtype=np.float32) + self.target_dof_pos = config.default_angles.copy() + self.obs = np.zeros(config.num_obs, dtype=np.float32) + self.cmd = np.array([0.0, 0, 0]) + self.counter = 0 + + rp.init() + self._node = rp.create_node("low_level_cmd_sender") + + if config.msg_type == "hg": + # g1 and h1_2 use the hg msg type + + self.low_cmd = LowCmdHG() + self.low_state = LowStateHG() + + self.lowcmd_publisher_ = self._node.create_publisher(LowCmdHG, + 'lowcmd', 10) + self.lowstate_subscriber = self._node.create_subscription(LowStateHG, + 'lowstate', self.LowStateHgHandler, 10) + self.mode_pr_ = MotorMode.PR + self.mode_machine_ = 0 + + # self.lowcmd_publisher_ = ChannelPublisher(config.lowcmd_topic, LowCmdHG) + # self.lowcmd_publisher_.Init() + + # self.lowstate_subscriber = ChannelSubscriber(config.lowstate_topic, LowStateHG) + # self.lowstate_subscriber.Init(self.LowStateHgHandler, 10) + + elif config.msg_type == "go": + raise ValueError(f"{config.msg_type} is not implemented yet.") + + else: + raise ValueError("Invalid msg_type") + + # wait for the subscriber to receive data + # self.wait_for_low_state() + + # Initialize the command msg + if config.msg_type == "hg": + init_cmd_hg(self.low_cmd, self.mode_machine_, self.mode_pr_) + elif config.msg_type == "go": + init_cmd_go(self.low_cmd, weak_motor=self.config.weak_motor) + + self.mode = Mode.wait + self._mode_change = True + self._timer = self._node.create_timer(self.config.control_dt, self.run_wrapper) + self._terminate = False + try: + rp.spin(self._node) + except KeyboardInterrupt: + print("KeyboardInterrupt") + finally: + self._node.destroy_timer(self._timer) + create_damping_cmd(self.low_cmd) + self.send_cmd(self.low_cmd) + self._node.destroy_node() + rp.shutdown() + print("Exit") + + def LowStateHgHandler(self, msg: LowStateHG): + self.low_state = msg + self.mode_machine_ = self.low_state.mode_machine + self.remote_controller.set(self.low_state.wireless_remote) + + def LowStateGoHandler(self, msg: LowStateGo): + self.low_state = msg + self.remote_controller.set(self.low_state.wireless_remote) + + def send_cmd(self, cmd: Union[LowCmdGo, LowCmdHG]): + cmd.mode_machine = self.mode_machine_ + cmd.crc = CRC().Crc(cmd) + size = len(cmd.motor_cmd) + # print(cmd.mode_machine) + # for i in range(size): + # print(i, cmd.motor_cmd[i].q, + # cmd.motor_cmd[i].dq, + # cmd.motor_cmd[i].kp, + # cmd.motor_cmd[i].kd, + # cmd.motor_cmd[i].tau) + self.lowcmd_publisher_.publish(cmd) + + def wait_for_low_state(self): + while self.low_state.crc == 0: + print(self.low_state) + time.sleep(self.config.control_dt) + print("Successfully connected to the robot.") + + def zero_torque_state(self): + if self.remote_controller.button[KeyMap.start] == 1: + self._mode_change = True + self.mode = Mode.default_pos + else: + create_zero_cmd(self.low_cmd) + self.send_cmd(self.low_cmd) + + def prepare_default_pos(self): + # move time 2s + total_time = 2 + self.counter = 0 + self._num_step = int(total_time / self.config.control_dt) + + dof_idx = self.config.leg_joint2motor_idx + self.config.arm_waist_joint2motor_idx + kps = self.config.kps + self.config.arm_waist_kps + kds = self.config.kds + self.config.arm_waist_kds + self._kps = [float(kp) for kp in kps] + self._kds = [float(kd) for kd in kds] + self._default_pos = np.concatenate((self.config.default_angles, self.config.arm_waist_target), axis=0) + self._dof_size = len(dof_idx) + self._dof_idx = dof_idx + + # record the current pos + self._init_dof_pos = np.zeros(self._dof_size, + dtype=np.float32) + for i in range(self._dof_size): + self._init_dof_pos[i] = self.low_state.motor_state[dof_idx[i]].q + + def move_to_default_pos(self): + # move to default pos + if self.counter < self._num_step: + alpha = self.counter / self._num_step + for j in range(self._dof_size): + motor_idx = self._dof_idx[j] + target_pos = self._default_pos[j] + self.low_cmd.motor_cmd[motor_idx].q = (self._init_dof_pos[j] * + (1 - alpha) + target_pos * alpha) + self.low_cmd.motor_cmd[motor_idx].dq = 0.0 + self.low_cmd.motor_cmd[motor_idx].kp = self._kps[j] + self.low_cmd.motor_cmd[motor_idx].kd = self._kds[j] + self.low_cmd.motor_cmd[motor_idx].tau = 0.0 + self.send_cmd(self.low_cmd) + self.counter += 1 + else: + self._mode_change = True + self.mode = Mode.damping + + def default_pos_state(self): + if self.remote_controller.button[KeyMap.A] != 1: + for i in range(len(self.config.leg_joint2motor_idx)): + motor_idx = self.config.leg_joint2motor_idx[i] + self.low_cmd.motor_cmd[motor_idx].q = float(self.config.default_angles[i]) + self.low_cmd.motor_cmd[motor_idx].dq = 0.0 + self.low_cmd.motor_cmd[motor_idx].kp = self._kps[i] + self.low_cmd.motor_cmd[motor_idx].kd = self._kds[i] + self.low_cmd.motor_cmd[motor_idx].tau = 0.0 + for i in range(len(self.config.arm_waist_joint2motor_idx)): + motor_idx = self.config.arm_waist_joint2motor_idx[i] + self.low_cmd.motor_cmd[motor_idx].q = float(self.config.arm_waist_target[i]) + self.low_cmd.motor_cmd[motor_idx].dq = 0.0 + self.low_cmd.motor_cmd[motor_idx].kp = self._kps[i] + self.low_cmd.motor_cmd[motor_idx].kd = self._kds[i] + self.low_cmd.motor_cmd[motor_idx].tau = 0.0 + self.send_cmd(self.low_cmd) + else: + self._mode_change = True + self.mode = Mode.policy + + def run_policy(self): + if self.remote_controller.button[KeyMap.select] == 1: + self._mode_change = True + self.mode = Mode.null + return + self.counter += 1 + + # Get current joint positions + for i_mot in range(len(self.motor_joint)): + i_pin = self.pin_from_mot[i_mot] + self.qj[i_pin] = self.low_state.motor_state[i_mot].q + self.cmd[0] = self.remote_controller.ly + self.cmd[1] = self.remote_controller.lx * -1 + self.cmd[2] = self.remote_controller.rx * -1 + delta = np.concatenate([self.cmd, + [1,0,0,0]]) + res_q = self.ikctrl(self.qj, delta, rel=True) + for i_act in range(len(res_q)): + i_mot = self.mot_from_pin_act[i_act] + i_pin = self.pin_from_mot[i_mot] + target_q = ( + self.low_state.motor_state[i_mot].q + res_q[i_act] + ) + target_q = np.clip(target_q, + self.lim_lo_pin[i_pin], + self.lim_hi_pin[i_pin]) + self.low_cmd.motor_cmd[i_mot].q = target_q + self.low_cmd.motor_cmd[i_mot].dq = 0.0 + self.low_cmd.motor_cmd[i_mot].kp = float(self.config.kps[i_mot]) + self.low_cmd.motor_cmd[i_mot].kd = float(self.config.kps[i_mot]) + self.low_cmd.motor_cmd[i_mot].tau = 0.0 + # send the command + self.send_cmd(self.low_cmd) + + def run_wrapper(self): + # print("hello", self.mode, + # self.mode == Mode.zero_torque) + if self.mode == Mode.wait: + if self.low_state.crc != 0: + self.mode = Mode.zero_torque + self.low_cmd.mode_machine = self.mode_machine_ + print("Successfully connected to the robot.") + elif self.mode == Mode.zero_torque: + if self._mode_change: + print("Enter zero torque state.") + print("Waiting for the start signal...") + self._mode_change = False + self.zero_torque_state() + elif self.mode == Mode.default_pos: + if self._mode_change: + print("Moving to default pos.") + self._mode_change = False + self.prepare_default_pos() + self.move_to_default_pos() + elif self.mode == Mode.damping: + if self._mode_change: + print("Enter default pos state.") + print("Waiting for the Button A signal...") + self._mode_change = False + self.default_pos_state() + elif self.mode == Mode.policy: + if self._mode_change: + print("Run policy.") + self._mode_change = False + self.counter = 0 + self.run_policy() + elif self.mode == Mode.null: + self._terminate = True + + # time.sleep(self.config.control_dt) + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument("config", type=str, help="config file name in the configs folder", default="g1.yaml") + args = parser.parse_args() + + # Load config + config_path = f"{LEGGED_GYM_ROOT_DIR}/deploy/deploy_real/configs/{args.config}" + config = Config(config_path) + + controller = Controller(config) diff --git a/deploy/deploy_real/ikctrl.py b/deploy/deploy_real/ikctrl.py new file mode 100644 index 0000000..a2cf266 --- /dev/null +++ b/deploy/deploy_real/ikctrl.py @@ -0,0 +1,121 @@ +import os +from typing import Tuple +from contextlib import contextmanager +from pathlib import Path +import time + +import numpy as np +import torch as th + +import pinocchio as pin +import pink +from pink.tasks import FrameTask + + +@contextmanager +def with_dir(d): + d0 = os.getcwd() + try: + os.chdir(d) + yield + finally: + os.chdir(d0) + + +def xyzw2wxyz(q_xyzw: th.Tensor, dim: int = -1): + if isinstance(q_xyzw, np.ndarray): + return np.roll(q_xyzw, 1, axis=dim) + return th.roll(q_xyzw, 1, dims=dim) + + +def wxyz2xyzw(q_wxyz: th.Tensor, dim: int = -1): + if isinstance(q_wxyz, np.ndarray): + return np.roll(q_wxyz, -1, axis=dim) + return th.roll(q_wxyz, -1, dims=dim) + + +def dls_ik( + dpose: np.ndarray, + jac: np.ndarray, + sqlmda: float +): + """ + Arg: + dpose: task-space error (A[..., err]) + jac: jacobian (A[..., err, dof]) + sqlmda: DLS damping factor. + Return: + joint residual (A[..., dof]) + """ + if isinstance(dpose, tuple): + dpose = np.concatenate([dpose[0], dpose[1]], axis=-1) + J = jac + A = J @ J.T + # NOTE(ycho): add to view of diagonal + a = np.einsum('...ii->...i', A) + a += sqlmda + dq = (J.T @ np.linalg.solve(A, dpose[..., None]))[..., 0] + return dq + + +class IKCtrl: + def __init__(self, + urdf_path: str, + joint_names: Tuple[str, ...], + sqlmda: float = 0.05**2): + path = Path(urdf_path) + with with_dir(path.parent): + robot = pin.RobotWrapper.BuildFromURDF(filename=urdf_path, + package_dirs=["."], + root_joint=None) + self.robot = robot + # NOTE(ycho): build index map between pin.q and other set(s) of ordered joints. + larm_from_pin = [] + for j in joint_names: + larm_from_pin.append(robot.index(j) - 1) + self.larm_from_pin = np.asarray(larm_from_pin, dtype=np.int32) + self.task = FrameTask("left_hand_palm_link", + position_cost=1.0, + orientation_cost=0.0) + self.sqlmda = sqlmda + self.cfg = pink.Configuration(robot.model, robot.data, + np.zeros_like(robot.q0)) + + def __call__(self, + q0: np.ndarray, + target_pose: np.ndarray, + rel:bool=False + ): + """ + Arg: + q0: Current robot joints; A[..., 43?] + target_pose: + Policy output. A[..., 7] formatted as (xyz, q_{wxyz}) + Given as world frame absolute pose, for some reason. + Return: + joint residual: A[..., 7] + """ + robot = self.robot + + # source pose + self.cfg.update(q0) + T0 = self.cfg.get_transform_frame_to_world("left_hand_palm_link") + + # target pose + dst_xyz = target_pose[..., 0:3] + dst_quat = pin.Quaternion(wxyz2xyzw(target_pose[..., 3:7])) + T1 = pin.SE3(dst_quat, dst_xyz) + if rel: + T1 = T0 @ T1 + + # jacobian + self.task.set_target(T0) + jac = self.task.compute_jacobian(self.cfg) + jac = jac[:, self.larm_from_pin] + + # error&ik + dT = T1.actInv(T0) + dpose = pin.log(dT).vector + dq = dls_ik(dpose, jac, self.sqlmda) + return dq +