walk these ways controller tested with FSM in simulation

This commit is contained in:
Rooholla-KhorramBakht 2024-03-19 21:50:19 -04:00
parent cf73faf024
commit 34b778bab9
9 changed files with 345 additions and 706 deletions

View File

@ -0,0 +1,145 @@
import threading
import numpy as np
import time
from enum import Enum
class FSM:
def __init__(self, robot, remote, safety_hypervisor, user_controller_callback=None):
self.robot = robot
self.remote = remote
self.remote.flushStates()
self.safety = safety_hypervisor
self.user_controller_callback = user_controller_callback
self.state = "damping"
self.tracking_kp = np.array(4*[150, 150, 150.]).reshape(12)
self.tracking_kv = np.array(12*[3.])
self.damping_kv = np.array(12*[2.])
self.tracking_complete = True
self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), self.damping_kv, np.zeros(12))
self.fsm_dT = 1./50.
self.control_dT = 1./50.
self.dT = self.fsm_dT
self.modes = {"tracking":self.trackingControlUpdate,
"damping" :self.dampingControlUpdate,
"user": self.userControlUpdate,
}
self.setMode("damping")
self.running = True
self.fsm_thread = threading.Thread(target = self.update)
self.fsm_thread.start()
# if the robot is a simulation, create a thread for stepping it
if self.robot.simulated:
self.sim_thread = threading.Thread(target=self.simUpdate)
self.sim_thread.start()
def setMode(self, mode):
assert mode in self.modes.keys(), 'the requested control update mode is not implemented'
self.updateCommands = self.modes[mode]
# print(f'setting mode to {mode}')
def moveTo(self, target, duration=0.5):
# assert self.tracking_complete, 'The previous moveTo command is not completed yet!'
self.q_start = self.robot.getJointStates()['q']
self.q_target = target
self.time = 0.0
self.move_duration = duration
self.q_des = lambda t: [self.q_start + np.clip((t)/self.move_duration,0, 1)*(self.q_target - self.q_start), \
True if np.clip((t)/self.move_duration,0, 1)==1 else False] # q_des(t), Movement finished
self.tracking_complete = False
self.setMode("tracking")
def trackingControlUpdate(self):
self.time +=self.dT
q_des, done = self.q_des(self.time)
self.robot.setCommands(q_des, np.zeros(12), self.tracking_kp, self.tracking_kv, np.zeros(12))
self.tracking_complete = done
def dampingControlUpdate(self):
self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), self.damping_kv, np.zeros(12))
def userControlUpdate(self):
if self.user_controller_callback is not None:
self.user_controller_callback(self.robot, self.remote)
def simUpdate(self):
while self.running:
self.robot.step()
time.sleep(self.robot.dt)
def update(self):
while self.running:
getattr(self, self.state)()
time.sleep(self.dT)
self.updateCommands()
def close(self):
self.running = False
# The following functions each are the states of the FSM
def damping(self):
# print('damping')
if self.remote.standUpDownSeq():
self.moveTo(self.robot.prestanding_q, 1)
self.state = "pre_standing"
else:
self.setMode("damping")
self.state = "damping"
def pre_standing(self):
# print("pre_stance")
if self.tracking_complete:
self.moveTo(self.robot.standing_q, duration=1.5)
self.state = 'standing'
else:
self.state = "pre_standing"
def standing(self):
# print("standing")
if self.tracking_complete:
# self.moveTo(robot.standing_q, duration=1)
self.state = 'locked_stance'
else:
self.state = "standing"
def locked_stance(self):
# print("locked_stance")
if self.remote.startSeq():
self.setMode("user")
self.dT = self.control_dT
self.state = "user_loop"
self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12))
elif self.remote.standUpDownSeq() or self.robot.overheat():
self.moveTo(self.robot.sitting_q, duration = 1.5)
self.state = "sitting"
def user_loop(self):
# print("user_loop")
if self.safety.unsafe():
self.dT = self.fsm_dT
self.setMode("damping")
elif self.remote.standUpDownSeq() or self.safety.controlTimeout():
self.dT = self.fsm_dT
self.moveTo(self.robot.standing_q, duration = 1)
self.timer = time.time()
self.state = "switch_back_to_locked_stance"
else:
self.state = "user_loop"
def sitting(self):
# print('sitting')
if self.tracking_complete:
self.setMode("damping")
self.state = 'damping'
else:
self.state = "sitting"
def switch_back_to_locked_stance(self):
if time.time()-self.timer > 0.5:
# print("going back to locked stance")
self.state = "locked_stance"

63
Go2Py/robot/remote.py Normal file
View File

@ -0,0 +1,63 @@
import threading
from pynput import keyboard
class BaseRemote:
def __init__(self):
pass
def startSeq(self):
return False
def standUpDownSeq(self):
return False
def flushStates(self):
pass
remote = BaseRemote()
class KeyboardRemote(BaseRemote):
def __init__(self):
super().__init__()
self.start_seq_flag = False
self.stand_up_down_seq_flag = False
self.listener_thread = threading.Thread(target=self._listen_to_keyboard)
self.listener_thread.daemon = True
self.listener_thread.start()
def _on_press(self, key):
try:
if key.char == 's': # Start sequence
self.start_seq_flag = True
elif key.char == 'u': # Stand up/down sequence
self.stand_up_down_seq_flag = True
except AttributeError:
pass # Special keys (like space) will be handled here
def _on_release(self, key):
try:
if key.char == 's': # Start sequence
self.start_seq_flag = False
elif key.char == 'u': # Stand up/down sequence
self.stand_up_down_seq_flag = False
except AttributeError:
pass # Special keys (like space) will be handled here
def _listen_to_keyboard(self):
with keyboard.Listener(on_press=self._on_press, on_release=self._on_release) as listener:
listener.join()
def startSeq(self):
if self.start_seq_flag:
self.start_seq_flag = False
return True
return False
def standUpDownSeq(self):
if self.stand_up_down_seq_flag:
self.stand_up_down_seq_flag = False
return True
return False
def flushStates(self):
self.stand_up_down_seq_flag = False
self.start_seq_flag = False

14
Go2Py/robot/safety.py Normal file
View File

@ -0,0 +1,14 @@
import time
class SafetyHypervisor():
def __init__(self, robot):
self.robot = robot
def unsafe(self):
return False
def controlTimeout(self):
if time.time() - self.robot.latest_command_stamp > 0.1:
print('controller timeout')
return True
else:
return False

View File

@ -8,267 +8,22 @@
"source": [
"from Go2Py.robot.interface.dds import GO2Real\n",
"import time\n",
"robot = GO2Real(mode='lowlevel')"
"from Go2Py.sim.mujoco import Go2Sim\n",
"from Go2Py.robot.fsm import FSM\n",
"from Go2Py.robot.remote import KeyboardRemote\n",
"from Go2Py.robot.safety import SafetyHypervisor\n",
"import numpy as np"
]
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import threading\n",
"from pynput import keyboard\n",
"class BaseRemote:\n",
" def __init__(self):\n",
" pass\n",
" def startSeq(self):\n",
" return False\n",
" def standUpDownSeq(self):\n",
" return False\n",
"\n",
" def flushStates(self):\n",
" pass\n",
"\n",
"remote = BaseRemote()\n",
"\n",
"class KeyboardRemote(BaseRemote):\n",
" def __init__(self):\n",
" super().__init__()\n",
" self.start_seq_flag = False\n",
" self.stand_up_down_seq_flag = False\n",
" self.listener_thread = threading.Thread(target=self._listen_to_keyboard)\n",
" self.listener_thread.daemon = True\n",
" self.listener_thread.start()\n",
"\n",
" def _on_press(self, key):\n",
" try:\n",
" if key.char == 's': # Start sequence\n",
" self.start_seq_flag = True\n",
" elif key.char == 'u': # Stand up/down sequence\n",
" self.stand_up_down_seq_flag = True\n",
" except AttributeError:\n",
" pass # Special keys (like space) will be handled here\n",
"\n",
" \n",
" def _on_release(self, key):\n",
" try:\n",
" if key.char == 's': # Start sequence\n",
" self.start_seq_flag = False\n",
" elif key.char == 'u': # Stand up/down sequence\n",
" self.stand_up_down_seq_flag = False\n",
" except AttributeError:\n",
" pass # Special keys (like space) will be handled here\n",
"\n",
" def _listen_to_keyboard(self):\n",
" with keyboard.Listener(on_press=self._on_press, on_release=self._on_release) as listener:\n",
" listener.join()\n",
"\n",
" def startSeq(self):\n",
" if self.start_seq_flag:\n",
" self.start_seq_flag = False\n",
" return True\n",
" return False\n",
"\n",
" def standUpDownSeq(self):\n",
" if self.stand_up_down_seq_flag:\n",
" self.stand_up_down_seq_flag = False\n",
" return True\n",
" return False\n",
"\n",
" def flushStates(self):\n",
" self.stand_up_down_seq_flag = False\n",
" self.start_seq_flag = False"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"remote = KeyboardRemote()"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"class SafetyHypervisor():\n",
" def __init__(self, robot):\n",
" self.robot = robot\n",
"\n",
" def unsafe(self):\n",
" return False\n",
" \n",
" def controlTimeout(self):\n",
" if time.time() - robot.latest_command_stamp > 0.1:\n",
" print('controller timeout')\n",
" return True\n",
" else:\n",
" return False \n",
"\n",
"safety = SafetyHypervisor(robot)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"import threading\n",
"import numpy as np\n",
"import time\n",
"from enum import Enum\n",
"\n",
"\n",
"class FSM:\n",
" def __init__(self, robot, remote, safety_hypervisor, user_controller_callback=None):\n",
" self.robot = robot\n",
" self.remote = remote\n",
" self.remote.flushStates()\n",
" self.safety = safety_hypervisor\n",
" self.user_controller_callback = user_controller_callback\n",
"\n",
" self.state = \"damping\"\n",
"\n",
" # self.tracking_kp = np.array(4*[200, 200, 350.]).reshape(12)\n",
" # self.tracking_kv = np.array(12*[10.])\n",
"\n",
" self.tracking_kp = np.array(4*[150, 150, 150.]).reshape(12)\n",
" self.tracking_kv = np.array(12*[3.])\n",
" self.damping_kv = np.array(12*[2.])\n",
"\n",
"\n",
" self.tracking_complete = True\n",
" self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), self.damping_kv, np.zeros(12))\n",
" self.fsm_dT = 1./50.\n",
" self.control_dT = 1./400.\n",
" self.dT = self.fsm_dT\n",
"\n",
" self.modes = {\"tracking\":self.trackingControlUpdate,\n",
" \"damping\" :self.dampingControlUpdate,\n",
" \"user\": self.userControlUpdate,\n",
" }\n",
" self.setMode(\"damping\")\n",
"\n",
" self.running = True\n",
" self.fsm_thread = threading.Thread(target = self.update)\n",
" self.fsm_thread.start()\n",
" # if the robot is a simulation, create a thread for stepping it\n",
" if robot.simulated:\n",
" self.sim_thread = threading.Thread(target=self.simUpdate)\n",
" self.sim_thread.start()\n",
"\n",
" def setMode(self, mode):\n",
" assert mode in self.modes.keys(), 'the requested control update mode is not implemented'\n",
" self.updateCommands = self.modes[mode]\n",
" # print(f'setting mode to {mode}')\n",
" \n",
" def moveTo(self, target, duration=0.5):\n",
" # assert self.tracking_complete, 'The previous moveTo command is not completed yet!'\n",
" self.q_start = self.robot.getJointStates()['q']\n",
" self.q_target = target\n",
" self.time = 0.0\n",
" self.move_duration = duration\n",
" self.q_des = lambda t: [self.q_start + np.clip((t)/self.move_duration,0, 1)*(self.q_target - self.q_start), \\\n",
" True if np.clip((t)/self.move_duration,0, 1)==1 else False] # q_des(t), Movement finished\n",
" self.tracking_complete = False\n",
" self.setMode(\"tracking\")\n",
"\n",
" def trackingControlUpdate(self):\n",
" self.time +=self.dT\n",
" q_des, done = self.q_des(self.time)\n",
" self.robot.setCommands(q_des, np.zeros(12), self.tracking_kp, self.tracking_kv, np.zeros(12))\n",
" self.tracking_complete = done\n",
"\n",
" def dampingControlUpdate(self):\n",
" self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), self.damping_kv, np.zeros(12))\n",
"\n",
" def userControlUpdate(self):\n",
" if self.user_controller_callback is not None:\n",
" user_controller_callback(robot, remote)\n",
"\n",
" def simUpdate(self):\n",
" while self.running:\n",
" self.robot.step()\n",
" time.sleep(robot.dt)\n",
" \n",
" def update(self):\n",
" while self.running:\n",
" getattr(self, self.state)()\n",
" time.sleep(self.dT)\n",
" self.updateCommands()\n",
" \n",
" def close(self):\n",
" self.running = False\n",
"\n",
" # The following functions each are the states of the FSM\n",
" def damping(self):\n",
" # print('damping')\n",
" if self.remote.standUpDownSeq():\n",
" self.moveTo(robot.prestanding_q, 1)\n",
" self.state = \"pre_standing\"\n",
" else:\n",
" self.setMode(\"damping\")\n",
" self.state = \"damping\"\n",
"\n",
" def pre_standing(self):\n",
" # print(\"pre_stance\")\n",
" if self.tracking_complete:\n",
" self.moveTo(robot.standing_q, duration=1.5)\n",
" self.state = 'standing'\n",
" else:\n",
" self.state = \"pre_standing\"\n",
" \n",
" def standing(self):\n",
" # print(\"standing\")\n",
" if self.tracking_complete:\n",
" # self.moveTo(robot.standing_q, duration=1)\n",
" self.state = 'locked_stance'\n",
" else:\n",
" self.state = \"standing\"\n",
"\n",
" def locked_stance(self):\n",
" # print(\"locked_stance\")\n",
" if self.remote.startSeq():\n",
" self.setMode(\"user\")\n",
" self.dT = self.control_dT\n",
" self.state = \"user_loop\"\n",
" self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12))\n",
" elif self.remote.standUpDownSeq() or robot.overheat():\n",
" self.moveTo(robot.sitting_q, duration = 1.5)\n",
" self.state = \"sitting\"\n",
"\n",
" def user_loop(self):\n",
" # print(\"user_loop\")\n",
" if self.safety.unsafe():\n",
" self.dT = self.fsm_dT\n",
" self.setMode(\"damping\")\n",
" elif self.remote.standUpDownSeq() or self.safety.controlTimeout():\n",
" self.dT = self.fsm_dT\n",
" self.moveTo(robot.standing_q, duration = 1)\n",
" self.timer = time.time()\n",
" self.state = \"switch_back_to_locked_stance\"\n",
" else:\n",
" self.state = \"user_loop\"\n",
"\n",
" def sitting(self):\n",
" # print('sitting')\n",
" if self.tracking_complete:\n",
" self.setMode(\"damping\")\n",
" self.state = 'damping'\n",
" else:\n",
" self.state = \"sitting\"\n",
"\n",
" def switch_back_to_locked_stance(self):\n",
" if time.time()-self.timer > 0.5:\n",
" # print(\"going back to locked stance\")\n",
" self.state = \"locked_stance\""
"robot = GO2Real(mode='lowlevel')\n",
"remote = KeyboardRemote()\n",
"safety_hypervisor = SafetyHypervisor(robot)"
]
},
{
@ -288,43 +43,6 @@
"source": [
"fsm.close()"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'q': array([-0.02495611, 1.26249647, -2.82826662, 0.04563564, 1.2505368 ,\n",
" -2.7933557 , -0.30623949, 1.28283751, -2.82314873, 0.26400229,\n",
" 1.29355574, -2.84276843]),\n",
" 'dq': array([ 0.03875524, 0.0155021 , 0.01011006, 0.00775105, 0.03875524,\n",
" 0.01415409, -0.04263076, -0.00387552, 0.02426415, -0.00387552,\n",
" 0. , -0.02426415]),\n",
" 'ddq': array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]),\n",
" 'tau_est': array([-0.02473828, 0.04947656, -0.04741504, -0.07421485, 0. ,\n",
" 0. , 0.04947656, 0.04947656, 0. , 0.02473828,\n",
" -0.07421485, 0. ]),\n",
" 'temperature': array([38., 32., 32., 36., 32., 33., 41., 33., 33., 41., 33., 33.])}"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"robot.getJointStates()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {

View File

@ -2,22 +2,44 @@
"cells": [
{
"cell_type": "code",
"execution_count": null,
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from Go2Py.sim.mujoco import Go2Sim\n",
"from Go2Py.robot.fsm import FSM\n",
"from Go2Py.robot.remote import KeyboardRemote\n",
"from Go2Py.robot.safety import SafetyHypervisor\n",
"import numpy as np"
]
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"robot = Go2Sim()\n",
"robot.sitDownReset()"
"remote = KeyboardRemote()\n",
"safety_hypervisor = SafetyHypervisor(robot)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"controller timeout\n"
]
}
],
"source": [
"robot.sitDownReset()\n",
"fsm = FSM(robot, remote, safety_hypervisor)"
]
},
{
@ -26,308 +48,8 @@
"metadata": {},
"outputs": [],
"source": [
"import threading\n",
"from pynput import keyboard\n",
"class BaseRemote:\n",
" def __init__(self):\n",
" pass\n",
" def startSeq(self):\n",
" return False\n",
" def standUpDownSeq(self):\n",
" return False\n",
"\n",
" def flushStates(self):\n",
" pass\n",
"\n",
"remote = BaseRemote()\n",
"\n",
"class KeyboardRemote(BaseRemote):\n",
" def __init__(self):\n",
" super().__init__()\n",
" self.start_seq_flag = False\n",
" self.stand_up_down_seq_flag = False\n",
" self.listener_thread = threading.Thread(target=self._listen_to_keyboard)\n",
" self.listener_thread.daemon = True\n",
" self.listener_thread.start()\n",
"\n",
" def _on_press(self, key):\n",
" try:\n",
" if key.char == 's': # Start sequence\n",
" self.start_seq_flag = True\n",
" elif key.char == 'u': # Stand up/down sequence\n",
" self.stand_up_down_seq_flag = True\n",
" except AttributeError:\n",
" pass # Special keys (like space) will be handled here\n",
"\n",
" \n",
" def _on_release(self, key):\n",
" try:\n",
" if key.char == 's': # Start sequence\n",
" self.start_seq_flag = False\n",
" elif key.char == 'u': # Stand up/down sequence\n",
" self.stand_up_down_seq_flag = False\n",
" except AttributeError:\n",
" pass # Special keys (like space) will be handled here\n",
"\n",
" def _listen_to_keyboard(self):\n",
" with keyboard.Listener(on_press=self._on_press, on_release=self._on_release) as listener:\n",
" listener.join()\n",
"\n",
" def startSeq(self):\n",
" if self.start_seq_flag:\n",
" self.start_seq_flag = False\n",
" return True\n",
" return False\n",
"\n",
" def standUpDownSeq(self):\n",
" if self.stand_up_down_seq_flag:\n",
" self.stand_up_down_seq_flag = False\n",
" return True\n",
" return False\n",
"\n",
" def flushStates(self):\n",
" self.stand_up_down_seq_flag = False\n",
" self.start_seq_flag = False"
"fsm.close()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"remote = KeyboardRemote()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"class SafetyHypervisor():\n",
" def __init__(self, robot):\n",
" self.robot = robot\n",
"\n",
" def unsafe(self):\n",
" return False\n",
" \n",
" def controlTimeout(self):\n",
" if time.time() - robot.latest_command_stamp > 0.1:\n",
" print('controller timeout')\n",
" return True\n",
" else:\n",
" return False \n",
"\n",
"safety = SafetyHypervisor(robot)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import threading\n",
"import numpy as np\n",
"import time\n",
"from enum import Enum\n",
"\n",
"\n",
"class FSM:\n",
" def __init__(self, robot, remote, safety_hypervisor, user_controller_callback=None):\n",
" self.robot = robot\n",
" self.remote = remote\n",
" self.remote.flushStates()\n",
" self.safety = safety_hypervisor\n",
" self.user_controller_callback = user_controller_callback\n",
"\n",
" self.state = \"damping\"\n",
" self.tracking_kp = np.array(4*[150, 150, 150.]).reshape(12)\n",
" self.tracking_kv = np.array(12*[3.])\n",
" self.damping_kv = np.array(12*[2.])\n",
"\n",
"\n",
" self.tracking_complete = True\n",
" self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), self.damping_kv, np.zeros(12))\n",
" self.fsm_dT = 1./50.\n",
" self.control_dT = 1./400.\n",
" self.dT = self.fsm_dT\n",
"\n",
" self.modes = {\"tracking\":self.trackingControlUpdate,\n",
" \"damping\" :self.dampingControlUpdate,\n",
" \"user\": self.userControlUpdate,\n",
" }\n",
" self.setMode(\"damping\")\n",
"\n",
" self.running = True\n",
" self.fsm_thread = threading.Thread(target = self.update)\n",
" self.fsm_thread.start()\n",
" # if the robot is a simulation, create a thread for stepping it\n",
" if robot.simulated:\n",
" self.sim_thread = threading.Thread(target=self.simUpdate)\n",
" self.sim_thread.start()\n",
"\n",
" def setMode(self, mode):\n",
" assert mode in self.modes.keys(), 'the requested control update mode is not implemented'\n",
" self.updateCommands = self.modes[mode]\n",
" # print(f'setting mode to {mode}')\n",
" \n",
" def moveTo(self, target, duration=0.5):\n",
" # assert self.tracking_complete, 'The previous moveTo command is not completed yet!'\n",
" self.q_start = self.robot.getJointStates()['q']\n",
" self.q_target = target\n",
" self.time = 0.0\n",
" self.move_duration = duration\n",
" self.q_des = lambda t: [self.q_start + np.clip((t)/self.move_duration,0, 1)*(self.q_target - self.q_start), \\\n",
" True if np.clip((t)/self.move_duration,0, 1)==1 else False] # q_des(t), Movement finished\n",
" self.tracking_complete = False\n",
" self.setMode(\"tracking\")\n",
"\n",
" def trackingControlUpdate(self):\n",
" self.time +=self.dT\n",
" q_des, done = self.q_des(self.time)\n",
" self.robot.setCommands(q_des, np.zeros(12), self.tracking_kp, self.tracking_kv, np.zeros(12))\n",
" self.tracking_complete = done\n",
"\n",
" def dampingControlUpdate(self):\n",
" self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), self.damping_kv, np.zeros(12))\n",
"\n",
" def userControlUpdate(self):\n",
" if self.user_controller_callback is not None:\n",
" user_controller_callback(robot, remote)\n",
"\n",
" def simUpdate(self):\n",
" while self.running:\n",
" self.robot.step()\n",
" time.sleep(robot.dt)\n",
" \n",
" def update(self):\n",
" while self.running:\n",
" getattr(self, self.state)()\n",
" time.sleep(self.dT)\n",
" self.updateCommands()\n",
" \n",
" def close(self):\n",
" self.running = False\n",
"\n",
" # The following functions each are the states of the FSM\n",
" def damping(self):\n",
" # print('damping')\n",
" if self.remote.standUpDownSeq():\n",
" self.moveTo(robot.prestanding_q, 1)\n",
" self.state = \"pre_standing\"\n",
" else:\n",
" self.setMode(\"damping\")\n",
" self.state = \"damping\"\n",
"\n",
" def pre_standing(self):\n",
" # print(\"pre_stance\")\n",
" if self.tracking_complete:\n",
" self.moveTo(robot.standing_q, duration=1.5)\n",
" self.state = 'standing'\n",
" else:\n",
" self.state = \"pre_standing\"\n",
" \n",
" def standing(self):\n",
" # print(\"standing\")\n",
" if self.tracking_complete:\n",
" # self.moveTo(robot.standing_q, duration=1)\n",
" self.state = 'locked_stance'\n",
" else:\n",
" self.state = \"standing\"\n",
"\n",
" def locked_stance(self):\n",
" # print(\"locked_stance\")\n",
" if self.remote.startSeq():\n",
" self.setMode(\"user\")\n",
" self.dT = self.control_dT\n",
" self.state = \"user_loop\"\n",
" self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12))\n",
" elif self.remote.standUpDownSeq() or robot.overheat():\n",
" self.moveTo(robot.sitting_q, duration = 1.5)\n",
" self.state = \"sitting\"\n",
"\n",
" def user_loop(self):\n",
" # print(\"user_loop\")\n",
" if self.safety.unsafe():\n",
" self.dT = self.fsm_dT\n",
" self.setMode(\"damping\")\n",
" elif self.remote.standUpDownSeq() or self.safety.controlTimeout():\n",
" self.dT = self.fsm_dT\n",
" self.moveTo(robot.standing_q, duration = 1)\n",
" self.timer = time.time()\n",
" self.state = \"switch_back_to_locked_stance\"\n",
" else:\n",
" self.state = \"user_loop\"\n",
"\n",
" def sitting(self):\n",
" # print('sitting')\n",
" if self.tracking_complete:\n",
" self.setMode(\"damping\")\n",
" self.state = 'damping'\n",
" else:\n",
" self.state = \"sitting\"\n",
"\n",
" def switch_back_to_locked_stance(self):\n",
" if time.time()-self.timer > 0.5:\n",
" # print(\"going back to locked stance\")\n",
" self.state = \"locked_stance\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fsm = FSM(robot, remote, safety)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fsm.moveTo(robot.sitting_q, 1)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fsm.moveTo(robot.standing_q, 2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fsm.setMode(\"damping\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fsm.close()\n",
"robot.sitDownReset()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {

View File

@ -1,43 +1,5 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Joystick Control\n",
"As the first step after successfully communicating with the robot's onboard controller, we use a USB joystick (a Logitech Extreme 3D Pro) to command the robot with desired $x$, $y$, and $\\Psi$ velocities.\n",
"\n",
"First, use the Pygame-based joystick class provided with B1Py to connect to the joystick:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"pygame 2.5.2 (SDL 2.28.2, Python 3.8.18)\n",
"Hello from the pygame community. https://www.pygame.org/contribute.html\n"
]
}
],
"source": [
"from Go2Py.joy import Logitech3DPro\n",
"# joy = Logitech3DPro(joy_id=0) "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"***Note:*** Initially when you instantiate the joystick, the class performs an offset calibration to maker sure the neutral configuration of the device reads zero. It is important to leave the device untouched during the couple of seconds after calling the cell above. The prompts tells you when you can continue. \n",
"\n",
"Then as explained in [hardware interface documentation](unitree_locomotion_controller_interface.ipynb), instantiate and communication link to high-level controller on the robot. Note that the robot should be standing and ready to walk before we can control it. Furthermore, the the highlevel node in b1py_node ROS2 package (in `deploy/ros2_ws`) should be launched on the robot. "
]
},
{
"cell_type": "code",
"execution_count": 1,
@ -62,29 +24,13 @@
"ros2_exec_manager.start()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, read the joystick values and send them to the robot in a loop:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"Duration = 20\n",
"N = Duration//0.01\n",
"for i in range(500):\n",
" cmd = joy.readAnalog()\n",
" vx = cmd['x']\n",
" vy = cmd['y']\n",
" w = cmd['z'] / 2\n",
" body_height = cmd['aux'] / 10\n",
" robot.setCommands(vx,vy,w, bodyHeight=body_height)\n",
" time.sleep(0.01)"
"robot.getJointStates()"
]
},
{
@ -105,31 +51,6 @@
" robot.setCommands(q, dq, kp, kd, tau)\n",
" time.sleep(0.01) "
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'q': array([-0.06477112, 1.24850821, -2.8002429 , 0.04539341, 1.25765204,\n",
" -2.78191853, -0.34121001, 1.27112007, -2.79908991, 0.34284496,\n",
" 1.26930332, -2.8161664 ]),\n",
" 'dq': array([ 0. , 0.01937762, -0.00606604, -0.01162657, 0. ,\n",
" 0.01819811, 0.01937762, 0. , 0.03033019, -0.05813286,\n",
" 0.00775105, 0.00808805])}"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"robot.getJointStates()"
]
}
],
"metadata": {

View File

@ -1,8 +1,15 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Basic Test"
]
},
{
"cell_type": "code",
"execution_count": 1,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
@ -12,7 +19,7 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
@ -22,16 +29,16 @@
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from Go2Py.controllers.walk_these_ways import *"
"from Go2Py.control.walk_these_ways import *"
]
},
{
"cell_type": "code",
"execution_count": 4,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
@ -44,17 +51,9 @@
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"p_gains: [20. 20. 20. 20. 20. 20. 20. 20. 20. 20. 20. 20.]\n"
]
}
],
"outputs": [],
"source": [
"agent = WalkTheseWaysAgent(cfg, command_profile, robot)\n",
"agent = HistoryWrapper(agent)"
@ -62,7 +61,7 @@
},
{
"cell_type": "code",
"execution_count": 6,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
@ -73,21 +72,9 @@
},
{
"cell_type": "code",
"execution_count": 7,
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"frq: 12.822735624382831 Hz\n",
"frq: 47.954632763194 Hz\n",
"frq: 53.309743511528 Hz\n",
"frq: 54.65674559220214 Hz\n",
"frq: 45.29730547005778 Hz\n"
]
}
],
"outputs": [],
"source": [
"robot.reset()\n",
"obs = agent.reset()\n",
@ -106,12 +93,81 @@
" time.sleep(robot.dt/4)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Full System With FSM"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
"source": [
"from Go2Py.robot.fsm import FSM\n",
"from Go2Py.robot.remote import KeyboardRemote\n",
"from Go2Py.robot.safety import SafetyHypervisor\n",
"from Go2Py.sim.mujoco import Go2Sim\n",
"from Go2Py.control.walk_these_ways import *\n",
"import numpy as np"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class walkTheseWaysController:\n",
" def __init__(self, robot, remote, checkpoint):\n",
" self.remote = remote\n",
" self.robot = robot\n",
" self.cfg = loadParameters(checkpoint)\n",
" self.policy = Policy(checkpoint)\n",
" self.command_profile = CommandInterface()\n",
" self.agent = WalkTheseWaysAgent(self.cfg, self.command_profile, self.robot)\n",
" self.agent = HistoryWrapper(self.agent)\n",
" self.init()\n",
"\n",
" def init(self):\n",
" self.obs = self.agent.reset()\n",
" self.policy_info = {}\n",
"\n",
" def update(self, robot, remote):\n",
" action = self.policy(self.obs, self.policy_info)\n",
" self.obs, self.ret, self.done, self.info = self.agent.step(action)\n",
" self.command_profile.yaw_vel_cmd = 1.2\n",
" self.command_profile.x_vel_cmd = 0.8\n",
" self.command_profile.y_vel_cmd = 0.0\n",
" self.command_profile.stance_width_cmd=0.2\n",
" self.command_profile.footswing_height_cmd=-0.05\n",
" self.command_profile.step_frequency_cmd = 2.5\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"remote = KeyboardRemote()\n",
"robot = Go2Sim()\n",
"robot.standUpReset()\n",
"safety_hypervisor = SafetyHypervisor(robot)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"checkpoint = \"/home/rstaion/projects/rooholla/locomotion/Go2Py/Go2Py/assets/checkpoints/walk_these_ways/\"\n",
"controller = walkTheseWaysController(robot, remote, checkpoint)\n",
"fsm = FSM(robot, remote, safety_hypervisor, user_controller_callback=controller.update)"
]
}
],
"metadata": {