mcts_maneuvers.py 10.9 KB
Newer Older
Aravind Bk's avatar
Aravind Bk committed
1 2 3 4 5 6 7
from .maneuver_base import ManeuverBase
from env.simple_intersection.constants import *
import env.simple_intersection.road_geokinemetry as rd
from env.simple_intersection.features import extract_ego_features, extract_other_veh_features
from model_checker.simple_intersection import LTLProperty

import numpy as np
Ashish Gaurav's avatar
Ashish Gaurav committed
8

Aravind Bk's avatar
Aravind Bk committed
9 10 11 12 13 14 15 16 17 18 19 20 21 22
# TODO: separate out into different files?? is it really needed?


class MCTSKeepLane(ManeuverBase):
    def _init_param(self):
        self._v_ref = rd.speed_limit
        self._target_lane = self.env.ego.APs['lane']
        self._extra_action_weights_flag = True

    @property
    def extra_initiation_condition(self):
        return not self.env.ego.APs['veh_ahead']

    def generate_learning_scenario(self):
Ashish Gaurav's avatar
Ashish Gaurav committed
23 24 25 26 27 28
        self.generate_scenario(
            enable_LTL_preconditions=False,
            ego_pos_range=(rd.intersection_width_w_offset / 2,
                           rd.hlanes.end_pos),
            ego_perturb_lim=(rd.hlanes.width / 4, np.pi / 12),
            ego_heading_towards_lane_centre=True)
Aravind Bk's avatar
Aravind Bk committed
29 30 31 32
        self.env._terminate_in_goal = True
        self.env._reward_in_goal = 200

    def generate_validation_scenario(self):
Ashish Gaurav's avatar
Ashish Gaurav committed
33 34 35 36 37
        self.generate_scenario(
            enable_LTL_preconditions=False,
            timeout=10,
            ego_perturb_lim=(0, 0),
            ego_heading_towards_lane_centre=True)
Aravind Bk's avatar
Aravind Bk committed
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
        self.env._terminate_in_goal = True
        self.env._reward_in_goal = 200

    @staticmethod
    def _features_dim_reduction(features_tuple):
        return features_tuple[0:5] + features_tuple[6:7] + features_tuple[9:11]


class MCTSStop(ManeuverBase):

    _terminate_in_goal = True
    _reward_in_goal = None

    def _init_param(self):
        self._set_v_ref()
        self._target_lane = self.env.ego.APs['lane']
        self._extra_action_weights_flag = True

    def _update_param(self):
        self._set_v_ref()

    def _set_v_ref(self):
        self._v_ref = rd.speed_limit
        x = self.env.ego.x
        if x <= rd.hlanes.near_stop_region:
            self._v_ref = rd.speed_limit
        elif x <= rd.hlanes.stop_region_centre:
Ashish Gaurav's avatar
Ashish Gaurav committed
65 66
            self._v_ref = -(rd.speed_limit / abs(rd.hlanes.near_stop_region)
                            ) * (x - rd.hlanes.stop_region_centre)
Aravind Bk's avatar
Aravind Bk committed
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
        else:
            self._v_ref = 0

    @property
    def goal_achieved(self):
        APs = self.env.ego.APs
        return APs['stopped_now'] and APs['in_stop_region']

    @property
    def extra_initiation_condition(self):
        APs = self.env.ego.APs
        return not APs['has_stopped_in_stop_region'] and \
               (APs['before_but_close_to_stop_region'] or
                APs['in_stop_region'])

    def generate_learning_scenario(self):
Ashish Gaurav's avatar
Ashish Gaurav committed
83 84 85 86 87
        self.generate_scenario(
            ego_pos_range=(rd.hlanes.start_pos,
                           -rd.intersection_width_w_offset / 2),
            ego_perturb_lim=(rd.hlanes.width / 4, np.pi / 6),
            ego_heading_towards_lane_centre=True)
Aravind Bk's avatar
Aravind Bk committed
88 89 90 91 92 93 94 95

        self._reward_in_goal = 200

    def _low_level_manual_policy(self):
        return self.env.aggressive_driving_policy(EGO_INDEX)

    @staticmethod
    def _features_dim_reduction(features_tuple):
Ashish Gaurav's avatar
Ashish Gaurav committed
96 97 98 99 100
        return features_tuple[0:
                              5] + features_tuple[6:
                                                  7] + features_tuple[9:
                                                                      11] + features_tuple[12:
                                                                                           13]
Aravind Bk's avatar
Aravind Bk committed
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129


class MCTSWait(ManeuverBase):

    _penalty_in_violation = 200

    _terminate_in_goal = True
    _reward_in_goal = None

    def _init_param(self):
        ego = self.env.ego
        self._v_ref = 0.0
        self._target_lane = ego.APs['lane']

        #self._LTL_preconditions.append(
        #    LTLProperty("G ( has_stopped_in_stop_region => (in_stop_region or in_intersection))",
        #                self._penalty_in_violation))

        self._n_others_with_higher_priority = 0
        for veh in self.env.vehs[1:]:
            if veh.waited_count > ego.waited_count:
                self._n_others_with_higher_priority += 1

    def _update_param(self):
        if self.env.ego.APs['highest_priority']:
            self._v_ref = rd.speed_limit

    def generate_learning_scenario(self):
        n_others = np.random.randint(0, 3)
Ashish Gaurav's avatar
Ashish Gaurav committed
130 131 132 133 134 135 136 137 138
        self.generate_scenario(
            enable_LTL_preconditions=True,
            timeout=62,
            n_others_range=(n_others, n_others),
            ego_pos_range=rd.hlanes.stop_region,
            n_others_stopped_in_stop_region=n_others,
            ego_v_upper_lim=0,
            ego_perturb_lim=(rd.hlanes.width / 4, np.pi / 6),
            ego_heading_towards_lane_centre=True)
Aravind Bk's avatar
Aravind Bk committed
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198

        max_waited_count = 0
        for veh in self.env.vehs[1:]:
            max_waited_count = max(max_waited_count, veh.waited_count)

        self.env.ego.waited_count = np.random.randint(0, max_waited_count + 21)
        self.env.init_APs(False)

        self._reward_in_goal = 200

    def _low_level_manual_policy(self):
        return self.env.aggressive_driving_policy(EGO_INDEX)

    @property
    def goal_achieved(self):
        APs = self.env.ego.APs
        return APs['in_intersection'] and APs['highest_priority']

    @property
    def extra_initiation_condition(self):
        APs = self.env.ego.APs
        return (APs['stopped_now'] and APs['in_stop_region'])

    @property
    def extra_termination_condition(self):
        APs = self.env.ego.APs
        cond = (APs['in_intersection'] and not APs['highest_priority']) \
            or APs['over_speed_limit']
        if cond:
            self._extra_r_terminal = -200
        else:
            self._extra_r_terminal = None
        return cond

    @staticmethod
    def _features_dim_reduction(features_tuple):
        return tuple(features_tuple[i] for i in \
            [1, 2, 4, 6, 9, 10, 11, 18, 19])


class MCTSChangeLane(ManeuverBase):

    max_goal_distance = 21
    min_y_distance = rd.hlanes.width * 0.8

    _terminate_in_goal = True
    _reward_in_goal = None

    def _init_param(self):
        self._init_x = self.env.ego.x
        self._init_y = self.env.ego.y
        self._v_ref = rd.speed_limit
        self._target_lane = not self.env.ego.APs['lane']

        self._terminate_in_goal = True

    @property
    def goal_achieved(self):
        APs = self.env.ego.APs
        on_other_lane = APs['lane'] == self._target_lane
Ashish Gaurav's avatar
Ashish Gaurav committed
199 200 201 202
        within_goal_distance = abs(self.env.ego.x -
                                   self._init_x) <= self.max_goal_distance
        achieved_y_displacement = abs(self.env.ego.y -
                                      self._init_y) >= self.min_y_distance
Aravind Bk's avatar
Aravind Bk committed
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
        return on_other_lane and APs['on_route'] and achieved_y_displacement and \
                APs['parallel_to_lane'] and within_goal_distance

    @property
    def extra_initiation_condition(self):
        return not self.env.ego.APs['in_intersection'] and not self.env.ego.APs['in_stop_region']

    @property
    def extra_termination_condition(self):
        APs = self.env.ego.APs
        cond1 = not APs['on_route'] or APs['over_speed_limit']
        cond2 = APs['in_stop_region'] or APs['in_intersection']
        if cond1 or cond2:
            self._extra_r_terminal = -200
        else:
            self._extra_r_terminal = None
        return cond1 or cond2

    def _low_level_manual_policy(self):
        return self.env.aggressive_driving_policy(EGO_INDEX)

    def generate_learning_scenario(self):
Ashish Gaurav's avatar
Ashish Gaurav committed
225 226 227 228 229
        before_intersection = (
            rd.hlanes.start_pos,
            rd.hlanes.stop_region[0] - self.max_goal_distance)
        after_intersection = (rd.intersection_width_w_offset / 2,
                              rd.hlanes.end_pos - self.max_goal_distance)
Aravind Bk's avatar
Aravind Bk committed
230 231
        random_placement_index = np.random.choice([0, 1])
        random_h_pos = before_intersection if random_placement_index == 0 else after_intersection
Ashish Gaurav's avatar
Ashish Gaurav committed
232 233 234 235 236 237
        self.generate_scenario(
            enable_LTL_preconditions=False,
            timeout=15,
            ego_pos_range=random_h_pos,
            ego_lane=np.random.choice([0, 1]),
            ego_perturb_lim=(rd.hlanes.width / 5, np.pi / 6))
Aravind Bk's avatar
Aravind Bk committed
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
        # print('our range was %s, %s, ego at %s' % (before_intersection, after_intersection, self.env.ego.x))
        self._reward_in_goal = 200

    # TODO: It is not a good idea to specify features by numbers, as the list
    # of features is ever changing. We should specify them by strings.
    @staticmethod
    def _features_dim_reduction(features_tuple):
        return tuple(features_tuple[i] for i in \
            [1, 2, 3, 4, 5, 6, 7, 9, 10])


class MCTSFollow(ManeuverBase):

    __veh_ahead_buffer = 0.5

    _penalty_for_out_of_follow_range = None

    def _init_param(self):
        self._update_param()
        self._terminate_in_goal = True

        if self._target_veh_i is None:
Ashish Gaurav's avatar
Ashish Gaurav committed
260
            pass  # print("\nWarning: initially, no vehicle is ahead of the ego in Follow maneuver.")
Aravind Bk's avatar
Aravind Bk committed
261 262 263 264 265

    def _update_param(self):
        self._target_veh_i, self._V2V_dist = self.env.get_V2V_distance()

    def generate_learning_scenario(self):
Ashish Gaurav's avatar
Ashish Gaurav committed
266 267 268 269 270
        self.generate_scenario(
            enable_LTL_preconditions=False,
            n_others_range=(1, 1),
            ego_perturb_lim=(rd.hlanes.width / 2, np.pi / 4),
            veh_ahead_scenario=True)
Aravind Bk's avatar
Aravind Bk committed
271 272 273
        _penalty_for_out_of_follow_range = 100

    def generate_validation_scenario(self):
Ashish Gaurav's avatar
Ashish Gaurav committed
274 275 276 277 278
        self.generate_scenario(
            enable_LTL_preconditions=False,
            n_others_range=(1, 1),
            ego_perturb_lim=(0, 0),
            veh_ahead_scenario=True)
Aravind Bk's avatar
Aravind Bk committed
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
        _penalty_for_out_of_follow_range = 100

    def _low_level_manual_policy(self):
        return self.env.aggressive_driving_policy(EGO_INDEX)

    @property
    def extra_initiation_condition(self):
        return self.env.ego.APs['veh_ahead']

    @property
    def extra_termination_condition(self):
        condition1 = self._target_veh_i is None

        condition2 = self.env.ego.APs['lane'] != self._target_lane

        condition3 = self._V2V_dist > self.env.veh_ahead_max + self.__veh_ahead_buffer or \
                     self._V2V_dist < self.env.veh_ahead_min - self.__veh_ahead_buffer

        if condition1 or condition2 or condition3:
            if self._penalty_for_out_of_follow_range is not None:
                self._extra_r_terminal = -self._penalty_for_out_of_follow_range
            else:
                self._extra_r_terminal = None
            return True
        else:
            self._extra_r_terminal = None
            return False

    def _features_dim_reduction(self, features_tuple):
        # TODO: Remove magic number 19 and 5 using features.py
        if self._target_veh_i is not None:
Ashish Gaurav's avatar
Ashish Gaurav committed
310 311 312 313
            start_of_veh_feature = 20 + ((self._target_veh_i - 1) * 5)
            return features_tuple[1:11] + features_tuple[start_of_veh_feature:
                                                         start_of_veh_feature +
                                                         4]
Aravind Bk's avatar
Aravind Bk committed
314
        else:
Ashish Gaurav's avatar
Ashish Gaurav committed
315
            return features_tuple[1:11] + (0.0, 0.0, 0.0, 0.0)