2023-10-25 10:34:46 +08:00
|
|
|
|
import time
|
|
|
|
|
import grpc
|
|
|
|
|
import numpy as np
|
|
|
|
|
|
|
|
|
|
from robowaiter.proto import GrabSim_pb2
|
|
|
|
|
from robowaiter.proto import GrabSim_pb2_grpc
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
channel = grpc.insecure_channel(
|
|
|
|
|
"localhost:30001",
|
|
|
|
|
options=[
|
|
|
|
|
("grpc.max_send_message_length", 1024 * 1024 * 1024),
|
|
|
|
|
("grpc.max_receive_message_length", 1024 * 1024 * 1024),
|
|
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
stub = GrabSim_pb2_grpc.GrabSimStub(channel)
|
|
|
|
|
|
|
|
|
|
animation_step = [4, 5, 7, 3, 3]
|
|
|
|
|
loc_offset = [-700, -1400]
|
|
|
|
|
|
|
|
|
|
|
2023-11-08 15:40:35 +08:00
|
|
|
|
def init_world(scene_num=1, mapID=11):
|
2023-10-25 10:34:46 +08:00
|
|
|
|
stub.SetWorld(GrabSim_pb2.BatchMap(count=scene_num, mapID=mapID))
|
|
|
|
|
time.sleep(3) # wait for the map to load
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def image_extract(camera_data):
|
|
|
|
|
image = camera_data.images[0]
|
|
|
|
|
return np.frombuffer(image.data, dtype=image.dtype).reshape(
|
|
|
|
|
(image.height, image.width, image.channels)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Scene:
|
|
|
|
|
robot = None
|
2023-11-08 17:37:49 +08:00
|
|
|
|
event_list = []
|
2023-11-08 15:28:01 +08:00
|
|
|
|
|
|
|
|
|
default_state = {
|
|
|
|
|
"map": {
|
|
|
|
|
"2d": None,
|
|
|
|
|
"obj_pos": {}
|
|
|
|
|
},
|
|
|
|
|
"chat_list": [], # 未处理的顾客的对话, (顾客的位置,顾客对话的内容)
|
|
|
|
|
"sub_goal_list": [], # 子目标列表
|
|
|
|
|
"status": None, # 仿真器中的观测信息,见下方详细解释
|
2023-10-25 10:34:46 +08:00
|
|
|
|
}
|
2023-11-08 15:28:01 +08:00
|
|
|
|
"""
|
2023-10-25 10:34:46 +08:00
|
|
|
|
status:
|
|
|
|
|
location: Dict[X: float, Y: float]
|
|
|
|
|
rotation: Dict[Yaw: float]
|
|
|
|
|
joints: List[Dict[name: str, location: Dict[X: float, Y: float, Z: float]]]
|
|
|
|
|
fingers: List[Dict[name: str, location: List[3 * Dict[X: float, Y: float, Z: float]]]]
|
|
|
|
|
objects[:-1]: List[Dict[name: str, location: Dict[X: float, Y: float, Z: float]]]
|
|
|
|
|
objects[-1]: Dict[name: "Hand", boxes: List[Dict[diagonals: List[4 * Dict[X0: float, Y0: float, Z0: float, X1: float, Y1: float, Z1: float]]]]]
|
|
|
|
|
walkers: List[name: str, pose: Dict[X: float, Y: float, Yaw: float], speed: float, target: Dict[X: float, Y: float, Yaw: float]]
|
|
|
|
|
timestamp: int, timestep: int
|
|
|
|
|
collision: str, info: str
|
|
|
|
|
"""
|
|
|
|
|
|
2023-11-08 10:03:40 +08:00
|
|
|
|
def __init__(self,robot=None, sceneID=0):
|
2023-10-25 10:34:46 +08:00
|
|
|
|
self.sceneID = sceneID
|
|
|
|
|
self.use_offset = True
|
2023-10-25 22:12:15 +08:00
|
|
|
|
self.start_time = time.time()
|
|
|
|
|
self.time = 0
|
2023-10-25 10:34:46 +08:00
|
|
|
|
|
|
|
|
|
# init robot
|
2023-11-08 10:03:40 +08:00
|
|
|
|
if robot:
|
|
|
|
|
robot.set_scene(self)
|
|
|
|
|
robot.load_BT()
|
2023-10-25 10:34:46 +08:00
|
|
|
|
self.robot = robot
|
2023-11-08 15:28:01 +08:00
|
|
|
|
self.sub_task_seq = None
|
2023-10-25 10:34:46 +08:00
|
|
|
|
|
2023-11-08 17:14:51 +08:00
|
|
|
|
# myx op
|
|
|
|
|
self.op_dialog = ["","制作咖啡","倒水","夹点心","拖地","擦桌子","关闭窗帘","关筒灯","开大厅灯","搬椅子","打开窗帘","关大厅灯","开筒灯"]
|
|
|
|
|
self.op_act_num = [0,3,4,6,3,2,0,0,0,1,0,0,0]
|
|
|
|
|
self.op_v_list = [[[0.0,0.0]],[[250.0, 310.0]],[[-70.0, 480.0]],[[250.0, 630.0]],[[260.0, 1120.0]],[[300.0, -220.0]],[[0.0, -70.0]]]
|
|
|
|
|
|
2023-10-25 22:12:15 +08:00
|
|
|
|
|
|
|
|
|
def _reset(self):
|
|
|
|
|
# 场景自定义的reset
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def _run(self):
|
|
|
|
|
# 场景自定义的run
|
2023-10-25 10:34:46 +08:00
|
|
|
|
pass
|
|
|
|
|
|
2023-10-25 22:12:15 +08:00
|
|
|
|
def _step(self):
|
|
|
|
|
# 场景自定义的step
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def reset(self):
|
|
|
|
|
# 基类reset,默认执行仿真器初始化操作
|
|
|
|
|
self.reset_sim()
|
|
|
|
|
|
|
|
|
|
# reset state
|
2023-11-08 15:28:01 +08:00
|
|
|
|
self.state = self.default_state
|
2023-10-25 22:12:15 +08:00
|
|
|
|
print("场景初始化完成")
|
|
|
|
|
self._reset()
|
|
|
|
|
|
|
|
|
|
self.running = True
|
|
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
|
# 基类run
|
|
|
|
|
|
|
|
|
|
self._run()
|
|
|
|
|
|
|
|
|
|
# 运行并由robot打印每步信息
|
|
|
|
|
while True:
|
|
|
|
|
self.step()
|
|
|
|
|
|
|
|
|
|
def step(self):
|
|
|
|
|
# 基类step,默认执行行为树tick操作
|
|
|
|
|
self.time = time.time() - self.start_time
|
|
|
|
|
|
2023-11-08 17:37:49 +08:00
|
|
|
|
self.deal_event()
|
2023-10-25 22:12:15 +08:00
|
|
|
|
self._step()
|
|
|
|
|
self.robot.step()
|
|
|
|
|
|
2023-11-08 17:37:49 +08:00
|
|
|
|
def deal_event(self):
|
|
|
|
|
if len(self.event_list)>0:
|
|
|
|
|
next_event = self.event_list[0]
|
|
|
|
|
t,func = next_event
|
|
|
|
|
if self.time >= t:
|
|
|
|
|
print(f'event: {t}, {func.__name__}')
|
|
|
|
|
self.event_list.pop(0)
|
|
|
|
|
func()
|
|
|
|
|
|
|
|
|
|
def create_chat_event(self,sentence):
|
|
|
|
|
def customer_say():
|
|
|
|
|
print(f'顾客说:{sentence}')
|
|
|
|
|
self.chat_bubble(f'顾客说:{sentence}')
|
|
|
|
|
self.state['chat_list'].append(f'{sentence}')
|
|
|
|
|
|
|
|
|
|
return customer_say
|
2023-10-25 10:34:46 +08:00
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def status(self):
|
|
|
|
|
return stub.Observe(GrabSim_pb2.SceneID(value=self.sceneID))
|
|
|
|
|
|
|
|
|
|
def reset_sim(self):
|
|
|
|
|
stub.Reset(GrabSim_pb2.ResetParams(scene=self.sceneID))
|
|
|
|
|
|
|
|
|
|
# reset world
|
|
|
|
|
init_world()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def walker_control_generator(self, walkerID, autowalk, speed, X, Y, Yaw):
|
|
|
|
|
if self.use_offset:
|
|
|
|
|
X, Y = X + loc_offset[0], Y + loc_offset[1]
|
|
|
|
|
return GrabSim_pb2.WalkerControls.WControl(
|
|
|
|
|
id=walkerID,
|
|
|
|
|
autowalk=autowalk,
|
|
|
|
|
speed=speed,
|
|
|
|
|
pose=GrabSim_pb2.Pose(X=X, Y=Y, Yaw=Yaw),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def walk_to(self, X, Y, Yaw, velocity=150, dis_limit=100):
|
|
|
|
|
if self.use_offset:
|
|
|
|
|
X, Y = X + loc_offset[0], Y + loc_offset[1]
|
|
|
|
|
stub.Do(
|
|
|
|
|
GrabSim_pb2.Action(
|
|
|
|
|
scene=self.sceneID,
|
|
|
|
|
action=GrabSim_pb2.Action.ActionType.WalkTo,
|
|
|
|
|
values=[X, Y, Yaw, velocity, dis_limit],
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def reachable_check(self, X, Y, Yaw):
|
|
|
|
|
if self.use_offset:
|
|
|
|
|
X, Y = X + loc_offset[0], Y + loc_offset[1]
|
|
|
|
|
navigation_info = stub.Do(
|
|
|
|
|
GrabSim_pb2.Action(
|
|
|
|
|
scene=self.sceneID,
|
|
|
|
|
action=GrabSim_pb2.Action.ActionType.WalkTo,
|
|
|
|
|
values=[X, Y, Yaw],
|
|
|
|
|
)
|
|
|
|
|
).info
|
|
|
|
|
if navigation_info == "Unreachable":
|
|
|
|
|
return False
|
|
|
|
|
else:
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def add_walker(self, X, Y, Yaw):
|
|
|
|
|
if self.use_offset:
|
|
|
|
|
X, Y = X + loc_offset[0], Y + loc_offset[1]
|
|
|
|
|
if self.reachable_check(X, Y, Yaw):
|
|
|
|
|
stub.AddWalker(
|
|
|
|
|
GrabSim_pb2.WalkerList(
|
|
|
|
|
walkers=[
|
|
|
|
|
GrabSim_pb2.WalkerList.Walker(
|
|
|
|
|
id=0,
|
|
|
|
|
pose=GrabSim_pb2.Pose(
|
|
|
|
|
X=X, Y=Y, Yaw=Yaw
|
|
|
|
|
), # Parameter id is useless
|
|
|
|
|
)
|
|
|
|
|
],
|
|
|
|
|
scene=self.sceneID,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def remove_walker(self, *args): # take single walkerID or a list of walkerIDs
|
|
|
|
|
remove_list = []
|
|
|
|
|
if isinstance(args[0], list):
|
|
|
|
|
remove_list = args[0]
|
|
|
|
|
else:
|
|
|
|
|
for walkerID in args:
|
|
|
|
|
# walkerID is the index of the walker in status.walkers.
|
|
|
|
|
# Since status.walkers is a list, some walkerIDs would change after removing a walker.
|
|
|
|
|
remove_list.append(walkerID)
|
|
|
|
|
stub.RemoveWalkers(GrabSim_pb2.RemoveList(IDs=remove_list, scene=self.sceneID))
|
|
|
|
|
|
|
|
|
|
def clean_walker(self):
|
|
|
|
|
stub.CleanWalkers(GrabSim_pb2.SceneID(value=self.sceneID))
|
|
|
|
|
|
|
|
|
|
def control_walker(self, control_list):
|
|
|
|
|
stub.ControlWalkers(
|
|
|
|
|
GrabSim_pb2.WalkerControls(controls=control_list, scene=self.sceneID)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def control_joints(self, angles):
|
|
|
|
|
stub.Do(
|
|
|
|
|
GrabSim_pb2.Action(
|
|
|
|
|
scene=self.sceneID,
|
|
|
|
|
action=GrabSim_pb2.Action.ActionType.RotateJoints,
|
|
|
|
|
values=angles,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def add_object(self, type, X, Y, Z, Yaw=0):
|
|
|
|
|
if self.use_offset:
|
|
|
|
|
X, Y = X + loc_offset[0], Y + loc_offset[1]
|
|
|
|
|
stub.AddObjects(
|
|
|
|
|
GrabSim_pb2.ObjectList(
|
|
|
|
|
objects=[
|
|
|
|
|
GrabSim_pb2.ObjectList.Object(x=X, y=Y, yaw=Yaw, z=Z, type=type)
|
|
|
|
|
],
|
|
|
|
|
scene=self.sceneID,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def remove_object(self, *args): # refer to remove_walker
|
|
|
|
|
remove_list = []
|
|
|
|
|
if isinstance(args[0], list):
|
|
|
|
|
remove_list = args[0]
|
|
|
|
|
else:
|
|
|
|
|
for objectID in args:
|
|
|
|
|
remove_list.append(objectID)
|
|
|
|
|
stub.RemoveObjects(GrabSim_pb2.RemoveList(IDs=remove_list, scene=self.sceneID))
|
|
|
|
|
|
|
|
|
|
def clean_object(self):
|
|
|
|
|
stub.CleanObjects(GrabSim_pb2.SceneID(value=self.sceneID))
|
|
|
|
|
|
|
|
|
|
def grasp(self, handID, objectID):
|
|
|
|
|
stub.Do(
|
|
|
|
|
GrabSim_pb2.Action(
|
|
|
|
|
scene=self.sceneID,
|
|
|
|
|
action=GrabSim_pb2.Action.ActionType.Grasp,
|
|
|
|
|
values=[handID, objectID],
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def release(self, handID):
|
|
|
|
|
stub.Do(
|
|
|
|
|
GrabSim_pb2.Action(
|
|
|
|
|
scene=self.sceneID,
|
|
|
|
|
action=GrabSim_pb2.Action.ActionType.Release,
|
|
|
|
|
values=[handID],
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def get_camera_color(self, image_only=True):
|
|
|
|
|
camera_data = stub.Capture(
|
|
|
|
|
GrabSim_pb2.CameraList(
|
|
|
|
|
cameras=[GrabSim_pb2.CameraName.Head_Color], scene=self.sceneID
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
if image_only:
|
|
|
|
|
return image_extract(camera_data)
|
|
|
|
|
else:
|
|
|
|
|
return camera_data
|
|
|
|
|
|
|
|
|
|
def get_camera_depth(self, image_only=True):
|
|
|
|
|
camera_data = stub.Capture(
|
|
|
|
|
GrabSim_pb2.CameraList(
|
|
|
|
|
cameras=[GrabSim_pb2.CameraName.Head_Depth], scene=self.sceneID
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
if image_only:
|
|
|
|
|
return image_extract(camera_data)
|
|
|
|
|
else:
|
|
|
|
|
return camera_data
|
|
|
|
|
|
|
|
|
|
def get_camera_segment(self, image_only=True):
|
|
|
|
|
camera_data = stub.Capture(
|
|
|
|
|
GrabSim_pb2.CameraList(
|
|
|
|
|
cameras=[GrabSim_pb2.CameraName.Head_Segment], scene=self.sceneID
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
if image_only:
|
|
|
|
|
return image_extract(camera_data)
|
|
|
|
|
else:
|
|
|
|
|
return camera_data
|
|
|
|
|
|
|
|
|
|
def chat_bubble(self, message):
|
|
|
|
|
stub.ControlRobot(
|
|
|
|
|
GrabSim_pb2.ControlInfo(
|
|
|
|
|
scene=self.sceneID, type=0, action=1, content=message
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def animation_control(self, animation_type):
|
|
|
|
|
# animation_type: 1:make coffee 2: pour water 3: grab food 4: mop floor 5: clean table
|
|
|
|
|
scene = stub.ControlRobot(
|
|
|
|
|
GrabSim_pb2.ControlInfo(scene=self.sceneID, type=animation_type, action=1)
|
|
|
|
|
)
|
|
|
|
|
if scene.info == "action success":
|
|
|
|
|
for i in range(2, animation_step[animation_type - 1] + 1):
|
|
|
|
|
stub.ControlRobot(
|
|
|
|
|
GrabSim_pb2.ControlInfo(
|
|
|
|
|
scene=self.sceneID, type=animation_type, action=i
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def animation_reset(self):
|
|
|
|
|
stub.ControlRobot(GrabSim_pb2.ControlInfo(scene=self.sceneID, type=0, action=0))
|
|
|
|
|
|
2023-11-08 17:15:23 +08:00
|
|
|
|
def control_robot_action(self, type=0, action=0, message="你好"):
|
|
|
|
|
scene = stub.ControlRobot(
|
|
|
|
|
GrabSim_pb2.ControlInfo(
|
|
|
|
|
scene=self.sceneID, type=type, action=action, content=message
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
if str(scene.info).find("Action Success") > -1:
|
|
|
|
|
print(scene.info)
|
|
|
|
|
return True
|
|
|
|
|
else:
|
|
|
|
|
print(scene.info)
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def op_task_execute(self,task_type):
|
|
|
|
|
self.control_robot_action(0, 1, "开始"+self.op_dialog[task_type]) # 开始制作咖啡
|
|
|
|
|
result = self.control_robot_action(task_type, 1) #
|
|
|
|
|
self.control_robot_action(0, 2)
|
|
|
|
|
if result:
|
|
|
|
|
if self.op_act_num[task_type]>0:
|
|
|
|
|
for i in range(2,2+self.op_act_num[task_type]):
|
|
|
|
|
self.control_robot_action(task_type,i)
|
|
|
|
|
self.control_robot_action(0, 2)
|
|
|
|
|
self.control_robot_action(0, 1, "成功"+self.op_dialog[task_type])
|
|
|
|
|
else:
|
|
|
|
|
self.control_robot_action(0, 1, self.op_dialog[task_type]+"失败")
|
|
|
|
|
|
|
|
|
|
def move_task_area(self,op_type=0):
|
|
|
|
|
scene = stub.Observe(GrabSim_pb2.SceneID(value=self.sceneID))
|
|
|
|
|
|
|
|
|
|
walk_value = [scene.location.X, scene.location.Y, scene.rotation.Yaw]
|
|
|
|
|
print("------------------move_task_area----------------------")
|
|
|
|
|
print("position:", walk_value,"开始任务:",self.op_dialog[op_type])
|
|
|
|
|
for walk_v in self.op_v_list[op_type]:
|
|
|
|
|
walk_v = walk_v + [scene.rotation.Yaw, 60, 0]
|
|
|
|
|
action = GrabSim_pb2.Action(
|
|
|
|
|
scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v
|
|
|
|
|
)
|
|
|
|
|
scene = stub.Do(action)
|
|
|
|
|
|