dds lowlevel interface is added

This commit is contained in:
Rooholla-KhorramBakht 2024-03-10 18:27:42 -04:00
parent afc8ba24ac
commit 2eb931c689
28 changed files with 1106 additions and 28 deletions

19
Go2Py/.idlpy_manifest Normal file
View File

@ -0,0 +1,19 @@
Go2pyLowCmd
msgs
BmsState_
unitree_go
IMUState_
unitree_go
LowState_
unitree_go
MotorState_
unitree_go

View File

@ -1,25 +0,0 @@
#!/bin/bash
GREEN='\033[0;32m'
NC='\033[0m' # No Color
echo -e "${GREEN} Starting DDS type generation...${NC}"
# Clean
rm -r msgs
# Make
for file in idl/*.idl
do
echo "Processing $file file..."
idlc -l py $file
done
mkdir msgs/cpp
cd msgs/cpp
for file in ../../idl/*.idl
do
echo "Processing $file file..."
idlc -l ../../idl/libcycloneddsidlcxx.so.0.10.2 $file
done
cd ../..
# rm -r ../cpp_bridge/include/go2py
# mv msgs/cpp ../cpp_bridge/include/go2py
echo -e "${GREEN} Done with DDS type generation${NC}"

View File

@ -0,0 +1,173 @@
import struct
import threading
import time
import numpy as np
import numpy.linalg as LA
from scipy.spatial.transform import Rotation as R
from cyclonedds.domain import DomainParticipant
from Go2Py.unitree_go.msg.dds_ import Go2pyLowCmd_
from cyclonedds.topic import Topic
from cyclonedds.pub import DataWriter
from cyclonedds.domain import DomainParticipant
from cyclonedds.topic import Topic
from cyclonedds.sub import DataReader
from cyclonedds.util import duration
from Go2Py.unitree_go.msg.dds_ import LowState_
from threading import Thread
class GO2Real():
def __init__(
self,
mode = 'lowlevel', # 'highlevel' or 'lowlevel'
vx_max=0.5,
vy_max=0.4,
ωz_max=0.5,
):
assert mode in ['highlevel', 'lowlevel'], "mode should be either 'highlevel' or 'lowlevel'"
self.mode = mode
if self.mode == 'highlevel':
raise NotImplementedError('DDS interface for the highlevel commands is not implemented yet. Please use our ROS2 interface.')
self.highcmd_topic_name = "rt/go2/twist_cmd"
self.lowcmd_topic_name = "rt/go2/lowcmd"
self.lowstate_topic_name = "rt/lowstate"
self.participant = DomainParticipant()
self.lowstate_topic = Topic(self.participant, self.lowstate_topic_name, LowState_)
self.lowstate_reader = DataReader(self.participant, self.lowstate_topic)
self.lowcmd_topic = Topic(self.participant, self.lowcmd_topic_name, Go2pyLowCmd_)
self.lowcmd_writer = DataWriter(self.participant, self.lowcmd_topic)
self.state = None
self.setCommands = {'lowlevel':self.setCommandsLow}[self.mode]
self.lowstate_thread = Thread(target = self.lowstate_update)
self.running = True
self.lowstate_thread.start()
def lowstate_update(self):
"""
Retrieve the state of the robot
"""
while self.running:
for msg in self.lowstate_reader.take_iter(timeout=duration(milliseconds=100.)):
self.state = msg
def getIMU(self):
accel = self.state.imu_state.accelerometer
gyro = self.state.imu_state.gyroscope
quat = self.state.imu_state.quaternion
rpy = self.state.imu_state.rpy
temp = self.state.imu_state.temperature
return {'accel':accel, 'gyro':gyro, 'quat':quat, "rpy":rpy, 'temp':temp}
def getFootContacts(self):
"""Returns the raw foot contact forces"""
footContacts = self.state.foot_force
return np.array(footContacts)
def getJointStates(self):
"""Returns the joint angles (q) and velocities (dq) of the robot"""
motor_state = np.array([[self.state.motor_state[i].q,
robot.state.motor_state[i].dq,
robot.state.motor_state[i].ddq,
robot.state.motor_state[i].tau_est,
robot.state.motor_state[i].temperature] for i in range(12)])
return {'q':motor_state[:,0],
'dq':motor_state[:,1],
'ddq':motor_state[:,2],
'tau_est':motor_state[:,3],
'temperature':motor_state[:,4]}
def getRemoteState(self):
"""A method to get the state of the wireless remote control.
Returns a xRockerBtn object:
- head: [head1, head2]
- keySwitch: xKeySwitch object
- lx: float
- rx: float
- ry: float
- L2: float
- ly: float
"""
wirelessRemote = self.state.wireless_remote[:24]
binary_data = bytes(wirelessRemote)
format_str = "<2BH5f"
data = struct.unpack(format_str, binary_data)
head = list(data[:2])
lx = data[3]
rx = data[4]
ry = data[5]
L2 = data[6]
ly = data[7]
_btn = bin(data[2])[2:].zfill(16)
btn = [int(char) for char in _btn]
btn.reverse()
keySwitch = xKeySwitch(*btn)
rockerBtn = xRockerBtn(head, keySwitch, lx, rx, ry, L2, ly)
return rockerBtn
def getCommandFromRemote(self):
"""Do not use directly for control!!!"""
rockerBtn = self.getRemoteState()
lx = rockerBtn.lx
ly = rockerBtn.ly
rx = rockerBtn.rx
v_x = ly * self.vx_max
v_y = lx * self.vy_max
ω = rx * self.ωz_max
return v_x, v_y, ω
def getBatteryState(self):
"""Returns the battery percentage of the robot"""
batteryState = self.state.bms_state
return batteryState.soc
def setCommandsHigh(self, v_x, v_y, ω_z, bodyHeight=0.0, footRaiseHeight=0.0, mode=2):
self.cmd_watchdog_timer = time.time()
_v_x, _v_y, _ω_z = self.clip_velocity(v_x, v_y, ω_z)
self.highcmd.header.stamp = self.get_clock().now().to_msg()
self.highcmd.header.frame_id = "base_link"
self.highcmd.twist.linear.x = _v_x
self.highcmd.twist.linear.y = _v_y
self.highcmd.twist.angular.z = _ω_z
self.highcmd_publisher.publish(self.highcmd)
def setCommandsLow(self, q, dq, kp, kd, tau_ff):
assert q.size == dq.size == kp.size == kd.size == tau_ff.size == 12, "q, dq, kp, kd, tau_ff should have size 12"
lowcmd = Go2pyLowCmd_(
q,
dq,
kp,
kd,
tau_ff
)
self.lowcmd_writer.write(lowcmd)
def close(self):
self.running = False
def check_calf_collision(self, q):
self.pin_robot.update(q)
in_collision = self.pin_robot.check_calf_collision(q)
return in_collision
def clip_velocity(self, v_x, v_y, ω_z):
_v = np.array([[v_x], [v_y]])
_scale = np.sqrt(_v.T @ self.P_v_max @ _v)[0, 0]
if _scale > 1.0:
scale = 1.0 / _scale
else:
scale = 1.0
return scale * v_x, scale * v_y, np.clip(ω_z, self.ωz_min, self.ωz_max)

View File

@ -12,7 +12,7 @@ from rclpy.executors import MultiThreadedExecutor
from geometry_msgs.msg import TransformStamped
from Go2Py.joy import xKeySwitch, xRockerBtn
from geometry_msgs.msg import TwistStamped
from unitree_go.msg import LowState, Go2pyLowCmd
from Go2Py.msgs.unitree_go.msg import LowState, Go2pyLowCmd
from nav_msgs.msg import Odometry

View File

@ -0,0 +1,19 @@
BmsState_
msg
Go2pyLowCmd
msg
IMUState_
msg
LowState_
msg
MotorState_
msg

View File

@ -0,0 +1,9 @@
"""
Generated by Eclipse Cyclone DDS idlc Python Backend
Cyclone DDS IDL version: v0.11.0
Module: Go2Py.unitree_go
"""
from . import msg
__all__ = ["msg", ]

View File

@ -0,0 +1,19 @@
BmsState_
dds_
Go2pyLowCmd
dds_
IMUState_
dds_
LowState_
dds_
MotorState_
dds_

View File

@ -0,0 +1,9 @@
"""
Generated by Eclipse Cyclone DDS idlc Python Backend
Cyclone DDS IDL version: v0.11.0
Module: Go2Py.unitree_go.msg
"""
from . import dds_
__all__ = ["dds_", ]

View File

@ -0,0 +1,19 @@
BmsState_
BmsState_
Go2pyLowCmd
Go2pyLowCmd_
IMUState_
IMUState_
LowState_
LowState_
MotorState_
MotorState_

View File

@ -0,0 +1,35 @@
"""
Generated by Eclipse Cyclone DDS idlc Python Backend
Cyclone DDS IDL version: v0.11.0
Module: Go2Py.unitree_go.msg.dds_
IDL file: BmsState_.idl
"""
from enum import auto
from typing import TYPE_CHECKING, Optional
from dataclasses import dataclass
import cyclonedds.idl as idl
import cyclonedds.idl.annotations as annotate
import cyclonedds.idl.types as types
# root module import for resolving types
import Go2Py.unitree_go
@dataclass
@annotate.final
@annotate.autoid("sequential")
class BmsState_(idl.IdlStruct, typename="Go2Py.unitree_go.msg.dds_.BmsState_"):
version_high: types.uint8
version_low: types.uint8
status: types.uint8
soc: types.uint8
current: types.int32
cycle: types.uint16
bq_ntc: types.array[types.uint8, 2]
mcu_ntc: types.array[types.uint8, 2]
cell_vol: types.array[types.uint16, 15]

View File

@ -0,0 +1,30 @@
"""
Generated by Eclipse Cyclone DDS idlc Python Backend
Cyclone DDS IDL version: v0.11.0
Module: Go2Py.unitree_go.msg.dds_
IDL file: Go2pyLowCmd.idl
"""
from enum import auto
from typing import TYPE_CHECKING, Optional
from dataclasses import dataclass
import cyclonedds.idl as idl
import cyclonedds.idl.annotations as annotate
import cyclonedds.idl.types as types
# root module import for resolving types
import Go2Py.unitree_go
@dataclass
@annotate.final
@annotate.autoid("sequential")
class Go2pyLowCmd_(idl.IdlStruct, typename="unitree_go.msg.dds_.Go2pyLowCmd_"):
q: types.array[types.float32, 12]
dq: types.array[types.float32, 12]
kp: types.array[types.float32, 12]
kd: types.array[types.float32, 12]
tau: types.array[types.float32, 12]

View File

@ -0,0 +1,31 @@
"""
Generated by Eclipse Cyclone DDS idlc Python Backend
Cyclone DDS IDL version: v0.11.0
Module: Go2Py.unitree_go.msg.dds_
IDL file: IMUState_.idl
"""
from enum import auto
from typing import TYPE_CHECKING, Optional
from dataclasses import dataclass
import cyclonedds.idl as idl
import cyclonedds.idl.annotations as annotate
import cyclonedds.idl.types as types
# root module import for resolving types
import Go2Py.unitree_go
@dataclass
@annotate.final
@annotate.autoid("sequential")
class IMUState_(idl.IdlStruct, typename="Go2Py.unitree_go.msg.dds_.IMUState_"):
quaternion: types.array[types.float32, 4]
gyroscope: types.array[types.float32, 3]
accelerometer: types.array[types.float32, 3]
rpy: types.array[types.float32, 3]
temperature: types.uint8

View File

@ -0,0 +1,48 @@
"""
Generated by Eclipse Cyclone DDS idlc Python Backend
Cyclone DDS IDL version: v0.11.0
Module: Go2Py.unitree_go.msg.dds_
IDL file: LowState_.idl
"""
from enum import auto
from typing import TYPE_CHECKING, Optional
from dataclasses import dataclass
import cyclonedds.idl as idl
import cyclonedds.idl.annotations as annotate
import cyclonedds.idl.types as types
# root module import for resolving types
import Go2Py.unitree_go
@dataclass
@annotate.final
@annotate.autoid("sequential")
class LowState_(idl.IdlStruct, typename="Go2Py.unitree_go.msg.dds_.LowState_"):
head: types.array[types.uint8, 2]
level_flag: types.uint8
frame_reserve: types.uint8
sn: types.array[types.uint32, 2]
version: types.array[types.uint32, 2]
bandwidth: types.uint16
imu_state: 'Go2Py.unitree_go.msg.dds_.IMUState_'
motor_state: types.array['Go2Py.unitree_go.msg.dds_.MotorState_', 20]
bms_state: 'Go2Py.unitree_go.msg.dds_.BmsState_'
foot_force: types.array[types.int16, 4]
foot_force_est: types.array[types.int16, 4]
tick: types.uint32
wireless_remote: types.array[types.uint8, 40]
bit_flag: types.uint8
adc_reel: types.float32
temperature_ntc1: types.uint8
temperature_ntc2: types.uint8
power_v: types.float32
power_a: types.float32
fan_frequency: types.array[types.uint16, 4]
reserve: types.uint32
crc: types.uint32

View File

@ -0,0 +1,37 @@
"""
Generated by Eclipse Cyclone DDS idlc Python Backend
Cyclone DDS IDL version: v0.11.0
Module: Go2Py.unitree_go.msg.dds_
IDL file: MotorState_.idl
"""
from enum import auto
from typing import TYPE_CHECKING, Optional
from dataclasses import dataclass
import cyclonedds.idl as idl
import cyclonedds.idl.annotations as annotate
import cyclonedds.idl.types as types
# root module import for resolving types
import Go2Py.unitree_go
@dataclass
@annotate.final
@annotate.autoid("sequential")
class MotorState_(idl.IdlStruct, typename="Go2Py.unitree_go.msg.dds_.MotorState_"):
mode: types.uint8
q: types.float32
dq: types.float32
ddq: types.float32
tau_est: types.float32
q_raw: types.float32
dq_raw: types.float32
ddq_raw: types.float32
temperature: types.uint8
lost: types.uint32
reserve: types.array[types.uint32, 2]

View File

@ -0,0 +1,13 @@
"""
Generated by Eclipse Cyclone DDS idlc Python Backend
Cyclone DDS IDL version: v0.11.0
Module: Go2Py.unitree_go.msg.dds_
"""
from ._BmsState_ import BmsState_
from ._Go2pyLowCmd import Go2pyLowCmd_
from ._IMUState_ import IMUState_
from ._LowState_ import LowState_
from ._MotorState_ import MotorState_
__all__ = ["BmsState_", "Go2pyLowCmd_", "IMUState_", "LowState_", "MotorState_", ]

384
dds_test.ipynb Normal file
View File

@ -0,0 +1,384 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from cyclonedds.domain import DomainParticipant\n",
"from Go2Py.unitree_go.msg.dds_ import Go2pyLowCmd_\n",
"from cyclonedds.topic import Topic\n",
"from cyclonedds.pub import DataWriter"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"participant = DomainParticipant()\n",
"topic = Topic(participant, \"rt/go2/lowcmd\", Go2pyLowCmd_)\n",
"writer = DataWriter(participant, topic)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"msg = Go2pyLowCmd_(np.zeros(12),\n",
"np.zeros(12),np.zeros(12),np.zeros(12),np.zeros(12))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"while True:\n",
" writer.write(msg)\n",
" time.sleep(0.001)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from Go2Py.std_msgs.msg.dds_ import String_\n",
"participant = DomainParticipant()\n",
"topic = Topic(participant, \"rt/test\", String_)\n",
"writer = DataWriter(participant, topic)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"msg = String_(\"Hello\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"while True:\n",
" writer.write(msg)\n",
" time.sleep(0.001)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from cyclonedds.domain import DomainParticipant\n",
"from cyclonedds.topic import Topic\n",
"from cyclonedds.sub import DataReader\n",
"from cyclonedds.util import duration\n",
"from Go2Py.unitree_go.msg.dds_ import LowState_"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"participant = DomainParticipant()\n",
"topic = Topic(participant, 'rt/lowstate', LowState_)\n",
"reader = DataReader(participant, topic)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"for msg in reader.take_iter(timeout=duration(milliseconds=100.)):\n",
" print(msg)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"reader.take(timeout=duration(milliseconds=100.))"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import struct\n",
"import threading\n",
"import time\n",
"import numpy as np\n",
"import numpy.linalg as LA\n",
"from scipy.spatial.transform import Rotation as R\n",
"\n",
"from cyclonedds.domain import DomainParticipant\n",
"from Go2Py.unitree_go.msg.dds_ import Go2pyLowCmd_\n",
"from cyclonedds.topic import Topic\n",
"from cyclonedds.pub import DataWriter\n",
"\n",
"from cyclonedds.domain import DomainParticipant\n",
"from cyclonedds.topic import Topic\n",
"from cyclonedds.sub import DataReader\n",
"from cyclonedds.util import duration\n",
"from Go2Py.unitree_go.msg.dds_ import LowState_\n",
"from threading import Thread"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"class GO2Real():\n",
" def __init__(\n",
" self,\n",
" mode = 'lowlevel', # 'highlevel' or 'lowlevel'\n",
" vx_max=0.5,\n",
" vy_max=0.4,\n",
" ωz_max=0.5,\n",
" ):\n",
" assert mode in ['highlevel', 'lowlevel'], \"mode should be either 'highlevel' or 'lowlevel'\"\n",
" self.mode = mode\n",
" if self.mode == 'highlevel':\n",
" raise NotImplementedError('DDS interface for the highlevel commands is not implemented yet. Please use our ROS2 interface.')\n",
"\n",
" self.highcmd_topic_name = \"rt/go2/twist_cmd\"\n",
" self.lowcmd_topic_name = \"rt/go2/lowcmd\"\n",
" self.lowstate_topic_name = \"rt/lowstate\"\n",
"\n",
" self.participant = DomainParticipant()\n",
" self.lowstate_topic = Topic(self.participant, self.lowstate_topic_name, LowState_)\n",
" self.lowstate_reader = DataReader(self.participant, self.lowstate_topic)\n",
" \n",
" self.lowcmd_topic = Topic(self.participant, self.lowcmd_topic_name, Go2pyLowCmd_)\n",
" self.lowcmd_writer = DataWriter(self.participant, self.lowcmd_topic)\n",
"\n",
" self.state = None\n",
" self.setCommands = {'lowlevel':self.setCommandsLow}[self.mode]\n",
" self.lowstate_thread = Thread(target = self.lowstate_update)\n",
" self.running = True\n",
" self.lowstate_thread.start()\n",
"\n",
" def lowstate_update(self):\n",
" \"\"\"\n",
" Retrieve the state of the robot\n",
" \"\"\"\n",
" while self.running:\n",
" for msg in self.lowstate_reader.take_iter(timeout=duration(milliseconds=100.)):\n",
" self.state = msg\n",
"\n",
" def getIMU(self):\n",
" accel = self.state.imu_state.accelerometer\n",
" gyro = self.state.imu_state.gyroscope\n",
" quat = self.state.imu_state.quaternion\n",
" rpy = self.state.imu_state.rpy\n",
" temp = self.state.imu_state.temperature\n",
" return {'accel':accel, 'gyro':gyro, 'quat':quat, \"rpy\":rpy, 'temp':temp}\n",
"\n",
" def getFootContacts(self):\n",
" \"\"\"Returns the raw foot contact forces\"\"\"\n",
" footContacts = self.state.foot_force \n",
" return np.array(footContacts)\n",
"\n",
" def getJointStates(self):\n",
" \"\"\"Returns the joint angles (q) and velocities (dq) of the robot\"\"\"\n",
" motor_state = np.array([[self.state.motor_state[i].q,\n",
" robot.state.motor_state[i].dq,\n",
" robot.state.motor_state[i].ddq,\n",
" robot.state.motor_state[i].tau_est,\n",
" robot.state.motor_state[i].temperature] for i in range(12)])\n",
" return {'q':motor_state[:,0], \n",
" 'dq':motor_state[:,1],\n",
" 'ddq':motor_state[:,2],\n",
" 'tau_est':motor_state[:,3],\n",
" 'temperature':motor_state[:,4]}\n",
"\n",
" def getRemoteState(self):\n",
" \"\"\"A method to get the state of the wireless remote control. \n",
" Returns a xRockerBtn object: \n",
" - head: [head1, head2]\n",
" - keySwitch: xKeySwitch object\n",
" - lx: float\n",
" - rx: float\n",
" - ry: float\n",
" - L2: float\n",
" - ly: float\n",
" \"\"\"\n",
" wirelessRemote = self.state.wireless_remote[:24]\n",
" binary_data = bytes(wirelessRemote)\n",
"\n",
" format_str = \"<2BH5f\"\n",
" data = struct.unpack(format_str, binary_data)\n",
"\n",
" head = list(data[:2])\n",
" lx = data[3]\n",
" rx = data[4]\n",
" ry = data[5]\n",
" L2 = data[6]\n",
" ly = data[7]\n",
"\n",
" _btn = bin(data[2])[2:].zfill(16)\n",
" btn = [int(char) for char in _btn]\n",
" btn.reverse()\n",
"\n",
" keySwitch = xKeySwitch(*btn)\n",
" rockerBtn = xRockerBtn(head, keySwitch, lx, rx, ry, L2, ly)\n",
" return rockerBtn\n",
"\n",
" def getCommandFromRemote(self):\n",
" \"\"\"Do not use directly for control!!!\"\"\"\n",
" rockerBtn = self.getRemoteState()\n",
"\n",
" lx = rockerBtn.lx\n",
" ly = rockerBtn.ly\n",
" rx = rockerBtn.rx\n",
"\n",
" v_x = ly * self.vx_max\n",
" v_y = lx * self.vy_max\n",
" ω = rx * self.ωz_max\n",
" \n",
" return v_x, v_y, ω\n",
"\n",
" def getBatteryState(self):\n",
" \"\"\"Returns the battery percentage of the robot\"\"\"\n",
" batteryState = self.state.bms_state\n",
" return batteryState.soc\n",
"\n",
" def setCommandsHigh(self, v_x, v_y, ω_z, bodyHeight=0.0, footRaiseHeight=0.0, mode=2):\n",
" self.cmd_watchdog_timer = time.time()\n",
" _v_x, _v_y, _ω_z = self.clip_velocity(v_x, v_y, ω_z)\n",
" self.highcmd.header.stamp = self.get_clock().now().to_msg()\n",
" self.highcmd.header.frame_id = \"base_link\"\n",
" self.highcmd.twist.linear.x = _v_x\n",
" self.highcmd.twist.linear.y = _v_y\n",
" self.highcmd.twist.angular.z = _ω_z\n",
" self.highcmd_publisher.publish(self.highcmd)\n",
"\n",
" def setCommandsLow(self, q, dq, kp, kd, tau_ff):\n",
" assert q.size == dq.size == kp.size == kd.size == tau_ff.size == 12, \"q, dq, kp, kd, tau_ff should have size 12\"\n",
" lowcmd = Go2pyLowCmd_(\n",
" q,\n",
" dq, \n",
" kp,\n",
" kd,\n",
" tau_ff\n",
" )\n",
" self.lowcmd_writer.write(lowcmd)\n",
"\n",
" def close(self):\n",
" self.running = False\n",
"\n",
" def check_calf_collision(self, q):\n",
" self.pin_robot.update(q)\n",
" in_collision = self.pin_robot.check_calf_collision(q)\n",
" return in_collision\n",
"\n",
" def clip_velocity(self, v_x, v_y, ω_z):\n",
" _v = np.array([[v_x], [v_y]])\n",
" _scale = np.sqrt(_v.T @ self.P_v_max @ _v)[0, 0]\n",
"\n",
" if _scale > 1.0:\n",
" scale = 1.0 / _scale\n",
" else:\n",
" scale = 1.0\n",
"\n",
" return scale * v_x, scale * v_y, np.clip(ω_z, self.ωz_min, self.ωz_max)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"robot = GO2Real()"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"64"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"robot.getBatteryState()"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"BmsState_(version_high=1, version_low=9, status=8, soc=64, current=-1716, cycle=9, bq_ntc=b'\\x1e\\x1c', mcu_ntc=b' \\x1f', cell_vol=[3843, 3848, 3849, 3849, 3848, 3846, 3845, 3839, 0, 0, 0, 0, 0, 0, 0])"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"robot.state.bms_state"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.10"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

View File

@ -0,0 +1,51 @@
// generated from rosidl_generator_dds_idl/resource/idl.idl.em
// with input from unitree_go:msg/BmsState.idl
// generated code does not contain a copyright notice
#ifndef __unitree_go__msg__bms_state__idl__
#define __unitree_go__msg__bms_state__idl__
module Go2Py {
module unitree_go {
module msg {
module dds_ {
struct BmsState_ {
octet version_high; //电池版本
octet version_low; //电池版本
// 0SAFE,(未开启电池)
// 1WAKE_UP,(唤醒事件)
// 6PRECHG, (电池预冲电中)
// 7CHG, (电池正常充电中)
// 8DCHG, (电池正常放电中)
// 9SELF_DCHG, (电池自放电中)
// 11ALARM, (电池存在警告)
// 12RESET_ALARM, (等待按键复位警告中)
// 13AUTO_RECOVERY (复位中)
octet status; //电池状态信息。
octet soc; //电池电量信息类型uint8_t(范围1% - 100%)
long current; //充放电信息:(正:代表充电,负代表放电)可按照实际数值显示
unsigned short cycle; //充电循环次数
octet bq_ntc[2]; //电池内部两个NTC的温度int8_t范围-100 - 150。 0- BAT1; 1- BAT2
octet mcu_ntc[2]; //电池NTC数组0 - RES1 - MOS int8_t范围-100 - 150
unsigned short cell_vol[15]; //电池内部15节电池的电压。
};
}; // module dds_
}; // module msg
}; // module unitree_go
};
#endif // __unitree_go__msg__bms_state__idl__

View File

@ -0,0 +1,13 @@
module unitree_go {
module msg {
module dds_{
@final struct Go2pyLowCmd_ {
float q[12];
float dq[12];
float kp[12];
float kd[12];
float tau[12];
};
};
};
};

View File

@ -0,0 +1,36 @@
// generated from rosidl_generator_dds_idl/resource/idl.idl.em
// with input from unitree_go:msg/IMUState.idl
// generated code does not contain a copyright notice
#ifndef __unitree_go__msg__imu_state__idl__
#define __unitree_go__msg__imu_state__idl__
module Go2Py {
module unitree_go {
module msg {
module dds_ {
struct IMUState_ {
float quaternion[4]; //四元数数据
float gyroscope[3]; //角速度信息0 -> x ,0 -> y ,0 -> z
float accelerometer[3]; //加速度信息0 -> x ,0 -> y ,0 -> z
float rpy[3]; //欧拉角信息:默认为弧度值(可按照实际情况改为角度值),可按照实际数值显示(弧度值范围:-7 - +7显示3位小数数组0-roll翻滚角1-pitch俯仰角2-yaw偏航角
octet temperature; //IMU 温度信息(摄氏度)。
};
}; // module dds_
}; // module msg
}; // module unitree_go
};
#endif // __unitree_go__msg__imu_state__idl__

View File

@ -0,0 +1,74 @@
// generated from rosidl_generator_dds_idl/resource/idl.idl.em
// with input from unitree_go:msg/LowState.idl
// generated code does not contain a copyright notice
#include "BmsState_.idl"
#include "IMUState_.idl"
#include "MotorState_.idl"
#ifndef __unitree_go__msg__low_state__idl__
#define __unitree_go__msg__low_state__idl__
module Go2Py {
module unitree_go {
module msg {
module dds_ {
struct LowState_ {
octet head[2]; //帧头数据校验用0xFE,0xEF
octet level_flag; //沿用的,但是目前不用。
octet frame_reserve; //沿用的,但是目前不用。
unsigned long sn[2]; //已经改为文件存储形式,目前没用。
unsigned long version[2]; //沿用的,但是目前不用。
unsigned short bandwidth; //沿用的,但是目前不用。。
unitree_go::msg::dds_::IMUState_ imu_state; //IMU数据信息。
// FR_0 -> 0 , FR_1 -> 1 , FR_2 -> 2 电机顺序目前只用12电机后面保留。
// FL_0 -> 3 , FL_1 -> 4 , FL_2 -> 5
// RR_0 -> 6 , RR_1 -> 7 , RR_2 -> 8
// RL_0 -> 9 , RL_1 -> 10 , RL_2 -> 11
unitree_go::msg::dds_::MotorState_ motor_state[20]; //电机总数据。
unitree_go::msg::dds_::BmsState_ bms_state; //电池总数据。
short foot_force[4]; //足端力范围0-4095可按照实际数值显示。数组0-FR1-FL2-RR, 3-RL
short foot_force_est[4]; //沿用的,但是目前不用。
unsigned long tick; //1ms计时用按照1ms递增。
octet wireless_remote[40]; //遥控器原始数据。
//&0x80 - 电机 超时标志 1-超时 0-正常
//&0x40 - 小Mcu 超时标志 1-超时 0-正常
//&0x20 - 遥控器 超时标志 1-超时 0-正常
//&0x10 - 电池 超时标志 1-超时 0-正常
//&0x04 - 自动充电 自动充电状态标志 1-不充电 0-充电
//&0x02 - 板载电流错误标志 错误标志 1-板载电流异常 0-正常
//&0x01 - 运控命令超时 超时标志 1-超时 0-正常
octet bit_flag; //各个组件状态显示
float adc_reel; //卷线器电流范围0 - 3A
octet temperature_ntc1; //主板中心温度值(范围:-20 - 100℃
octet temperature_ntc2; //自动充电温度(范围:-20 - 100℃
float power_v; //此电压值为主板电压 -> 电池电压 。
float power_a; //此电流值为主板电流值 -> 电机电流。
unsigned short fan_frequency[4]; //风扇转速目前可按照实际数值显示0-100000-左后转速 , 1-右后转速2-前转速,单位转/分钟堵转检测3-&0x01左后堵转 , &0x02右后堵转&0x04前堵转
unsigned long reserve; //保留位。
unsigned long crc; //数据CRC校验用。
};
}; // module dds_
}; // module msg
}; // module unitree_go
};
#endif // __unitree_go__msg__low_state__idl__

View File

@ -0,0 +1,40 @@
// generated from rosidl_generator_dds_idl/resource/idl.idl.em
// with input from unitree_go:msg/MotorState.idl
// generated code does not contain a copyright notice
#ifndef __unitree_go__msg__motor_state__idl__
#define __unitree_go__msg__motor_state__idl__
module Go2Py {
module unitree_go {
module msg {
module dds_ {
struct MotorState_ {
octet mode; //电机控制模式Foc模式工作模式-> 0x01 stop模式待机模式-> 0x00。
float q; //关机反馈位置信息:默认为弧度值(可按照实际情况改为角度值),可按照实际数值显示(弧度值范围:-7 - +7显示3位小数
float dq; //关节反馈速度
float ddq; //关节反馈加速度
float tau_est; //关节反馈力矩
float q_raw; //沿用的,但是目前不用。
float dq_raw; //沿用的,但是目前不用。
float ddq_raw; //沿用的,但是目前不用。
octet temperature; //电机温度信息类型int8_t ,可按照实际数值显示(范围:-100 - 150
unsigned long lost; //电机丢包信息可按照实际数值显示范围0-9999999999
unsigned long reserve[2]; //当前电机通信频率+电机错误标志位数组0-电机错误标志位范围0-255可按照实际数值显示1-当前电机通信频率范围0-800可按照实际数值显示
};
}; // module dds_
}; // module msg
}; // module unitree_go
};
#endif // __unitree_go__msg__motor_state__idl__

29
msg_sources/make_msgs.sh Executable file
View File

@ -0,0 +1,29 @@
#!/bin/bash
GREEN='\033[0;32m'
NC='\033[0m' # No Color
echo -e "${GREEN} Starting DDS type generation...${NC}"
# Clean
# rm -r Go2Py/go2py_msgs/dds_
# rm -r ../Go2Py/unitree_go
# Make
for file in idls/*.idl
do
echo "Processing $file file..."
idlc -l py $file
done
mv Go2Py/unitree_go ../Go2Py
# rm -r Go2Py
# mkdir msgs/cpp
# cd msgs/cpp
# for file in ../../idl/*.idl
# do
# echo "Processing $file file..."
# idlc -l ../../idl/libcycloneddsidlcxx.so.0.10.2 $file
# done
# cd ../..
# # rm -r ../cpp_bridge/include/go2py
# # mv msgs/cpp ../cpp_bridge/include/go2py
# echo -e "${GREEN} Done with DDS type generation${NC}"

View File

@ -1,3 +1,18 @@
from setuptools import setup
# from setuptools import setup
setup()
# setup()
from setuptools import find_packages
from distutils.core import setup
setup(
name='Go2Py',
version='1.0.0',
author='Gabriel Margolis',
license="BSD-3-Clause",
packages=find_packages(),
author_email='gmargo@mit.edu',
description='Toolkit for deployment of sim-to-real RL on the Unitree Go1.',
install_requires=[
]
)