maneuver_base.py 13.6 KB
Newer Older
Aravind Bk's avatar
Aravind Bk committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
import numpy as np
import gym
import env.simple_intersection.road_geokinemetry as rd
from env.simple_intersection.constants import DT, MAX_ACCELERATION, MAX_STEERING_ANGLE_RATE, MAX_STEERING_ANGLE
from env import EpisodicEnvBase


class ManeuverBase(EpisodicEnvBase):
    """The abstract class from which each maneuver is defined and inherited.

    In all of the maneuvers, we assume that the ego-vehicle is supposed
    to drive on the 'h'orizontal route.
    """

    trained_policy = None

    #: learning mode of the low-level policy.
    # It can be 'training' or for validation after learning, 'testing'.
    # In reset, the scenario is generated depending on learning_mode.
    learning_mode = 'training'

    #: timeout (i.e., time horizon for termination)
23
    # By default, the time-out horizon is 1.
Aravind Bk's avatar
Aravind Bk committed
24 25 26 27 28 29 30 31
    timeout = 1

    #: the option specific weight vector for cost of driving, which is
    # substituted to self.env.cost_weights in every step call
    # (with some modification on the action weights if
    # _extra_action_weights_flag = True); note that a cost is defined
    # as a negative reward, so a cost will be summed up to the reward
    # with subtraction.
32
    _cost_weights = (10.0 * 1e-3, 10.0 * 1e-3, 0.25 * 1e-3, 1.0 * 1e-3,
Ashish Gaurav's avatar
Ashish Gaurav committed
33
                     100.0 * 1e-3, 0.1 * 1e-3, 0.25 * 1e-3, 0.1 * 1e-3)
Aravind Bk's avatar
Aravind Bk committed
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51

    _extra_r_terminal = None
    _extra_r_on_timeout = None

    #: the flag being False when _cost_weights is used without
    # modification; If True, then the action parts of _cost_weights
    # are increased for some edge cases (see the step method).
    _extra_action_weights_flag = True

    #: the extra weights on the actions added to _cost_weights
    # for some edge cases when _extra_action_weights_flag = True.
    _extra_action_weights = (100, 20)

    #: gives a reward equal to _extra_r_on_timeout when maneuver times out
    _give_reward_on_timeout = True

    #: enable the property for low-level policy training only, if any.
    # this flag is useful in low-level policy learning
52
    # (see maneuvers.py).
Aravind Bk's avatar
Aravind Bk committed
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
    _enable_low_level_training_properties = False

    #: the additional reward given to the high-level learner for choosing
    # the maneuver (each maneuver is an action in the high-level);
    # plays no role in low-level training.
    high_level_extra_reward = 0

    def __init__(self, env):
        """Constructor for the maneuver.

        Args:
            env: the SimpleIntersectionEnv reference
        """

        super().__init__()

        #: the associated environment
        # TODO: make a flag parameter for the deep-copy option of env.
        self.env = env
        self._reset_model_checker(env.ego.APs)

        #: target_lane for each maneuver is the current lane
        # by default and is substituted to self.env.target_lane
        # in every step call
        self._target_lane = self.env.ego.APs['lane']

        #: the reference velocity for each maneuver is
        # the speed limit of the road by default and is substituted
        # to self.env.v_ref in every step call
        self._v_ref = rd.speed_limit

        # (re-)initialize the parameters including _target_lane, _v_ref,
        # _cost_weights, if necessary in the subclass.
        self._init_param()

        #: time_out_horizon is the maximal time at which a maneuver
        #  must terminate, divided by DT.
        self.__time_out_horizon = np.round(self.timeout / DT)
        self.__time_out_count = 0

    def step(self, u_ego):
        """The main function, increases timeout count and calculates the reward
        based on model- and collision-checking.

        Returns: a tuple (reduced_features_tuple, R, terminal, info), where
            reduced_features_tuple: the features tuple after _features_dim_reduction;
            R: the reward for the transition;
            terminal: True if the next state is the terminal state, False if not;
            info: log info.
        """

        # Substitute the maneuver-specific variables to those in self.env
        self.env.v_ref = self._v_ref
        self.env.target_lane = self._target_lane

        if self._extra_action_weights_flag:
            ego = self.env.ego
            weights = list(self._cost_weights)
            if ego.v <= 0 and u_ego[0] < 0:
Ashish Gaurav's avatar
Ashish Gaurav committed
112 113
                weights[4] += self._extra_action_weights[0] * abs(
                    u_ego[0]) / MAX_ACCELERATION
Aravind Bk's avatar
Aravind Bk committed
114 115 116

            if abs(ego.psi) >= MAX_STEERING_ANGLE and \
                np.sign(ego.psi) == np.sign(u_ego[1]):
Ashish Gaurav's avatar
Ashish Gaurav committed
117 118
                weights[6] += self._extra_action_weights[1] * abs(
                    u_ego[1]) / MAX_STEERING_ANGLE_RATE
Aravind Bk's avatar
Aravind Bk committed
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155

            self.env.cost_weights = tuple(weights)

        else:
            self.env.cost_weights = self._cost_weights

        info = dict()

        features, env_r, env_terminal, env_info = self.env.step(u_ego)

        info.update(env_info)
        # _mc_AP has to be set before super().step.
        self._set_mc_AP(self.env.ego.APs)

        self.__time_out_count += 1

        # the super class reward = self._r_terminal
        # which is added to the final reward (see below).
        _, _, super_terminal, super_info = super().step(u_ego)

        info.update(super_info)
        terminal = env_terminal or super_terminal

        if env_terminal:
            if self.env.r_terminal is not None:
                # Remove the terminal reward from env_r as it is re-evaluated
                # through _terminal_reward_superposition below.
                env_r -= self.env.r_terminal
            self._terminal_reward_superposition(self.env.r_terminal)
        self._update_param()

        extra_termination_cond = self.extra_termination_condition
        if self.timeout_happened or extra_termination_cond:
            if extra_termination_cond:
                # in this case, no additional reward by Default
                # (i.e., self._extra_r_terminal = None by default).
                self._terminal_reward_superposition(self._extra_r_terminal)
156
                info['maneuver_termination_reason'] = 'extra_termination_condition'
Aravind Bk's avatar
Aravind Bk committed
157 158 159 160
            if self.timeout_happened:
                if self._give_reward_on_timeout:
                    # in this case, no additional reward by Default
                    # (i.e., self._extra_r_on_timeout = None by default).
Ashish Gaurav's avatar
Ashish Gaurav committed
161 162
                    self._terminal_reward_superposition(
                        self._extra_r_on_timeout)
Aravind Bk's avatar
Aravind Bk committed
163 164 165 166 167 168 169 170 171 172 173
                info['maneuver_termination_reason'] = 'timeout'
            terminal = True

        reward = env_r

        if self._r_terminal is not None:
            reward += self._r_terminal

        return self._features_dim_reduction(features), reward, terminal, info

    def get_reduced_feature_length(self):
Ashish Gaurav's avatar
Ashish Gaurav committed
174 175 176
        """get the length of the feature tuple after applying
        _features_dim_reduction.

Aravind Bk's avatar
Aravind Bk committed
177 178 179 180 181 182 183 184 185 186
        :return:
        """
        return len(self.get_reduced_features_tuple())

    def get_reduced_features_tuple(self):
        return self._features_dim_reduction(self.env.get_features_tuple())

    @property
    def observation_space(self):
        length = self.get_reduced_feature_length()
Ashish Gaurav's avatar
Ashish Gaurav committed
187 188 189 190
        return gym.spaces.Box(
            low=-np.finfo(np.float32).max,
            high=np.finfo(np.float32).max,
            shape=(length, ))
Aravind Bk's avatar
Aravind Bk committed
191 192 193

    @property
    def action_space(self):
Ashish Gaurav's avatar
Ashish Gaurav committed
194 195 196
        return gym.spaces.Box(
            low=np.array([-MAX_ACCELERATION, -MAX_STEERING_ANGLE_RATE]),
            high=np.array([MAX_ACCELERATION, MAX_STEERING_ANGLE_RATE]))
Aravind Bk's avatar
Aravind Bk committed
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218

    def reset(self):
        """Gym compliant reset function.

        Reset the environment as specified by the
        generate_learning_scenario() in this class and the maneuver,
        and then return the initial features tuple.

        Returns:
             features tuple after _features_dim_reduction

        Reinitialize the maneuver and the environment with whatever parameters
        it was initially called with. This will be implemented in each subclass maneuver.
        (to reset the maneuver part within the implementation of this method
        in the subclass, use reset_maneuver below after resetting the environment).
        Use this function only for the low-level policy learning of each maneuver.
        """
        if self.learning_mode == 'training':
            self.generate_learning_scenario()
        elif self.learning_mode == 'testing':
            self.generate_validation_scenario()
        else:
Ashish Gaurav's avatar
Ashish Gaurav committed
219 220
            raise ValueError(
                "learning_mode has to be either training or testing")
Aravind Bk's avatar
Aravind Bk committed
221 222 223 224 225

        self.reset_maneuver()

        # TODO: make this warning as a log, not a direct print.
        if not self.initiation_condition:
Ashish Gaurav's avatar
Ashish Gaurav committed
226 227
            print('\nWarning: the maneuver ' + self.__class__.__name__ +
                  ' is ready but the initiation condition is not satisfied.')
Aravind Bk's avatar
Aravind Bk committed
228 229 230 231 232 233 234 235 236 237

        return self._features_dim_reduction(self.env.get_features_tuple())

    def reset_maneuver(self):
        self.__init__(self.env)

    def render(self, mode='human'):
        self.env.render()  # Do nothing but calling self.env.render()

    def set_low_level_trained_policy(self, trained_policy):
Ashish Gaurav's avatar
Ashish Gaurav committed
238 239 240 241
        """Sets the trained policy as a function which takes in feature vector
        and returns an action (a, dot_psi).

        By default, trained_policy is None
Aravind Bk's avatar
Aravind Bk committed
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
        """
        self.trained_policy = trained_policy

    @property
    def termination_condition(self):
        return self.extra_termination_condition or self.timeout_happened or \
               super().termination_condition or self.env.termination_condition

    # A series of virtual functions that might need to be rewritten
    # (if necessary) in each subclass for each specific maneuver.

    @property
    def extra_termination_condition(self):
        return False

    @staticmethod
    def _features_dim_reduction(features_tuple):
Ashish Gaurav's avatar
Ashish Gaurav committed
259
        """Reduce the dimension of the features in step and reset.
Aravind Bk's avatar
Aravind Bk committed
260 261 262 263 264 265 266 267

        Param: features_tuple: a tuple obtained by e.g., self.env.get_features_tuple()
        Return: the reduced features tuple (by default, it returns features_tuple itself.
        """
        return features_tuple

    # TODO: Determine whether this method depends on the external features_tuple or for simplicity, define and use a features_tuple within the class.
    def low_level_policy(self, reduced_features_tuple):
Ashish Gaurav's avatar
Ashish Gaurav committed
268 269 270 271 272
        """the low level policy as a map from a feature vector to an action (a,
        dot_psi).

        By default, it'll call low_level_manual_policy below if it's
        implemented in the subclass.
Aravind Bk's avatar
Aravind Bk committed
273 274 275 276 277 278 279
        """
        if self.trained_policy is None:
            return self._low_level_manual_policy()
        else:
            return self.trained_policy(reduced_features_tuple)

    def _low_level_manual_policy(self):
Ashish Gaurav's avatar
Ashish Gaurav committed
280 281 282 283 284 285
        """the manually-defined low level policy as a map from a feature vector
        to an action (a, dot_psi).

        _low_level_policy will call this manual policy unless modified
        in the subclass. Implement this in the subclass whenever
        necessary.
Aravind Bk's avatar
Aravind Bk committed
286
        """
Ashish Gaurav's avatar
Ashish Gaurav committed
287 288
        raise NotImplemented(self.__class__.__name__ +
                             "._low_Level_manual_policy is not implemented.")
Aravind Bk's avatar
Aravind Bk committed
289 290

    def generate_learning_scenario(self):
Ashish Gaurav's avatar
Ashish Gaurav committed
291 292
        raise NotImplemented(self.__class__.__name__ +
                             ".generate_learning_scenario is not implemented.")
Aravind Bk's avatar
Aravind Bk committed
293

294 295
    # Override this method in the subclass if some customization is needed.
    def generate_validation_scenario(self):
Aravind Bk's avatar
Aravind Bk committed
296
        self.generate_learning_scenario()
297
        self._enable_low_level_training_properties = False
Aravind Bk's avatar
Aravind Bk committed
298

Ashish Gaurav's avatar
Ashish Gaurav committed
299 300 301 302
    def generate_scenario(self,
                          enable_LTL_preconditions=True,
                          timeout=np.infty,
                          **kwargs):
Ashish Gaurav's avatar
Ashish Gaurav committed
303 304 305
        """generates the scenario for low-level policy learning and validation.
        This method will be used in generate_learning_scenario and
        generate_validation_scenario in the subclasses.
Aravind Bk's avatar
Aravind Bk committed
306 307 308 309 310 311 312 313

        Param:
            enable_LTL_preconditions: whether to enable LTL preconditions in the maneuver or not
            timeout: the timeout for the scenario (which is infinity by default)
            **kwargs: the arguments of generate_scenario of the environment.
        """

        kwargs.setdefault('n_others_range', (0, 0))
Ashish Gaurav's avatar
Ashish Gaurav committed
314 315
        kwargs.setdefault('ego_pos_range',
                          (rd.hlanes.start_pos, rd.hlanes.end_pos))
Aravind Bk's avatar
Aravind Bk committed
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363

        while not self.env.generate_scenario(**kwargs):
            pass

        if enable_LTL_preconditions:
            self.env.enable_LTL_preconditions()
        else:
            self.env.disable_LTL_preconditions()

        self.timeout = timeout

    @property
    def initiation_condition(self):
        """this method specifies the initiation condition (or in a technical
        term, initiation set) of the maneuver.

        Returns True if the condition is satisfied, and False otherwise.
        """

        return not (self.env.termination_condition or self.violation_happened) and \
               self.extra_initiation_condition

    @property
    def extra_initiation_condition(self):
        return True

    @property
    def timeout_happened(self):
        return self.__time_out_count >= self.__time_out_horizon

    @property
    def goal_achieved(self):
        """Check whether the ego vehicle achieves the goal of the maneuver or
        not.

        By default, there is no goal, so the ego vehicle never achieves
        it (i.e., goal_achieved is always False).
        """

        return False

    def _init_param(self):
        """Initialize parameters in the constructor and 'ready' methods before
        applying the maneuver."""

        return  # do nothing unless specified in the subclass

    def _update_param(self):
Ashish Gaurav's avatar
Ashish Gaurav committed
364
        """Update the parameters in the gym-compliant 'step' method above."""
Aravind Bk's avatar
Aravind Bk committed
365 366

        return  # do nothing unless specified in the subclass