maneuver_base.py 13.7 KB
Newer Older
Aravind Bk's avatar
Aravind Bk committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
import numpy as np
import gym
import env.simple_intersection.road_geokinemetry as rd
from env.simple_intersection.constants import DT, MAX_ACCELERATION, MAX_STEERING_ANGLE_RATE, MAX_STEERING_ANGLE
from env import EpisodicEnvBase


class ManeuverBase(EpisodicEnvBase):
    """The abstract class from which each maneuver is defined and inherited.

    In all of the maneuvers, we assume that the ego-vehicle is supposed
    to drive on the 'h'orizontal route.
    """

    trained_policy = None

    #: learning mode of the low-level policy.
    # It can be 'training' or for validation after learning, 'testing'.
    # In reset, the scenario is generated depending on learning_mode.
    learning_mode = 'training'

    #: timeout (i.e., time horizon for termination)
23
    # By default, the time-out horizon is 1.
Aravind Bk's avatar
Aravind Bk committed
24 25 26 27 28 29 30 31
    timeout = 1

    #: the option specific weight vector for cost of driving, which is
    # substituted to self.env.cost_weights in every step call
    # (with some modification on the action weights if
    # _extra_action_weights_flag = True); note that a cost is defined
    # as a negative reward, so a cost will be summed up to the reward
    # with subtraction.
32
    # TODO: remove or to provide additional functionality, keep _cost_weights in ManeuverBase here (see other TODOs in simple_intersection_env regarding _cost_weights).
33
    _cost_weights = (10.0 * 1e-3, 10.0 * 1e-3, 0.25 * 1e-3, 1.0 * 1e-3,
34
                     100.0 * 1e-3, 0.1 * 1e-3, 0.05 * 1e-3, 0.1 * 1e-3)
Aravind Bk's avatar
Aravind Bk committed
35 36 37 38 39 40 41

    _extra_r_terminal = None
    _extra_r_on_timeout = None

    #: the flag being False when _cost_weights is used without
    # modification; If True, then the action parts of _cost_weights
    # are increased for some edge cases (see the step method).
42
    _extra_action_weights_flag = False
Aravind Bk's avatar
Aravind Bk committed
43 44 45 46 47 48 49 50 51 52

    #: the extra weights on the actions added to _cost_weights
    # for some edge cases when _extra_action_weights_flag = True.
    _extra_action_weights = (100, 20)

    #: gives a reward equal to _extra_r_on_timeout when maneuver times out
    _give_reward_on_timeout = True

    #: enable the property for low-level policy training only, if any.
    # this flag is useful in low-level policy learning
53
    # (see maneuvers.py).
Aravind Bk's avatar
Aravind Bk committed
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
    _enable_low_level_training_properties = False

    #: the additional reward given to the high-level learner for choosing
    # the maneuver (each maneuver is an action in the high-level);
    # plays no role in low-level training.
    high_level_extra_reward = 0

    def __init__(self, env):
        """Constructor for the maneuver.

        Args:
            env: the SimpleIntersectionEnv reference
        """

        super().__init__()

        #: the associated environment
        # TODO: make a flag parameter for the deep-copy option of env.
        self.env = env
        self._reset_model_checker(env.ego.APs)

        #: target_lane for each maneuver is the current lane
        # by default and is substituted to self.env.target_lane
        # in every step call
        self._target_lane = self.env.ego.APs['lane']

        #: the reference velocity for each maneuver is
        # the speed limit of the road by default and is substituted
        # to self.env.v_ref in every step call
        self._v_ref = rd.speed_limit

        # (re-)initialize the parameters including _target_lane, _v_ref,
        # _cost_weights, if necessary in the subclass.
        self._init_param()

        #: time_out_horizon is the maximal time at which a maneuver
        #  must terminate, divided by DT.
        self.__time_out_horizon = np.round(self.timeout / DT)
        self.__time_out_count = 0

    def step(self, u_ego):
        """The main function, increases timeout count and calculates the reward
        based on model- and collision-checking.

        Returns: a tuple (reduced_features_tuple, R, terminal, info), where
            reduced_features_tuple: the features tuple after _features_dim_reduction;
            R: the reward for the transition;
            terminal: True if the next state is the terminal state, False if not;
            info: log info.
        """

        # Substitute the maneuver-specific variables to those in self.env
        self.env.v_ref = self._v_ref
        self.env.target_lane = self._target_lane

        if self._extra_action_weights_flag:
            ego = self.env.ego
            weights = list(self._cost_weights)
            if ego.v <= 0 and u_ego[0] < 0:
Ashish Gaurav's avatar
Ashish Gaurav committed
113 114
                weights[4] += self._extra_action_weights[0] * abs(
                    u_ego[0]) / MAX_ACCELERATION
Aravind Bk's avatar
Aravind Bk committed
115 116 117

            if abs(ego.psi) >= MAX_STEERING_ANGLE and \
                np.sign(ego.psi) == np.sign(u_ego[1]):
Ashish Gaurav's avatar
Ashish Gaurav committed
118 119
                weights[6] += self._extra_action_weights[1] * abs(
                    u_ego[1]) / MAX_STEERING_ANGLE_RATE
Aravind Bk's avatar
Aravind Bk committed
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156

            self.env.cost_weights = tuple(weights)

        else:
            self.env.cost_weights = self._cost_weights

        info = dict()

        features, env_r, env_terminal, env_info = self.env.step(u_ego)

        info.update(env_info)
        # _mc_AP has to be set before super().step.
        self._set_mc_AP(self.env.ego.APs)

        self.__time_out_count += 1

        # the super class reward = self._r_terminal
        # which is added to the final reward (see below).
        _, _, super_terminal, super_info = super().step(u_ego)

        info.update(super_info)
        terminal = env_terminal or super_terminal

        if env_terminal:
            if self.env.r_terminal is not None:
                # Remove the terminal reward from env_r as it is re-evaluated
                # through _terminal_reward_superposition below.
                env_r -= self.env.r_terminal
            self._terminal_reward_superposition(self.env.r_terminal)
        self._update_param()

        extra_termination_cond = self.extra_termination_condition
        if self.timeout_happened or extra_termination_cond:
            if extra_termination_cond:
                # in this case, no additional reward by Default
                # (i.e., self._extra_r_terminal = None by default).
                self._terminal_reward_superposition(self._extra_r_terminal)
157
                info['maneuver_termination_reason'] = 'extra_termination_condition'
Aravind Bk's avatar
Aravind Bk committed
158 159 160 161
            if self.timeout_happened:
                if self._give_reward_on_timeout:
                    # in this case, no additional reward by Default
                    # (i.e., self._extra_r_on_timeout = None by default).
Ashish Gaurav's avatar
Ashish Gaurav committed
162 163
                    self._terminal_reward_superposition(
                        self._extra_r_on_timeout)
Aravind Bk's avatar
Aravind Bk committed
164 165 166 167 168 169 170 171 172 173 174
                info['maneuver_termination_reason'] = 'timeout'
            terminal = True

        reward = env_r

        if self._r_terminal is not None:
            reward += self._r_terminal

        return self._features_dim_reduction(features), reward, terminal, info

    def get_reduced_feature_length(self):
Ashish Gaurav's avatar
Ashish Gaurav committed
175 176 177
        """get the length of the feature tuple after applying
        _features_dim_reduction.

Aravind Bk's avatar
Aravind Bk committed
178 179 180 181 182 183 184 185 186 187
        :return:
        """
        return len(self.get_reduced_features_tuple())

    def get_reduced_features_tuple(self):
        return self._features_dim_reduction(self.env.get_features_tuple())

    @property
    def observation_space(self):
        length = self.get_reduced_feature_length()
Ashish Gaurav's avatar
Ashish Gaurav committed
188 189 190 191
        return gym.spaces.Box(
            low=-np.finfo(np.float32).max,
            high=np.finfo(np.float32).max,
            shape=(length, ))
Aravind Bk's avatar
Aravind Bk committed
192 193 194

    @property
    def action_space(self):
Ashish Gaurav's avatar
Ashish Gaurav committed
195 196 197
        return gym.spaces.Box(
            low=np.array([-MAX_ACCELERATION, -MAX_STEERING_ANGLE_RATE]),
            high=np.array([MAX_ACCELERATION, MAX_STEERING_ANGLE_RATE]))
Aravind Bk's avatar
Aravind Bk committed
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219

    def reset(self):
        """Gym compliant reset function.

        Reset the environment as specified by the
        generate_learning_scenario() in this class and the maneuver,
        and then return the initial features tuple.

        Returns:
             features tuple after _features_dim_reduction

        Reinitialize the maneuver and the environment with whatever parameters
        it was initially called with. This will be implemented in each subclass maneuver.
        (to reset the maneuver part within the implementation of this method
        in the subclass, use reset_maneuver below after resetting the environment).
        Use this function only for the low-level policy learning of each maneuver.
        """
        if self.learning_mode == 'training':
            self.generate_learning_scenario()
        elif self.learning_mode == 'testing':
            self.generate_validation_scenario()
        else:
Ashish Gaurav's avatar
Ashish Gaurav committed
220 221
            raise ValueError(
                "learning_mode has to be either training or testing")
Aravind Bk's avatar
Aravind Bk committed
222 223 224 225 226

        self.reset_maneuver()

        # TODO: make this warning as a log, not a direct print.
        if not self.initiation_condition:
Ashish Gaurav's avatar
Ashish Gaurav committed
227 228
            print('\nWarning: the maneuver ' + self.__class__.__name__ +
                  ' is ready but the initiation condition is not satisfied.')
Aravind Bk's avatar
Aravind Bk committed
229 230 231 232 233 234 235 236 237 238

        return self._features_dim_reduction(self.env.get_features_tuple())

    def reset_maneuver(self):
        self.__init__(self.env)

    def render(self, mode='human'):
        self.env.render()  # Do nothing but calling self.env.render()

    def set_low_level_trained_policy(self, trained_policy):
Ashish Gaurav's avatar
Ashish Gaurav committed
239 240 241 242
        """Sets the trained policy as a function which takes in feature vector
        and returns an action (a, dot_psi).

        By default, trained_policy is None
Aravind Bk's avatar
Aravind Bk committed
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
        """
        self.trained_policy = trained_policy

    @property
    def termination_condition(self):
        return self.extra_termination_condition or self.timeout_happened or \
               super().termination_condition or self.env.termination_condition

    # A series of virtual functions that might need to be rewritten
    # (if necessary) in each subclass for each specific maneuver.

    @property
    def extra_termination_condition(self):
        return False

    @staticmethod
    def _features_dim_reduction(features_tuple):
Ashish Gaurav's avatar
Ashish Gaurav committed
260
        """Reduce the dimension of the features in step and reset.
Aravind Bk's avatar
Aravind Bk committed
261 262 263 264 265 266 267 268

        Param: features_tuple: a tuple obtained by e.g., self.env.get_features_tuple()
        Return: the reduced features tuple (by default, it returns features_tuple itself.
        """
        return features_tuple

    # TODO: Determine whether this method depends on the external features_tuple or for simplicity, define and use a features_tuple within the class.
    def low_level_policy(self, reduced_features_tuple):
Ashish Gaurav's avatar
Ashish Gaurav committed
269 270 271 272 273
        """the low level policy as a map from a feature vector to an action (a,
        dot_psi).

        By default, it'll call low_level_manual_policy below if it's
        implemented in the subclass.
Aravind Bk's avatar
Aravind Bk committed
274 275 276 277 278 279 280
        """
        if self.trained_policy is None:
            return self._low_level_manual_policy()
        else:
            return self.trained_policy(reduced_features_tuple)

    def _low_level_manual_policy(self):
Ashish Gaurav's avatar
Ashish Gaurav committed
281 282 283 284 285 286
        """the manually-defined low level policy as a map from a feature vector
        to an action (a, dot_psi).

        _low_level_policy will call this manual policy unless modified
        in the subclass. Implement this in the subclass whenever
        necessary.
Aravind Bk's avatar
Aravind Bk committed
287
        """
Ashish Gaurav's avatar
Ashish Gaurav committed
288 289
        raise NotImplemented(self.__class__.__name__ +
                             "._low_Level_manual_policy is not implemented.")
Aravind Bk's avatar
Aravind Bk committed
290 291

    def generate_learning_scenario(self):
Ashish Gaurav's avatar
Ashish Gaurav committed
292 293
        raise NotImplemented(self.__class__.__name__ +
                             ".generate_learning_scenario is not implemented.")
Aravind Bk's avatar
Aravind Bk committed
294

295 296
    # Override this method in the subclass if some customization is needed.
    def generate_validation_scenario(self):
Aravind Bk's avatar
Aravind Bk committed
297
        self.generate_learning_scenario()
298
        self._enable_low_level_training_properties = False
Aravind Bk's avatar
Aravind Bk committed
299

Ashish Gaurav's avatar
Ashish Gaurav committed
300 301 302 303
    def generate_scenario(self,
                          enable_LTL_preconditions=True,
                          timeout=np.infty,
                          **kwargs):
Ashish Gaurav's avatar
Ashish Gaurav committed
304 305 306
        """generates the scenario for low-level policy learning and validation.
        This method will be used in generate_learning_scenario and
        generate_validation_scenario in the subclasses.
Aravind Bk's avatar
Aravind Bk committed
307 308 309 310 311 312 313 314

        Param:
            enable_LTL_preconditions: whether to enable LTL preconditions in the maneuver or not
            timeout: the timeout for the scenario (which is infinity by default)
            **kwargs: the arguments of generate_scenario of the environment.
        """

        kwargs.setdefault('n_others_range', (0, 0))
Ashish Gaurav's avatar
Ashish Gaurav committed
315 316
        kwargs.setdefault('ego_pos_range',
                          (rd.hlanes.start_pos, rd.hlanes.end_pos))
Aravind Bk's avatar
Aravind Bk committed
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335

        while not self.env.generate_scenario(**kwargs):
            pass

        if enable_LTL_preconditions:
            self.env.enable_LTL_preconditions()
        else:
            self.env.disable_LTL_preconditions()

        self.timeout = timeout

    @property
    def initiation_condition(self):
        """this method specifies the initiation condition (or in a technical
        term, initiation set) of the maneuver.

        Returns True if the condition is satisfied, and False otherwise.
        """

336
        return not self.termination_condition and self.extra_initiation_condition
Aravind Bk's avatar
Aravind Bk committed
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363

    @property
    def extra_initiation_condition(self):
        return True

    @property
    def timeout_happened(self):
        return self.__time_out_count >= self.__time_out_horizon

    @property
    def goal_achieved(self):
        """Check whether the ego vehicle achieves the goal of the maneuver or
        not.

        By default, there is no goal, so the ego vehicle never achieves
        it (i.e., goal_achieved is always False).
        """

        return False

    def _init_param(self):
        """Initialize parameters in the constructor and 'ready' methods before
        applying the maneuver."""

        return  # do nothing unless specified in the subclass

    def _update_param(self):
Ashish Gaurav's avatar
Ashish Gaurav committed
364
        """Update the parameters in the gym-compliant 'step' method above."""
Aravind Bk's avatar
Aravind Bk committed
365 366

        return  # do nothing unless specified in the subclass