Skip to main content

Q-Learning: Implementation

In this article, we will show practical implementation of Q-learning using gymnasium's cliff walking environment.
Created on November 13|Last edited on January 19

Introduction

Note: If you're looking for an introduction to Q-Learning, the first piece in this series—What is Q-Learning?—is a great place to start. If you have a solid grasp of the fundamentals, read on!
💡
Welcome to our next piece on Q-Learning! Today, we're going to walk through a hands-on demonstration of Q-Learning's effectiveness, complete with code you can run to repeat the experiments yourself:

Here's what we will be covering:


Define the problem statement

Real-world training poses challenges in terms of both difficulty and speed. Unlike in a virtual scenario, you can't simply click "undo" if the robot you're training takes a tumble off a cliff. Time can't be accelerated, and throwing more compute at the problem doesn't make the robot move faster. Moreover, training numerous robots simultaneously in the real world is often prohibitively expensive.
This is precisely where we can benefit from reinforcement learning. Here, a simulated environment is invaluable, allowing us to overcome the challenges in our previous paragraph, particularly during the initial stages of training—a sort of bootstrapping process. In a simulated setting, you can iterate more rapidly and efficiently, overcoming the constraints posed by the real world.

Gymnasium

Gymnasium serves as a toolkit, offering an API standard for reinforcement learning. This toolkit opens the door to a diverse array of simulated environments, ranging from classic Atari games and board games to 2D and 3D physical simulations. With Gymnasium, you can efficiently train agents, compare their performance, and even develop new reinforcement learning algorithms. Learn more about Gymnasium here.
To use gymnasium, let's first install and import:
!pip install -qq gymnasium

import gymnasium as gym
Additionally, we'll want Weights & Biases to track and visualize our experiments.
!pip install -qq --upgrade wandb

import wandb
wandb.login()
Running wandb.login() will prompt you to enter your api key. Just put that in and we can get started with the fun stuff.


Q-Learning for Cliff Walking

Our experiment today will be in Gynasium's cliff walking environment —a 4x12 grid you can see in the gif below. The challenge is simple: guide the agent (that little elf-looking guy) from the starting point at [3, 0] to the goal at [3, 11] without falling off the cliff. We want to do this in the minimum amount of steps.
Source: Gymnasium Documentation (Cliff Walking)
If the agent moves onto the cliff locations [3, 1 through 10], it's sent back to the starting point. Each episode concludes when the agent successfully reaches the goal at [3, 11].
To use this environment, we need to initialize it using gym.make().
env = gym.make('CliffWalking-v0', render_mode="rgb_array")
We are using render_mode as "rgb_array" because we need to return a single frame representing the current state of the environment. This will be helpful later in producing a video of learned optimal path.

Observation Space

The observation space is nothing but the grid structure that the agent can perceive.
env.observation_space
It returns shape Discrete(48), an integer value equivalent to 4rows*12 cols.
There are 3*12+1 [number of rows*number of columns+start point] possible states that the agent can navigate. This accounts for the entire grid, excluding the cliff and the goal.
The observation is a value representing the agent's current position calculated as current_row*number of cols+current_col. For example: start position can be calculated as follows: 3*12+0 = 36.
env.observation_space.sample()
It returns an integer value representing a randomly generated observation.

Action Space

The action space is the set of actions the agent can take to navigate the grid.
env.action_space
It returns shape Discrete(4), an integer value equivalent to 4 actions.
  • 0: Move up
  • 1: Move right
  • 2: Move down
  • 3: Move left

Reward Function

The reward the agent receives upon moving to different locations in the grid.
  • -1: each time step
  • -100: agent stepped into the cliff


Information

Some important information needs to be addressed before moving on to the implementation part.

env.reset()

It returns the agent's first observation for an episode and information (metrics and debug info). It also resets the environment to an initial state.
state, info = env.reset()
Calling env.reset() returns:
  • initial agent start state = [36]
  • info = {'prob': 1} (transition probability for the state. As cliff walking is not stochastic, the transition probability return always 1.)

env.step()

Calling env.step() updates an environment with action and returns:
action = env.action_space.sample()

new_state, reward, terminated, truncated, info = env.step(action)
  • new_state - new state reached after taking an action.
  • reward - reward gained after taking an action and reaching to a new state.
  • terminated - if agent reaches a terminal state or a cliff, one episode ends. We need to call env.reset() to start a new episode from start state [36].
  • truncated - if there is a time-limit (max-steps that agent can take in an episode) or if the agent goes out of bounds (if the agent takes certain action which leads him to go out of the defined maze like cliff). We need to call env.reset() to start a new episode from start state [36].
  • info - {prob: 1.0} (transition probability for the state. As cliff walking is not stochastic, the transition probability return always 1.)

Q-learning Implementation

From the first article, we know the steps:
  • Initialize Q-table by setting all the Q-values to 0.
  • Choose an action using epsilon-greedy policy.
  • Perform a chosen action.
  • Update Q-value.
The above steps are repeated for n training episodes until we obtain an estimate of the optimal Q-function. Once we have an optimal Q-function, we have an optimal policy since we know the best action to take for each state.

Initialize Q-table

The Q-table is represented as [number of rows (state), number of columns (action)]. To get the state_space and action_space, we can use:
state_space = env.observation_space.n
action_space = env.action_space.n
This gives us as following:
  • state_space = 48
  • action_space = 4
To initialise Q-table with values as 0, we can use numpy.zeros().
def initialize_q_table(state_space, action_space):
Qtable = np.zeros((state_space, action_space))
return Qtable
This gives us an array of shape [48, 4] (state_space, action_space).

Build the Agent

This code block defines a CliffwalkingAgent class for a RL agent in a cliff walking environment.
  • Initialisation: The class is initialised with parameters (args) and an empty list to track training errors.
  • Action Selection (get_action method): Given a Q-table, a state, and an exploration parameter (epsilon), it chooses an action. With probability epsilon, it explores by selecting a random action using env.action_space.sample(). Otherwise, it exploits by selecting the action with the maximum Q-value for the given state. [epsilon-greedy policy]
  • Q-Value Update (update method): Given a Q-table, current state, new state, and action, it updates the Q-value. It calculates the temporal difference (TD) target and error. The training error is recorded, and the Q-value is updated using the Q-learning update rule.
  • Epsilon Decay (decay_epsilon method): It adjusts the exploration parameter (epsilon) over episodes using exponential decay. The decayed epsilon value is returned for exploration-exploitation balance.
class CliffwalkingAgent:
def __init__(self, args):
self.args = args
self.training_error = []

def get_action(
self,
Qtable,
state,
epsilon
):
if np.random.random() > epsilon:
return np.argmax(Qtable[state][:])
else:
return env.action_space.sample()

def update(
self,
Qtable,
state,
new_state,
action
):
td_target = reward + self.args.discount_factor * np.max(Qtable[new_state])
td_error = td_target - Qtable[state][action]
self.training_error.append(td_error)
return Qtable[state][action] + self.args.learning_rate * td_error

def decay_epsilon(
self,
episode
):
exp_decay = np.exp(-self.args.decay_rate * episode)
return self.args.min_epsilon + (self.args.max_epsilon - self.args.min_epsilon) * exp_decay


Define Hyperparameters

This code block defines a configuration namespace (configs) with parameters for training, evaluation, environment settings, and exploration/exploitation.
configs = Namespace(
# Training parameters
n_training_episodes = 100000, # Total number of training episodes
learning_rate = 0.7, # Learning rate

# Evaluation parameters
n_eval_episodes = 100, # Total number of test episodes

# Environment parameters
env_id = "CliffWalking-v0", # Environment name
max_steps = 99, # Max steps per episode
discount_factor = 0.95, # Discount Factor
eval_seed = [], # Evaluation seed of the environment

# Exploration/Exploitation parameters
max_epsilon = 1.0, # Exploration probability at start
min_epsilon = 0.05, # Minimum exploration probability (start exploiting)
decay_rate = 0.0005, # Exponential decay rate for exploration prob
)


Training Loop

This code block implements a training function for a RL agent using Q-learning in a cliff walking environment.
  • An instance of the CliffwalkingAgent class is created with specified configurations (configs).
  • The training environment is wrapped with episode recording using gym.wrappers.RecordEpisodeStatistics. This will keep track of cumulative rewards and episode lengths returning them at the end.
  • The function iterates over a specified number of training episodes (n_training_episodes).
  • The agent's exploration probability (epsilon) is reduced as training progresses.
  • The environment is reset, and the agent interacts with it for a maximum number of steps per episode (max_steps).
  • For each step, the agent selects an action, observes the new state and reward, and updates the Q-table using the Q-learning formula.
  • The episode concludes if the agent reaches a terminal state (goal) or truncated ( maximum steps are reached).
This function facilitates the training of the RL agent, allowing it to learn a policy for navigating the cliff walking environment by updating Q-values based on observed states and rewards.
agent = CliffwalkingAgent(configs)
env = gym.wrappers.RecordEpisodeStatistics(env, deque_size=configs.n_training_episodes)

def train(n_training_episodes, max_steps, Qtable):
for episode in tqdm(range(n_training_episodes)):
# Reduce epsilon
epsilon = agent.decay_epsilon(episode)
# Reset the environment
state, info = env.reset()
step = 0
terminated = False
truncated = False

# repeat
for step in range(max_steps):
# epsilon greedy policy
action = agent.get_action(Qtable, state, epsilon)

# Take action At and observe Rt+1 and St+1
new_state, reward, terminated, truncated, info = env.step(action)

# Update Q-value
Qtable[state][action] = agent.update(Qtable, state, new_state, action)

# If terminated or truncated end the episode
if terminated or truncated:
break

state = new_state

return Qtable


Evaluation

This code block defines a function, evaluate_agent, designed to evaluate the performance of a trained RL agent over a specified number of episodes.
  • The function iterates through the evaluation episodes.
  • It resets the environment, and for each step in the episode, the agent selects best actions based on the Q-table (greedy policy) and receives rewards.
  • The total rewards obtained in each episode are recorded and used to calculate and return the mean and standard deviation of rewards.
This function serves as a valuable tool for quantifying and understanding the agent's performance in the specified environment after training.
def evaluate_agent(env, max_steps, n_eval_episodes, Qtable, seed):
episode_rewards = []
for episode in tqdm(range(n_eval_episodes)):
if seed:
state, info = env.reset(seed=seed[episode])
else:
state, info = env.reset()
step = 0
truncated = False
terminated = False
total_rewards_ep = 0

for step in range(max_steps):
# greedy policy
action = np.argmax(Qtable[state][:])
new_state, reward, terminated, truncated, info = env.step(action)
total_rewards_ep += reward

if terminated or truncated:
break
state = new_state
episode_rewards.append(total_rewards_ep)
mean_reward = np.mean(episode_rewards)
std_reward = np.std(episode_rewards)

return mean_reward, std_reward
We should have a mean reward of -13.


Visualization with wandb

Let's visualize returned our cumulative rewards, episode lengths, and training error over n_training_episodes with wandb.
rolling_length = 500

fig0, ax0 = plt.subplots()

# Plot Episode rewards
ax0.set_title("Episode Rewards over time")
reward_moving_average = (
np.convolve(
np.array(env.return_queue).flatten(), np.ones(rolling_length), mode="valid"
)
/ rolling_length
)
ax0.plot(reward_moving_average, label='Rolling Average')
ax0.set_xlabel("Episode")
ax0.set_ylabel("Episode Reward")

# Adjust layout and show the plot
plt.tight_layout()
plt.show()

# Log visualization chart
wandb.log({'Episode Rewards over time': fig0})


rolling_length = 500

fig1, ax1 = plt.subplots()

# Plot Episode lengths
ax1.set_title("Episode Lengths over time")
length_moving_average = (
np.convolve(
np.array(env.length_queue).flatten(), np.ones(rolling_length), mode="same"
)
/ rolling_length
)
ax1.plot(length_moving_average, label='Rolling Average')
ax1.set_xlabel("Episode")
ax1.set_ylabel("Episode Length")

# Adjust layout and show the plot
plt.tight_layout()
plt.show()

# Log visualization chart
wandb.log({'Episode Lengths over time': fig1})



rolling_length = 500

fig2, ax2 = plt.subplots()

# Plot Training Error
ax2.set_title("Training Error over time")
training_error_moving_average = (
np.convolve(np.array(agent.training_error), np.ones(rolling_length), mode="same"
)
/ rolling_length
)
ax2.plot(training_error_moving_average, label='Rolling Average')
ax2.set_xlabel("Training Step")
ax2.set_ylabel("Training Error")

# Adjust layout and show the plot
plt.tight_layout()
plt.show()

# Log visualization chart
wandb.log({'Training Error over time': fig2})



Let's generate video recording for learned optimal path and log it to wandb.
def record_video(env, Qtable, fps=1):
images = []
terminated = False
truncated = False
state, info = env.reset(seed=random.randint(0,500))
img = env.render()
images.append(img)
while not terminated or truncated:
# Use (greedy policy) to take the action (index) that have the maximum expected future reward given that state
action = np.argmax(Qtable[state][:])
state, reward, terminated, truncated, info = env.step(action)
img = env.render()
images.append(img)
frames = np.transpose(np.array(images, dtype=np.uint8), (0, 3, 1, 2)) # We reshaped frames to log to wandb
return frames

# Log optimal policy path
optimal_policy_recorded_frames = record_video(env, Qtable_cliffwalking, fps=1)
wandb.log({"optimal policy path": wandb.Video(optimal_policy_recorded_frames, fps=1)})




Conclusion

In this article we have shown implementation part of Q-learning using gymnasium cliff walking environment. Any of the toy text environments will work with this simple Q-learning implementation.
The observation space used in these environments are small. But if we use any environment with larger observation space, it will fail. We will need to incorporate Deep Q-Network to handle such cases. More on this in the next article!
Iterate on AI agents and models faster. Try Weights & Biases today.