2024-03-20 09:50:19 +08:00
|
|
|
import threading
|
|
|
|
import numpy as np
|
|
|
|
import time
|
|
|
|
from enum import Enum
|
|
|
|
|
|
|
|
|
|
|
|
class FSM:
|
|
|
|
def __init__(self, robot, remote, safety_hypervisor, user_controller_callback=None):
|
|
|
|
self.robot = robot
|
|
|
|
self.remote = remote
|
|
|
|
self.remote.flushStates()
|
|
|
|
self.safety = safety_hypervisor
|
|
|
|
self.user_controller_callback = user_controller_callback
|
|
|
|
|
|
|
|
self.state = "damping"
|
2024-05-21 06:45:59 +08:00
|
|
|
self.tracking_kp = np.array(4 * [150, 150, 150.]).reshape(12)
|
|
|
|
self.tracking_kv = np.array(12 * [3.])
|
|
|
|
self.damping_kv = np.array(12 * [2.])
|
2024-03-20 09:50:19 +08:00
|
|
|
|
|
|
|
self.tracking_complete = True
|
2024-05-21 06:45:59 +08:00
|
|
|
self.robot.setCommands(
|
|
|
|
np.zeros(12),
|
|
|
|
np.zeros(12),
|
|
|
|
np.zeros(12),
|
|
|
|
self.damping_kv,
|
|
|
|
np.zeros(12))
|
|
|
|
self.fsm_dT = 1. / 50.
|
|
|
|
self.control_dT = 1. / 50.
|
2024-03-20 09:50:19 +08:00
|
|
|
self.dT = self.fsm_dT
|
|
|
|
|
2024-05-21 06:45:59 +08:00
|
|
|
self.modes = {"tracking": self.trackingControlUpdate,
|
|
|
|
"damping": self.dampingControlUpdate,
|
2024-03-20 09:50:19 +08:00
|
|
|
"user": self.userControlUpdate,
|
2024-05-21 06:45:59 +08:00
|
|
|
}
|
2024-03-20 09:50:19 +08:00
|
|
|
self.setMode("damping")
|
|
|
|
|
|
|
|
self.running = True
|
2024-05-21 06:45:59 +08:00
|
|
|
self.fsm_thread = threading.Thread(target=self.update)
|
2024-03-20 09:50:19 +08:00
|
|
|
self.fsm_thread.start()
|
|
|
|
# if the robot is a simulation, create a thread for stepping it
|
|
|
|
if self.robot.simulated:
|
|
|
|
self.sim_thread = threading.Thread(target=self.simUpdate)
|
|
|
|
self.sim_thread.start()
|
|
|
|
|
|
|
|
def setMode(self, mode):
|
|
|
|
assert mode in self.modes.keys(), 'the requested control update mode is not implemented'
|
|
|
|
self.updateCommands = self.modes[mode]
|
|
|
|
# print(f'setting mode to {mode}')
|
2024-05-21 06:45:59 +08:00
|
|
|
|
2024-03-20 09:50:19 +08:00
|
|
|
def moveTo(self, target, duration=0.5):
|
|
|
|
# assert self.tracking_complete, 'The previous moveTo command is not completed yet!'
|
|
|
|
self.q_start = self.robot.getJointStates()['q']
|
|
|
|
self.q_target = target
|
|
|
|
self.time = 0.0
|
|
|
|
self.move_duration = duration
|
2024-05-21 06:45:59 +08:00
|
|
|
self.q_des = lambda t: [self.q_start +
|
|
|
|
np.clip((t) /
|
|
|
|
self.move_duration, 0, 1) *
|
|
|
|
(self.q_target -
|
|
|
|
self.q_start), True if np.clip((t) /
|
|
|
|
self.move_duration, 0, 1) == 1 else False] # q_des(t), Movement finished
|
2024-03-20 09:50:19 +08:00
|
|
|
self.tracking_complete = False
|
|
|
|
self.setMode("tracking")
|
|
|
|
|
|
|
|
def trackingControlUpdate(self):
|
2024-05-21 06:45:59 +08:00
|
|
|
self.time += self.dT
|
2024-03-20 09:50:19 +08:00
|
|
|
q_des, done = self.q_des(self.time)
|
2024-05-21 06:45:59 +08:00
|
|
|
self.robot.setCommands(
|
|
|
|
q_des,
|
|
|
|
np.zeros(12),
|
|
|
|
self.tracking_kp,
|
|
|
|
self.tracking_kv,
|
|
|
|
np.zeros(12))
|
2024-03-20 09:50:19 +08:00
|
|
|
self.tracking_complete = done
|
|
|
|
|
|
|
|
def dampingControlUpdate(self):
|
2024-05-21 06:45:59 +08:00
|
|
|
self.robot.setCommands(
|
|
|
|
np.zeros(12),
|
|
|
|
np.zeros(12),
|
|
|
|
np.zeros(12),
|
|
|
|
self.damping_kv,
|
|
|
|
np.zeros(12))
|
2024-03-20 09:50:19 +08:00
|
|
|
|
|
|
|
def userControlUpdate(self):
|
|
|
|
if self.user_controller_callback is not None:
|
|
|
|
self.user_controller_callback(self.robot, self.remote)
|
|
|
|
|
|
|
|
def simUpdate(self):
|
|
|
|
while self.running:
|
|
|
|
self.robot.step()
|
|
|
|
time.sleep(self.robot.dt)
|
2024-05-21 06:45:59 +08:00
|
|
|
|
2024-03-20 09:50:19 +08:00
|
|
|
def update(self):
|
|
|
|
while self.running:
|
|
|
|
getattr(self, self.state)()
|
|
|
|
time.sleep(self.dT)
|
|
|
|
self.updateCommands()
|
2024-05-21 06:45:59 +08:00
|
|
|
|
2024-03-20 09:50:19 +08:00
|
|
|
def close(self):
|
|
|
|
self.running = False
|
|
|
|
|
|
|
|
# The following functions each are the states of the FSM
|
|
|
|
def damping(self):
|
|
|
|
# print('damping')
|
|
|
|
if self.remote.standUpDownSeq():
|
|
|
|
self.moveTo(self.robot.prestanding_q, 1)
|
|
|
|
self.state = "pre_standing"
|
|
|
|
else:
|
|
|
|
self.setMode("damping")
|
|
|
|
self.state = "damping"
|
|
|
|
|
|
|
|
def pre_standing(self):
|
|
|
|
# print("pre_stance")
|
|
|
|
if self.tracking_complete:
|
|
|
|
self.moveTo(self.robot.standing_q, duration=1.5)
|
|
|
|
self.state = 'standing'
|
|
|
|
else:
|
|
|
|
self.state = "pre_standing"
|
2024-05-21 06:45:59 +08:00
|
|
|
|
2024-03-20 09:50:19 +08:00
|
|
|
def standing(self):
|
|
|
|
# print("standing")
|
|
|
|
if self.tracking_complete:
|
|
|
|
# self.moveTo(robot.standing_q, duration=1)
|
|
|
|
self.state = 'locked_stance'
|
|
|
|
else:
|
|
|
|
self.state = "standing"
|
|
|
|
|
|
|
|
def locked_stance(self):
|
|
|
|
# print("locked_stance")
|
|
|
|
if self.remote.startSeq():
|
|
|
|
self.setMode("user")
|
|
|
|
self.dT = self.control_dT
|
|
|
|
self.state = "user_loop"
|
2024-05-21 06:45:59 +08:00
|
|
|
self.robot.setCommands(
|
|
|
|
np.zeros(12),
|
|
|
|
np.zeros(12),
|
|
|
|
np.zeros(12),
|
|
|
|
np.zeros(12),
|
|
|
|
np.zeros(12))
|
2024-03-20 09:50:19 +08:00
|
|
|
elif self.remote.standUpDownSeq() or self.robot.overheat():
|
2024-05-21 06:45:59 +08:00
|
|
|
self.moveTo(self.robot.sitting_q, duration=1.5)
|
2024-03-20 09:50:19 +08:00
|
|
|
self.state = "sitting"
|
|
|
|
|
|
|
|
def user_loop(self):
|
|
|
|
# print("user_loop")
|
|
|
|
if self.safety.unsafe():
|
|
|
|
self.dT = self.fsm_dT
|
|
|
|
self.setMode("damping")
|
|
|
|
elif self.remote.standUpDownSeq() or self.safety.controlTimeout():
|
|
|
|
self.dT = self.fsm_dT
|
2024-05-21 06:45:59 +08:00
|
|
|
self.moveTo(self.robot.standing_q, duration=1)
|
2024-03-20 09:50:19 +08:00
|
|
|
self.timer = time.time()
|
|
|
|
self.state = "switch_back_to_locked_stance"
|
|
|
|
else:
|
|
|
|
self.state = "user_loop"
|
|
|
|
|
|
|
|
def sitting(self):
|
|
|
|
# print('sitting')
|
|
|
|
if self.tracking_complete:
|
|
|
|
self.setMode("damping")
|
|
|
|
self.state = 'damping'
|
|
|
|
else:
|
|
|
|
self.state = "sitting"
|
|
|
|
|
|
|
|
def switch_back_to_locked_stance(self):
|
2024-05-21 06:45:59 +08:00
|
|
|
if time.time() - self.timer > 0.5:
|
2024-03-20 09:50:19 +08:00
|
|
|
# print("going back to locked stance")
|
2024-05-21 06:45:59 +08:00
|
|
|
self.state = "locked_stance"
|