Skip to content
Snippets Groups Projects
Commit 66a34752 authored by Shamak Dutta's avatar Shamak Dutta
Browse files

Initial commit

parents
No related branches found
No related tags found
No related merge requests found
# Maze World - Assignment 2
Assignment code for course ECE 493 T25 at the University of Waterloo in Spring 2019.
(*Code designed and created by Sriram Ganapathi Subramanian and Mark Crowley, 2019*)
**Due Date:** July 30 11:59pm submitted as PDF and code to LEARN dropbox.
**Collaboration:** You can discuss solutions and help to work out the code. But each person *must do their own work*. All code and writing will be cross-checked against each other and against internet databases for cheating.
Updates to code which will be useful for all or bugs in the provided code will be updated on gitlab and announced.
## Domain Description - GridWorld
The domain consists of a 10x10 grid of cells. The agent being controlled is represented as a red square. The goal is a yellow oval and you receive a reward of 1 for reaching it, this ends and resets the episode.
Blue squares are **pits** which yield a penalty of -10 and end the episode.
Black squares are **walls** which cannot be passed through. If the agent tries to walk into a wall they will remain in their current position and receive a penalty of -.3.
Their are **three tasks** defined in `run_main.py` which can be commented out to try each. They include a combination of pillars, rooms, pits and obstacles. The aim is to learn a policy that maximizes expected reward and reaches the goal as quickly as possible.
# <img src="task1.png" width="300"/><img src="task2.png" width="300"/><img src="task3.png" width="300"/>
## Assignment Requirements
This assignment will have a written component and a programming component.
Clone the mazeworld environment locally and run the code looking at the implemtation of the sample algorithm.
Your task is to implement three other algortihms on this domain.
- **(20%)** Implement SARSA
- **(20%)** Implement QLearning
- **(20%)** At least one other algorithm of your choice or own design.
Suggestions to try:
- Policy Iteration (easy)
- Expected SARSA (less easy)
- Double Q-Learning (less easy)
- n-step TD or TD(Lambda) with eligibility traces (harder)
- Policy Gradients (harderer)
- **(10%) bonus** Implement four algorithms in total (you can do more but we'll only look at four, you need to tell us which).
- **(40%)** Report : Write a short report on the problem and the results of your three algorithms. The report should be submited on LEARN as a pdf.
- Describing each algorithm you used, define the states, actions, dynamics. Define the mathematical formulation of your algorithm, show the Bellman updates for you use.
- Some quantitative analysis of the results, a default plot for comparing all algorithms is given. You can do more than that.
- Some qualitative analysis of why one algorithm works well in each case, what you noticed along the way.
### Evaluation
You will also submit your code to LEARN and grading will be carried out using a combination of automated and manual grading.
Your algorithms should follow the pattern of the `RL_brain.py` and `RL_brainsample_PI.py` files.
We will look at your definition and implmentation which should match the description in the document.
We will also automatically run your code on the given domain on the three tasks define in `run_main.py` as well as other maps you have not seen in order to evaluate it.
Part of your grade will come from the overall performance of your algorithm on each domain.
So make sure your code runs with the given unmodified `run_main` and `maze_end` code if we import your class names.
### Code Suggestions
- When the number of episodes ends a plot is displayed of the algorithm performance. If multiple algorithms are run at once then they will be all plotted together for comparison. You may modify the plotting code and add any other analysis you need, this is only a starting point.
- there are a number of parameters defined in `run_main` that can be used to speed up the simulations. Once you have debugged an algorithm and see it is running you can alter the `sim_speed`, `\*EveryNth` variables to alter the speed of each step and how often data is printed or updated visually to speed up training.
- For the default algorithms we have implmented on these domains it seems to take at least 1500 episodes to converge, so don't read too much into how it looks after a few hundred.
<img src="plot.png" width="400"/><img src="plotzoom.png" width="400"/>
import numpy as np
import pandas as pd
class rlalgorithm:
def __init__(self, actions, ...):
self.actions = actions
def choose_action(self, observation):
# implement this.
return action
def learn(...):
#implement this. Learn something from the transition
import numpy as np
import pandas as pd
class rlalgorithm:
def __init__(self, actions, learning_rate=0.01, reward_decay=0.9, e_greedy=0.1):
self.actions = actions
self.lr = learning_rate
self.gamma = reward_decay
self.epsilon = e_greedy
self.q_table = pd.DataFrame(columns=self.actions, dtype=np.float64)
self.display_name="Asynch PI Hack"
'''Choose the next action to take given the observed state using an epsilon greedy policy'''
def choose_action(self, observation):
self.check_state_exist(observation)
#BUG: Epsilon should be .1 and signify the small probability of NOT choosing max action
if np.random.uniform() >= self.epsilon:
state_action = self.q_table.loc[observation, :]
action = np.random.choice(state_action[state_action == np.max(state_action)].index)
else:
action = np.random.choice(self.actions)
return action
'''Update the Q(S,A) state-action value table using the latest experience
This is a not a very good learning update
'''
def learn(self, s, a, r, s_):
self.check_state_exist(s_)
if s_ != 'terminal':
a_ = self.choose_action(str(s_))
q_target = r + self.gamma * self.q_table.loc[s_, a_]
else:
q_target = r # next state is terminal
self.q_table.loc[s, a] = q_target # update
return s_, a_
'''States are dynamically added to the Q(S,A) table as they are encountered'''
def check_state_exist(self, state):
if state not in self.q_table.index:
# append new state to q table
self.q_table = self.q_table.append(
pd.Series(
[0]*len(self.actions),
index=self.q_table.columns,
name=state,
)
)
import random
import numpy as np
import time
import sys
if sys.version_info.major == 2:
import Tkinter as tk
else:
import tkinter as tk
UNIT = 40 # pixels per cell (width and height)
MAZE_H = 10 # height of the entire grid in cells
MAZE_W = 10 # width of the entire grid in cells
origin = np.array([UNIT/2, UNIT/2])
class Maze(tk.Tk, object):
def __init__(self, agentXY, goalXY, walls=[],pits=[]):
super(Maze, self).__init__()
self.action_space = ['u', 'd', 'l', 'r']
self.n_actions = len(self.action_space)
self.wallblocks = []
self.pitblocks=[]
self.UNIT = 40 # pixels per cell (width and height)
self.MAZE_H = 10 # height of the entire grid in cells
self.MAZE_W = 10 # width of the entire grid in cells
self.title('maze')
self.geometry('{0}x{1}'.format(MAZE_H * UNIT, MAZE_W * UNIT))
self.build_shape_maze(agentXY, goalXY, walls, pits)
#self.build_maze()
def build_shape_maze(self,agentXY,goalXY, walls,pits):
self.canvas = tk.Canvas(self, bg='white',
height=MAZE_H * UNIT,
width=MAZE_W * UNIT)
# create grids
for c in range(0, MAZE_W * UNIT, UNIT):
x0, y0, x1, y1 = c, 0, c, MAZE_H * UNIT
self.canvas.create_line(x0, y0, x1, y1)
for r in range(0, MAZE_H * UNIT, UNIT):
x0, y0, x1, y1 = 0, r, MAZE_W * UNIT, r
self.canvas.create_line(x0, y0, x1, y1)
for x,y in walls:
self.add_wall(x,y)
for x,y in pits:
self.add_pit(x,y)
self.add_goal(goalXY[0],goalXY[1])
self.add_agent(agentXY[0],agentXY[1])
self.canvas.pack()
'''Add a solid wall block at coordinate for centre of bloc'''
def add_wall(self, x, y):
wall_center = origin + np.array([UNIT * x, UNIT*y])
self.wallblocks.append(self.canvas.create_rectangle(
wall_center[0] - 15, wall_center[1] - 15,
wall_center[0] + 15, wall_center[1] + 15,
fill='black'))
'''Add a solid pit block at coordinate for centre of bloc'''
def add_pit(self, x, y):
pit_center = origin + np.array([UNIT * x, UNIT*y])
self.pitblocks.append(self.canvas.create_rectangle(
pit_center[0] - 15, pit_center[1] - 15,
pit_center[0] + 15, pit_center[1] + 15,
fill='blue'))
'''Add a solid goal for goal at coordinate for centre of bloc'''
def add_goal(self, x=4, y=4):
goal_center = origin + np.array([UNIT * x, UNIT*y])
self.goal = self.canvas.create_oval(
goal_center[0] - 15, goal_center[1] - 15,
goal_center[0] + 15, goal_center[1] + 15,
fill='yellow')
'''Add a solid wall red block for agent at coordinate for centre of bloc'''
def add_agent(self, x=0, y=0):
agent_center = origin + np.array([UNIT * x, UNIT*y])
self.agent = self.canvas.create_rectangle(
agent_center[0] - 15, agent_center[1] - 15,
agent_center[0] + 15, agent_center[1] + 15,
fill='red')
def reset(self, value = 1, resetAgent=True):
self.update()
time.sleep(0.2)
if(value == 0):
return self.canvas.coords(self.agent)
else:
#Reset Agent
if(resetAgent):
self.canvas.delete(self.agent)
self.agent = self.canvas.create_rectangle(origin[0] - 15, origin[1] - 15,
origin[0] + 15, origin[1] + 15,
fill='red')
return self.canvas.coords(self.agent)
'''computeReward - definition of reward function'''
def computeReward(self, currstate, action, nextstate):
reverse=False
if nextstate == self.canvas.coords(self.goal):
reward = 1
done = True
nextstate = 'terminal'
#elif nextstate in [self.canvas.coords(self.pit1), self.canvas.coords(self.pit2)]:
elif nextstate in [self.canvas.coords(w) for w in self.wallblocks]:
reward = -0.3
done = False
nextstate = currstate
reverse=True
#print("Wall penalty:{}".format(reward))
elif nextstate in [self.canvas.coords(w) for w in self.pitblocks]:
reward = -10
done = True
nextstate = 'terminal'
reverse=False
#print("Wall penalty:{}".format(reward))
else:
reward = -0.1
done = False
return reward,done, reverse
'''step - definition of one-step dynamics function'''
def step(self, action):
s = self.canvas.coords(self.agent)
base_action = np.array([0, 0])
if action == 0: # up
if s[1] > UNIT:
base_action[1] -= UNIT
elif action == 1: # down
if s[1] < (MAZE_H - 1) * UNIT:
base_action[1] += UNIT
elif action == 2: # right
if s[0] < (MAZE_W - 1) * UNIT:
base_action[0] += UNIT
elif action == 3: # left
if s[0] > UNIT:
base_action[0] -= UNIT
self.canvas.move(self.agent, base_action[0], base_action[1]) # move agent
s_ = self.canvas.coords(self.agent) # next state
#print("s_.coords:{}({})".format(self.canvas.coords(self.agent),type(self.canvas.coords(self.agent))))
#print("s_:{}({})".format(s_, type(s_)))
# call the reward function
reward, done, reverse = self.computeReward(s, action, s_)
if(reverse):
self.canvas.move(self.agent, -base_action[0], -base_action[1]) # move agent back
s_ = self.canvas.coords(self.agent)
return s_, reward, done
def render(self, sim_speed=.01):
time.sleep(sim_speed)
self.update()
def update():
for t in range(10):
print("The value of t is", t)
s = env.reset()
while True:
env.render()
a = 1
s, r, done = env.step(a)
if done:
break
if __name__ == '__main__':
env = Maze()
env.after(100, update)
env.mainloop()
plot.png 0 → 100644
plot.png

266 KiB

plotzoom.png

262 KiB

from maze_env import Maze
from RL_brainsample_PI import rlalgorithm as rlalg1
import numpy as np
import sys
import matplotlib.pyplot as plt
import pickle
import time
DEBUG=1
def debug(debuglevel, msg, **kwargs):
if debuglevel <= DEBUG:
if 'printNow' in kwargs:
if kwargs['printNow']:
print(msg)
else:
print(msg)
def plot_rewards(experiments):
color_list=['blue','green','red','black','magenta']
label_list=[]
for i, (env, RL, data) in enumerate(experiments):
x_values=range(len(data['global_reward']))
label_list.append(RL.display_name)
y_values=data['global_reward']
plt.plot(x_values, y_values, c=color_list[i],label=label_list[-1])
plt.legend(label_list)
plt.title("Reward Progress", fontsize=24)
plt.xlabel("Episode", fontsize=18)
plt.ylabel("Return", fontsize=18)
plt.tick_params(axis='both', which='major',
labelsize=14)
# plt.axis([0, 1100, 0, 1100000])
plt.show()
def update(env, RL, data, episodes=50):
global_reward = np.zeros(episodes)
data['global_reward']=global_reward
for episode in range(episodes):
t=0
# initial state
if episode == 0:
state = env.reset(value = 0)
else:
state = env.reset()
debug(2,'state(ep:{},t:{})={}'.format(episode, t, state))
# RL choose action based on state
action = RL.choose_action(str(state))
while True:
# fresh env
#if(t<5000 and (showRender or (episode % renderEveryNth)==0)):
if(showRender or (episode % renderEveryNth)==0):
env.render(sim_speed)
# RL take action and get next state and reward
state_, reward, done = env.step(action)
global_reward[episode] += reward
debug(2,'state(ep:{},t:{})={}'.format(episode, t, state))
debug(2,'reward_{}= total return_t ={} Mean50={}'.format(reward, global_reward[episode],np.mean(global_reward[-50:])))
# RL learn from this transition
# and determine next state and action
state, action = RL.learn(str(state), action, reward, str(state_))
# break while loop when end of this episode
if done:
break
else:
t=t+1
debug(1,"({}) Episode {}: Length={} Total return = {} ".format(RL.display_name,episode, t, global_reward[episode],global_reward[episode]),printNow=(episode%printEveryNth==0))
if(episode>=100):
debug(1," Median100={} Variance100={}".format(np.median(global_reward[episode-100:episode]),np.var(global_reward[episode-100:episode])),printNow=(episode%printEveryNth==0))
# end of game
print('game over -- Algorithm {} completed'.format(RL.display_name))
env.destroy()
if __name__ == "__main__":
sim_speed = 0.05
#Example Short Fast for Debugging
showRender=True
episodes=30
renderEveryNth=5
printEveryNth=1
do_plot_rewards=True
#Exmaple Full Run, you may need to run longer
#showRender=False
#episodes=2000
#renderEveryNth=10000
#printEveryNth=100
#do_plot_rewards=True
if(len(sys.argv)>1):
episodes = int(sys.argv[1])
if(len(sys.argv)>2):
showRender = sys.argv[2] in ['true','True','T','t']
if(len(sys.argv)>3):
datafile = sys.argv[3]
#All Tasks
agentXY=[0,0]
goalXY=[4,4]
#Task 1
wall_shape=np.array([[7,7],[4,6]])
pits=np.array([[6,3],[2,6]])
#Task 2
#wall_shape=np.array([[5,2],[4,2],[3,2],[3,3],[3,4],[3,5],[3,6],[4,6],[5,6]])
#pits=[]
#Task 3
#wall_shape=np.array([[7,4],[7,3],[6,3],[6,2],[5,2],[4,2],[3,2],[3,3],[3,4],[3,5],[3,6],[4,6],[5,6]])
#pits=np.array([[1,3],[0,5], [7,7]])
env1 = Maze(agentXY,goalXY,wall_shape, pits)
RL1 = rlalg1(actions=list(range(env1.n_actions)))
data1={}
env1.after(10, update(env1, RL1, data1, episodes))
env1.mainloop()
experiments = [(env1,RL1, data1)]
#Create another RL_brain_ALGNAME.py class and import it as rlag2 then run it here.
#env2 = Maze(agentXY,goalXY,wall_shape,pits)
#RL2 = rlalg2(actions=list(range(env2.n_actions)))
#data2={}
#env2.after(10, update(env2, RL2, data2, episodes))
#env2.mainloop()
#experiments.append((env2,RL2, data2))
print("All experiments complete")
for env, RL, data in experiments:
print("{} : max reward = {} medLast100={} varLast100={}".format(RL.display_name, np.max(data['global_reward']),np.median(data['global_reward'][-100:]), np.var(data['global_reward'][-100:])))
if(do_plot_rewards):
#Simple plot of return for each episode and algorithm, you can make more informative plots
plot_rewards(experiments)
#Not implemented yet
#if(do_save_data):
# for env, RL, data in experiments:
# saveData(env,RL,data)
task1.png 0 → 100644
task1.png

256 KiB

task2.png 0 → 100644
task2.png

257 KiB

task3.png 0 → 100644
task3.png

262 KiB

0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment