from .controller_base import ControllerBase class ManualPolicy(ControllerBase): """Manual policy execution using nodes and edges.""" def __init__(self, env, low_level_policies, transition_adj, start_node_alias): """Constructor for manual policy execution. Args: env: env instance low_level_policies: low level policies dictionary transition_adj: adjacent edges dictionary that contains transitions start_node: starting node """ super(ManualPolicy, self).__init__(env, low_level_policies, start_node_alias) self.adj = transition_adj def _transition(self): """Check if the current node's termination condition is met and if it is possible to transition to another node, i.e. its initiation condition is met. This is an internal function. Returns the new node if a transition can happen, None otherwise """ new_node = None if self.low_level_policies[self.current_node].termination_condition: for next_node in self.adj[self.current_node]: if self.low_level_policies[next_node].initiation_condition: new_node = next_node break # change current_node to the highest priority next node return new_node def can_transition(self): """Check if we can transition. Returns True if we can, false if we cannot. """ return self._transition() is not None def do_transition(self, observation): """Do a singular transition using the specified edges. Args: observation: final observation from episodic step (not used) """ new_node = self._transition() if new_node is not None: self.current_node = new_node