mcts_maneuvers.py 10.6 KB
Newer Older
Aravind Bk's avatar
Aravind Bk committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
from .maneuver_base import ManeuverBase
from env.simple_intersection.constants import *
import env.simple_intersection.road_geokinemetry as rd
from env.simple_intersection.features import extract_ego_features, extract_other_veh_features
from model_checker.simple_intersection import LTLProperty

import numpy as np
# TODO: separate out into different files?? is it really needed?


class MCTSKeepLane(ManeuverBase):

    def _init_param(self):
        self._v_ref = rd.speed_limit
        self._target_lane = self.env.ego.APs['lane']
        self._extra_action_weights_flag = True

    @property
    def extra_initiation_condition(self):
        return not self.env.ego.APs['veh_ahead']

    def generate_learning_scenario(self):
        self.generate_scenario(enable_LTL_preconditions=False,
                               ego_pos_range=(rd.intersection_width_w_offset/2, rd.hlanes.end_pos),
                               ego_perturb_lim=(rd.hlanes.width / 4, np.pi / 12),
                               ego_heading_towards_lane_centre=True)
        self.env._terminate_in_goal = True
        self.env._reward_in_goal = 200

    def generate_validation_scenario(self):
        self.generate_scenario(enable_LTL_preconditions=False, timeout=10,
                               ego_perturb_lim=(0, 0),
                               ego_heading_towards_lane_centre=True)
        self.env._terminate_in_goal = True
        self.env._reward_in_goal = 200

    @staticmethod
    def _features_dim_reduction(features_tuple):
        return features_tuple[0:5] + features_tuple[6:7] + features_tuple[9:11]


class MCTSStop(ManeuverBase):

    _terminate_in_goal = True
    _reward_in_goal = None

    def _init_param(self):
        self._set_v_ref()
        self._target_lane = self.env.ego.APs['lane']
        self._extra_action_weights_flag = True

    def _update_param(self):
        self._set_v_ref()

    def _set_v_ref(self):
        self._v_ref = rd.speed_limit
        x = self.env.ego.x
        if x <= rd.hlanes.near_stop_region:
            self._v_ref = rd.speed_limit
        elif x <= rd.hlanes.stop_region_centre:
            self._v_ref = - (rd.speed_limit / abs(rd.hlanes.near_stop_region)) * (x - rd.hlanes.stop_region_centre)
        else:
            self._v_ref = 0

    @property
    def goal_achieved(self):
        APs = self.env.ego.APs
        return APs['stopped_now'] and APs['in_stop_region']

    @property
    def extra_initiation_condition(self):
        APs = self.env.ego.APs
        return not APs['has_stopped_in_stop_region'] and \
               (APs['before_but_close_to_stop_region'] or
                APs['in_stop_region'])

    def generate_learning_scenario(self):
        self.generate_scenario(ego_pos_range=(rd.hlanes.start_pos, - rd.intersection_width_w_offset / 2),
                               ego_perturb_lim=(rd.hlanes.width / 4, np.pi / 6),
                               ego_heading_towards_lane_centre=True)

        self._reward_in_goal = 200

    def _low_level_manual_policy(self):
        return self.env.aggressive_driving_policy(EGO_INDEX)

    @staticmethod
    def _features_dim_reduction(features_tuple):
        return features_tuple[0:5] + features_tuple[6:7] + features_tuple[9:11] + features_tuple[12:13]


class MCTSWait(ManeuverBase):

    _penalty_in_violation = 200

    _terminate_in_goal = True
    _reward_in_goal = None

    def _init_param(self):
        ego = self.env.ego
        self._v_ref = 0.0
        self._target_lane = ego.APs['lane']

        #self._LTL_preconditions.append(
        #    LTLProperty("G ( has_stopped_in_stop_region => (in_stop_region or in_intersection))",
        #                self._penalty_in_violation))

        self._n_others_with_higher_priority = 0
        for veh in self.env.vehs[1:]:
            if veh.waited_count > ego.waited_count:
                self._n_others_with_higher_priority += 1

    def _update_param(self):
        if self.env.ego.APs['highest_priority']:
            self._v_ref = rd.speed_limit

    def generate_learning_scenario(self):
        n_others = np.random.randint(0, 3)
        self.generate_scenario(enable_LTL_preconditions=True, timeout=62,
                               n_others_range=(n_others, n_others),
                               ego_pos_range=rd.hlanes.stop_region,
                               n_others_stopped_in_stop_region=n_others,
                               ego_v_upper_lim=0, ego_perturb_lim=(rd.hlanes.width / 4, np.pi / 6),
                               ego_heading_towards_lane_centre=True)

        max_waited_count = 0
        for veh in self.env.vehs[1:]:
            max_waited_count = max(max_waited_count, veh.waited_count)

        self.env.ego.waited_count = np.random.randint(0, max_waited_count + 21)
        self.env.init_APs(False)

        self._reward_in_goal = 200

    def _low_level_manual_policy(self):
        return self.env.aggressive_driving_policy(EGO_INDEX)

    @property
    def goal_achieved(self):
        APs = self.env.ego.APs
        return APs['in_intersection'] and APs['highest_priority']

    @property
    def extra_initiation_condition(self):
        APs = self.env.ego.APs
        return (APs['stopped_now'] and APs['in_stop_region'])

    @property
    def extra_termination_condition(self):
        APs = self.env.ego.APs
        cond = (APs['in_intersection'] and not APs['highest_priority']) \
            or APs['over_speed_limit']
        if cond:
            self._extra_r_terminal = -200
        else:
            self._extra_r_terminal = None
        return cond

    @staticmethod
    def _features_dim_reduction(features_tuple):
        return tuple(features_tuple[i] for i in \
            [1, 2, 4, 6, 9, 10, 11, 18, 19])


class MCTSChangeLane(ManeuverBase):

    max_goal_distance = 21
    min_y_distance = rd.hlanes.width * 0.8

    _terminate_in_goal = True
    _reward_in_goal = None

    def _init_param(self):
        self._init_x = self.env.ego.x
        self._init_y = self.env.ego.y
        self._v_ref = rd.speed_limit
        self._target_lane = not self.env.ego.APs['lane']

        self._terminate_in_goal = True

    @property
    def goal_achieved(self):
        APs = self.env.ego.APs
        on_other_lane = APs['lane'] == self._target_lane
        within_goal_distance = abs(self.env.ego.x - self._init_x) <= self.max_goal_distance
        achieved_y_displacement = abs(self.env.ego.y - self._init_y) >= self.min_y_distance
        return on_other_lane and APs['on_route'] and achieved_y_displacement and \
                APs['parallel_to_lane'] and within_goal_distance

    @property
    def extra_initiation_condition(self):
        return not self.env.ego.APs['in_intersection'] and not self.env.ego.APs['in_stop_region']

    @property
    def extra_termination_condition(self):
        APs = self.env.ego.APs
        cond1 = not APs['on_route'] or APs['over_speed_limit']
        cond2 = APs['in_stop_region'] or APs['in_intersection']
        if cond1 or cond2:
            self._extra_r_terminal = -200
        else:
            self._extra_r_terminal = None
        return cond1 or cond2

    def _low_level_manual_policy(self):
        return self.env.aggressive_driving_policy(EGO_INDEX)

    def generate_learning_scenario(self):
        before_intersection = (rd.hlanes.start_pos, rd.hlanes.stop_region[0]-self.max_goal_distance)
        after_intersection = (rd.intersection_width_w_offset / 2, rd.hlanes.end_pos-self.max_goal_distance)
        random_placement_index = np.random.choice([0, 1])
        random_h_pos = before_intersection if random_placement_index == 0 else after_intersection
        self.generate_scenario(enable_LTL_preconditions=False, timeout=15,
                               ego_pos_range=random_h_pos,
                               ego_lane=np.random.choice([0, 1]),
                               ego_perturb_lim=(rd.hlanes.width/5, np.pi/6))
        # print('our range was %s, %s, ego at %s' % (before_intersection, after_intersection, self.env.ego.x))
        self._reward_in_goal = 200

    # TODO: It is not a good idea to specify features by numbers, as the list
    # of features is ever changing. We should specify them by strings.
    @staticmethod
    def _features_dim_reduction(features_tuple):
        return tuple(features_tuple[i] for i in \
            [1, 2, 3, 4, 5, 6, 7, 9, 10])


class MCTSFollow(ManeuverBase):

    __veh_ahead_buffer = 0.5

    _penalty_for_out_of_follow_range = None

    def _init_param(self):
        self._update_param()
        self._terminate_in_goal = True

        if self._target_veh_i is None:
            pass # print("\nWarning: initially, no vehicle is ahead of the ego in Follow maneuver.")

    def _update_param(self):
        self._target_veh_i, self._V2V_dist = self.env.get_V2V_distance()

    def generate_learning_scenario(self):
        self.generate_scenario(enable_LTL_preconditions=False,
                               n_others_range=(1, 1),
                               ego_perturb_lim=(rd.hlanes.width/2, np.pi/4),
                               veh_ahead_scenario=True)
        _penalty_for_out_of_follow_range = 100

    def generate_validation_scenario(self):
        self.generate_scenario(enable_LTL_preconditions=False,
                               n_others_range=(1, 1),
                               ego_perturb_lim=(0, 0),
                               veh_ahead_scenario=True)
        _penalty_for_out_of_follow_range = 100

    def _low_level_manual_policy(self):
        return self.env.aggressive_driving_policy(EGO_INDEX)

    @property
    def extra_initiation_condition(self):
        return self.env.ego.APs['veh_ahead']

    @property
    def extra_termination_condition(self):
        condition1 = self._target_veh_i is None

        condition2 = self.env.ego.APs['lane'] != self._target_lane

        condition3 = self._V2V_dist > self.env.veh_ahead_max + self.__veh_ahead_buffer or \
                     self._V2V_dist < self.env.veh_ahead_min - self.__veh_ahead_buffer

        if condition1 or condition2 or condition3:
            if self._penalty_for_out_of_follow_range is not None:
                self._extra_r_terminal = -self._penalty_for_out_of_follow_range
            else:
                self._extra_r_terminal = None
            return True
        else:
            self._extra_r_terminal = None
            return False

    def _features_dim_reduction(self, features_tuple):
        # TODO: Remove magic number 19 and 5 using features.py
        if self._target_veh_i is not None:
            start_of_veh_feature = 20+((self._target_veh_i-1)*5)
            return features_tuple[1:11] + features_tuple[start_of_veh_feature:start_of_veh_feature+4]
        else:
            return features_tuple[1:11] + (0.0, 0.0, 0.0, 0.0)