357 lines
15 KiB
Python
357 lines
15 KiB
Python
import model
|
|
from qlearningAgents import PacmanQAgent
|
|
from backend import ReplayMemory
|
|
import layout
|
|
import copy
|
|
import torch
|
|
import numpy as np
|
|
import os
|
|
from util import *
|
|
import util
|
|
from pacman import GameState
|
|
from game import Directions, Actions
|
|
|
|
class Agent:
|
|
"""
|
|
An agent must define a getAction method, but may also define the
|
|
following methods which will be called if they exist:
|
|
|
|
def registerInitialState(self, state): # inspects the starting state
|
|
"""
|
|
|
|
def __init__(self, index=0):
|
|
self.index = index
|
|
|
|
def getAction(self, state):
|
|
"""
|
|
The Agent will receive a GameState (from either {pacman, capture, sonar}.py) and
|
|
must return an action from Directions.{North, South, East, West, Stop}
|
|
"""
|
|
raiseNotDefined()
|
|
def betterEvaluationFunction(currentGameState: GameState):
|
|
"""
|
|
Your extreme ghost-hunting, pellet-nabbing, food-gobbling, unstoppable
|
|
evaluation function (question 5).
|
|
|
|
DESCRIPTION: <write something here so we know what you did>
|
|
"""
|
|
# Useful information you can extract from a GameState (pacman.py)
|
|
if currentGameState.isLose():
|
|
return -500
|
|
if currentGameState.isWin():
|
|
return 500
|
|
kInf=1e100
|
|
capsules_position = currentGameState.getCapsules()
|
|
current_pos = currentGameState.getPacmanPosition()
|
|
food_positions = currentGameState.getFood().asList()
|
|
# print(f"action:{action}, action_vec:{action_vec}")
|
|
current_ghost_positions = currentGameState.getGhostPositions()
|
|
current_ghost_scared_times = [ghostState.scaredTimer for ghostState in currentGameState.getGhostStates()]
|
|
not_scared_ghosts_positions = [current_ghost_positions[i] for i in range(len(current_ghost_positions)) if current_ghost_scared_times[i]==0]
|
|
scared_ghosts_positions = [current_ghost_positions[i] for i in range(len(current_ghost_positions)) if current_ghost_scared_times[i]>0 and current_ghost_scared_times[i]<=1.2*util.manhattanDistance(current_ghost_positions[i],current_pos)+2]
|
|
edible_ghosts_positions = [current_ghost_positions[i] for i in range(len(current_ghost_positions)) if current_ghost_scared_times[i]>1.2*util.manhattanDistance(current_ghost_positions[i],current_pos)+2]
|
|
current_self_position = currentGameState.getPacmanPosition()
|
|
def DotProduct(a,b):
|
|
return a[0]*b[0]+a[1]*b[1]
|
|
def CrossProduct(a,b):
|
|
return a[0]*b[1]-a[1]*b[0]
|
|
def EuclideanDistance(a,b):
|
|
return ((a[0]-b[0])**2+(a[1]-b[1])**2)**0.5
|
|
def DistanceAnalysis(current_self_position,object_postion_list,flag="None"):
|
|
if len(object_postion_list)==0:
|
|
return 0
|
|
if current_self_position in object_postion_list:
|
|
return kInf
|
|
res=0
|
|
for obj_pos in object_postion_list:
|
|
if flag=="Ghost" and util.manhattanDistance(current_self_position,obj_pos)>=6:
|
|
continue
|
|
distance_to_obj=util.manhattanDistance(current_self_position,obj_pos)
|
|
res=max(res,1/distance_to_obj)
|
|
return res
|
|
|
|
da_for_foods=DistanceAnalysis(current_self_position,food_positions)
|
|
da_for_unscared_ghosts=DistanceAnalysis(current_self_position,not_scared_ghosts_positions,"Ghost")
|
|
da_for_scared_ghosts=DistanceAnalysis(current_self_position,scared_ghosts_positions)
|
|
da_for_capsules=DistanceAnalysis(current_self_position,capsules_position)
|
|
da_for_edible_ghosts=DistanceAnalysis(current_self_position,edible_ghosts_positions)
|
|
res=da_for_capsules*2-da_for_unscared_ghosts*2-da_for_scared_ghosts*0.2+da_for_foods*0.2+da_for_edible_ghosts*1
|
|
if da_for_unscared_ghosts<1/6:
|
|
res+=(da_for_foods*0.2+da_for_edible_ghosts*1)*5
|
|
# res*=random.uniform(0.9, 1.1)
|
|
res*=100
|
|
global last_score
|
|
res+=(currentGameState.getScore()-last_score)*10
|
|
# print(f"res:{res}")
|
|
return res
|
|
class MultiAgentSearchAgent(Agent):
|
|
"""
|
|
This class provides some common elements to all of your
|
|
multi-agent searchers. Any methods defined here will be available
|
|
to the MinimaxPacmanAgent, AlphaBetaPacmanAgent & ExpectimaxPacmanAgent.
|
|
|
|
You *do not* need to make any changes here, but you can if you want to
|
|
add functionality to all your adversarial search agents. Please do not
|
|
remove anything, however.
|
|
|
|
Note: this is an abstract class: one that should not be instantiated. It's
|
|
only partially specified, and designed to be extended. Agent (game.py)
|
|
is another abstract class.
|
|
"""
|
|
|
|
def __init__(self, evalFn = 'scoreEvaluationFunction', depth = '2'):
|
|
self.index = 0 # Pacman is always agent index 0
|
|
self.evaluationFunction = betterEvaluationFunction
|
|
self.depth = int(depth)
|
|
last_score=0
|
|
class ExpectimaxAgent(MultiAgentSearchAgent):
|
|
"""
|
|
Your expectimax agent (question 4)
|
|
"""
|
|
|
|
def ExpectMaxSearch(self,gameState: GameState,depth_remain:int,agentIndex:int) -> tuple[int, list[Actions]]:
|
|
if depth_remain==0:
|
|
# print(f"depth_remain:{depth_remain}")
|
|
# print(f"returning leaf {self.evaluationFunction(gameState)}, {[]}")
|
|
return self.evaluationFunction(gameState),[]
|
|
legal_actions = gameState.getLegalActions(agentIndex)
|
|
if len(legal_actions)==0:
|
|
# print(f"depth_remain:{depth_remain}")
|
|
# print(f"returning leaf {self.evaluationFunction(gameState)}, {[]}")
|
|
return self.evaluationFunction(gameState),[]
|
|
kInf=1e100
|
|
res_action=[]
|
|
res_val=0
|
|
if agentIndex==0:
|
|
# Max
|
|
res_val = -kInf
|
|
for action in legal_actions:
|
|
successorGameState = gameState.generateSuccessor(agentIndex,action)
|
|
nxt_depth=depth_remain-1 if agentIndex==gameState.getNumAgents()-1 else depth_remain
|
|
val,action_list=self.ExpectMaxSearch(successorGameState,nxt_depth,(agentIndex+1)%gameState.getNumAgents())
|
|
if action=="Stop":
|
|
val-=100
|
|
if val>res_val:
|
|
res_val=val
|
|
# print(f"action:{action}, action_list:{action_list}")
|
|
res_action=[action]+action_list
|
|
else:
|
|
# Mins
|
|
res_val = kInf
|
|
val_list=[]
|
|
for action in legal_actions:
|
|
successorGameState = gameState.generateSuccessor(agentIndex,action)
|
|
nxt_depth=depth_remain-1 if agentIndex==gameState.getNumAgents()-1 else depth_remain
|
|
val,action_list=self.ExpectMaxSearch(successorGameState,nxt_depth,(agentIndex+1)%gameState.getNumAgents())
|
|
val_list.append(val)
|
|
if val<res_val:
|
|
res_val=val
|
|
res_action=[action]+action_list
|
|
res_val=sum(val_list)/len(val_list)
|
|
# print(f"depth_remain:{depth_remain}")
|
|
# print(f"returning {res_val}, {res_action}")
|
|
return res_val,res_action
|
|
|
|
def getAction(self, gameState: GameState):
|
|
"""
|
|
Returns the expectimax action using self.depth and self.evaluationFunction
|
|
|
|
All ghosts should be modeled as choosing uniformly at random from their
|
|
legal moves.
|
|
"""
|
|
global last_score
|
|
last_score=gameState.getScore()
|
|
stat = self.ExpectMaxSearch(gameState,self.depth,0)
|
|
# print(f"stat:{stat}")
|
|
return stat[1][0]
|
|
|
|
|
|
class PacmanDeepQAgent(PacmanQAgent):
|
|
def __init__(self, layout_input="smallGrid", target_update_rate=300, doubleQ=True, **args):
|
|
PacmanQAgent.__init__(self, **args)
|
|
self.model = None
|
|
self.target_model = None
|
|
self.target_update_rate = target_update_rate
|
|
self.update_amount = 0
|
|
self.epsilon_explore = 1.0
|
|
self.epsilon0 = 0.4
|
|
self.minimal_epsilon = 0.01
|
|
if model.kProductionMode:
|
|
self.epsilon_explore=0.01
|
|
self.epsilon0=0.01
|
|
self.minimal_epsilon=0.01
|
|
print("in production mode, epsilon set to 0.01")
|
|
self.epsilon = self.epsilon0
|
|
self.discount = 0.95
|
|
self.update_frequency = 3
|
|
self.counts = None
|
|
self.replay_memory = ReplayMemory(50000)
|
|
self.min_transitions_before_training = 10000
|
|
self.td_error_clipping = 10
|
|
|
|
# Initialize Q networks:
|
|
if isinstance(layout_input, str):
|
|
layout_instantiated = layout.getLayout(layout_input)
|
|
else:
|
|
layout_instantiated = layout_input
|
|
self.state_dim = self.get_state_dim(layout_instantiated)
|
|
self.initialize_q_networks(self.state_dim)
|
|
|
|
self.doubleQ = doubleQ
|
|
if self.doubleQ:
|
|
self.target_update_rate = -1
|
|
self.guiding_agent = ExpectimaxAgent()
|
|
|
|
def get_state_dim(self, layout):
|
|
pac_ft_size = 2
|
|
ghost_ft_size = 2 * layout.getNumGhosts()
|
|
food_capsule_ft_size = layout.width * layout.height
|
|
return pac_ft_size + ghost_ft_size + food_capsule_ft_size
|
|
|
|
def get_features(self, state):
|
|
pacman_state = np.array(state.getPacmanPosition())
|
|
ghost_state = np.array(state.getGhostPositions())
|
|
capsules = state.getCapsules()
|
|
food_locations = np.array(state.getFood().data).astype(np.float32)
|
|
for x, y in capsules:
|
|
food_locations[x][y] = 2
|
|
return np.concatenate((pacman_state, ghost_state.flatten(), food_locations.flatten()))
|
|
|
|
def initialize_q_networks(self, state_dim, action_dim=5):
|
|
import model
|
|
self.model = model.DeepQNetwork(state_dim, action_dim)
|
|
self.target_model = model.DeepQNetwork(state_dim, action_dim)
|
|
if os.path.exists('para.bin'):
|
|
print("Loading model parameters from para.bin")
|
|
checkpoint = torch.load('para.bin')
|
|
self.model.load_state_dict(checkpoint['model_state_dict'])
|
|
self.target_model.load_state_dict(checkpoint['target_model_state_dict'])
|
|
self.model.optimizer.load_state_dict(checkpoint['model_optimizer_state_dict'])
|
|
self.target_model.optimizer.load_state_dict(checkpoint['target_model_optimizer_state_dict'])
|
|
self.replay_memory = checkpoint['memory']
|
|
print(self.model.state_dict())
|
|
else:
|
|
print("Initializing new model parameters")
|
|
def save_model(self, filename="para.bin"):
|
|
if model.kProductionMode:
|
|
print("in production mode, not saving model")
|
|
return
|
|
print(f"Saving model parameters to {filename}")
|
|
torch.save({
|
|
'model_state_dict': self.model.state_dict(),
|
|
'target_model_state_dict': self.target_model.state_dict(),
|
|
'model_optimizer_state_dict': self.model.optimizer.state_dict(),
|
|
"target_model_optimizer_state_dict": self.target_model.optimizer.state_dict(),
|
|
"memory": self.replay_memory
|
|
}, filename)
|
|
print(self.model.state_dict())
|
|
|
|
def getQValue(self, state, action):
|
|
"""
|
|
Should return Q(state,action) as predicted by self.model
|
|
"""
|
|
feats = self.get_features(state)
|
|
legalActions = self.getLegalActions(state)
|
|
action_index = legalActions.index(action)
|
|
state = torch.tensor(np.array([feats]).astype("float64"), dtype=torch.double)
|
|
return self.model.run(state).data[0][action_index]
|
|
|
|
|
|
def shape_reward(self, reward):
|
|
if reward > 100:
|
|
reward = 10
|
|
elif reward > 0 and reward < 10:
|
|
reward = 2
|
|
elif reward == -1:
|
|
reward = 0
|
|
elif reward < -100:
|
|
reward = -10
|
|
return reward
|
|
|
|
|
|
def compute_q_targets(self, minibatch, network = None, target_network=None, doubleQ=False):
|
|
"""Prepare minibatches
|
|
Args:
|
|
minibatch (List[Transition]): Minibatch of `Transition`
|
|
Returns:
|
|
float: Loss value
|
|
"""
|
|
if network is None:
|
|
network = self.model
|
|
if target_network is None:
|
|
target_network = self.target_model
|
|
states = np.vstack([x.state for x in minibatch])
|
|
states = torch.tensor(states, dtype=torch.double)
|
|
actions = np.array([x.action for x in minibatch])
|
|
rewards = np.array([x.reward for x in minibatch])
|
|
next_states = np.vstack([x.next_state for x in minibatch])
|
|
next_states = torch.tensor(next_states)
|
|
done = np.array([x.done for x in minibatch])
|
|
|
|
Q_predict = network.run(states).data.detach().cpu().numpy()
|
|
Q_target = np.copy(Q_predict )
|
|
state_indices = states.int().detach().numpy()
|
|
state_indices = (state_indices[:, 0], state_indices[:, 1])
|
|
exploration_bonus = 1 / (2 * np.sqrt((self.counts[state_indices] / 100)))
|
|
|
|
replace_indices = np.arange(actions.shape[0])
|
|
action_indices = np.argmax(network.run(next_states).data.cpu(), axis=1)
|
|
target = rewards + exploration_bonus + (1 - done) * self.discount * target_network.run(next_states).data[replace_indices, action_indices].detach().cpu().numpy()
|
|
|
|
Q_target[replace_indices, actions] = target
|
|
|
|
if self.td_error_clipping is not None:
|
|
Q_target = Q_predict + np.clip(
|
|
Q_target - Q_predict, -self.td_error_clipping, self.td_error_clipping)
|
|
|
|
return Q_target
|
|
|
|
def update(self, state, action, nextState, reward):
|
|
legalActions = self.getLegalActions(state)
|
|
action_index = legalActions.index(action)
|
|
done = nextState.isLose() or nextState.isWin()
|
|
reward = self.shape_reward(reward)
|
|
|
|
if self.counts is None:
|
|
x, y = np.array(state.getFood().data).shape
|
|
self.counts = np.ones((x, y))
|
|
|
|
state = self.get_features(state)
|
|
nextState = self.get_features(nextState)
|
|
self.counts[int(state[0])][int(state[1])] += 1
|
|
|
|
transition = (state, action_index, reward, nextState, done)
|
|
self.replay_memory.push(*transition)
|
|
|
|
|
|
if len(self.replay_memory) < self.min_transitions_before_training:
|
|
self.epsilon = self.epsilon_explore
|
|
else:
|
|
self.epsilon = max(self.epsilon0 * (1 - self.update_amount / 20000), self.minimal_epsilon)
|
|
|
|
|
|
if len(self.replay_memory) > self.min_transitions_before_training and self.update_amount % self.update_frequency == 0:
|
|
minibatch = self.replay_memory.pop(self.model.batch_size)
|
|
states = np.vstack([x.state for x in minibatch])
|
|
states = torch.tensor(states.astype("float64"), dtype=torch.double)
|
|
Q_target1 = self.compute_q_targets(minibatch, self.model, self.target_model, doubleQ=self.doubleQ)
|
|
Q_target1 = torch.tensor(Q_target1.astype("float64"), dtype=torch.double)
|
|
|
|
if self.doubleQ:
|
|
Q_target2 = self.compute_q_targets(minibatch, self.target_model, self.model, doubleQ=self.doubleQ)
|
|
Q_target2 = torch.tensor(Q_target2.astype("float64"), dtype=torch.double)
|
|
|
|
self.model.gradient_update(states, Q_target1)
|
|
if self.doubleQ:
|
|
self.target_model.gradient_update(states, Q_target2)
|
|
|
|
if self.target_update_rate > 0 and self.update_amount % self.target_update_rate == 0:
|
|
self.target_model.set_weights(copy.deepcopy(self.model.parameters))
|
|
|
|
self.update_amount += 1
|
|
|
|
|
|
def final(self, state):
|
|
"""Called at the end of each game."""
|
|
PacmanQAgent.final(self, state)
|