118 lines
4.3 KiB
Python
118 lines
4.3 KiB
Python
|
|
"""
|
|
Functions you should use.
|
|
Please avoid importing any other torch functions or modules.
|
|
Your code will not pass if the gradescope autograder detects any changed imports
|
|
"""
|
|
|
|
from torch.nn import Module
|
|
from torch.nn import Linear
|
|
from torch import tensor, double, optim
|
|
from torch.nn.functional import relu, mse_loss
|
|
import torch
|
|
|
|
|
|
class DeepQNetwork(Module):
|
|
"""
|
|
A model that uses a Deep Q-value Network (DQN) to approximate Q(s,a) as part
|
|
of reinforcement learning.
|
|
"""
|
|
def __init__(self, state_dim, action_dim):
|
|
self.num_actions = action_dim
|
|
self.state_size = state_dim
|
|
super(DeepQNetwork, self).__init__()
|
|
# Remember to set self.learning_rate, self.numTrainingGames,
|
|
# and self.batch_size!
|
|
"*** YOUR CODE HERE ***"
|
|
# Initialize layers
|
|
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
|
layer1_size=512
|
|
layer2_size=128
|
|
layer3_size=64
|
|
self.fc1 = Linear(state_dim, layer1_size).to(self.device)
|
|
self.fc2 = Linear(layer1_size, layer2_size).to(self.device)
|
|
self.fc3 = Linear(layer2_size, layer3_size).to(self.device)
|
|
self.fc_out= Linear(layer3_size, action_dim).to(self.device)
|
|
|
|
# Set learning parameters
|
|
self.learning_rate = 0.01
|
|
self.numTrainingGames = 5000
|
|
self.batch_size = 128
|
|
|
|
# Optimizer
|
|
self.optimizer = optim.SGD(self.parameters(), lr=self.learning_rate)
|
|
# self.scheduler1 = torch.optim.lr_scheduler.CosineAnnealingLR(self.optimizer, T_max=27500, eta_min=self.learning_rate*0.25) # Replace with CosineAnnealingLR
|
|
# self.scheduler2 = torch.optim.lr_scheduler.ExponentialLR(self.optimizer, gamma=0.9999)
|
|
self.output_step=500
|
|
self.output_cnt=0
|
|
|
|
"**END CODE"""
|
|
self.double()
|
|
|
|
|
|
def get_loss(self, states, Q_target):
|
|
"""
|
|
Returns the Squared Loss between Q values currently predicted
|
|
by the network, and Q_target.
|
|
Inputs:
|
|
states: a (batch_size x state_dim) numpy array
|
|
Q_target: a (batch_size x num_actions) numpy array, or None
|
|
Output:
|
|
loss node between Q predictions and Q_target
|
|
"""
|
|
"*** YOUR CODE HERE ***"
|
|
Q_target_tensor = tensor(Q_target, dtype=double, device=self.device)
|
|
loss = mse_loss(self.forward(states), Q_target_tensor)
|
|
return loss
|
|
|
|
|
|
def forward(self, states):
|
|
"""
|
|
Runs the DQN for a batch of states.
|
|
The DQN takes the state and returns the Q-values for all possible actions
|
|
that can be taken. That is, if there are two actions, the network takes
|
|
as input the state s and computes the vector [Q(s, a_1), Q(s, a_2)]
|
|
Inputs:
|
|
states: a (batch_size x state_dim) numpy array
|
|
Q_target: a (batch_size x num_actions) numpy array, or None
|
|
Output:
|
|
result: (batch_size x num_actions) numpy array of Q-value
|
|
scores, for each of the actions
|
|
"""
|
|
"*** YOUR CODE HERE ***"
|
|
if states.device.type != self.device.type:
|
|
states = states.to(self.device)
|
|
x = relu(self.fc1(states))
|
|
x = relu(self.fc2(x))
|
|
x = relu(self.fc3(x))
|
|
Q_values = self.fc_out(x)
|
|
return Q_values
|
|
|
|
|
|
|
|
def run(self, states):
|
|
return self.forward(states)
|
|
|
|
def gradient_update(self, states, Q_target):
|
|
"""
|
|
Update your parameters by one gradient step with the .update(...) function.
|
|
You can look at the ML project for an idea of how to do this, but note that rather
|
|
than iterating through a dataset, you should only be applying a single gradient step
|
|
to the given datapoints.
|
|
|
|
Inputs:
|
|
states: a (batch_size x state_dim) numpy array
|
|
Q_target: a (batch_size x num_actions) numpy array, or None
|
|
Output:
|
|
None
|
|
"""
|
|
"*** YOUR CODE HERE ***"
|
|
self.optimizer.zero_grad()
|
|
loss = self.get_loss(states, Q_target)
|
|
loss.backward()
|
|
self.optimizer.step()
|
|
# self.scheduler1.step()
|
|
# self.scheduler2.step()
|
|
self.output_cnt+=1
|
|
if self.output_cnt%self.output_step==0:
|
|
print("now lr is: ", self.optimizer.param_groups[0]['lr'],"update count", self.output_cnt) |