569 lines
23 KiB
Python
569 lines
23 KiB
Python
"""Adapted from rllab maze_env.py."""
|
|
|
|
import os
|
|
import tempfile
|
|
import xml.etree.ElementTree as ET
|
|
import math
|
|
import numpy as np
|
|
import gym
|
|
from . import maze_env_utils
|
|
from .point import PointEnv
|
|
from gym.utils import seeding
|
|
|
|
# Directory that contains mujoco xml files.
|
|
MODEL_DIR = os.path.join(os.path.dirname(__file__), 'assets')
|
|
|
|
|
|
class PointMazeEnv(gym.Env):
|
|
MODEL_CLASS = PointEnv
|
|
|
|
MAZE_HEIGHT = None
|
|
MAZE_SIZE_SCALING = None
|
|
|
|
def __init__(
|
|
self,
|
|
maze_id=None,
|
|
maze_height=0.5,
|
|
maze_size_scaling=8,
|
|
n_bins=0,
|
|
sensor_range=3.,
|
|
sensor_span=2 * math.pi,
|
|
observe_blocks=False,
|
|
put_spin_near_agent=False,
|
|
top_down_view=False,
|
|
manual_collision=False,
|
|
goal=None,
|
|
EPS=0.25,
|
|
max_episode_steps=2000,
|
|
*args,
|
|
**kwargs):
|
|
self._maze_id = maze_id
|
|
|
|
model_cls = self.__class__.MODEL_CLASS
|
|
if model_cls is None:
|
|
raise "MODEL_CLASS unspecified!"
|
|
xml_path = os.path.join(MODEL_DIR, model_cls.FILE)
|
|
self.tree = tree = ET.parse(xml_path)
|
|
self.worldbody = worldbody = tree.find(".//worldbody")
|
|
self.visualize_goal = False
|
|
self.max_episode_steps = max_episode_steps
|
|
self.t = 0
|
|
self.MAZE_HEIGHT = height = maze_height
|
|
self.MAZE_SIZE_SCALING = size_scaling = maze_size_scaling
|
|
self._n_bins = n_bins
|
|
self._sensor_range = sensor_range * size_scaling
|
|
self._sensor_span = sensor_span
|
|
self._observe_blocks = observe_blocks
|
|
self._put_spin_near_agent = put_spin_near_agent
|
|
self._top_down_view = top_down_view
|
|
self._manual_collision = manual_collision
|
|
|
|
self.MAZE_STRUCTURE = structure = maze_env_utils.construct_maze(
|
|
maze_id=self._maze_id)
|
|
# Elevate the maze to allow for falling.
|
|
self.elevated = any(-1 in row for row in structure)
|
|
self.blocks = any(
|
|
any(maze_env_utils.can_move(r) for r in row)
|
|
for row in structure) # Are there any movable blocks?
|
|
|
|
torso_x, torso_y = self._find_robot() # x, y coordinates
|
|
self._init_torso_x = torso_x
|
|
self._init_torso_y = torso_y
|
|
self._init_positions = [
|
|
(x - torso_x, y - torso_y)
|
|
for x, y in self._find_all_robots()]
|
|
|
|
self._view = np.zeros([5, 5, 3])
|
|
|
|
height_offset = 0.
|
|
if self.elevated:
|
|
height_offset = height * size_scaling
|
|
torso = tree.find(".//body[@name='torso']")
|
|
torso.set('pos', '0 0 %.2f' % (0.75 + height_offset))
|
|
if self.blocks:
|
|
default = tree.find(".//default")
|
|
default.find('.//geom').set('solimp', '.995 .995 .01')
|
|
|
|
self.movable_blocks = []
|
|
for i in range(len(structure)):
|
|
for j in range(len(structure[0])):
|
|
struct = structure[i][j]
|
|
if struct == 'r' and self._put_spin_near_agent:
|
|
struct = maze_env_utils.Move.SpinXY
|
|
if self.elevated and struct not in [-1]:
|
|
# Create elevated platform.
|
|
ET.SubElement(
|
|
worldbody, "geom",
|
|
name="elevated_%d_%d" % (i, j),
|
|
pos="%f %f %f" % (j * size_scaling - torso_x,
|
|
i * size_scaling - torso_y,
|
|
height / 2 * size_scaling),
|
|
size="%f %f %f" % (0.5 * size_scaling,
|
|
0.5 * size_scaling,
|
|
height / 2 * size_scaling),
|
|
type="box",
|
|
material="",
|
|
contype="1",
|
|
conaffinity="1",
|
|
rgba="0.9 0.9 0.9 1",
|
|
)
|
|
if struct == 1: # Unmovable block.
|
|
# Offset all coordinates so that robot starts at the origin
|
|
ET.SubElement(
|
|
worldbody, "geom",
|
|
name="block_%d_%d" % (i, j),
|
|
pos="%f %f %f" % (j * size_scaling - torso_x,
|
|
i * size_scaling - torso_y,
|
|
height_offset +
|
|
height / 2 * size_scaling),
|
|
size="%f %f %f" % (0.5 * size_scaling,
|
|
0.5 * size_scaling,
|
|
height / 2 * size_scaling),
|
|
type="box",
|
|
material="",
|
|
contype="1",
|
|
conaffinity="1",
|
|
rgba="0.4 0.4 0.4 1",
|
|
)
|
|
elif maze_env_utils.can_move(struct):
|
|
name = "movable_%d_%d" % (i, j)
|
|
self.movable_blocks.append((name, struct))
|
|
falling = maze_env_utils.can_move_z(struct)
|
|
spinning = maze_env_utils.can_spin(struct)
|
|
x_offset = 0.25 * size_scaling if spinning else 0.0
|
|
y_offset = 0.0
|
|
shrink = 0.1 if spinning else 0.99 if falling else 1.0
|
|
height_shrink = 0.1 if spinning else 1.0
|
|
_x = j * size_scaling - torso_x + x_offset
|
|
_y = i * size_scaling - torso_y + y_offset
|
|
_z = height / 2 * size_scaling * height_shrink
|
|
movable_body = ET.SubElement(
|
|
worldbody, "body",
|
|
name=name,
|
|
pos="%f %f %f" % (_x, _y, height_offset + _z),
|
|
)
|
|
ET.SubElement(
|
|
movable_body, "geom",
|
|
name="block_%d_%d" % (i, j),
|
|
pos="0 0 0",
|
|
size="%f %f %f" % (0.5 * size_scaling * shrink,
|
|
0.5 * size_scaling * shrink,
|
|
_z),
|
|
type="box",
|
|
material="",
|
|
mass="0.001" if falling else "0.0002",
|
|
contype="1",
|
|
conaffinity="1",
|
|
rgba="0.9 0.1 0.1 1"
|
|
)
|
|
if maze_env_utils.can_move_x(struct):
|
|
ET.SubElement(
|
|
movable_body, "joint",
|
|
armature="0",
|
|
axis="1 0 0",
|
|
damping="0.0",
|
|
limited="true" if falling else "false",
|
|
range="%f %f" % (-size_scaling, size_scaling),
|
|
margin="0.01",
|
|
name="movable_x_%d_%d" % (i, j),
|
|
pos="0 0 0",
|
|
type="slide"
|
|
)
|
|
if maze_env_utils.can_move_y(struct):
|
|
ET.SubElement(
|
|
movable_body, "joint",
|
|
armature="0",
|
|
axis="0 1 0",
|
|
damping="0.0",
|
|
limited="true" if falling else "false",
|
|
range="%f %f" % (-size_scaling, size_scaling),
|
|
margin="0.01",
|
|
name="movable_y_%d_%d" % (i, j),
|
|
pos="0 0 0",
|
|
type="slide"
|
|
)
|
|
if maze_env_utils.can_move_z(struct):
|
|
ET.SubElement(
|
|
movable_body, "joint",
|
|
armature="0",
|
|
axis="0 0 1",
|
|
damping="0.0",
|
|
limited="true",
|
|
range="%f 0" % (-height_offset),
|
|
margin="0.01",
|
|
name="movable_z_%d_%d" % (i, j),
|
|
pos="0 0 0",
|
|
type="slide"
|
|
)
|
|
if maze_env_utils.can_spin(struct):
|
|
ET.SubElement(
|
|
movable_body, "joint",
|
|
armature="0",
|
|
axis="0 0 1",
|
|
damping="0.0",
|
|
limited="false",
|
|
name="spinable_%d_%d" % (i, j),
|
|
pos="0 0 0",
|
|
type="ball"
|
|
)
|
|
|
|
torso = tree.find(".//body[@name='torso']")
|
|
geoms = torso.findall(".//geom")
|
|
for geom in geoms:
|
|
if 'name' not in geom.attrib:
|
|
raise Exception("Every geom of the torso must have a name "
|
|
"defined")
|
|
|
|
_, file_path = tempfile.mkstemp(text=True, suffix='.xml')
|
|
tree.write(file_path)
|
|
|
|
self.wrapped_env = model_cls(*args, file_path=file_path, **kwargs)
|
|
self.args = args
|
|
self.kwargs = kwargs
|
|
self.GOAL = goal
|
|
if self.GOAL is not None:
|
|
self.GOAL = self.unwrapped._rowcol_to_xy(*self.GOAL)
|
|
self.EPS = EPS
|
|
|
|
def get_ori(self):
|
|
return self.wrapped_env.get_ori()
|
|
|
|
def get_top_down_view(self):
|
|
self._view = np.zeros_like(self._view)
|
|
|
|
def valid(row, col):
|
|
return self._view.shape[0] > row >= 0 \
|
|
and self._view.shape[1] > col >= 0
|
|
|
|
def update_view(x, y, d, row=None, col=None):
|
|
if row is None or col is None:
|
|
x = x - self._robot_x
|
|
y = y - self._robot_y
|
|
|
|
row, col = self._xy_to_rowcol(x, y)
|
|
update_view(x, y, d, row=row, col=col)
|
|
return
|
|
|
|
row, row_frac, col, col_frac = int(row), row % 1, int(col), col % 1
|
|
if row_frac < 0:
|
|
row_frac += 1
|
|
if col_frac < 0:
|
|
col_frac += 1
|
|
|
|
if valid(row, col):
|
|
self._view[row, col, d] += (
|
|
(min(1., row_frac + 0.5) - max(0., row_frac - 0.5)) *
|
|
(min(1., col_frac + 0.5) - max(0., col_frac - 0.5)))
|
|
if valid(row - 1, col):
|
|
self._view[row - 1, col, d] += (
|
|
(max(0., 0.5 - row_frac)) *
|
|
(min(1., col_frac + 0.5) - max(0., col_frac - 0.5)))
|
|
if valid(row + 1, col):
|
|
self._view[row + 1, col, d] += (
|
|
(max(0., row_frac - 0.5)) *
|
|
(min(1., col_frac + 0.5) - max(0., col_frac - 0.5)))
|
|
if valid(row, col - 1):
|
|
self._view[row, col - 1, d] += (
|
|
(min(1., row_frac + 0.5) - max(0., row_frac - 0.5)) *
|
|
(max(0., 0.5 - col_frac)))
|
|
if valid(row, col + 1):
|
|
self._view[row, col + 1, d] += (
|
|
(min(1., row_frac + 0.5) - max(0., row_frac - 0.5)) *
|
|
(max(0., col_frac - 0.5)))
|
|
if valid(row - 1, col - 1):
|
|
self._view[row - 1, col - 1, d] += (
|
|
(max(0., 0.5 - row_frac)) * max(0., 0.5 - col_frac))
|
|
if valid(row - 1, col + 1):
|
|
self._view[row - 1, col + 1, d] += (
|
|
(max(0., 0.5 - row_frac)) * max(0., col_frac - 0.5))
|
|
if valid(row + 1, col + 1):
|
|
self._view[row + 1, col + 1, d] += (
|
|
(max(0., row_frac - 0.5)) * max(0., col_frac - 0.5))
|
|
if valid(row + 1, col - 1):
|
|
self._view[row + 1, col - 1, d] += (
|
|
(max(0., row_frac - 0.5)) * max(0., 0.5 - col_frac))
|
|
|
|
# Draw ant.
|
|
robot_x, robot_y = self.wrapped_env.get_body_com("torso")[:2]
|
|
self._robot_x = robot_x
|
|
self._robot_y = robot_y
|
|
self._robot_ori = self.get_ori()
|
|
|
|
structure = self.MAZE_STRUCTURE
|
|
size_scaling = self.MAZE_SIZE_SCALING
|
|
|
|
# Draw immovable blocks and chasms.
|
|
for i in range(len(structure)):
|
|
for j in range(len(structure[0])):
|
|
if structure[i][j] == 1: # Wall.
|
|
update_view(j * size_scaling - self._init_torso_x,
|
|
i * size_scaling - self._init_torso_y,
|
|
0)
|
|
if structure[i][j] == -1: # Chasm.
|
|
update_view(j * size_scaling - self._init_torso_x,
|
|
i * size_scaling - self._init_torso_y,
|
|
1)
|
|
|
|
# Draw movable blocks.
|
|
for block_name, block_type in self.movable_blocks:
|
|
block_x, block_y = self.wrapped_env.get_body_com(block_name)[:2]
|
|
update_view(block_x, block_y, 2)
|
|
|
|
import cv2
|
|
cv2.imshow('x.jpg', cv2.resize(
|
|
np.uint8(self._view * 255), (512, 512),
|
|
interpolation=cv2.INTER_CUBIC))
|
|
cv2.waitKey(0)
|
|
|
|
return self._view
|
|
|
|
def get_range_sensor_obs(self):
|
|
"""Returns egocentric range sensor observations of maze."""
|
|
robot_x, robot_y, robot_z = self.wrapped_env.get_body_com("torso")[:3]
|
|
ori = self.get_ori()
|
|
|
|
structure = self.MAZE_STRUCTURE
|
|
size_scaling = self.MAZE_SIZE_SCALING
|
|
height = self.MAZE_HEIGHT
|
|
|
|
segments = []
|
|
# Get line segments (corresponding to outer boundary) of each immovable
|
|
# block or drop-off.
|
|
for i in range(len(structure)):
|
|
for j in range(len(structure[0])):
|
|
if structure[i][j] in [1, -1]: # There's a wall or drop-off.
|
|
cx = j * size_scaling - self._init_torso_x
|
|
cy = i * size_scaling - self._init_torso_y
|
|
x1 = cx - 0.5 * size_scaling
|
|
x2 = cx + 0.5 * size_scaling
|
|
y1 = cy - 0.5 * size_scaling
|
|
y2 = cy + 0.5 * size_scaling
|
|
struct_segments = [
|
|
((x1, y1), (x2, y1)),
|
|
((x2, y1), (x2, y2)),
|
|
((x2, y2), (x1, y2)),
|
|
((x1, y2), (x1, y1)),
|
|
]
|
|
for seg in struct_segments:
|
|
segments.append(dict(
|
|
segment=seg,
|
|
type=structure[i][j],
|
|
))
|
|
|
|
for block_name, block_type in self.movable_blocks:
|
|
block_x, block_y, block_z = \
|
|
self.wrapped_env.get_body_com(block_name)[:3]
|
|
if (block_z + height * size_scaling / 2 >= robot_z and
|
|
robot_z >= block_z - height * size_scaling / 2):
|
|
# Block in view.
|
|
x1 = block_x - 0.5 * size_scaling
|
|
x2 = block_x + 0.5 * size_scaling
|
|
y1 = block_y - 0.5 * size_scaling
|
|
y2 = block_y + 0.5 * size_scaling
|
|
struct_segments = [
|
|
((x1, y1), (x2, y1)),
|
|
((x2, y1), (x2, y2)),
|
|
((x2, y2), (x1, y2)),
|
|
((x1, y2), (x1, y1)),
|
|
]
|
|
for seg in struct_segments:
|
|
segments.append(dict(
|
|
segment=seg,
|
|
type=block_type,
|
|
))
|
|
|
|
# 3 for wall, drop-off, block
|
|
sensor_readings = np.zeros((self._n_bins, 3))
|
|
for ray_idx in range(self._n_bins):
|
|
ray_ori = (ori - self._sensor_span * 0.5 + (
|
|
2 * ray_idx + 1.0) /
|
|
(2 * self._n_bins) * self._sensor_span)
|
|
ray_segments = []
|
|
# Get all segments that intersect with ray.
|
|
for seg in segments:
|
|
p = maze_env_utils.ray_segment_intersect(
|
|
ray=((robot_x, robot_y), ray_ori),
|
|
segment=seg["segment"])
|
|
if p is not None:
|
|
ray_segments.append(dict(
|
|
segment=seg["segment"],
|
|
type=seg["type"],
|
|
ray_ori=ray_ori,
|
|
distance=maze_env_utils.point_distance(
|
|
p, (robot_x, robot_y)),
|
|
))
|
|
if len(ray_segments) > 0:
|
|
# Find out which segment is intersected first.
|
|
first_seg = sorted(
|
|
ray_segments, key=lambda x: x["distance"])[0]
|
|
seg_type = first_seg["type"]
|
|
idx = (0 if seg_type == 1 else # Wall.
|
|
1 if seg_type == -1 else # Drop-off.
|
|
2 if maze_env_utils.can_move(seg_type) else # Block.
|
|
None)
|
|
if first_seg["distance"] <= self._sensor_range:
|
|
sensor_readings[ray_idx][idx] = \
|
|
(self._sensor_range - first_seg[
|
|
"distance"]) / self._sensor_range
|
|
return sensor_readings
|
|
|
|
def _get_obs(self):
|
|
wrapped_obs = self.wrapped_env._get_obs()
|
|
if self._top_down_view:
|
|
self.get_top_down_view()
|
|
|
|
if self._observe_blocks:
|
|
additional_obs = []
|
|
for block_name, block_type in self.movable_blocks:
|
|
additional_obs.append(
|
|
self.wrapped_env.get_body_com(block_name))
|
|
wrapped_obs = np.concatenate([wrapped_obs[:3]] + additional_obs +
|
|
[wrapped_obs[3:]])
|
|
|
|
self.get_range_sensor_obs()
|
|
return wrapped_obs
|
|
|
|
def seed(self, seed=None):
|
|
self.np_random, seed = seeding.np_random(seed)
|
|
return [seed]
|
|
|
|
def reset(self, goal=None):
|
|
self.goal = goal
|
|
|
|
if self.visualize_goal: # remove the prev goal and add a new goal
|
|
goal_x, goal_y = goal[0], goal[1]
|
|
size_scaling = self.MAZE_SIZE_SCALING
|
|
# remove the original goal
|
|
try:
|
|
self.worldbody.remove(self.goal_element)
|
|
except AttributeError:
|
|
pass
|
|
# offset all coordinates so that robot starts at the origin
|
|
self.goal_element = \
|
|
ET.SubElement(
|
|
self.worldbody, "geom",
|
|
name="goal_%d_%d" % (goal_x, goal_y),
|
|
pos="%f %f %f" % (goal_x,
|
|
goal_y,
|
|
self.MAZE_HEIGHT / 2 * size_scaling),
|
|
# smaller than the block to prevent collision
|
|
size="%f %f %f" % (0.1 * size_scaling,
|
|
0.1 * size_scaling,
|
|
self.MAZE_HEIGHT / 2 * size_scaling),
|
|
type="box",
|
|
material="",
|
|
contype="1",
|
|
conaffinity="1",
|
|
rgba="1.0 0.0 0.0 0.5"
|
|
)
|
|
# Note: running the lines below will make the robot position wrong!
|
|
# (because the graph is rebuilt)
|
|
torso = self.tree.find(".//body[@name='torso']")
|
|
geoms = torso.findall(".//geom")
|
|
for geom in geoms:
|
|
if 'name' not in geom.attrib:
|
|
raise Exception("Every geom of the torso must have a name "
|
|
"defined")
|
|
_, file_path = tempfile.mkstemp(text=True, suffix='.xml')
|
|
self.tree.write(file_path)
|
|
# here we write a temporal file with the robot specifications.
|
|
# Why not the original one??
|
|
|
|
model_cls = self.__class__.MODEL_CLASS
|
|
# file to the robot specifications; model_cls is AntEnv
|
|
self.wrapped_env = model_cls(
|
|
*self.args, file_path=file_path, **self.kwargs)
|
|
|
|
self.t = 0
|
|
self.trajectory = []
|
|
self.wrapped_env.reset()
|
|
if len(self._init_positions) > 1:
|
|
xy = self._init_positions[self.np_random.randint(
|
|
len(self._init_positions))]
|
|
self.wrapped_env.set_xy(xy)
|
|
return self._get_obs()
|
|
|
|
@property
|
|
def viewer(self):
|
|
return self.wrapped_env.viewer
|
|
|
|
def render(self, *args, **kwargs):
|
|
return self.wrapped_env.render(*args, **kwargs)
|
|
|
|
@property
|
|
def observation_space(self):
|
|
shape = self._get_obs().shape
|
|
high = np.inf * np.ones(shape)
|
|
low = -high
|
|
return gym.spaces.Box(low, high)
|
|
|
|
@property
|
|
def action_space(self):
|
|
return self.wrapped_env.action_space
|
|
|
|
def _find_robot(self):
|
|
structure = self.MAZE_STRUCTURE
|
|
size_scaling = self.MAZE_SIZE_SCALING
|
|
for i in range(len(structure)):
|
|
for j in range(len(structure[0])):
|
|
if structure[i][j] == 'r':
|
|
return j * size_scaling, i * size_scaling
|
|
assert False, 'No robot in maze specification.'
|
|
|
|
def _find_all_robots(self):
|
|
structure = self.MAZE_STRUCTURE
|
|
size_scaling = self.MAZE_SIZE_SCALING
|
|
coords = []
|
|
for i in range(len(structure)):
|
|
for j in range(len(structure[0])):
|
|
if structure[i][j] == 'r':
|
|
coords.append((j * size_scaling, i * size_scaling))
|
|
return coords
|
|
|
|
def _is_in_collision(self, pos):
|
|
x, y = pos
|
|
structure = self.MAZE_STRUCTURE
|
|
scale = self.MAZE_SIZE_SCALING
|
|
for i in range(len(structure)):
|
|
for j in range(len(structure[0])):
|
|
if structure[i][j] == 1:
|
|
minx = j * scale - scale * 0.5 - self._init_torso_x
|
|
maxx = j * scale + scale * 0.5 - self._init_torso_x
|
|
miny = i * scale - scale * 0.5 - self._init_torso_y
|
|
maxy = i * scale + scale * 0.5 - self._init_torso_y
|
|
if minx <= x <= maxx and miny <= y <= maxy:
|
|
return True
|
|
return False
|
|
|
|
def _rowcol_to_xy(self, j, i):
|
|
scale = self.MAZE_SIZE_SCALING
|
|
minx = j * scale - scale * 0.5 - self._init_torso_x
|
|
maxx = j * scale + scale * 0.5 - self._init_torso_x
|
|
miny = i * scale - scale * 0.5 - self._init_torso_y
|
|
maxy = i * scale + scale * 0.5 - self._init_torso_y
|
|
return (minx + maxx) / 2, (miny + maxy) / 2
|
|
|
|
def step(self, action):
|
|
self.t += 1
|
|
if self._manual_collision:
|
|
old_pos = self.wrapped_env.get_xy()
|
|
inner_next_obs, inner_reward, inner_done, info = \
|
|
self.wrapped_env.step(action)
|
|
new_pos = self.wrapped_env.get_xy()
|
|
if self._is_in_collision(new_pos):
|
|
self.wrapped_env.set_xy(old_pos)
|
|
else:
|
|
inner_next_obs, inner_reward, inner_done, info = \
|
|
self.wrapped_env.step(action)
|
|
next_obs = self._get_obs()
|
|
done = False
|
|
if self.goal is not None:
|
|
done = bool(((next_obs[:2] - self.goal[:2]) ** 2).sum() < self.EPS)
|
|
|
|
new_pos = self.wrapped_env.get_xy()
|
|
if self._is_in_collision(new_pos) or inner_done:
|
|
done = True
|
|
if self.t >= self.max_episode_steps:
|
|
done = True
|
|
return next_obs, inner_reward, done, info
|