Go2Py_SIM/Go2Py/robot/fsm.py

170 lines
5.5 KiB
Python

import threading
import numpy as np
import time
from enum import Enum
class FSM:
def __init__(self, robot, remote, safety_hypervisor, control_dT=1/50, user_controller_callback=None):
self.robot = robot
self.remote = remote
self.remote.flushStates()
self.safety = safety_hypervisor
self.user_controller_callback = user_controller_callback
self.state = "damping"
self.tracking_kp = np.array(4 * [150, 150, 150.]).reshape(12)
self.tracking_kv = np.array(12 * [3.])
self.damping_kv = np.array(12 * [2.])
self.tracking_complete = True
self.robot.setCommands(
np.zeros(12),
np.zeros(12),
np.zeros(12),
self.damping_kv,
np.zeros(12))
self.fsm_dT = 1. / 50.
self.control_dT = control_dT
self.dT = self.fsm_dT
self.modes = {"tracking": self.trackingControlUpdate,
"damping": self.dampingControlUpdate,
"user": self.userControlUpdate,
}
self.setMode("damping")
self.running = True
self.fsm_thread = threading.Thread(target=self.update)
self.fsm_thread.start()
# if the robot is a simulation, create a thread for stepping it
if self.robot.simulated:
self.sim_thread = threading.Thread(target=self.simUpdate)
self.sim_thread.start()
def setMode(self, mode):
assert mode in self.modes.keys(), 'the requested control update mode is not implemented'
self.updateCommands = self.modes[mode]
# print(f'setting mode to {mode}')
def moveTo(self, target, duration=0.5):
# assert self.tracking_complete, 'The previous moveTo command is not completed yet!'
self.q_start = self.robot.getJointStates()['q']
self.q_target = target
self.time = 0.0
self.move_duration = duration
self.q_des = lambda t: [self.q_start +
np.clip((t) /
self.move_duration, 0, 1) *
(self.q_target -
self.q_start), True if np.clip((t) /
self.move_duration, 0, 1) == 1 else False] # q_des(t), Movement finished
self.tracking_complete = False
self.setMode("tracking")
def trackingControlUpdate(self):
self.time += self.dT
q_des, done = self.q_des(self.time)
self.robot.setCommands(
q_des,
np.zeros(12),
self.tracking_kp,
self.tracking_kv,
np.zeros(12))
self.tracking_complete = done
def dampingControlUpdate(self):
self.robot.setCommands(
np.zeros(12),
np.zeros(12),
np.zeros(12),
self.damping_kv,
np.zeros(12))
def userControlUpdate(self):
if self.user_controller_callback is not None:
self.user_controller_callback(self.robot, self.remote)
def simUpdate(self):
while self.running:
self.robot.step()
time.sleep(self.robot.dt)
def update(self):
while self.running:
getattr(self, self.state)()
time.sleep(self.dT)
self.updateCommands()
def close(self):
self.running = False
# The following functions each are the states of the FSM
def damping(self):
# print('damping')
if self.remote.standUpDownSeq():
self.moveTo(self.robot.prestanding_q, 1)
self.state = "pre_standing"
else:
self.setMode("damping")
self.state = "damping"
def pre_standing(self):
# print("pre_stance")
if self.tracking_complete:
self.moveTo(self.robot.standing_q, duration=1.5)
self.state = 'standing'
else:
self.state = "pre_standing"
def standing(self):
# print("standing")
if self.tracking_complete:
# self.moveTo(robot.standing_q, duration=1)
self.state = 'locked_stance'
else:
self.state = "standing"
def locked_stance(self):
# print("locked_stance")
if self.remote.startSeq():
self.setMode("user")
self.dT = self.control_dT
self.state = "user_loop"
self.robot.setCommands(
np.zeros(12),
np.zeros(12),
np.zeros(12),
np.zeros(12),
np.zeros(12))
elif self.remote.standUpDownSeq() or self.robot.overheat():
self.moveTo(self.robot.sitting_q, duration=1.5)
self.state = "sitting"
def user_loop(self):
# print("user_loop")
if self.safety.unsafe():
self.dT = self.fsm_dT
self.setMode("damping")
elif self.remote.standUpDownSeq() or self.safety.controlTimeout():
self.dT = self.fsm_dT
self.moveTo(self.robot.standing_q, duration=1)
self.timer = time.time()
self.state = "switch_back_to_locked_stance"
else:
self.state = "user_loop"
def sitting(self):
# print('sitting')
if self.tracking_complete:
self.setMode("damping")
self.state = 'damping'
else:
self.state = "sitting"
def switch_back_to_locked_stance(self):
if time.time() - self.timer > 0.5:
# print("going back to locked stance")
self.state = "locked_stance"