env.simple_intersection package

Submodules

env.simple_intersection.constants module

env.simple_intersection.constants.DT = 0.1

the time difference btw. the current and the next time steps.

env.simple_intersection.constants.DT_over_2 = 0.05

Constants for state-constrained Runge-Kutta 4 method in vehicles.py

env.simple_intersection.constants.EGO_INDEX = 0

Index of the ego vehicle in the vehicles list

env.simple_intersection.constants.H_CAR_SCALE(w)

Lambda function that takes in car width and returns scale_x for car

env.simple_intersection.constants.H_SPACE_SCALE = 5.565217391304348

Horizontal space scale

env.simple_intersection.constants.H_TILE_SCALE(w)

Lambda function that takes in tile width and returns scale_x for road tile

env.simple_intersection.constants.LANE_SEPARATOR_HALF_WIDTH = 1

Lane separator width

env.simple_intersection.constants.LANE_WIDTH = 22.772727272727273

Lane width

env.simple_intersection.constants.MAX_ACCELERATION = 2.0

maximum acceleration

env.simple_intersection.constants.MAX_NUM_VEHICLES = 6

MAX NUMBER OF VEHICLES

env.simple_intersection.constants.MAX_STEERING_ANGLE = 0.9599310885968813

maximum steering angle - 33 deg. (extreme case – 53 deg.)

env.simple_intersection.constants.MAX_STEERING_ANGLE_RATE = 1.0

maximum rate of change of steering angle

env.simple_intersection.constants.NUM_HPIXELS = 640

Number of pixels in the horizontal direction

env.simple_intersection.constants.NUM_HTILES = 14

Number of horizontal tiles

env.simple_intersection.constants.NUM_VPIXELS = 501

Number of pixels in the vertical direction

env.simple_intersection.constants.NUM_VTILES = 11

Number of horizontal tiles

env.simple_intersection.constants.ROAD_TILE_IMAGE_DIM = 300

Road tile image dimension

env.simple_intersection.constants.VEHICLE_WHEEL_BASE = 1.7

vehicle wheel base ( = L in the paper)

env.simple_intersection.constants.V_CAR_SCALE(h)

Lambda function that takes in car height and returns scale_y for car

env.simple_intersection.constants.V_SPACE_SCALE = 5.566666666666666

Vertical space scale

env.simple_intersection.constants.V_TILE_SCALE(h)

Lambda function that takes in tile height and returns scale_y for road tile

env.simple_intersection.features module

class env.simple_intersection.features.Features(env)

Bases: object

get_features_tuple()

continuous + discrete features.

reset(env)
class env.simple_intersection.features.OtherVehFeatures(rel_x, rel_y, v, acc, waited_time)

Bases: object

env.simple_intersection.features.extract_ego_features(features_tuple, *args)
env.simple_intersection.features.extract_other_veh_features(features_tuple, veh_index, *args)

env.simple_intersection.road_geokinemetry module

class env.simple_intersection.road_geokinemetry.Route(direction, n_lanes, start_pos, end_pos, width, centres, stop_region, near_stop_region, sidewalks)

Bases: object

A class that contains the lane(s) information for each route (‘h’ for horizontal or ‘v’ for vertical).

The current implementation is tested when the number of lanes is “2”.

Properties:
  • direction: ‘h’ for hlanes or ‘v’ for vlanes
  • n_lanes: the number of lanes (has to be multiples of 2)
  • start_pos: the position that the route starts
  • end_pos: the position that the route ends
  • min_pos: the minimum value of the position on the route
  • max_pos: the maximum value of the position on the route
  • middle_pos: the center value of the position on the route
  • length: the length of the route
  • width: the width of each lane
  • centres: an array containing each lane’s center point
  • stop_region: an array defining the stop region on the route
  • near_stop_region: the position before stop_region[1] defining
    the region near the stop region
  • stop_region_centre: the center position of the stop region
  • sidewalks: the side walk points of the route
  • total_width: the width of the route
  • total_centre: the center of the route
pos_range

env.simple_intersection.road_networks module

class env.simple_intersection.road_networks.RoadNetworkCross(image_url)

Bases: env.simple_intersection.shapes.Shape

Road network for the cross scenario.

This scenario has just one horizontal lane and one vertical lane. Relevant constants are used from the constants file.

draw()

Draw the horizontal and vertical lane.

env.simple_intersection.shapes module

class env.simple_intersection.shapes.Image(image_url, anchor=None, tile_scale=False, image_type=None)

Bases: env.simple_intersection.shapes.Shape

Holds an image.

draw()

Draw Image object.

class env.simple_intersection.shapes.Rectangle(xmin, xmax, ymin, ymax, color=(0, 0, 0, 255))

Bases: env.simple_intersection.shapes.Shape

Rectangle using OpenGL.

draw()

Draw Rectangle object using OpenGL.

class env.simple_intersection.shapes.Shape

Bases: object

Shape abstract class.

class env.simple_intersection.shapes.Text(text, x, y, font_size=12, color=(0, 0, 0, 255), multiline=False, width=None)

Bases: env.simple_intersection.shapes.Shape

Holds a pyglet label.

draw()

Draw Text object.

env.simple_intersection.simple_intersection_env module

class env.simple_intersection.simple_intersection_env.SimpleIntersectionEnv(**kwargs)

Bases: env.road_env.RoadEnv, env.env_base.EpisodicEnvBase

Road environment corresponding to the cross scenario.

This scenario has just one horizontal lane and one vertical lane. Relevant constants are used from the constants file.

V2V_ref = 6.0

The vehicle-to-vehicle desired distance in the aggressive driving policy

action_space
aggressive_driving_policy(veh_index, maneuver_opt=None)

Implementation of the aggressive driving policy. Calculate the (veh_index)-th vehicle’s low-level action (a, dot_phi) using aggressive driving policy. If using aggressive driving policy for all vehicles, there is [has to be] no collision.

Parameters:
  • veh_index – the index of the vehicle to calculate the action (a, dot_phi) via aggressive driving policy.
  • maneuver_opt – an option that specifies a maneuver to be executed; it can be ‘stop’, ‘wait’, etc.
Returns:

current low-level action (a, dot_phi) generated by the aggressive driving policy for the current state.

check_ego_collision()

Checks if ego collides with any other vehicle in the environment. Uses bounding boxes to check if a seperating line exists between given vehicle and each of the other vehicles.

Returns: True if ego collides with another vehicle, False otherwise

check_other_veh_collisions()

Checks if other vehicles other than ego collide in the environment. Uses V2V distance for vehicles in same lane, otherwise uses bounding boxes Checks all vehicles other than ego.

Returns: True if a vehicle collides with another vehicle, False otherwise

collision_happened
copy()
cost(u)

Calculate the driving cost of the ego, i.e., the negative reward for the ego-driving.

Parameters:u – the low-level input (a, dot_psi) to the ego vehicle.
Returns:
the weighted sum of squares of the components in
the vector “vec.” Note that a cost is a negative reward.
Return type:cost
cost_normalization_ranges = [[-3.5, 3.5], [-6.2, 6.2], [-10.081044753473748, -10.081044753473748], [-12, 12], [0, 12], [-2, 2], [-16, 16], [-2, 2]]
draw(info=None)

Draw the background, road network and vehicle network. Also show information as passed on by info.

Parameters:info – the information string to be shown on the window
ego
ego_info_text = None

info for displaying on top of ego

generate_scenario(**kwargs)

Randomly generate a road scenario with.

“the N-number of vehicles + an ego vehicle”

(n_others_range[0] <= N <= n_others_range[1]), all of whose speeds are also set randomly (or deterministically depending on the options) between 0 and the speed limit of the road environment. The position and speed of the vehicles are set (some of them randomly) but not in a way that causes inevitable collision or violation. The ego vehicle is always posed on a horizontal lane and there is initially no or only one vehicle posed in the intersection.

TODO: More debugging TODO: Add more comments for each block within the function for comprehensive understanding. TODO: restructure the code by making private method for each part for a certain option.

Parameters:
  • n_others_range – a tuple (n1, n2), which gives the range of the vehicles on the road, excluding the ego vehicle, by n1 <= N <= n2.
  • ego_pos_range – a tuple or list (x1, x2); the ego’s x-position will be sampled uniformly in [x1, x2].
  • ego_v_upper_lim – the ego’s velocity limit. the ego’s speed will be sampled uniformly in [0, ego_v_upper_lim]
  • ego_lane – the lane in which ego will be initialized
  • ego_perturb_lim

    a tuple (da, dtheta), where da is the absolute maximum of the ego’s initial perturbations along the y-axis from the lane centre; dtheta is the absolute maximum of the ego’s heading angle perturbation from the horizontal line (the angle parallel to the horizontal lane). The perturbation along the y-axis is sampled from

    ”uniform(-ego_perturb_lim[0], ego_perturb_lim[0])”

    within the lane boundary. Similarly, the perturbation of the heading angle is sampled from

    ”uniform(-ego_perturb_lim[1], ego_perturb_lim[1]).”

    da is restricted by the width of the road, and dtheta the non- holonomic constraint;

  • ego_heading_towards_lane_centre – If True, then the ego’s heading angle is always initialized towards the lane centre.
  • others_h_pos_range – the range of the other vehicles that are on the horizontal route and are not corresponding to the vehicle generated by the code for a special scenario below.
  • others_v_pos_range – the range of the other vehicles that are on the vertical route and initially not stopped in the stop region.
  • n_others_stopped_in_stop_region – the number of the other vehicles that stopped in the stop region, on the other lanes than that of the ego. The maximum is 3 (as there are 4-lanes including ego’s). When this is true, the waited_count of the stopped cars (except the ego) in the stop region will be set randomly. This option is useful when generating an initial scenario for Wait maneuver.
  • veh_ahead_scenario – if True, the generated scenario has a vehicle ahead of the ego within self.veh_ahead_max. This option is useful in generating an initial scenario for e.g., Follow and Pass.
  • stopped_car_scenario – if True, the generated scenario contains a stopped car located after the intersection but in the same lane as ego’s TODO: this option is not fully tested and need some debugging.
  • allow_vehicles_in_ego_lane – if False, there will be no vehicle in the same lane to the ego’s. This option is useful when you generate an initial scenario like those in Default and Finish maneuvers.
  • v_max_multiplier – a constant between 0 and 1, used in resetting the speeds of the vehicles if necessary to make sure that at least an input to the ego exists to avoid collisions or stop-sign violations. The less v_max_multiplier, more conservative scenario is generated (but when v_max_multipler = 1, it could generate a scenario where a collision is inevitable). 0.9 is a value set by the developer through trial-and- error and exhaustive testing.
  • randomize_special_scenarios – If True, randomizes scenarios such as vehicle_ahead_scenario, stopped_car_scenario and n_others_stopped_in_stop_region
:returns
False if the initial state is Terminal, True if not.
get_V2V_distance(veh_index=0)

Calculate the distance between the target and the nearest adjacent car in the same lane. The distance is given as a bumper-to-bumper one, so being it not positive means that the two cars collide.

Parameters:veh_index

the index of the vehicle whose V2V distance is returned, ranging from 0 to len(self.vehs)

(with 0 for the index of the ego).
Returns:a tuple (i, V2Vdist), where V2Vdist is the V2V distance from the indexed vehicle to the closest vehicle, with its index i, on the same route; i is None and V2Vdist == np.infty when there is no vehicle ahead.
get_features_tuple()

Get/calculate the features wrt. the current state variables.

Returns features tuple

goal_achieved

A property from the base class which is True if the goal of the road scenario is achieved, otherwise False.

This property is used in both step of EpisodicEnvBase and the implementation of the high-level reinforcement learning and execution.

init_APs(update_local_APs=True)

Initialize the global (and local if update_local_APs is true) atomic propositions (APs) of each vehicle using the current state. This method is basically called by “generate_scenario” method. Compared with update_APs, the way of setting ‘highest_priority’ is different.

Parameters:update_local_APs – True if the local APs are also necessary to be initialized; False if not, and only the global APs are required to be initialzied.
is_ego_off_road()

Checks if ego goes off-road.

Returns: True if ego goes off-road, False otherwise

is_terminal()

Check if the environment is at a terminal state or not.

Parameters:env – environment instance

Returns True if the environment has terminated

max_ego_theta = 1.0471975511965976

the maximum of the ego’s heading angle

n_other_vehs = None

the number of the other vehicles, initialized in generate_scenario

n_others_with_higher_priority
normalize_cost = False
normalize_tuple(vec, scale_factor=10)

Normalizes each element in a tuple according to ranges defined in self.cost_normalization_ranges. Normalizes between 0 and 1. And the scales by scale_factor.

Parameters:
  • vec – The tuple to be normalized
  • scale_factor – the scaling factor to multiply the normalized values by
Returns:

The normalized tuple

observation_space
render()

Gym compliant render function.

reset()

Gym compliant reset function.

Reinitialize this environment with whatever parameters it was initially called with.

Returns:the environment state after reset
Return type:new_state
set_ego_info_text(info)
step(u_ego)

Gym compliant step function.

Calculate the next continuous and discrete states of every vehicle using the given action u_ego for ego-vehicle and aggressive driving policy for the other vehicles. The variables in self.vehs will be automatically updated. Then, call the options _step and get the reward and terminal values.

Parameters:u_ego – the low level action [a, dot_phi] of the ego vehicle.
Returns:the new state of the environment R: reward obtained terminal: whether or not the current policy finished, that is, its termination_condition() is satisfied; TODO - also return true when the environment’s state indicates the episode is over. info: information variables
Return type:new_state
termination_condition

In the subclass, specify the condition for termination of the episode (or the maneuver).

update_APs(update_local_APs=True, update_highest_priority=True)

Update the global (and local if update_local_APs is true) atomic propositions (APs) of each vehicle using the current state. This method is basically called by “step” method below after it updates the vehicle’s continuous state. By a global AP, it means that the state information of the other cars has to be used to update the AP (whereas a local AP means that the information of the other cars are not necessary). For comparison and more details, see also “updateLocalAPs” method in vehicles module.

Parameters:update_local_APs – True if the local APs are also necessary to be update; False if not, and only the global APs are required to be updated.
veh_ahead_max = 40.0

The maximum distance within which self.vehs[i].APs[‘veh_ahead’] = True

veh_ahead_min = 2.5

The minimum distance that every two adjacent vehicles has to be apart from each other.

vehicle_network = None

the vehicle_nework for render, initialized in generate_scenario

vehs = None

the list of the vehicles, initialized in generate_scenario

window = None

env.simple_intersection.utilities module

class env.simple_intersection.utilities.BoundingBox

Bases: object

A class that provides utility functions related to bounding boxes.

  • Bounding box: ndarray of shape (4,2) containing the vertices of the
    rectangle in order
static all_same_sign_in_list(test_list)

Check if all elements in a list are of the same sign. Zero does not have a sign.

Parameters:test_list – the python list to be checked
Returns:1 or -1 depending upon the side w.r.t the edge
static does_bounding_box_cross_line(line, bb)

Checks if bounding box crosses a line. Line is represented by an np.ndarray of size (2,2) representing two points in it. Works only for quadrilateral bounding boxes (in 2D).

Parameters:
  • line – np.ndarray Two points representing the line
  • bb – np.ndarray Bounding box
Returns:

true if the bounding box crosses the line

static does_bounding_box_intersect(first_bb, second_bb)

Checks if bounding boxes intersect using separating axis test: Two objects don’t intersect if you can find a line that separates the two objects. Works only for quadrilateral bounding boxes (in 2D).

Parameters:
  • first_bb – np.ndarray First bounding box
  • second_bb – np.ndarray Second bounding box
Returns:

true if there is an intersection

static localize_bounding_box_wrt_line(line, bb)

Returns the side in which a bounding box is w.r.t a line. Line is represented by an np.ndarray of size (2,2) representing two points in it. Works only for quadrilateral bounding boxes (in 2D).

Parameters:
  • line – np.ndarray Two points representing the line
  • bb – np.ndarray Bounding box
Returns:

1 or -1 depending upon the side w.r.t the line. returns 0 if it intersects the line

static localize_point_wrt_line(line, testpoint)

Check the side in which a point lies w.r.t a line.

Parameters:
  • line – np.ndarray Two points representing the line
  • testpoint – the point to be checked
Returns:

1 or -1 depending upon the side w.r.t the edge; returns 0 if it is on the line

env.simple_intersection.utilities.calculate_s(v)

Calculate the distance traveling when the vehicle is maximally de- accelerating for a complete stop.

Parameters:v – the current speed of the vehicle.
Returns:the distance traveling when the vehicle is under the maximum de-acceleration to stop completely.
env.simple_intersection.utilities.calculate_v_max(dist)

Calculate the maximum velocity you can reach at the given position ahead.

Parameters:dist – the distance you travel from the current position.
Returns:the maximum reachable velocity.
env.simple_intersection.utilities.config_Pyglet(pyglet, win_x, win_y)

Configure pyglet and return the window that can be used by other pyglet objects.

Parameters:
  • win_x – x width of the window
  • win_y – y width of the window
Returns:

Pyglet window object

env.simple_intersection.utilities.draw_all_shapes(shapes)

Go through all the shapes and call their draw function.

Parameters:shapes – list of shapes
env.simple_intersection.utilities.get_APs(env, index, *args)

Retrieve atomic proposition data from the env.

Parameters:
  • env – SimpleIntersectionEnv object
  • index – vehicle index
  • args – atomic proposition names (variable arguments)
Returns:

multiline string with the required data

env.simple_intersection.utilities.get_veh_attributes(env, index, *args)

Retrieve vehicle attributes from env.

Parameters:
  • env – SimpleIntersectionEnv object
  • index – vehicle index
  • args – atomic proposition names (variable arguments)
Returns:

multiline string with the required data

env.simple_intersection.utilities.road2image(pos, which_dir)

Transform a 1D point to the 1D point in the graphics coordinate for the same direction (‘h’ or ‘v’).

Parameters:
  • pos – 1D position in x or y-axis of the road coordinate system
  • which_dir – the direction of the coordinate. It has to be either horizontal (‘h’) or vertical (‘v’).
Returns:

the corresponding 1D position in the graphics coordinate.

env.simple_intersection.vehicle_networks module

class env.simple_intersection.vehicle_networks.VehicleNetworkCross(image_url, env)

Bases: env.simple_intersection.shapes.Shape

Vehicle network for the cross scenario.

This scenario has just one horizontal lane and one vertical lane. Relevant constants are used from the constants file.

draw(ego_info_text)

Draw the cars. TODO: add the code for drawing the lines on the road, e.g.,:

:param ego_info: _text: the text to be displayed above ego
pyglet.graphics.draw(2, pyglet.gl.GL_LINES, (“v2f”, (0, 320, 640, 320)))

env.simple_intersection.vehicles module

class env.simple_intersection.vehicles.Vehicle(which_dir, lane=None, v_upper_lim=11.176)

Bases: env.simple_intersection.vehicles.VehicleState

A vehicle class that inherits VehicleState (so, contains the (continuous) state info.), and also have discrete state variables. (i.e., atomic propositions (APs)).

Properties are the same as of VehicleState +
  • APs: the class AtomicPropositions that stores all the APs
    of the vehicle as True or False.
  • waited_count: the number of time steps the vehicle stays
    in the stop region; it is -1 when the car is not in the stop region;
init_local_discrete_var()
is_within_road_boundaries(route)

Checks if this vehicle is within road boundaries given a route.

Parameters:route – route as defined by road_geokinemetry object
Returns:True if vehicle is within road boundaries, False otherwise
random_reset(which_dir, lane=None, v_upper_lim=11.176)

Randomly reset the properties of the class including vehicle’s state variables; the speed is [and the lane number can be] chosen randomly.

The parameters of the method are exactly same as those in the constructor.

reset(which_dir, lane, pos_in_lane, v, psi)

Reset the properties of the class including vehicle’s state variables; this reset method updates all the atomic propositions and the discrete state (i.e., waited_count).

step(u)

Calculate the next state and update the other properties for the given input u. NOTE: the global atomic propositions (‘highest_priority’ and ‘intersection_is_clear’) are not updated.

Parameters:u – a tuple or an array whose first element corresponds to the acceleration and the second element corresponds to the rate of change of the steering angle.
update_local_APs()

Update local atomic propositions (related to the vehicle, road geometry, and road environment); based upon the continuous and the previous discrete states, all the atomic propositions within the class are updated, except the global ones (‘highest_priority’, ‘intersection_is_clear’, and ‘veh_ahead’).

update_waited_count()

Increase waited_count if the vehicle is in the stop region on the same route; otherwise set waited_count = -1.

class env.simple_intersection.vehicles.VehicleState(which_dir, lane, pos_in_lane, v, psi)

Bases: object

A class that defines the vehicle’s (cont.) state variables and provides a mean to calculate the next state.

  • which_dir: the direction of the vehicle. It has to be either horizontal
    (‘h’) or vertical (‘v’);
  • x, y: the x and y position of the vehicle;
  • theta: the heading angle of the vehicle;
  • v: the velocity along the heading angle;
  • psi: the steering angle;
  • (acc, psi_dot): the acceleration and the rate of change of the
    steering angle psi. If u = (u[0], u[1]) in step is in the reasonable range, this is equal to the previous input u at the previous step.
  • method: either ‘Euler’ (Euler method) or ‘RK4’ (Runge-Kutta 4th-order)
get_bounding_box()

Calculate the bounding box of the vehicle.

Returns: the four points of the bounding box depending on the vehicle’s pose (x,y,theta)

step(u)

Calculate the next state and update the other properties for the given input u.

Parameters:u – a tuple or an array whose first element corresponds to the acceleration and the second element corresponds to the rate of change of the steering angle.

Module contents