arm ik ctrl

This commit is contained in:
Yoonyoung Cho 2025-02-12 18:04:18 +09:00
parent 0a0f4673a7
commit 52a50f4f33
4 changed files with 526 additions and 0 deletions

View File

@ -40,6 +40,11 @@ class Config:
self.arm_waist_kps = []
self.arm_waist_kds = []
self.arm_waist_target = []
if 'motor_joint' in config:
self.motor_joint = config['motor_joint']
else:
self.motor_joint=[]
self.ang_vel_scale = config["ang_vel_scale"]
self.dof_pos_scale = config["dof_pos_scale"]

View File

@ -0,0 +1,102 @@
#
control_dt: 0.02
msg_type: "hg" # "hg" or "go"
imu_type: "pelvis" # "torso" or "pelvis"
lowcmd_topic: "rt/lowcmd"
lowstate_topic: "rt/lowstate"
policy_path: "{LEGGED_GYM_ROOT_DIR}/deploy/pre_train/g1/policy.pt"
motor_joint: [
'left_hip_pitch_joint'
'left_hip_roll_joint'
'left_hip_yaw_joint'
'left_knee_joint'
'left_ankle_pitch_joint'
'left_ankle_roll_joint'
'right_hip_pitch_joint'
'right_hip_roll_joint'
'right_hip_yaw_joint'
'right_knee_joint'
'right_ankle_pitch_joint'
'right_ankle_roll_joint'
'waist_yaw_joint'
'waist_roll_joint'
'waist_pitch_joint'
'left_shoulder_pitch_joint'
'left_shoulder_roll_joint'
'left_shoulder_yaw_joint'
'left_elbow_joint'
'left_wrist_roll_joint'
'left_wrist_pitch_joint'
'left_wrist_yaw_joint'
'right_shoulder_pitch_joint'
'right_shoulder_roll_joint'
'right_shoulder_yaw_joint'
'right_elbow_joint'
'right_wrist_roll_joint'
'right_wrist_pitch_joint'
'right_wrist_yaw_joint'
]
left_arm_joint2motor_idx: [15,16,17,18,19,20,21]
all_joint2motor_idx: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
12, 13, 14, 15, 16, 17, 18, 19, 20, 21,
22, 23, 24, 25, 26, 27, 28]
# kps: [100, 100, 100, 150, 40, 40, 100, 100, 100, 150, 40, 40]
# kds: [2, 2, 2, 4, 2, 2, 2, 2, 2, 4, 2, 2]
# default_angles: [-0.1, 0.0, 0.0, 0.3, -0.2, 0.0,
# -0.1, 0.0, 0.0, 0.3, -0.2, 0.0]
kps: [
100, 100, 100, 150, 40, 40, 100, 100, 100, 150, 40, 40,
150, 150, 150,
100, 100, 50, 50, 20, 20, 20,
100, 100, 50, 50, 20, 20, 20
]
kds: [
2, 2, 2, 4, 2, 2, 2, 2, 2, 4, 2, 2,
3, 3, 3,
2, 2, 2, 2, 1, 1, 1,
2, 2, 2, 2, 1, 1, 1
]
default_angles: [
-0.2, 0.0, 0.0, 0.42, -0.23, 0.0,
-0.2, 0.0, 0.0, 0.42, -0.23, 0.0,
0., 0., 0.,
0.35, 0.16, 0., 0.87, 0., 0., 0.,
0.35, -0.16, 0., 0.87, 0., 0., 0.,
]
# arm_waist_joint2motor_idx: [12, 13, 14,
# 15, 16, 17, 18, 19, 20, 21,
# 22, 23, 24, 25, 26, 27, 28]
# arm_waist_kps: [300, 300, 300,
# 100, 100, 50, 50, 20, 20, 20,
# 100, 100, 50, 50, 20, 20, 20]
# arm_waist_kds: [3, 3, 3,
# 2, 2, 2, 2, 1, 1, 1,
# 2, 2, 2, 2, 1, 1, 1]
# arm_waist_target: [ 0, 0, 0,
# 0, 0, 0, 0, 0, 0, 0,
# 0, 0, 0, 0, 0, 0, 0]
# ang_vel_scale: 0.25
ang_vel_scale: 1.0
dof_pos_scale: 1.0
# dof_vel_scale: 0.05
dof_vel_scale: 1.0
# action_scale: 0.25
action_scale: 0.5
# cmd_scale: [2.0, 2.0, 0.25]
cmd_scale: [0.0, 0.0, 0.0]
# num_actions: 12
num_actions: 29
# num_obs: 47
num_obs: 96
# max_cmd: [0.8, 0.5, 1.57]
max_cmd: [1.0, 1.0, 1.0]

View File

@ -0,0 +1,298 @@
from legged_gym import LEGGED_GYM_ROOT_DIR
from typing import Union
import numpy as np
import time
import torch
import rclpy as rp
from unitree_hg.msg import LowCmd as LowCmdHG, LowState as LowStateHG
from unitree_go.msg import LowCmd as LowCmdGo, LowState as LowStateGo
from common.command_helper_ros import create_damping_cmd, create_zero_cmd, init_cmd_hg, init_cmd_go, MotorMode
from common.rotation_helper import get_gravity_orientation, transform_imu_data
from common.remote_controller import RemoteController, KeyMap
from config import Config
from common.crc import CRC
from enum import Enum
from ikctrl import IKCtrl
class Mode(Enum):
wait = 0
zero_torque = 1
default_pos = 2
damping = 3
policy = 4
null = 5
class Controller:
def __init__(self, config: Config) -> None:
self.config = config
self.remote_controller = RemoteController()
act_joint = ["left_shoulder_pitch_joint",
"left_shoulder_roll_joint",
"left_shoulder_yaw_joint",
"left_elbow_joint",
"left_wrist_roll_joint",
"left_wrist_pitch_joint",
"left_wrist_yaw_joint"]
self.ikctrl = IKCtrl('/input/unitree_ros/robots/g1_description/g1_29dof_with_hand_rev_1_0.urdf',
act_joint)
self.lim_lo_pin = self.ikctrl.robot.model.lowerPositionLimit
self.lim_hi_pin = self.ikctrl.robot.model.upperPositionLimit
self.pin_from_mot = np.zeros(29) # FIXME(ycho): hardcoded
self.mot_from_pin = np.zeros(43) # FIXME(ycho): hardcoded
self.mot_from_pin_act = np.zeros(7) # FIXME(ycho): hardcoded
for i_mot, j in enumerate( self.config.motor_joint ):
i_pin = (self.ikctrl.robot.index(j) - 1)
self.pin_from_mot[i_mot] = i_pin
self.mot_from_pin[i_pin] = i_mot
if j in act_joint:
i_act = act_joint.index(j)
self.mot_from_pin_act[i_act] = i_mot
# Initialize the policy network
self.policy = torch.jit.load(config.policy_path)
# Initializing process variables
self.qj = np.zeros(43, dtype=np.float32)
self.dqj = np.zeros(43, dtype=np.float32)
self.action = np.zeros(7, dtype=np.float32)
self.target_dof_pos = config.default_angles.copy()
self.obs = np.zeros(config.num_obs, dtype=np.float32)
self.cmd = np.array([0.0, 0, 0])
self.counter = 0
rp.init()
self._node = rp.create_node("low_level_cmd_sender")
if config.msg_type == "hg":
# g1 and h1_2 use the hg msg type
self.low_cmd = LowCmdHG()
self.low_state = LowStateHG()
self.lowcmd_publisher_ = self._node.create_publisher(LowCmdHG,
'lowcmd', 10)
self.lowstate_subscriber = self._node.create_subscription(LowStateHG,
'lowstate', self.LowStateHgHandler, 10)
self.mode_pr_ = MotorMode.PR
self.mode_machine_ = 0
# self.lowcmd_publisher_ = ChannelPublisher(config.lowcmd_topic, LowCmdHG)
# self.lowcmd_publisher_.Init()
# self.lowstate_subscriber = ChannelSubscriber(config.lowstate_topic, LowStateHG)
# self.lowstate_subscriber.Init(self.LowStateHgHandler, 10)
elif config.msg_type == "go":
raise ValueError(f"{config.msg_type} is not implemented yet.")
else:
raise ValueError("Invalid msg_type")
# wait for the subscriber to receive data
# self.wait_for_low_state()
# Initialize the command msg
if config.msg_type == "hg":
init_cmd_hg(self.low_cmd, self.mode_machine_, self.mode_pr_)
elif config.msg_type == "go":
init_cmd_go(self.low_cmd, weak_motor=self.config.weak_motor)
self.mode = Mode.wait
self._mode_change = True
self._timer = self._node.create_timer(self.config.control_dt, self.run_wrapper)
self._terminate = False
try:
rp.spin(self._node)
except KeyboardInterrupt:
print("KeyboardInterrupt")
finally:
self._node.destroy_timer(self._timer)
create_damping_cmd(self.low_cmd)
self.send_cmd(self.low_cmd)
self._node.destroy_node()
rp.shutdown()
print("Exit")
def LowStateHgHandler(self, msg: LowStateHG):
self.low_state = msg
self.mode_machine_ = self.low_state.mode_machine
self.remote_controller.set(self.low_state.wireless_remote)
def LowStateGoHandler(self, msg: LowStateGo):
self.low_state = msg
self.remote_controller.set(self.low_state.wireless_remote)
def send_cmd(self, cmd: Union[LowCmdGo, LowCmdHG]):
cmd.mode_machine = self.mode_machine_
cmd.crc = CRC().Crc(cmd)
size = len(cmd.motor_cmd)
# print(cmd.mode_machine)
# for i in range(size):
# print(i, cmd.motor_cmd[i].q,
# cmd.motor_cmd[i].dq,
# cmd.motor_cmd[i].kp,
# cmd.motor_cmd[i].kd,
# cmd.motor_cmd[i].tau)
self.lowcmd_publisher_.publish(cmd)
def wait_for_low_state(self):
while self.low_state.crc == 0:
print(self.low_state)
time.sleep(self.config.control_dt)
print("Successfully connected to the robot.")
def zero_torque_state(self):
if self.remote_controller.button[KeyMap.start] == 1:
self._mode_change = True
self.mode = Mode.default_pos
else:
create_zero_cmd(self.low_cmd)
self.send_cmd(self.low_cmd)
def prepare_default_pos(self):
# move time 2s
total_time = 2
self.counter = 0
self._num_step = int(total_time / self.config.control_dt)
dof_idx = self.config.leg_joint2motor_idx + self.config.arm_waist_joint2motor_idx
kps = self.config.kps + self.config.arm_waist_kps
kds = self.config.kds + self.config.arm_waist_kds
self._kps = [float(kp) for kp in kps]
self._kds = [float(kd) for kd in kds]
self._default_pos = np.concatenate((self.config.default_angles, self.config.arm_waist_target), axis=0)
self._dof_size = len(dof_idx)
self._dof_idx = dof_idx
# record the current pos
self._init_dof_pos = np.zeros(self._dof_size,
dtype=np.float32)
for i in range(self._dof_size):
self._init_dof_pos[i] = self.low_state.motor_state[dof_idx[i]].q
def move_to_default_pos(self):
# move to default pos
if self.counter < self._num_step:
alpha = self.counter / self._num_step
for j in range(self._dof_size):
motor_idx = self._dof_idx[j]
target_pos = self._default_pos[j]
self.low_cmd.motor_cmd[motor_idx].q = (self._init_dof_pos[j] *
(1 - alpha) + target_pos * alpha)
self.low_cmd.motor_cmd[motor_idx].dq = 0.0
self.low_cmd.motor_cmd[motor_idx].kp = self._kps[j]
self.low_cmd.motor_cmd[motor_idx].kd = self._kds[j]
self.low_cmd.motor_cmd[motor_idx].tau = 0.0
self.send_cmd(self.low_cmd)
self.counter += 1
else:
self._mode_change = True
self.mode = Mode.damping
def default_pos_state(self):
if self.remote_controller.button[KeyMap.A] != 1:
for i in range(len(self.config.leg_joint2motor_idx)):
motor_idx = self.config.leg_joint2motor_idx[i]
self.low_cmd.motor_cmd[motor_idx].q = float(self.config.default_angles[i])
self.low_cmd.motor_cmd[motor_idx].dq = 0.0
self.low_cmd.motor_cmd[motor_idx].kp = self._kps[i]
self.low_cmd.motor_cmd[motor_idx].kd = self._kds[i]
self.low_cmd.motor_cmd[motor_idx].tau = 0.0
for i in range(len(self.config.arm_waist_joint2motor_idx)):
motor_idx = self.config.arm_waist_joint2motor_idx[i]
self.low_cmd.motor_cmd[motor_idx].q = float(self.config.arm_waist_target[i])
self.low_cmd.motor_cmd[motor_idx].dq = 0.0
self.low_cmd.motor_cmd[motor_idx].kp = self._kps[i]
self.low_cmd.motor_cmd[motor_idx].kd = self._kds[i]
self.low_cmd.motor_cmd[motor_idx].tau = 0.0
self.send_cmd(self.low_cmd)
else:
self._mode_change = True
self.mode = Mode.policy
def run_policy(self):
if self.remote_controller.button[KeyMap.select] == 1:
self._mode_change = True
self.mode = Mode.null
return
self.counter += 1
# Get current joint positions
for i_mot in range(len(self.motor_joint)):
i_pin = self.pin_from_mot[i_mot]
self.qj[i_pin] = self.low_state.motor_state[i_mot].q
self.cmd[0] = self.remote_controller.ly
self.cmd[1] = self.remote_controller.lx * -1
self.cmd[2] = self.remote_controller.rx * -1
delta = np.concatenate([self.cmd,
[1,0,0,0]])
res_q = self.ikctrl(self.qj, delta, rel=True)
for i_act in range(len(res_q)):
i_mot = self.mot_from_pin_act[i_act]
i_pin = self.pin_from_mot[i_mot]
target_q = (
self.low_state.motor_state[i_mot].q + res_q[i_act]
)
target_q = np.clip(target_q,
self.lim_lo_pin[i_pin],
self.lim_hi_pin[i_pin])
self.low_cmd.motor_cmd[i_mot].q = target_q
self.low_cmd.motor_cmd[i_mot].dq = 0.0
self.low_cmd.motor_cmd[i_mot].kp = float(self.config.kps[i_mot])
self.low_cmd.motor_cmd[i_mot].kd = float(self.config.kps[i_mot])
self.low_cmd.motor_cmd[i_mot].tau = 0.0
# send the command
self.send_cmd(self.low_cmd)
def run_wrapper(self):
# print("hello", self.mode,
# self.mode == Mode.zero_torque)
if self.mode == Mode.wait:
if self.low_state.crc != 0:
self.mode = Mode.zero_torque
self.low_cmd.mode_machine = self.mode_machine_
print("Successfully connected to the robot.")
elif self.mode == Mode.zero_torque:
if self._mode_change:
print("Enter zero torque state.")
print("Waiting for the start signal...")
self._mode_change = False
self.zero_torque_state()
elif self.mode == Mode.default_pos:
if self._mode_change:
print("Moving to default pos.")
self._mode_change = False
self.prepare_default_pos()
self.move_to_default_pos()
elif self.mode == Mode.damping:
if self._mode_change:
print("Enter default pos state.")
print("Waiting for the Button A signal...")
self._mode_change = False
self.default_pos_state()
elif self.mode == Mode.policy:
if self._mode_change:
print("Run policy.")
self._mode_change = False
self.counter = 0
self.run_policy()
elif self.mode == Mode.null:
self._terminate = True
# time.sleep(self.config.control_dt)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("config", type=str, help="config file name in the configs folder", default="g1.yaml")
args = parser.parse_args()
# Load config
config_path = f"{LEGGED_GYM_ROOT_DIR}/deploy/deploy_real/configs/{args.config}"
config = Config(config_path)
controller = Controller(config)

View File

@ -0,0 +1,121 @@
import os
from typing import Tuple
from contextlib import contextmanager
from pathlib import Path
import time
import numpy as np
import torch as th
import pinocchio as pin
import pink
from pink.tasks import FrameTask
@contextmanager
def with_dir(d):
d0 = os.getcwd()
try:
os.chdir(d)
yield
finally:
os.chdir(d0)
def xyzw2wxyz(q_xyzw: th.Tensor, dim: int = -1):
if isinstance(q_xyzw, np.ndarray):
return np.roll(q_xyzw, 1, axis=dim)
return th.roll(q_xyzw, 1, dims=dim)
def wxyz2xyzw(q_wxyz: th.Tensor, dim: int = -1):
if isinstance(q_wxyz, np.ndarray):
return np.roll(q_wxyz, -1, axis=dim)
return th.roll(q_wxyz, -1, dims=dim)
def dls_ik(
dpose: np.ndarray,
jac: np.ndarray,
sqlmda: float
):
"""
Arg:
dpose: task-space error (A[..., err])
jac: jacobian (A[..., err, dof])
sqlmda: DLS damping factor.
Return:
joint residual (A[..., dof])
"""
if isinstance(dpose, tuple):
dpose = np.concatenate([dpose[0], dpose[1]], axis=-1)
J = jac
A = J @ J.T
# NOTE(ycho): add to view of diagonal
a = np.einsum('...ii->...i', A)
a += sqlmda
dq = (J.T @ np.linalg.solve(A, dpose[..., None]))[..., 0]
return dq
class IKCtrl:
def __init__(self,
urdf_path: str,
joint_names: Tuple[str, ...],
sqlmda: float = 0.05**2):
path = Path(urdf_path)
with with_dir(path.parent):
robot = pin.RobotWrapper.BuildFromURDF(filename=urdf_path,
package_dirs=["."],
root_joint=None)
self.robot = robot
# NOTE(ycho): build index map between pin.q and other set(s) of ordered joints.
larm_from_pin = []
for j in joint_names:
larm_from_pin.append(robot.index(j) - 1)
self.larm_from_pin = np.asarray(larm_from_pin, dtype=np.int32)
self.task = FrameTask("left_hand_palm_link",
position_cost=1.0,
orientation_cost=0.0)
self.sqlmda = sqlmda
self.cfg = pink.Configuration(robot.model, robot.data,
np.zeros_like(robot.q0))
def __call__(self,
q0: np.ndarray,
target_pose: np.ndarray,
rel:bool=False
):
"""
Arg:
q0: Current robot joints; A[..., 43?]
target_pose:
Policy output. A[..., 7] formatted as (xyz, q_{wxyz})
Given as world frame absolute pose, for some reason.
Return:
joint residual: A[..., 7]
"""
robot = self.robot
# source pose
self.cfg.update(q0)
T0 = self.cfg.get_transform_frame_to_world("left_hand_palm_link")
# target pose
dst_xyz = target_pose[..., 0:3]
dst_quat = pin.Quaternion(wxyz2xyzw(target_pose[..., 3:7]))
T1 = pin.SE3(dst_quat, dst_xyz)
if rel:
T1 = T0 @ T1
# jacobian
self.task.set_target(T0)
jac = self.task.compute_jacobian(self.cfg)
jac = jac[:, self.larm_from_pin]
# error&ik
dT = T1.actInv(T0)
dpose = pin.log(dT).vector
dq = dls_ik(dpose, jac, self.sqlmda)
return dq