From 34b778bab9636ac852a1d5cc1942352c7d19184d Mon Sep 17 00:00:00 2001 From: Rooholla-KhorramBakht Date: Tue, 19 Mar 2024 21:50:19 -0400 Subject: [PATCH] walk these ways controller tested with FSM in simulation --- Go2Py/{controllers => control}/__init__.py | 0 .../walk_these_ways.py | 0 Go2Py/robot/fsm.py | 145 ++++++++ Go2Py/robot/remote.py | 63 ++++ Go2Py/robot/safety.py | 14 + examples/fsm_real.ipynb | 300 +--------------- examples/fsm_sim.ipynb | 330 ++---------------- examples/lowlevel_ros2_interface.ipynb | 81 +---- examples/walk_these_ways_sim.ipynb | 118 +++++-- 9 files changed, 345 insertions(+), 706 deletions(-) rename Go2Py/{controllers => control}/__init__.py (100%) rename Go2Py/{controllers => control}/walk_these_ways.py (100%) create mode 100644 Go2Py/robot/remote.py create mode 100644 Go2Py/robot/safety.py diff --git a/Go2Py/controllers/__init__.py b/Go2Py/control/__init__.py similarity index 100% rename from Go2Py/controllers/__init__.py rename to Go2Py/control/__init__.py diff --git a/Go2Py/controllers/walk_these_ways.py b/Go2Py/control/walk_these_ways.py similarity index 100% rename from Go2Py/controllers/walk_these_ways.py rename to Go2Py/control/walk_these_ways.py diff --git a/Go2Py/robot/fsm.py b/Go2Py/robot/fsm.py index e69de29..27a885b 100644 --- a/Go2Py/robot/fsm.py +++ b/Go2Py/robot/fsm.py @@ -0,0 +1,145 @@ +import threading +import numpy as np +import time +from enum import Enum + + +class FSM: + def __init__(self, robot, remote, safety_hypervisor, user_controller_callback=None): + self.robot = robot + self.remote = remote + self.remote.flushStates() + self.safety = safety_hypervisor + self.user_controller_callback = user_controller_callback + + self.state = "damping" + self.tracking_kp = np.array(4*[150, 150, 150.]).reshape(12) + self.tracking_kv = np.array(12*[3.]) + self.damping_kv = np.array(12*[2.]) + + + self.tracking_complete = True + self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), self.damping_kv, np.zeros(12)) + self.fsm_dT = 1./50. + self.control_dT = 1./50. + self.dT = self.fsm_dT + + self.modes = {"tracking":self.trackingControlUpdate, + "damping" :self.dampingControlUpdate, + "user": self.userControlUpdate, + } + self.setMode("damping") + + self.running = True + self.fsm_thread = threading.Thread(target = self.update) + self.fsm_thread.start() + # if the robot is a simulation, create a thread for stepping it + if self.robot.simulated: + self.sim_thread = threading.Thread(target=self.simUpdate) + self.sim_thread.start() + + def setMode(self, mode): + assert mode in self.modes.keys(), 'the requested control update mode is not implemented' + self.updateCommands = self.modes[mode] + # print(f'setting mode to {mode}') + + def moveTo(self, target, duration=0.5): + # assert self.tracking_complete, 'The previous moveTo command is not completed yet!' + self.q_start = self.robot.getJointStates()['q'] + self.q_target = target + self.time = 0.0 + self.move_duration = duration + self.q_des = lambda t: [self.q_start + np.clip((t)/self.move_duration,0, 1)*(self.q_target - self.q_start), \ + True if np.clip((t)/self.move_duration,0, 1)==1 else False] # q_des(t), Movement finished + self.tracking_complete = False + self.setMode("tracking") + + def trackingControlUpdate(self): + self.time +=self.dT + q_des, done = self.q_des(self.time) + self.robot.setCommands(q_des, np.zeros(12), self.tracking_kp, self.tracking_kv, np.zeros(12)) + self.tracking_complete = done + + def dampingControlUpdate(self): + self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), self.damping_kv, np.zeros(12)) + + def userControlUpdate(self): + if self.user_controller_callback is not None: + self.user_controller_callback(self.robot, self.remote) + + def simUpdate(self): + while self.running: + self.robot.step() + time.sleep(self.robot.dt) + + def update(self): + while self.running: + getattr(self, self.state)() + time.sleep(self.dT) + self.updateCommands() + + def close(self): + self.running = False + + # The following functions each are the states of the FSM + def damping(self): + # print('damping') + if self.remote.standUpDownSeq(): + self.moveTo(self.robot.prestanding_q, 1) + self.state = "pre_standing" + else: + self.setMode("damping") + self.state = "damping" + + def pre_standing(self): + # print("pre_stance") + if self.tracking_complete: + self.moveTo(self.robot.standing_q, duration=1.5) + self.state = 'standing' + else: + self.state = "pre_standing" + + def standing(self): + # print("standing") + if self.tracking_complete: + # self.moveTo(robot.standing_q, duration=1) + self.state = 'locked_stance' + else: + self.state = "standing" + + def locked_stance(self): + # print("locked_stance") + if self.remote.startSeq(): + self.setMode("user") + self.dT = self.control_dT + self.state = "user_loop" + self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12)) + elif self.remote.standUpDownSeq() or self.robot.overheat(): + self.moveTo(self.robot.sitting_q, duration = 1.5) + self.state = "sitting" + + def user_loop(self): + # print("user_loop") + if self.safety.unsafe(): + self.dT = self.fsm_dT + self.setMode("damping") + elif self.remote.standUpDownSeq() or self.safety.controlTimeout(): + self.dT = self.fsm_dT + self.moveTo(self.robot.standing_q, duration = 1) + self.timer = time.time() + self.state = "switch_back_to_locked_stance" + else: + self.state = "user_loop" + + def sitting(self): + # print('sitting') + if self.tracking_complete: + self.setMode("damping") + self.state = 'damping' + else: + self.state = "sitting" + + def switch_back_to_locked_stance(self): + if time.time()-self.timer > 0.5: + # print("going back to locked stance") + self.state = "locked_stance" \ No newline at end of file diff --git a/Go2Py/robot/remote.py b/Go2Py/robot/remote.py new file mode 100644 index 0000000..e6a704e --- /dev/null +++ b/Go2Py/robot/remote.py @@ -0,0 +1,63 @@ +import threading +from pynput import keyboard + +class BaseRemote: + def __init__(self): + pass + def startSeq(self): + return False + def standUpDownSeq(self): + return False + + def flushStates(self): + pass + +remote = BaseRemote() + +class KeyboardRemote(BaseRemote): + def __init__(self): + super().__init__() + self.start_seq_flag = False + self.stand_up_down_seq_flag = False + self.listener_thread = threading.Thread(target=self._listen_to_keyboard) + self.listener_thread.daemon = True + self.listener_thread.start() + + def _on_press(self, key): + try: + if key.char == 's': # Start sequence + self.start_seq_flag = True + elif key.char == 'u': # Stand up/down sequence + self.stand_up_down_seq_flag = True + except AttributeError: + pass # Special keys (like space) will be handled here + + + def _on_release(self, key): + try: + if key.char == 's': # Start sequence + self.start_seq_flag = False + elif key.char == 'u': # Stand up/down sequence + self.stand_up_down_seq_flag = False + except AttributeError: + pass # Special keys (like space) will be handled here + + def _listen_to_keyboard(self): + with keyboard.Listener(on_press=self._on_press, on_release=self._on_release) as listener: + listener.join() + + def startSeq(self): + if self.start_seq_flag: + self.start_seq_flag = False + return True + return False + + def standUpDownSeq(self): + if self.stand_up_down_seq_flag: + self.stand_up_down_seq_flag = False + return True + return False + + def flushStates(self): + self.stand_up_down_seq_flag = False + self.start_seq_flag = False \ No newline at end of file diff --git a/Go2Py/robot/safety.py b/Go2Py/robot/safety.py new file mode 100644 index 0000000..4e2c2fe --- /dev/null +++ b/Go2Py/robot/safety.py @@ -0,0 +1,14 @@ +import time +class SafetyHypervisor(): + def __init__(self, robot): + self.robot = robot + + def unsafe(self): + return False + + def controlTimeout(self): + if time.time() - self.robot.latest_command_stamp > 0.1: + print('controller timeout') + return True + else: + return False \ No newline at end of file diff --git a/examples/fsm_real.ipynb b/examples/fsm_real.ipynb index 7680df9..d1cdc6c 100644 --- a/examples/fsm_real.ipynb +++ b/examples/fsm_real.ipynb @@ -8,267 +8,22 @@ "source": [ "from Go2Py.robot.interface.dds import GO2Real\n", "import time\n", - "robot = GO2Real(mode='lowlevel')" + "from Go2Py.sim.mujoco import Go2Sim\n", + "from Go2Py.robot.fsm import FSM\n", + "from Go2Py.robot.remote import KeyboardRemote\n", + "from Go2Py.robot.safety import SafetyHypervisor\n", + "import numpy as np" ] }, { "cell_type": "code", - "execution_count": 2, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ - "import threading\n", - "from pynput import keyboard\n", - "class BaseRemote:\n", - " def __init__(self):\n", - " pass\n", - " def startSeq(self):\n", - " return False\n", - " def standUpDownSeq(self):\n", - " return False\n", - "\n", - " def flushStates(self):\n", - " pass\n", - "\n", - "remote = BaseRemote()\n", - "\n", - "class KeyboardRemote(BaseRemote):\n", - " def __init__(self):\n", - " super().__init__()\n", - " self.start_seq_flag = False\n", - " self.stand_up_down_seq_flag = False\n", - " self.listener_thread = threading.Thread(target=self._listen_to_keyboard)\n", - " self.listener_thread.daemon = True\n", - " self.listener_thread.start()\n", - "\n", - " def _on_press(self, key):\n", - " try:\n", - " if key.char == 's': # Start sequence\n", - " self.start_seq_flag = True\n", - " elif key.char == 'u': # Stand up/down sequence\n", - " self.stand_up_down_seq_flag = True\n", - " except AttributeError:\n", - " pass # Special keys (like space) will be handled here\n", - "\n", - " \n", - " def _on_release(self, key):\n", - " try:\n", - " if key.char == 's': # Start sequence\n", - " self.start_seq_flag = False\n", - " elif key.char == 'u': # Stand up/down sequence\n", - " self.stand_up_down_seq_flag = False\n", - " except AttributeError:\n", - " pass # Special keys (like space) will be handled here\n", - "\n", - " def _listen_to_keyboard(self):\n", - " with keyboard.Listener(on_press=self._on_press, on_release=self._on_release) as listener:\n", - " listener.join()\n", - "\n", - " def startSeq(self):\n", - " if self.start_seq_flag:\n", - " self.start_seq_flag = False\n", - " return True\n", - " return False\n", - "\n", - " def standUpDownSeq(self):\n", - " if self.stand_up_down_seq_flag:\n", - " self.stand_up_down_seq_flag = False\n", - " return True\n", - " return False\n", - "\n", - " def flushStates(self):\n", - " self.stand_up_down_seq_flag = False\n", - " self.start_seq_flag = False" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "metadata": {}, - "outputs": [], - "source": [ - "remote = KeyboardRemote()" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "metadata": {}, - "outputs": [], - "source": [ - "import time\n", - "class SafetyHypervisor():\n", - " def __init__(self, robot):\n", - " self.robot = robot\n", - "\n", - " def unsafe(self):\n", - " return False\n", - " \n", - " def controlTimeout(self):\n", - " if time.time() - robot.latest_command_stamp > 0.1:\n", - " print('controller timeout')\n", - " return True\n", - " else:\n", - " return False \n", - "\n", - "safety = SafetyHypervisor(robot)" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "metadata": {}, - "outputs": [], - "source": [ - "import threading\n", - "import numpy as np\n", - "import time\n", - "from enum import Enum\n", - "\n", - "\n", - "class FSM:\n", - " def __init__(self, robot, remote, safety_hypervisor, user_controller_callback=None):\n", - " self.robot = robot\n", - " self.remote = remote\n", - " self.remote.flushStates()\n", - " self.safety = safety_hypervisor\n", - " self.user_controller_callback = user_controller_callback\n", - "\n", - " self.state = \"damping\"\n", - "\n", - " # self.tracking_kp = np.array(4*[200, 200, 350.]).reshape(12)\n", - " # self.tracking_kv = np.array(12*[10.])\n", - "\n", - " self.tracking_kp = np.array(4*[150, 150, 150.]).reshape(12)\n", - " self.tracking_kv = np.array(12*[3.])\n", - " self.damping_kv = np.array(12*[2.])\n", - "\n", - "\n", - " self.tracking_complete = True\n", - " self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), self.damping_kv, np.zeros(12))\n", - " self.fsm_dT = 1./50.\n", - " self.control_dT = 1./400.\n", - " self.dT = self.fsm_dT\n", - "\n", - " self.modes = {\"tracking\":self.trackingControlUpdate,\n", - " \"damping\" :self.dampingControlUpdate,\n", - " \"user\": self.userControlUpdate,\n", - " }\n", - " self.setMode(\"damping\")\n", - "\n", - " self.running = True\n", - " self.fsm_thread = threading.Thread(target = self.update)\n", - " self.fsm_thread.start()\n", - " # if the robot is a simulation, create a thread for stepping it\n", - " if robot.simulated:\n", - " self.sim_thread = threading.Thread(target=self.simUpdate)\n", - " self.sim_thread.start()\n", - "\n", - " def setMode(self, mode):\n", - " assert mode in self.modes.keys(), 'the requested control update mode is not implemented'\n", - " self.updateCommands = self.modes[mode]\n", - " # print(f'setting mode to {mode}')\n", - " \n", - " def moveTo(self, target, duration=0.5):\n", - " # assert self.tracking_complete, 'The previous moveTo command is not completed yet!'\n", - " self.q_start = self.robot.getJointStates()['q']\n", - " self.q_target = target\n", - " self.time = 0.0\n", - " self.move_duration = duration\n", - " self.q_des = lambda t: [self.q_start + np.clip((t)/self.move_duration,0, 1)*(self.q_target - self.q_start), \\\n", - " True if np.clip((t)/self.move_duration,0, 1)==1 else False] # q_des(t), Movement finished\n", - " self.tracking_complete = False\n", - " self.setMode(\"tracking\")\n", - "\n", - " def trackingControlUpdate(self):\n", - " self.time +=self.dT\n", - " q_des, done = self.q_des(self.time)\n", - " self.robot.setCommands(q_des, np.zeros(12), self.tracking_kp, self.tracking_kv, np.zeros(12))\n", - " self.tracking_complete = done\n", - "\n", - " def dampingControlUpdate(self):\n", - " self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), self.damping_kv, np.zeros(12))\n", - "\n", - " def userControlUpdate(self):\n", - " if self.user_controller_callback is not None:\n", - " user_controller_callback(robot, remote)\n", - "\n", - " def simUpdate(self):\n", - " while self.running:\n", - " self.robot.step()\n", - " time.sleep(robot.dt)\n", - " \n", - " def update(self):\n", - " while self.running:\n", - " getattr(self, self.state)()\n", - " time.sleep(self.dT)\n", - " self.updateCommands()\n", - " \n", - " def close(self):\n", - " self.running = False\n", - "\n", - " # The following functions each are the states of the FSM\n", - " def damping(self):\n", - " # print('damping')\n", - " if self.remote.standUpDownSeq():\n", - " self.moveTo(robot.prestanding_q, 1)\n", - " self.state = \"pre_standing\"\n", - " else:\n", - " self.setMode(\"damping\")\n", - " self.state = \"damping\"\n", - "\n", - " def pre_standing(self):\n", - " # print(\"pre_stance\")\n", - " if self.tracking_complete:\n", - " self.moveTo(robot.standing_q, duration=1.5)\n", - " self.state = 'standing'\n", - " else:\n", - " self.state = \"pre_standing\"\n", - " \n", - " def standing(self):\n", - " # print(\"standing\")\n", - " if self.tracking_complete:\n", - " # self.moveTo(robot.standing_q, duration=1)\n", - " self.state = 'locked_stance'\n", - " else:\n", - " self.state = \"standing\"\n", - "\n", - " def locked_stance(self):\n", - " # print(\"locked_stance\")\n", - " if self.remote.startSeq():\n", - " self.setMode(\"user\")\n", - " self.dT = self.control_dT\n", - " self.state = \"user_loop\"\n", - " self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12))\n", - " elif self.remote.standUpDownSeq() or robot.overheat():\n", - " self.moveTo(robot.sitting_q, duration = 1.5)\n", - " self.state = \"sitting\"\n", - "\n", - " def user_loop(self):\n", - " # print(\"user_loop\")\n", - " if self.safety.unsafe():\n", - " self.dT = self.fsm_dT\n", - " self.setMode(\"damping\")\n", - " elif self.remote.standUpDownSeq() or self.safety.controlTimeout():\n", - " self.dT = self.fsm_dT\n", - " self.moveTo(robot.standing_q, duration = 1)\n", - " self.timer = time.time()\n", - " self.state = \"switch_back_to_locked_stance\"\n", - " else:\n", - " self.state = \"user_loop\"\n", - "\n", - " def sitting(self):\n", - " # print('sitting')\n", - " if self.tracking_complete:\n", - " self.setMode(\"damping\")\n", - " self.state = 'damping'\n", - " else:\n", - " self.state = \"sitting\"\n", - "\n", - " def switch_back_to_locked_stance(self):\n", - " if time.time()-self.timer > 0.5:\n", - " # print(\"going back to locked stance\")\n", - " self.state = \"locked_stance\"" + "robot = GO2Real(mode='lowlevel')\n", + "remote = KeyboardRemote()\n", + "safety_hypervisor = SafetyHypervisor(robot)" ] }, { @@ -288,43 +43,6 @@ "source": [ "fsm.close()" ] - }, - { - "cell_type": "code", - "execution_count": 8, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "{'q': array([-0.02495611, 1.26249647, -2.82826662, 0.04563564, 1.2505368 ,\n", - " -2.7933557 , -0.30623949, 1.28283751, -2.82314873, 0.26400229,\n", - " 1.29355574, -2.84276843]),\n", - " 'dq': array([ 0.03875524, 0.0155021 , 0.01011006, 0.00775105, 0.03875524,\n", - " 0.01415409, -0.04263076, -0.00387552, 0.02426415, -0.00387552,\n", - " 0. , -0.02426415]),\n", - " 'ddq': array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]),\n", - " 'tau_est': array([-0.02473828, 0.04947656, -0.04741504, -0.07421485, 0. ,\n", - " 0. , 0.04947656, 0.04947656, 0. , 0.02473828,\n", - " -0.07421485, 0. ]),\n", - " 'temperature': array([38., 32., 32., 36., 32., 33., 41., 33., 33., 41., 33., 33.])}" - ] - }, - "execution_count": 8, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "robot.getJointStates()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { diff --git a/examples/fsm_sim.ipynb b/examples/fsm_sim.ipynb index c6279fe..c247e1b 100644 --- a/examples/fsm_sim.ipynb +++ b/examples/fsm_sim.ipynb @@ -2,22 +2,44 @@ "cells": [ { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from Go2Py.sim.mujoco import Go2Sim\n", + "from Go2Py.robot.fsm import FSM\n", + "from Go2Py.robot.remote import KeyboardRemote\n", + "from Go2Py.robot.safety import SafetyHypervisor\n", "import numpy as np" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "robot = Go2Sim()\n", - "robot.sitDownReset()" + "remote = KeyboardRemote()\n", + "safety_hypervisor = SafetyHypervisor(robot)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "controller timeout\n" + ] + } + ], + "source": [ + "robot.sitDownReset()\n", + "fsm = FSM(robot, remote, safety_hypervisor)" ] }, { @@ -26,308 +48,8 @@ "metadata": {}, "outputs": [], "source": [ - "import threading\n", - "from pynput import keyboard\n", - "class BaseRemote:\n", - " def __init__(self):\n", - " pass\n", - " def startSeq(self):\n", - " return False\n", - " def standUpDownSeq(self):\n", - " return False\n", - "\n", - " def flushStates(self):\n", - " pass\n", - "\n", - "remote = BaseRemote()\n", - "\n", - "class KeyboardRemote(BaseRemote):\n", - " def __init__(self):\n", - " super().__init__()\n", - " self.start_seq_flag = False\n", - " self.stand_up_down_seq_flag = False\n", - " self.listener_thread = threading.Thread(target=self._listen_to_keyboard)\n", - " self.listener_thread.daemon = True\n", - " self.listener_thread.start()\n", - "\n", - " def _on_press(self, key):\n", - " try:\n", - " if key.char == 's': # Start sequence\n", - " self.start_seq_flag = True\n", - " elif key.char == 'u': # Stand up/down sequence\n", - " self.stand_up_down_seq_flag = True\n", - " except AttributeError:\n", - " pass # Special keys (like space) will be handled here\n", - "\n", - " \n", - " def _on_release(self, key):\n", - " try:\n", - " if key.char == 's': # Start sequence\n", - " self.start_seq_flag = False\n", - " elif key.char == 'u': # Stand up/down sequence\n", - " self.stand_up_down_seq_flag = False\n", - " except AttributeError:\n", - " pass # Special keys (like space) will be handled here\n", - "\n", - " def _listen_to_keyboard(self):\n", - " with keyboard.Listener(on_press=self._on_press, on_release=self._on_release) as listener:\n", - " listener.join()\n", - "\n", - " def startSeq(self):\n", - " if self.start_seq_flag:\n", - " self.start_seq_flag = False\n", - " return True\n", - " return False\n", - "\n", - " def standUpDownSeq(self):\n", - " if self.stand_up_down_seq_flag:\n", - " self.stand_up_down_seq_flag = False\n", - " return True\n", - " return False\n", - "\n", - " def flushStates(self):\n", - " self.stand_up_down_seq_flag = False\n", - " self.start_seq_flag = False" + "fsm.close()" ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "remote = KeyboardRemote()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import time\n", - "class SafetyHypervisor():\n", - " def __init__(self, robot):\n", - " self.robot = robot\n", - "\n", - " def unsafe(self):\n", - " return False\n", - " \n", - " def controlTimeout(self):\n", - " if time.time() - robot.latest_command_stamp > 0.1:\n", - " print('controller timeout')\n", - " return True\n", - " else:\n", - " return False \n", - "\n", - "safety = SafetyHypervisor(robot)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import threading\n", - "import numpy as np\n", - "import time\n", - "from enum import Enum\n", - "\n", - "\n", - "class FSM:\n", - " def __init__(self, robot, remote, safety_hypervisor, user_controller_callback=None):\n", - " self.robot = robot\n", - " self.remote = remote\n", - " self.remote.flushStates()\n", - " self.safety = safety_hypervisor\n", - " self.user_controller_callback = user_controller_callback\n", - "\n", - " self.state = \"damping\"\n", - " self.tracking_kp = np.array(4*[150, 150, 150.]).reshape(12)\n", - " self.tracking_kv = np.array(12*[3.])\n", - " self.damping_kv = np.array(12*[2.])\n", - "\n", - "\n", - " self.tracking_complete = True\n", - " self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), self.damping_kv, np.zeros(12))\n", - " self.fsm_dT = 1./50.\n", - " self.control_dT = 1./400.\n", - " self.dT = self.fsm_dT\n", - "\n", - " self.modes = {\"tracking\":self.trackingControlUpdate,\n", - " \"damping\" :self.dampingControlUpdate,\n", - " \"user\": self.userControlUpdate,\n", - " }\n", - " self.setMode(\"damping\")\n", - "\n", - " self.running = True\n", - " self.fsm_thread = threading.Thread(target = self.update)\n", - " self.fsm_thread.start()\n", - " # if the robot is a simulation, create a thread for stepping it\n", - " if robot.simulated:\n", - " self.sim_thread = threading.Thread(target=self.simUpdate)\n", - " self.sim_thread.start()\n", - "\n", - " def setMode(self, mode):\n", - " assert mode in self.modes.keys(), 'the requested control update mode is not implemented'\n", - " self.updateCommands = self.modes[mode]\n", - " # print(f'setting mode to {mode}')\n", - " \n", - " def moveTo(self, target, duration=0.5):\n", - " # assert self.tracking_complete, 'The previous moveTo command is not completed yet!'\n", - " self.q_start = self.robot.getJointStates()['q']\n", - " self.q_target = target\n", - " self.time = 0.0\n", - " self.move_duration = duration\n", - " self.q_des = lambda t: [self.q_start + np.clip((t)/self.move_duration,0, 1)*(self.q_target - self.q_start), \\\n", - " True if np.clip((t)/self.move_duration,0, 1)==1 else False] # q_des(t), Movement finished\n", - " self.tracking_complete = False\n", - " self.setMode(\"tracking\")\n", - "\n", - " def trackingControlUpdate(self):\n", - " self.time +=self.dT\n", - " q_des, done = self.q_des(self.time)\n", - " self.robot.setCommands(q_des, np.zeros(12), self.tracking_kp, self.tracking_kv, np.zeros(12))\n", - " self.tracking_complete = done\n", - "\n", - " def dampingControlUpdate(self):\n", - " self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), self.damping_kv, np.zeros(12))\n", - "\n", - " def userControlUpdate(self):\n", - " if self.user_controller_callback is not None:\n", - " user_controller_callback(robot, remote)\n", - "\n", - " def simUpdate(self):\n", - " while self.running:\n", - " self.robot.step()\n", - " time.sleep(robot.dt)\n", - " \n", - " def update(self):\n", - " while self.running:\n", - " getattr(self, self.state)()\n", - " time.sleep(self.dT)\n", - " self.updateCommands()\n", - " \n", - " def close(self):\n", - " self.running = False\n", - "\n", - " # The following functions each are the states of the FSM\n", - " def damping(self):\n", - " # print('damping')\n", - " if self.remote.standUpDownSeq():\n", - " self.moveTo(robot.prestanding_q, 1)\n", - " self.state = \"pre_standing\"\n", - " else:\n", - " self.setMode(\"damping\")\n", - " self.state = \"damping\"\n", - "\n", - " def pre_standing(self):\n", - " # print(\"pre_stance\")\n", - " if self.tracking_complete:\n", - " self.moveTo(robot.standing_q, duration=1.5)\n", - " self.state = 'standing'\n", - " else:\n", - " self.state = \"pre_standing\"\n", - " \n", - " def standing(self):\n", - " # print(\"standing\")\n", - " if self.tracking_complete:\n", - " # self.moveTo(robot.standing_q, duration=1)\n", - " self.state = 'locked_stance'\n", - " else:\n", - " self.state = \"standing\"\n", - "\n", - " def locked_stance(self):\n", - " # print(\"locked_stance\")\n", - " if self.remote.startSeq():\n", - " self.setMode(\"user\")\n", - " self.dT = self.control_dT\n", - " self.state = \"user_loop\"\n", - " self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12))\n", - " elif self.remote.standUpDownSeq() or robot.overheat():\n", - " self.moveTo(robot.sitting_q, duration = 1.5)\n", - " self.state = \"sitting\"\n", - "\n", - " def user_loop(self):\n", - " # print(\"user_loop\")\n", - " if self.safety.unsafe():\n", - " self.dT = self.fsm_dT\n", - " self.setMode(\"damping\")\n", - " elif self.remote.standUpDownSeq() or self.safety.controlTimeout():\n", - " self.dT = self.fsm_dT\n", - " self.moveTo(robot.standing_q, duration = 1)\n", - " self.timer = time.time()\n", - " self.state = \"switch_back_to_locked_stance\"\n", - " else:\n", - " self.state = \"user_loop\"\n", - "\n", - " def sitting(self):\n", - " # print('sitting')\n", - " if self.tracking_complete:\n", - " self.setMode(\"damping\")\n", - " self.state = 'damping'\n", - " else:\n", - " self.state = \"sitting\"\n", - "\n", - " def switch_back_to_locked_stance(self):\n", - " if time.time()-self.timer > 0.5:\n", - " # print(\"going back to locked stance\")\n", - " self.state = \"locked_stance\"" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "fsm = FSM(robot, remote, safety)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "fsm.moveTo(robot.sitting_q, 1)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "fsm.moveTo(robot.standing_q, 2)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "fsm.setMode(\"damping\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "fsm.close()\n", - "robot.sitDownReset()\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { diff --git a/examples/lowlevel_ros2_interface.ipynb b/examples/lowlevel_ros2_interface.ipynb index 2797c24..2450d51 100644 --- a/examples/lowlevel_ros2_interface.ipynb +++ b/examples/lowlevel_ros2_interface.ipynb @@ -1,43 +1,5 @@ { "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Joystick Control\n", - "As the first step after successfully communicating with the robot's onboard controller, we use a USB joystick (a Logitech Extreme 3D Pro) to command the robot with desired $x$, $y$, and $\\Psi$ velocities.\n", - "\n", - "First, use the Pygame-based joystick class provided with B1Py to connect to the joystick:" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "pygame 2.5.2 (SDL 2.28.2, Python 3.8.18)\n", - "Hello from the pygame community. https://www.pygame.org/contribute.html\n" - ] - } - ], - "source": [ - "from Go2Py.joy import Logitech3DPro\n", - "# joy = Logitech3DPro(joy_id=0) " - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "***Note:*** Initially when you instantiate the joystick, the class performs an offset calibration to maker sure the neutral configuration of the device reads zero. It is important to leave the device untouched during the couple of seconds after calling the cell above. The prompts tells you when you can continue. \n", - "\n", - "Then as explained in [hardware interface documentation](unitree_locomotion_controller_interface.ipynb), instantiate and communication link to high-level controller on the robot. Note that the robot should be standing and ready to walk before we can control it. Furthermore, the the highlevel node in b1py_node ROS2 package (in `deploy/ros2_ws`) should be launched on the robot. " - ] - }, { "cell_type": "code", "execution_count": 1, @@ -62,29 +24,13 @@ "ros2_exec_manager.start()" ] }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Finally, read the joystick values and send them to the robot in a loop:" - ] - }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ - "Duration = 20\n", - "N = Duration//0.01\n", - "for i in range(500):\n", - " cmd = joy.readAnalog()\n", - " vx = cmd['x']\n", - " vy = cmd['y']\n", - " w = cmd['z'] / 2\n", - " body_height = cmd['aux'] / 10\n", - " robot.setCommands(vx,vy,w, bodyHeight=body_height)\n", - " time.sleep(0.01)" + "robot.getJointStates()" ] }, { @@ -105,31 +51,6 @@ " robot.setCommands(q, dq, kp, kd, tau)\n", " time.sleep(0.01) " ] - }, - { - "cell_type": "code", - "execution_count": 2, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "{'q': array([-0.06477112, 1.24850821, -2.8002429 , 0.04539341, 1.25765204,\n", - " -2.78191853, -0.34121001, 1.27112007, -2.79908991, 0.34284496,\n", - " 1.26930332, -2.8161664 ]),\n", - " 'dq': array([ 0. , 0.01937762, -0.00606604, -0.01162657, 0. ,\n", - " 0.01819811, 0.01937762, 0. , 0.03033019, -0.05813286,\n", - " 0.00775105, 0.00808805])}" - ] - }, - "execution_count": 2, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "robot.getJointStates()" - ] } ], "metadata": { diff --git a/examples/walk_these_ways_sim.ipynb b/examples/walk_these_ways_sim.ipynb index 8a68e3b..25d3211 100644 --- a/examples/walk_these_ways_sim.ipynb +++ b/examples/walk_these_ways_sim.ipynb @@ -1,8 +1,15 @@ { "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Basic Test" + ] + }, { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -12,7 +19,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -22,16 +29,16 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ - "from Go2Py.controllers.walk_these_ways import *" + "from Go2Py.control.walk_these_ways import *" ] }, { "cell_type": "code", - "execution_count": 4, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -44,17 +51,9 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "p_gains: [20. 20. 20. 20. 20. 20. 20. 20. 20. 20. 20. 20.]\n" - ] - } - ], + "outputs": [], "source": [ "agent = WalkTheseWaysAgent(cfg, command_profile, robot)\n", "agent = HistoryWrapper(agent)" @@ -62,7 +61,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -73,21 +72,9 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "frq: 12.822735624382831 Hz\n", - "frq: 47.954632763194 Hz\n", - "frq: 53.309743511528 Hz\n", - "frq: 54.65674559220214 Hz\n", - "frq: 45.29730547005778 Hz\n" - ] - } - ], + "outputs": [], "source": [ "robot.reset()\n", "obs = agent.reset()\n", @@ -106,12 +93,81 @@ " time.sleep(robot.dt/4)" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Full System With FSM" + ] + }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], - "source": [] + "source": [ + "from Go2Py.robot.fsm import FSM\n", + "from Go2Py.robot.remote import KeyboardRemote\n", + "from Go2Py.robot.safety import SafetyHypervisor\n", + "from Go2Py.sim.mujoco import Go2Sim\n", + "from Go2Py.control.walk_these_ways import *\n", + "import numpy as np" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class walkTheseWaysController:\n", + " def __init__(self, robot, remote, checkpoint):\n", + " self.remote = remote\n", + " self.robot = robot\n", + " self.cfg = loadParameters(checkpoint)\n", + " self.policy = Policy(checkpoint)\n", + " self.command_profile = CommandInterface()\n", + " self.agent = WalkTheseWaysAgent(self.cfg, self.command_profile, self.robot)\n", + " self.agent = HistoryWrapper(self.agent)\n", + " self.init()\n", + "\n", + " def init(self):\n", + " self.obs = self.agent.reset()\n", + " self.policy_info = {}\n", + "\n", + " def update(self, robot, remote):\n", + " action = self.policy(self.obs, self.policy_info)\n", + " self.obs, self.ret, self.done, self.info = self.agent.step(action)\n", + " self.command_profile.yaw_vel_cmd = 1.2\n", + " self.command_profile.x_vel_cmd = 0.8\n", + " self.command_profile.y_vel_cmd = 0.0\n", + " self.command_profile.stance_width_cmd=0.2\n", + " self.command_profile.footswing_height_cmd=-0.05\n", + " self.command_profile.step_frequency_cmd = 2.5\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "remote = KeyboardRemote()\n", + "robot = Go2Sim()\n", + "robot.standUpReset()\n", + "safety_hypervisor = SafetyHypervisor(robot)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "checkpoint = \"/home/rstaion/projects/rooholla/locomotion/Go2Py/Go2Py/assets/checkpoints/walk_these_ways/\"\n", + "controller = walkTheseWaysController(robot, remote, checkpoint)\n", + "fsm = FSM(robot, remote, safety_hypervisor, user_controller_callback=controller.update)" + ] } ], "metadata": {