from .maneuver_base import ManeuverBase from env.simple_intersection.constants import * import env.simple_intersection.road_geokinemetry as rd from env.simple_intersection.features import extract_ego_features, extract_other_veh_features from model_checker.simple_intersection import LTLProperty import numpy as np # TODO: separate out into different files?? is it really needed? class KeepLane(ManeuverBase): def _init_param(self): self._v_ref = rd.speed_limit self._target_lane = self.env.ego.APs['lane'] def _init_LTL_preconditions(self): self._LTL_preconditions.append(LTLProperty("G ( not veh_ahead )", 0)) self._LTL_preconditions.append(LTLProperty("G ( not stopped_now )", 200, self._enable_low_level_training_properties)) self._LTL_preconditions.append( LTLProperty("G ( (lane and target_lane) or (not lane and not target_lane) )", 200, self._enable_low_level_training_properties)) def generate_learning_scenario(self): self.generate_scenario(enable_LTL_preconditions=False, ego_pos_range=(rd.hlanes.start_pos, rd.hlanes.end_pos), ego_perturb_lim=(rd.hlanes.width / 4, np.pi / 6), ego_heading_towards_lane_centre=True) # the goal reward and termination is led by the SimpleIntersectionEnv self.env._terminate_in_goal = True self.env._reward_in_goal = 200 self._enable_low_level_training_properties = True @staticmethod def _features_dim_reduction(features_tuple): return extract_ego_features(features_tuple, 'pos_near_stop_region', 'v', 'v_ref', 'e_y', 'psi', 'theta', 'acc', 'psi_dot') class Stop(ManeuverBase): _terminate_in_goal = True _reward_in_goal = None def _init_param(self): self._set_v_ref() self._target_lane = self.env.ego.APs['lane'] def _init_LTL_preconditions(self): self._LTL_preconditions.append(LTLProperty("G ( not has_stopped_in_stop_region )", self._penalty(self._reward_in_goal))) self._LTL_preconditions.append( LTLProperty("G ( (before_but_close_to_stop_region or in_stop_region) U has_stopped_in_stop_region )", 0)) self._LTL_preconditions.append( LTLProperty("G ( not stopped_now U in_stop_region )", 200, self._enable_low_level_training_properties)) self._LTL_preconditions.append( LTLProperty("G ( (lane and target_lane) or (not lane and not target_lane) )", 200, self._enable_low_level_training_properties)) def _update_param(self): self._set_v_ref() def _set_v_ref(self): self._v_ref = rd.speed_limit x = self.env.ego.x if x <= rd.hlanes.near_stop_region: self._v_ref = rd.speed_limit elif x <= rd.hlanes.stop_region_centre: self._v_ref = - (rd.speed_limit / abs(rd.hlanes.near_stop_region)) * (x - rd.hlanes.stop_region_centre) else: self._v_ref = 0 def generate_learning_scenario(self): self.generate_scenario(ego_pos_range=(rd.hlanes.near_stop_region, - rd.intersection_width_w_offset / 2), ego_perturb_lim=(rd.hlanes.width / 4, np.pi / 6), ego_heading_towards_lane_centre=True) self._reward_in_goal = 200 self._enable_low_level_training_properties = True def _low_level_manual_policy(self): return self.env.aggressive_driving_policy(EGO_INDEX) @staticmethod def _features_dim_reduction(features_tuple): return extract_ego_features(features_tuple, 'pos_near_stop_region', 'v', 'v_ref', 'e_y', 'psi', 'theta', 'acc', 'psi_dot', 'not_in_stop_region') class Wait(ManeuverBase): _terminate_in_goal = True _reward_in_goal = None def _init_LTL_preconditions(self): self._LTL_preconditions.append( LTLProperty("G ( (in_stop_region and stopped_now) U highest_priority )", 0)) self._LTL_preconditions.append( LTLProperty("G ( not (in_intersection and highest_priority) )", self._penalty(self._reward_in_goal))) def _init_param(self): ego = self.env.ego self._v_ref = rd.speed_limit if self.env.ego.APs['highest_priority'] else 0 self._target_lane = ego.APs['lane'] self._ego_stop_count = 0 def _update_param(self): if self.env.ego.APs['highest_priority']: self._v_ref = rd.speed_limit if self._enable_low_level_training_properties: if self.env.n_others_with_higher_priority == 0: self._ego_stop_count += 1 def generate_learning_scenario(self): n_others = np.random.randint(0, 3) self.generate_scenario(enable_LTL_preconditions=True, timeout=62, n_others_range=(n_others, n_others), ego_pos_range=rd.hlanes.stop_region, n_others_stopped_in_stop_region=n_others, ego_v_upper_lim=0, ego_perturb_lim=(rd.hlanes.width / 4, np.pi / 6), ego_heading_towards_lane_centre=True) max_waited_count = 0 for veh in self.env.vehs[1:]: max_waited_count = max(max_waited_count, veh.waited_count) self._extra_action_weights_flag = False self.env.ego.waited_count = np.random.randint(0, max_waited_count + 21) self.env.init_APs(False) self._reward_in_goal = 200 self._extra_r_on_timeout = -200 self._enable_low_level_training_properties = True self._ego_stop_count = 0 @property def extra_termination_condition(self): if self._enable_low_level_training_properties: # activated only for the low-level training. if self._ego_stop_count >= 50: self._extra_r_terminal = -200 return True else: self._extra_r_terminal = None return False @staticmethod def _features_dim_reduction(features_tuple): return extract_ego_features(features_tuple, 'v', 'v_ref', 'psi', 'theta', 'acc', 'psi_dot', 'pos_stop_region', 'intersection_is_clear', 'highest_priority') class ChangeLane(ManeuverBase): min_y_distance = rd.hlanes.width / 4 _terminate_in_goal = True _reward_in_goal = None _violation_penalty_in_low_level_training = None high_level_extra_reward = -20 def _init_param(self): self._v_ref = rd.speed_limit self._target_lane = not self.env.ego.APs['lane'] self._terminate_in_goal = True def _init_LTL_preconditions(self): self._LTL_preconditions.append( LTLProperty("G ( on_route and not over_speed_limit )", self._violation_penalty_in_low_level_training, self._enable_low_level_training_properties)) self._LTL_preconditions.append(LTLProperty("G ( not stopped_now )", self._violation_penalty_in_low_level_training, self._enable_low_level_training_properties)) @property def goal_achieved(self): ego = self.env.ego APs = self.env.ego.APs on_other_lane = APs['lane'] == self._target_lane achieved_y_displacement = np.sign(ego.y) * \ (ego.y - rd.hlanes.centres[APs['target_lane']]) >= - self.min_y_distance return on_other_lane and APs['on_route'] and \ achieved_y_displacement and APs['parallel_to_lane'] def _low_level_manual_policy(self): return self.env.aggressive_driving_policy(EGO_INDEX) def generate_learning_scenario(self): self.generate_scenario(enable_LTL_preconditions=False, timeout=15, ego_pos_range=(rd.hlanes.start_pos, rd.hlanes.end_pos), ego_lane=np.random.choice([0, 1]), ego_perturb_lim=(rd.hlanes.width/5, np.pi/6)) # print('our range was %s, %s, ego at %s' % (before_intersection, after_intersection, self.env.ego.x)) self._reward_in_goal = 200 self._violation_penalty_in_low_level_training = 200 self._enable_low_level_training_properties = True # TODO: It is not a good idea to specify features by numbers, as the list # of features is ever changing. We should specify them by strings. @staticmethod def _features_dim_reduction(features_tuple): return extract_ego_features(features_tuple, 'v', 'v_ref', 'e_y', 'psi', 'v tan(psi/L)', 'theta', 'lane', 'acc', 'psi_dot') class Follow(ManeuverBase): _target_veh_i = None _penalty_for_out_of_follow_range = None def _init_LTL_preconditions(self): self._LTL_preconditions.append( LTLProperty("G ( veh_ahead )", self._penalty_for_out_of_follow_range)) self._LTL_preconditions.append( LTLProperty("G ( (lane and target_lane) or (not lane and not target_lane) )", self._penalty_for_out_of_follow_range)) self._LTL_preconditions.append( LTLProperty("G ( not stopped_now U veh_ahead_stopped_now)", 200, self._enable_low_level_training_properties)) self._LTL_preconditions.append( LTLProperty("G ( not veh_ahead_too_close )", 200, self._enable_low_level_training_properties)) def generate_learning_scenario(self): self.generate_scenario(enable_LTL_preconditions=False, n_others_range=(1, 1), ego_perturb_lim=(rd.hlanes.width/2, np.pi/4), veh_ahead_scenario=True) self.env._terminate_in_goal = False self._penalty_for_out_of_follow_range = 200 self._enable_low_level_training_properties = True def _update_param(self): self._target_veh_i, _ = self.env.get_V2V_distance() def _low_level_manual_policy(self): return self.env.aggressive_driving_policy(EGO_INDEX) def _features_dim_reduction(self, features_tuple): ego_features = extract_ego_features(features_tuple, 'v', 'v_ref', 'e_y', 'psi', 'v tan(psi/L)', 'theta', 'lane', 'e_y,lane', 'acc', 'psi_dot') if self._target_veh_i is not None: return ego_features + extract_other_veh_features(features_tuple, self._target_veh_i, 'rel_x', 'rel_y', 'v', 'acc') else: return ego_features + (0.0, 0.0, 0.0, 0.0)