Reference
In this notebook we present the implementation of method in the paper "# exploration: A study of count-based exploration for deep reinforcement learning", which we refer to as count-based intrinsic reward, and in the paper "Episodic curiosity through reachability", which we refer to as episodic-curiousity intrinsic reward.
# Run these commands from the terminal to install related libraries and set up the working environment
# pip install gymnasium # Install the gymnasium library with RL environments
# pip install minigrid # Install the Minigrid library contains simple and easily configurable grid world environments for RL.
# pip install stable-baselines3 # Install the Stable Baselines 3 library contains RL Algorithm
# Import the libraries and check if they are working
import random, copy
from collections import deque
import torch
import gymnasium as gym
import minigrid
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from stable_baselines3 import PPO
from gymnasium.core import Wrapper
from minigrid.wrappers import ImgObsWrapper # Convert the observation space into an image
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from utils import get_policy_kwargs
from callbacks.Eval_Callback import Eval_Callback
# Set seed.
np.random.seed(1)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
env = gym.make("MiniGrid-Empty-16x16-v0", render_mode="rgb_array")
env = ImgObsWrapper(env)
ob, _ = env.reset()
img = env.render()
plt.imshow(img);
Action Space | Num | Name | Action | |--------|-----|--------| | 0 | left | Turn left | | 1 | right | Turn right | | 2| forward | Move forward | | 3 | pickup | Unused| | 4 | drop | Unused | | 5| toggle | Unused| | 6| done | Unused |
print(env.action_space)
Observation Space
print(env.observation_space)
Reward
env = gym.make("MiniGrid-Empty-16x16-v0", render_mode="rgb_array")
env = ImgObsWrapper(env)
simple_ppo_reward = {}
for run in range(5):
eval_callback = Eval_Callback(eval_env=env, eval_freq=10000, n_eval_episodes=10)
policy = PPO(policy="CnnPolicy", env=env, verbose=1, policy_kwargs=get_policy_kwargs(), ent_coef=0.005)
policy.learn(total_timesteps=100000, callback=eval_callback)
simple_ppo_reward[f"run_{run}"] = eval_callback.record_reward
# policy.save("pretrained_models/simple_ppo")
# policy = PPO.load("pretrained_models/simple_ppo")
# # Reset the environment to its initial state
# obs, _ = env.reset()
# count = 1
# reward_list = []
# # Perform some actions in the environment
# while count <= 50:
# action, _ = policy.predict(obs) # Sample an action using the trained policy
# # print(observation, action)
# obs, reward, done, truncated, info = env.step(action) # Take a step in the environment
# # If the episode is finish either done or truncated, record reward & reset the environment
# if done or truncated:
# reward_list.append(reward)
# observation = env.reset()
# count += 1
# print(f"The average reward for 50 runs are: {np.mean(reward_list)}")
# env.close() # Close the environment
import math
class SimHash(object) :
def __init__(self, state_emb, k) :
self.A = np.random.normal(0, 1, (k, state_emb))
def hash(self, state) :
hash_key = str(np.sign(self.A @ np.array(state)).tolist()) # -> the matrix A
return hash_key
class CountBasedBonusWrapepr(Wrapper):
def __init__(self, env):
super().__init__(env)
self.M = {} # -> Memory counting M
self.hash = SimHash(147, 56)
self.beta = 0.001 # -> hyperparameter beta
def _update_count_dict_(self, hash):
"""Function to update the counting library M, if hash is in count then + 1 otherwise create a new entry of value 0."""
pre_count = 0
if hash in self.M:
pre_count = self.M[hash]
new_count = pre_count + 1
self.M[hash] = new_count
def get_count(self, hash):
return self.M[hash]
def step(self, action):
obs, reward, terminated, truncated, info = self.env.step(action)
obs_flatten = obs.flatten()
hash = self.hash.hash(obs_flatten)
self._update_count_dict_(hash) # Every steps update the counting of the observation.
new_count = self.get_count(hash) # Get the count value after update the recorded value.
bonus = self.beta / math.sqrt(new_count) # Calculate the intrinsic reward.
reward += bonus
return obs, reward, terminated, truncated, info
env = gym.make("MiniGrid-Empty-16x16-v0", render_mode="rgb_array")
env = ImgObsWrapper(env)
count_base_reward = {}
for run in range(5):
train_env = CountBasedBonusWrapepr(env)
test_env = env
eval_callback = Eval_Callback(eval_env=env, eval_freq=10000, n_eval_episodes=10)
policy = PPO(policy="CnnPolicy", env=train_env, verbose=1, policy_kwargs=get_policy_kwargs(), ent_coef=0.005)
policy.learn(total_timesteps=100000, callback=eval_callback)
count_base_reward[f"run_{run}"] = eval_callback.record_reward
# policy.save("pretrained_models/ppo_observation_count_bonus")
# policy = PPO.load("pretrained_models/ppo_observation_count_bonus")
# # Reset the environment to its initial state
# obs, _ = test_env.reset()
# count = 1
# reward_list = []
# # Perform some actions in the environment
# while count <= 50:
# action, _ = policy.predict(obs) # Sample an action using the trained policy
# obs, reward, done, truncated, info = test_env.step(action) # Take a step in the environment
# # If the episode is finish either done or truncated, record reward & reset the environment
# if done or truncated:
# reward_list.append(reward)
# observation = test_env.reset()
# count += 1
# print(f"The average reward for 50 runs are: {np.mean(reward_list)}")
# test_env.close() # Close the environment
class R_Model(nn.Module):
def __init__(self):
super(R_Model, self).__init__()
# Define the number of output features after convolutional layers
feature_output = 64
# Embedding network -> In original implementation, the author used pretrained ResNet.
self.embedding = nn.Sequential(
nn.Conv2d(
in_channels=3, # Adjusted to match the number of input channels
out_channels=32,
kernel_size=3,
stride=1),
nn.LeakyReLU(),
nn.Conv2d(
in_channels=32,
out_channels=64,
kernel_size=3,
stride=1),
nn.LeakyReLU(),
nn.Conv2d(
in_channels=64,
out_channels=64,
kernel_size=3,
stride=1),
nn.LeakyReLU(),
nn.Flatten(),
nn.Linear(feature_output, 512)
)
self.classification = nn.Sequential(
nn.Linear(512 * 2, 256), # Combine the embeddings of two observations
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, 2),
nn.Softmax(dim=1)
)
def get_embedding(self, ob):
ob = torch.tensor(ob.reshape(1,3,7,7), dtype=torch.float32).to(device)
ob_emb = self.embedding(ob)
return ob_emb
def get_label(self, ob_1, ob_2):
ob_1_emb = self.get_embedding(ob_1)
ob_2_emb = self.get_embedding(ob_2)
combined_embedding = torch.cat((ob_1_emb, ob_2_emb), dim=1).to(device)
prob = self.classification(combined_embedding)
return prob
def get_reward(self, ob, M):
max_reward = 0
for ob_2 in M:
with torch.no_grad():
prob = self.get_label(ob, ob_2)
prob = prob.to("cpu")
value = prob[0][1]
if value > max_reward:
max_reward = value
return max_reward.item()
class EpisodicCuriousityBonusWrapepr(Wrapper):
def __init__(self, env):
super().__init__(env)
self.based_bonus = 0.001
self.M = [] # Memory storing current observation in current episode -> used to determine reachability of future step
self.eps = []
self.max_length = 10
self.step_retrained_model = 0
self.r_model = R_Model().to(device)
self.optimizer = optim.Adam(self.r_model.parameters(), lr=1e-4)
self.criterion = nn.BCELoss()
self.model_trained = False
self.beta = 1
self.alpha = 0.001
self.history = deque(maxlen=10) # Replay buffer storing data for traininng model
self.k = 5
self.gamma = 1.2 # Gap value between label reachable and unreachable
def reset(self, **kwargs):
obs = self.env.reset(**kwargs)
self.eps.append(obs[0])
self.M.append(obs[0])
return obs
def step(self, action):
obs, reward, terminated, truncated, info = self.env.step(action)
self.eps.append(obs)
if terminated or truncated:
self.history.append(self.eps.copy())
self.eps = []
self.M = []
# Train r_model
self.step_retrained_model += 1
if self.step_retrained_model == 30000:
if len(self.history) != 0:
X, y = self.create_training_data() # -> labelling the training data
self.train_r_model(X, y)
self.model_trained = True
self.step_retrained_model = 0
else:
self.step_retrained_model = 0
if len(self.M) >= 2 and self.model_trained: # -> If network R is trained then start getting the reward
bonus = self.r_model.get_reward(obs, self.M)
bonus = self.alpha*(self.beta-bonus)
reward += bonus
# Storing observation in current memory M to determine reachability
if len(self.M) > self.max_length:
if not any(np.array_equal(obs, array) for array in self.M):
self.M.pop(random.randint(0, len(self.M) - 1))
self.M.append(obs)
else:
if not any(np.array_equal(obs, array) for array in self.M):
self.M.append(obs)
return obs, reward, terminated, truncated, info
def create_training_data(self):
"""Function to create the training dataset of neural network R"""
X = []
y = []
for episode in self.history:
for _ in range(30):
episode_with_indices = list(enumerate(episode))
index, _ = random.choice(episode_with_indices)
# Get random positive (reachable) example -> reachable 1
# Calculate the maximum allowable value for 'step' to stay within the range of indices
max_step = min(self.k, len(episode) - 1 - index)
if max_step == 1:
step = 1
elif max_step == 0:
continue
else:
step = random.randint(1, max_step)
X.append([episode[index], episode[index+step]])
y.append(1)
# Get random negative (unreachable) example -> non-reachable 0
# If last few index then skip
if self.k*self.gamma > len(episode) - 1 - index:
continue
else:
step = random.randint(self.k*self.gamma, len(episode) - 1 - index)
X.append([episode[index], episode[index+step]])
y.append(0)
return X, y
def train_r_model(self, X, y):
for _ in range(5):
indices = list(range(len(X)))
# Shuffle the indices
random.shuffle(indices)
# Reorder both X and y using the shuffled indices
X_shuffled = [X[i] for i in indices]
y_shuffled = [y[i] for i in indices]
prob_stack = []
for i in range(len(X)):
prob = self.r_model.get_label(X_shuffled[i][0], X_shuffled[i][1])
prob_stack.append(prob)
prob_stack = torch.cat(prob_stack, dim=0)
loss = self.criterion(prob_stack[:,1], torch.tensor(y_shuffled).float().to(device))
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
env = gym.make("MiniGrid-Empty-16x16-v0", render_mode="rgb_array")
env = ImgObsWrapper(env)
episodic_curiousity = {}
for run in range(5):
train_env = EpisodicCuriousityBonusWrapepr(env)
test_env = env
eval_callback = Eval_Callback(eval_env=env, eval_freq=10000, n_eval_episodes=10)
policy = PPO(policy="CnnPolicy", env=train_env, verbose=1, policy_kwargs=get_policy_kwargs(), ent_coef=0.005)
policy.learn(total_timesteps=100000, callback=eval_callback)
episodic_curiousity[f"run_{run}"] = eval_callback.record_reward
policy.save("pretrained_models/ppo_observation_episodic_curiousity")
# policy = PPO.load("pretrained_models/ppo_observation_episodic_curiousity")
# # Reset the environment to its initial state
# obs, _ = test_env.reset()
# count = 1
# reward_list = []
# # Perform some actions in the environment
# while count <= 50:
# action, _ = policy.predict(obs) # Sample an action using the trained policy
# # print(observation, action)
# obs, reward, done, truncated, info = test_env.step(action) # Take a step in the environment
# # If the episode is finish either done or truncated, record reward & reset the environment
# if done or truncated:
# reward_list.append(reward)
# observation = test_env.reset()
# count += 1
# print(f"The average reward for 50 runs are: {np.mean(reward_list)}")
# test_env.close() # Close the environment
eval_freq = 10000
total_timesteps = 100000
row_idx = [i for i in range(0, total_timesteps, eval_freq)]
df_simple_PPO = pd.DataFrame.from_dict(simple_ppo_reward, orient='index').T
df_simple_PPO.index = row_idx
df_count_base = pd.DataFrame.from_dict(count_base_reward, orient='index').T
df_count_base.index = row_idx
df_episodic_curiousity = pd.DataFrame.from_dict(episodic_curiousity, orient='index').T
df_episodic_curiousity.index = row_idx
# Save DataFrames to CSV files
df_simple_PPO.to_csv('data/simple_ppo_rewards.csv')
df_count_base.to_csv('data/count_base_rewards.csv')
df_episodic_curiousity.to_csv('data/episodic_curiousity.csv')
df_simple_PPO = pd.read_csv('data/simple_ppo_rewards.csv')
df_count_base = pd.read_csv('data/count_base_rewards.csv')
df_episodic_curiousity = pd.read_csv('data/episodic_curiousity.csv')
dfs = [df_simple_PPO, df_count_base, df_episodic_curiousity]
for df in dfs:
df["mean"] = df.iloc[:,1:].mean(axis=1)
df["mean_smoothed"] = df["mean"].ewm(alpha=1-0.9).mean()
# Plot simple_ppo_reward
plt.plot(df_simple_PPO.iloc[:,0], df_simple_PPO['mean_smoothed'], label='Simple PPO')
# Plot count_base_reward
plt.plot(df_count_base.iloc[:,0], df_count_base['mean_smoothed'], label='Count-Based')
# Plot episodic_curiousity
plt.plot(df_episodic_curiousity.iloc[:,0], df_episodic_curiousity['mean_smoothed'], label='Episodic-Curiousity')
plt.xlabel('Timestep')
plt.ylabel('Reward')
plt.title('Average return over 5 runs (Evaluate over 10 episodes)')
plt.legend()
plt.show()