import model from qlearningAgents import PacmanQAgent from backend import ReplayMemory import layout import copy import torch import numpy as np import os from util import * import util from pacman import GameState from game import Directions, Actions class Agent: """ An agent must define a getAction method, but may also define the following methods which will be called if they exist: def registerInitialState(self, state): # inspects the starting state """ def __init__(self, index=0): self.index = index def getAction(self, state): """ The Agent will receive a GameState (from either {pacman, capture, sonar}.py) and must return an action from Directions.{North, South, East, West, Stop} """ raiseNotDefined() def betterEvaluationFunction(currentGameState: GameState): """ Your extreme ghost-hunting, pellet-nabbing, food-gobbling, unstoppable evaluation function (question 5). DESCRIPTION: """ # Useful information you can extract from a GameState (pacman.py) if currentGameState.isLose(): return -500 if currentGameState.isWin(): return 500 kInf=1e100 capsules_position = currentGameState.getCapsules() current_pos = currentGameState.getPacmanPosition() food_positions = currentGameState.getFood().asList() # print(f"action:{action}, action_vec:{action_vec}") current_ghost_positions = currentGameState.getGhostPositions() current_ghost_scared_times = [ghostState.scaredTimer for ghostState in currentGameState.getGhostStates()] not_scared_ghosts_positions = [current_ghost_positions[i] for i in range(len(current_ghost_positions)) if current_ghost_scared_times[i]==0] scared_ghosts_positions = [current_ghost_positions[i] for i in range(len(current_ghost_positions)) if current_ghost_scared_times[i]>0 and current_ghost_scared_times[i]<=1.2*util.manhattanDistance(current_ghost_positions[i],current_pos)+2] edible_ghosts_positions = [current_ghost_positions[i] for i in range(len(current_ghost_positions)) if current_ghost_scared_times[i]>1.2*util.manhattanDistance(current_ghost_positions[i],current_pos)+2] current_self_position = currentGameState.getPacmanPosition() def DotProduct(a,b): return a[0]*b[0]+a[1]*b[1] def CrossProduct(a,b): return a[0]*b[1]-a[1]*b[0] def EuclideanDistance(a,b): return ((a[0]-b[0])**2+(a[1]-b[1])**2)**0.5 def DistanceAnalysis(current_self_position,object_postion_list,flag="None"): if len(object_postion_list)==0: return 0 if current_self_position in object_postion_list: return kInf res=0 for obj_pos in object_postion_list: if flag=="Ghost" and util.manhattanDistance(current_self_position,obj_pos)>=6: continue distance_to_obj=util.manhattanDistance(current_self_position,obj_pos) res=max(res,1/distance_to_obj) return res da_for_foods=DistanceAnalysis(current_self_position,food_positions) da_for_unscared_ghosts=DistanceAnalysis(current_self_position,not_scared_ghosts_positions,"Ghost") da_for_scared_ghosts=DistanceAnalysis(current_self_position,scared_ghosts_positions) da_for_capsules=DistanceAnalysis(current_self_position,capsules_position) da_for_edible_ghosts=DistanceAnalysis(current_self_position,edible_ghosts_positions) res=da_for_capsules*2-da_for_unscared_ghosts*2-da_for_scared_ghosts*0.2+da_for_foods*0.2+da_for_edible_ghosts*1 if da_for_unscared_ghosts<1/6: res+=(da_for_foods*0.2+da_for_edible_ghosts*1)*5 # res*=random.uniform(0.9, 1.1) res*=100 global last_score res+=(currentGameState.getScore()-last_score)*10 # print(f"res:{res}") return res class MultiAgentSearchAgent(Agent): """ This class provides some common elements to all of your multi-agent searchers. Any methods defined here will be available to the MinimaxPacmanAgent, AlphaBetaPacmanAgent & ExpectimaxPacmanAgent. You *do not* need to make any changes here, but you can if you want to add functionality to all your adversarial search agents. Please do not remove anything, however. Note: this is an abstract class: one that should not be instantiated. It's only partially specified, and designed to be extended. Agent (game.py) is another abstract class. """ def __init__(self, evalFn = 'scoreEvaluationFunction', depth = '2'): self.index = 0 # Pacman is always agent index 0 self.evaluationFunction = betterEvaluationFunction self.depth = int(depth) last_score=0 class ExpectimaxAgent(MultiAgentSearchAgent): """ Your expectimax agent (question 4) """ def ExpectMaxSearch(self,gameState: GameState,depth_remain:int,agentIndex:int) -> tuple[int, list[Actions]]: if depth_remain==0: # print(f"depth_remain:{depth_remain}") # print(f"returning leaf {self.evaluationFunction(gameState)}, {[]}") return self.evaluationFunction(gameState),[] legal_actions = gameState.getLegalActions(agentIndex) if len(legal_actions)==0: # print(f"depth_remain:{depth_remain}") # print(f"returning leaf {self.evaluationFunction(gameState)}, {[]}") return self.evaluationFunction(gameState),[] kInf=1e100 res_action=[] res_val=0 if agentIndex==0: # Max res_val = -kInf for action in legal_actions: successorGameState = gameState.generateSuccessor(agentIndex,action) nxt_depth=depth_remain-1 if agentIndex==gameState.getNumAgents()-1 else depth_remain val,action_list=self.ExpectMaxSearch(successorGameState,nxt_depth,(agentIndex+1)%gameState.getNumAgents()) if action=="Stop": val-=100 if val>res_val: res_val=val # print(f"action:{action}, action_list:{action_list}") res_action=[action]+action_list else: # Mins res_val = kInf val_list=[] for action in legal_actions: successorGameState = gameState.generateSuccessor(agentIndex,action) nxt_depth=depth_remain-1 if agentIndex==gameState.getNumAgents()-1 else depth_remain val,action_list=self.ExpectMaxSearch(successorGameState,nxt_depth,(agentIndex+1)%gameState.getNumAgents()) val_list.append(val) if val 100: reward = 10 elif reward > 0 and reward < 10: reward = 2 elif reward == -1: reward = 0 elif reward < -100: reward = -10 return reward def compute_q_targets(self, minibatch, network = None, target_network=None, doubleQ=False): """Prepare minibatches Args: minibatch (List[Transition]): Minibatch of `Transition` Returns: float: Loss value """ if network is None: network = self.model if target_network is None: target_network = self.target_model states = np.vstack([x.state for x in minibatch]) states = torch.tensor(states, dtype=torch.double) actions = np.array([x.action for x in minibatch]) rewards = np.array([x.reward for x in minibatch]) next_states = np.vstack([x.next_state for x in minibatch]) next_states = torch.tensor(next_states) done = np.array([x.done for x in minibatch]) Q_predict = network.run(states).data.detach().cpu().numpy() Q_target = np.copy(Q_predict ) state_indices = states.int().detach().numpy() state_indices = (state_indices[:, 0], state_indices[:, 1]) exploration_bonus = 1 / (2 * np.sqrt((self.counts[state_indices] / 100))) replace_indices = np.arange(actions.shape[0]) action_indices = np.argmax(network.run(next_states).data.cpu(), axis=1) target = rewards + exploration_bonus + (1 - done) * self.discount * target_network.run(next_states).data[replace_indices, action_indices].detach().cpu().numpy() Q_target[replace_indices, actions] = target if self.td_error_clipping is not None: Q_target = Q_predict + np.clip( Q_target - Q_predict, -self.td_error_clipping, self.td_error_clipping) return Q_target def update(self, state, action, nextState, reward): legalActions = self.getLegalActions(state) action_index = legalActions.index(action) done = nextState.isLose() or nextState.isWin() reward = self.shape_reward(reward) if self.counts is None: x, y = np.array(state.getFood().data).shape self.counts = np.ones((x, y)) state = self.get_features(state) nextState = self.get_features(nextState) self.counts[int(state[0])][int(state[1])] += 1 transition = (state, action_index, reward, nextState, done) self.replay_memory.push(*transition) if len(self.replay_memory) < self.min_transitions_before_training: self.epsilon = self.epsilon_explore else: self.epsilon = max(self.epsilon0 * (1 - self.update_amount / 20000), self.minimal_epsilon) if len(self.replay_memory) > self.min_transitions_before_training and self.update_amount % self.update_frequency == 0: minibatch = self.replay_memory.pop(self.model.batch_size) states = np.vstack([x.state for x in minibatch]) states = torch.tensor(states.astype("float64"), dtype=torch.double) Q_target1 = self.compute_q_targets(minibatch, self.model, self.target_model, doubleQ=self.doubleQ) Q_target1 = torch.tensor(Q_target1.astype("float64"), dtype=torch.double) if self.doubleQ: Q_target2 = self.compute_q_targets(minibatch, self.target_model, self.model, doubleQ=self.doubleQ) Q_target2 = torch.tensor(Q_target2.astype("float64"), dtype=torch.double) self.model.gradient_update(states, Q_target1) if self.doubleQ: self.target_model.gradient_update(states, Q_target2) if self.target_update_rate > 0 and self.update_amount % self.target_update_rate == 0: self.target_model.set_weights(copy.deepcopy(self.model.parameters)) self.update_amount += 1 def final(self, state): """Called at the end of each game.""" PacmanQAgent.final(self, state)