manual_policy.py 1.92 KB
Newer Older
Aravind Bk's avatar
Aravind Bk committed
1 2
from .controller_base import ControllerBase

Ashish Gaurav's avatar
Ashish Gaurav committed
3

Aravind Bk's avatar
Aravind Bk committed
4 5 6
class ManualPolicy(ControllerBase):
    """Manual policy execution using nodes and edges."""

Ashish Gaurav's avatar
Ashish Gaurav committed
7 8
    def __init__(self, env, low_level_policies, transition_adj,
                 start_node_alias):
Aravind Bk's avatar
Aravind Bk committed
9 10 11 12 13 14 15 16 17
        """Constructor for manual policy execution.

        Args:
            env: env instance
            low_level_policies: low level policies dictionary
            transition_adj: adjacent edges dictionary that contains transitions
            start_node: starting node
        """

Ashish Gaurav's avatar
Ashish Gaurav committed
18 19
        super(ManualPolicy, self).__init__(env, low_level_policies,
                                           start_node_alias)
Aravind Bk's avatar
Aravind Bk committed
20 21 22
        self.adj = transition_adj

    def _transition(self):
Ashish Gaurav's avatar
Ashish Gaurav committed
23 24
        """Check if the current node's termination condition is met and if it
        is possible to transition to another node, i.e. its initiation
Aravind Bk's avatar
Aravind Bk committed
25 26 27 28 29 30 31 32
        condition is met. This is an internal function.

        Returns the new node if a transition can happen, None otherwise
        """

        new_node = None
        if self.low_level_policies[self.current_node].termination_condition:
            for next_node in self.adj[self.current_node]:
33
                self.low_level_policies[next_node].reset_maneuver()
Aravind Bk's avatar
Aravind Bk committed
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
                if self.low_level_policies[next_node].initiation_condition:
                    new_node = next_node
                    break  # change current_node to the highest priority next node

        return new_node

    def can_transition(self):
        """Check if we can transition.

        Returns True if we can, false if we cannot.
        """

        return self._transition() is not None

    def do_transition(self, observation):
        """Do a singular transition using the specified edges.

        Args:
            observation: final observation from episodic step (not used)
        """

        new_node = self._transition()
        if new_node is not None:
Ashish Gaurav's avatar
Ashish Gaurav committed
57
            self.current_node = new_node