import collections import cv2 import gymnasium as gym import numpy as np import pygame import pymunk import pymunk.pygame_util import shapely.geometry as sg import skimage.transform as st from gymnasium import spaces from pymunk.vec2d import Vec2d from pusht.pymunk_override import DrawOptions def pymunk_to_shapely(body, shapes): geoms = [] for shape in shapes: if isinstance(shape, pymunk.shapes.Poly): verts = [body.local_to_world(v) for v in shape.get_vertices()] verts += [verts[0]] geoms.append(sg.Polygon(verts)) else: raise RuntimeError(f"Unsupported shape type {type(shape)}") geom = sg.MultiPolygon(geoms) return geom class PushTEnv(gym.Env): metadata = {"render.modes": ["human", "rgb_array"], "video.frames_per_second": 10} reward_range = (0.0, 1.0) def __init__( self, legacy=True, # compatibility with original block_cog=None, damping=None, render_action=True, render_size=96, reset_to_state=None, ): self._seed = None self.seed() self.window_size = ws = 512 # The size of the PyGame window self.render_size = render_size self.sim_hz = 100 # Local controller params. self.k_p, self.k_v = 100, 20 # PD control.z self.control_hz = self.metadata["video.frames_per_second"] # legcay set_state for data compatibility self.legacy = legacy # agent_pos, block_pos, block_angle self.observation_space = spaces.Box( low=np.array([0, 0, 0, 0, 0], dtype=np.float64), high=np.array([ws, ws, ws, ws, np.pi * 2], dtype=np.float64), shape=(5,), dtype=np.float64, ) # positional goal for agent self.action_space = spaces.Box( low=np.array([0, 0], dtype=np.float64), high=np.array([ws, ws], dtype=np.float64), shape=(2,), dtype=np.float64, ) self.block_cog = block_cog self.damping = damping self.render_action = render_action """ If human-rendering is used, `self.window` will be a reference to the window that we draw to. `self.clock` will be a clock that is used to ensure that the environment is rendered at the correct framerate in human-mode. They will remain `None` until human-mode is used for the first time. """ self.window = None self.clock = None self.screen = None self.space = None self.teleop = None self.render_buffer = None self.latest_action = None self.reset_to_state = reset_to_state def reset(self): seed = self._seed self._setup() if self.block_cog is not None: self.block.center_of_gravity = self.block_cog if self.damping is not None: self.space.damping = self.damping # use legacy RandomState for compatibility state = self.reset_to_state if state is None: rs = np.random.RandomState(seed=seed) state = np.array( [ rs.randint(50, 450), rs.randint(50, 450), rs.randint(100, 400), rs.randint(100, 400), rs.randn() * 2 * np.pi - np.pi, ] ) self._set_state(state) observation = self._get_obs() return observation def step(self, action): dt = 1.0 / self.sim_hz self.n_contact_points = 0 n_steps = self.sim_hz // self.control_hz if action is not None: self.latest_action = action for _ in range(n_steps): # Step PD control. # self.agent.velocity = self.k_p * (act - self.agent.position) # P control works too. acceleration = self.k_p * (action - self.agent.position) + self.k_v * ( Vec2d(0, 0) - self.agent.velocity ) self.agent.velocity += acceleration * dt # Step physics. self.space.step(dt) # compute reward goal_body = self._get_goal_pose_body(self.goal_pose) goal_geom = pymunk_to_shapely(goal_body, self.block.shapes) block_geom = pymunk_to_shapely(self.block, self.block.shapes) intersection_area = goal_geom.intersection(block_geom).area goal_area = goal_geom.area coverage = intersection_area / goal_area reward = np.clip(coverage / self.success_threshold, 0, 1) done = coverage > self.success_threshold observation = self._get_obs() info = self._get_info() return observation, reward, done, info def render(self, mode): return self._render_frame(mode) def teleop_agent(self): TeleopAgent = collections.namedtuple("TeleopAgent", ["act"]) def act(obs): act = None mouse_position = pymunk.pygame_util.from_pygame(Vec2d(*pygame.mouse.get_pos()), self.screen) if self.teleop or (mouse_position - self.agent.position).length < 30: self.teleop = True act = mouse_position return act return TeleopAgent(act) def _get_obs(self): obs = np.array( tuple(self.agent.position) + tuple(self.block.position) + (self.block.angle % (2 * np.pi),) ) return obs def _get_goal_pose_body(self, pose): mass = 1 inertia = pymunk.moment_for_box(mass, (50, 100)) body = pymunk.Body(mass, inertia) # preserving the legacy assignment order for compatibility # the order here doesn't matter somehow, maybe because CoM is aligned with body origin body.position = pose[:2].tolist() body.angle = pose[2] return body def _get_info(self): n_steps = self.sim_hz // self.control_hz n_contact_points_per_step = int(np.ceil(self.n_contact_points / n_steps)) info = { "pos_agent": np.array(self.agent.position), "vel_agent": np.array(self.agent.velocity), "block_pose": np.array(list(self.block.position) + [self.block.angle]), "goal_pose": self.goal_pose, "n_contacts": n_contact_points_per_step, } return info def _render_frame(self, mode): if self.window is None and mode == "human": pygame.init() pygame.display.init() self.window = pygame.display.set_mode((self.window_size, self.window_size)) if self.clock is None and mode == "human": self.clock = pygame.time.Clock() canvas = pygame.Surface((self.window_size, self.window_size)) canvas.fill((255, 255, 255)) self.screen = canvas draw_options = DrawOptions(canvas) # Draw goal pose. goal_body = self._get_goal_pose_body(self.goal_pose) for shape in self.block.shapes: goal_points = [ pymunk.pygame_util.to_pygame(goal_body.local_to_world(v), draw_options.surface) for v in shape.get_vertices() ] goal_points += [goal_points[0]] pygame.draw.polygon(canvas, self.goal_color, goal_points) # Draw agent and block. self.space.debug_draw(draw_options) if mode == "human": # The following line copies our drawings from `canvas` to the visible window self.window.blit(canvas, canvas.get_rect()) pygame.event.pump() pygame.display.update() # the clock is already ticked during in step for "human" img = np.transpose(np.array(pygame.surfarray.pixels3d(canvas)), axes=(1, 0, 2)) img = cv2.resize(img, (self.render_size, self.render_size)) if self.render_action and self.latest_action is not None: action = np.array(self.latest_action) coord = (action / 512 * 96).astype(np.int32) marker_size = int(8 / 96 * self.render_size) thickness = int(1 / 96 * self.render_size) cv2.drawMarker( img, coord, color=(255, 0, 0), markerType=cv2.MARKER_CROSS, markerSize=marker_size, thickness=thickness, ) return img def close(self): if self.window is not None: pygame.display.quit() pygame.quit() def seed(self, seed=None): if seed is None: seed = np.random.randint(0, 25536) self._seed = seed self.np_random = np.random.default_rng(seed) def _handle_collision(self, arbiter, space, data): self.n_contact_points += len(arbiter.contact_point_set.points) def _set_state(self, state): if isinstance(state, np.ndarray): state = state.tolist() pos_agent = state[:2] pos_block = state[2:4] rot_block = state[4] self.agent.position = pos_agent # setting angle rotates with respect to center of mass # therefore will modify the geometric position # if not the same as CoM # therefore should be modified first. if self.legacy: # for compatibility with legacy data self.block.position = pos_block self.block.angle = rot_block else: self.block.angle = rot_block self.block.position = pos_block # Run physics to take effect self.space.step(1.0 / self.sim_hz) def _set_state_local(self, state_local): agent_pos_local = state_local[:2] block_pose_local = state_local[2:] tf_img_obj = st.AffineTransform(translation=self.goal_pose[:2], rotation=self.goal_pose[2]) tf_obj_new = st.AffineTransform(translation=block_pose_local[:2], rotation=block_pose_local[2]) tf_img_new = st.AffineTransform(matrix=tf_img_obj.params @ tf_obj_new.params) agent_pos_new = tf_img_new(agent_pos_local) new_state = np.array(list(agent_pos_new[0]) + list(tf_img_new.translation) + [tf_img_new.rotation]) self._set_state(new_state) return new_state def _setup(self): self.space = pymunk.Space() self.space.gravity = 0, 0 self.space.damping = 0 self.teleop = False self.render_buffer = [] # Add walls. walls = [ self._add_segment((5, 506), (5, 5), 2), self._add_segment((5, 5), (506, 5), 2), self._add_segment((506, 5), (506, 506), 2), self._add_segment((5, 506), (506, 506), 2), ] self.space.add(*walls) # Add agent, block, and goal zone. self.agent = self.add_circle((256, 400), 15) self.block = self.add_tee((256, 300), 0) self.goal_color = pygame.Color("LightGreen") self.goal_pose = np.array([256, 256, np.pi / 4]) # x, y, theta (in radians) # Add collision handling self.collision_handeler = self.space.add_collision_handler(0, 0) self.collision_handeler.post_solve = self._handle_collision self.n_contact_points = 0 self.max_score = 50 * 100 self.success_threshold = 0.95 # 95% coverage. def _add_segment(self, a, b, radius): shape = pymunk.Segment(self.space.static_body, a, b, radius) shape.color = pygame.Color("LightGray") # https://htmlcolorcodes.com/color-names return shape def add_circle(self, position, radius): body = pymunk.Body(body_type=pymunk.Body.KINEMATIC) body.position = position body.friction = 1 shape = pymunk.Circle(body, radius) shape.color = pygame.Color("RoyalBlue") self.space.add(body, shape) return body def add_box(self, position, height, width): mass = 1 inertia = pymunk.moment_for_box(mass, (height, width)) body = pymunk.Body(mass, inertia) body.position = position shape = pymunk.Poly.create_box(body, (height, width)) shape.color = pygame.Color("LightSlateGray") self.space.add(body, shape) return body def add_tee(self, position, angle, scale=30, color="LightSlateGray", mask=None): if mask is None: mask = pymunk.ShapeFilter.ALL_MASKS() mass = 1 length = 4 vertices1 = [ (-length * scale / 2, scale), (length * scale / 2, scale), (length * scale / 2, 0), (-length * scale / 2, 0), ] inertia1 = pymunk.moment_for_poly(mass, vertices=vertices1) vertices2 = [ (-scale / 2, scale), (-scale / 2, length * scale), (scale / 2, length * scale), (scale / 2, scale), ] inertia2 = pymunk.moment_for_poly(mass, vertices=vertices1) body = pymunk.Body(mass, inertia1 + inertia2) shape1 = pymunk.Poly(body, vertices1) shape2 = pymunk.Poly(body, vertices2) shape1.color = pygame.Color(color) shape2.color = pygame.Color(color) shape1.filter = pymunk.ShapeFilter(mask=mask) shape2.filter = pymunk.ShapeFilter(mask=mask) body.center_of_gravity = (shape1.center_of_gravity + shape2.center_of_gravity) / 2 body.position = position body.angle = angle body.friction = 1 self.space.add(body, shape1, shape2) return body