Source code for gym_urbandriving.envs.urbandriving_env

import gym
from copy import deepcopy
from gym_urbandriving.agents import *
import gym_urbandriving as uds
import IPython
import numpy as np

from gym_urbandriving.assets import Car, TrafficLight, CrosswalkLight
from gym_urbandriving.utils.featurizer import Featurizer
from gym_urbandriving.planning.reward_fn import default_reward_function
[docs]class UrbanDrivingEnv(gym.Env): """ This class controls the evolution of a world state. While the :class:`PositionState` represents the layout of objects in the scene, the :class:`UrbanDrivingEnv` controls the evolution of the scene, and manages background actors. Note ----- This class is used both to represent the true, global state of the world, and as a search agent's internal view of the world. Parameters ---------- config_data: Dict JSON config file is loaded into a dictonary specifing prameters init_state : PositionState The starting state for this environment (Only needed if now JSON config file is used) reward_fn : A function which takes in one parameter, the :class:`PositionState`, randomize : bool Sets whether :func:`env._reset()` returns the environment to the initial state, or if it returns it to a random state generated by the state's :func:`randomize()` function """ metadata = {'render.modes': ['human']} def __init__(self, config_data={}, init_state= None, reward_fn=default_reward_function, randomize=False): self.reward_fn = reward_fn self.max_time = 500 self.observation_type = 'raw' if config_data: self.featurizer = Featurizer(config_data['agents']['state_space_config']) self.init_state = uds.state.PositionState(config_data) if config_data['environment']['visualize']: self.visualizer = uds.PyGameVisualizer(config_data, (800, 800)) else: self.visualizer = None self.max_time = config_data['environment']['max_time'] self.observation_type = config_data['agents']['state_space'] self.visualize_lidar = config_data['environment']['visualize_lidar'] assert(self.observation_type in {'Q-LIDAR', 'raw', 'bmp'}) else: self.init_state = init_state self.visualizer = None self.lidar_points = {} self.randomize = randomize self.statics_rendered = False self.dynamic_collisions, self.static_collisions, self.last_col = [], [], -1 if (self.init_state): self._reset()
[docs] def _step(self, action, background_simplified=False,supervisor=False): """ The step function accepts a control for the 0th agent in the scene. Then, it queries all the background agents to determine their actions. Then, it updates the scene and returns. Parameters ---------- action : An action for the agentnum object in the scene. agentnum : int The index for the object which the action is applied for. Returns ------- PositionState State of the world after this step; float Reward calculated by :func:`self.reward_fn`, bool Whether we have timed out, or there is a collision) dict """ assert(self.current_state is not None) # Get actions for all objects background_car_actions = [] controlled_car_actions = [] background_traffic_actions = [] pedestrian_actions = [] crosswalk_light_actions = [] self.lidar_points = {} ### GET ALL ACTIONS #### for agent in self.current_state.bg_agents['background_cars']: background_car_actions.append(agent.eval_policy(self.current_state,simplified=background_simplified)) for agent in self.current_state.bg_agents['traffic_lights']: background_traffic_actions.append(agent.eval_policy(self.current_state)) for i,agent in enumerate(self.current_state.bg_agents['controlled_cars']): controlled_car_actions.append(agent.eval_policy(action[i],self.current_state,simplified=background_simplified)) for i,agent in enumerate(self.current_state.bg_agents['pedestrians']): pedestrian_actions.append(agent.eval_policy(self.current_state)) for i,agent in enumerate(self.current_state.bg_agents['crosswalk_lights']): crosswalk_light_actions.append(agent.eval_policy(self.current_state)) ####UPDATE ALL POLICIES######### for index, dobj in self.current_state.dynamic_objects['background_cars'].items(): dobj.step(background_car_actions[int(index)].get_value()) for index, dobj in self.current_state.dynamic_objects['traffic_lights'].items(): dobj.step(background_traffic_actions[int(index)]) for i, dobj in self.current_state.dynamic_objects['controlled_cars'].items(): # if controlled_car_actions[int(i)] == None: # IPython.embed() dobj.step(controlled_car_actions[int(i)].get_value()) for i, dobj in self.current_state.dynamic_objects['pedestrians'].items(): dobj.step(pedestrian_actions[int(i)]) for i, dobj in self.current_state.dynamic_objects['crosswalk_lights'].items(): dobj.step(crosswalk_light_actions[int(i)]) self.current_state.time += 1 dynamic_coll, static_coll, controlled_car_collisions = self.current_state.get_collisions() state = self.current_state reward = self.reward_fn(self.current_state) #done = (self.current_state.time == self.max_time) or len(dynamic_coll) or len(static_coll) done = self.current_state.time == self.max_time info_dict = {"saved_actions": action} if self.observation_type == 'raw': observations = [deepcopy(state) for i in state.dynamic_objects['controlled_cars']] elif self.observation_type == 'Q-LIDAR': observations = [] for key in self.current_state.dynamic_objects['controlled_cars'].keys(): obs = self.featurizer.featurize(self.current_state, key) observations.append(obs) self.lidar_points[key] = obs elif self.observation_type == 'bmp': assert(self.visualizer) self._render() observations = [self.visualizer.get_bitmap()] * len(state.dynamic_objects['controlled_cars']) if self.visualizer: self._render() return observations, reward, done, info_dict
def get_initial_observations(self): if self.observation_type == 'raw': observations = [self.current_state] * len(self.current_state.dynamic_objects['controlled_cars']) elif self.observation_type == 'Q-LIDAR': observations = [] for key in self.current_state.dynamic_objects['controlled_cars'].keys(): obs = self.featurizer.featurize(self.current_state, key) observations.append(obs) elif self.observation_type == 'bmp': assert(self.visualizer) self._render() observations = [self.visualizer.get_bitmap()] * len(state.dynamic_objects['controlled_cars']) return observations
[docs] def _reset(self, new_state=None): """ Resets the environment to its initial state, or a new state Parameters ---------- new_state : PositionState If specified, the environment will reset to this state """ self.last_col = -1 if new_state: self.init_state = new_state self.statics_rendered = False if self.randomize: self.init_state.randomize() self.current_state = deepcopy(self.init_state) return
[docs] def _render(self, mode='human', close=False, waypoints=[], traffic_trajectories=[], transparent_surface=None): """ If the renderer was specifed at construction, renders the current state of the world Parameters ---------- mode : str For OpenAI Gym compatibility waypoints : Extra points you would like to render over top of the the scene, for debugging """ if close: return if self.visualizer: window = [0, self.current_state.dimensions[0], 0, self.current_state.dimensions[1]] self.visualizer.render(self.current_state, window, rerender_statics= not self.statics_rendered, waypoints=waypoints, traffic_trajectories=traffic_trajectories, transparent_surface=transparent_surface, lidar_points=self.lidar_points if self.visualize_lidar else []) self.statics_rendered = True
def get_current_state(self): return self.current_state def get_state_copy(self): return deepcopy(self.current_state)