environment.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 | # AI for Games - Beat the Snake game # Building the Environment # Importing the libraries import numpy as np import pygame as pg # Initializing the Environment class class Environment(): def __init__(self, waitTime): # Defining the parameters self.width = 880 # width of the game window self.height = 880 # height of the game window self.nRows = 10 # number of rows in our board self.nColumns = 10 # number of columns in our board self.initSnakeLen = 2 # initial length of the snake self.defReward = -0.03 # reward for taking an action - The Living Penalty self.negReward = -1. # reward for dying self.posReward = 2. # reward for collecting an apple self.waitTime = waitTime # slowdown after taking an action if self.initSnakeLen > self.nRows / 2: self.initSnakeLen = int(self.nRows / 2) self.screen = pg.display.set_mode((self.width, self.height)) self.snakePos = list() # Creating the array that contains mathematical representation of the game's board self.screenMap = np.zeros((self.nRows, self.nColumns)) for i in range(self.initSnakeLen): self.snakePos.append((int(self.nRows / 2) + i, int(self.nColumns / 2))) self.screenMap[int(self.nRows / 2) + i][int(self.nColumns / 2)] = 0.5 self.applePos = self.placeApple() self.drawScreen() self.collected = False self.lastMove = 0 # Building a method that gets new, random position of an apple def placeApple(self): posx = np.random.randint(0, self.nColumns) posy = np.random.randint(0, self.nRows) while self.screenMap[posy][posx] == 0.5: posx = np.random.randint(0, self.nColumns) posy = np.random.randint(0, self.nRows) self.screenMap[posy][posx] = 1 return (posy, posx) # Making a function that draws everything for us to see def drawScreen(self): self.screen.fill((0, 0, 0)) cellWidth = self.width / self.nColumns cellHeight = self.height / self.nRows for i in range(self.nRows): for j in range(self.nColumns): if self.screenMap[i][j] == 0.5: pg.draw.rect(self.screen, (255, 255, 255), (j*cellWidth + 1, i*cellHeight + 1, cellWidth - 2, cellHeight - 2)) elif self.screenMap[i][j] == 1: pg.draw.rect(self.screen, (255, 0, 0), (j*cellWidth + 1, i*cellHeight + 1, cellWidth - 2, cellHeight - 2)) pg.display.flip() # A method that updates the snake's position def moveSnake(self, nextPos, col): self.snakePos.insert(0, nextPos) if not col: self.snakePos.pop(len(self.snakePos) - 1) self.screenMap = np.zeros((self.nRows, self.nColumns)) for i in range(len(self.snakePos)): self.screenMap[self.snakePos[i][0]][self.snakePos[i][1]] = 0.5 if col: self.applePos = self.placeApple() self.collected = True self.screenMap[self.applePos[0]][self.applePos[1]] = 1 # The main method that updates the environment def step(self, action): # action = 0 -> up # action = 1 -> down # action = 2 -> right # action = 3 -> left # Resetting these parameters and setting the reward to the living penalty gameOver = False reward = self.defReward self.collected = False for event in pg.event.get(): if event.type == pg.QUIT: return snakeX = self.snakePos[0][1] snakeY = self.snakePos[0][0] # Checking if an action is playable and if not then it is changed to the playable one if action == 1 and self.lastMove == 0: action = 0 if action == 0 and self.lastMove == 1: action = 1 if action == 3 and self.lastMove == 2: action = 2 if action == 2 and self.lastMove == 3: action = 3 # Checking what happens when we take this action if action == 0: if snakeY > 0: if self.screenMap[snakeY - 1][snakeX] == 0.5: gameOver = True reward = self.negReward elif self.screenMap[snakeY - 1][snakeX] == 1: reward = self.posReward self.moveSnake((snakeY - 1, snakeX), True) elif self.screenMap[snakeY - 1][snakeX] == 0: self.moveSnake((snakeY - 1, snakeX), False) else: gameOver = True reward = self.negReward elif action == 1: if snakeY < self.nRows - 1: if self.screenMap[snakeY + 1][snakeX] == 0.5: gameOver = True reward = self.negReward elif self.screenMap[snakeY + 1][snakeX] == 1: reward = self.posReward self.moveSnake((snakeY + 1, snakeX), True) elif self.screenMap[snakeY + 1][snakeX] == 0: self.moveSnake((snakeY + 1, snakeX), False) else: gameOver = True reward = self.negReward elif action == 2: if snakeX < self.nColumns - 1: if self.screenMap[snakeY][snakeX + 1] == 0.5: gameOver = True reward = self.negReward elif self.screenMap[snakeY][snakeX + 1] == 1: reward = self.posReward self.moveSnake((snakeY, snakeX + 1), True) elif self.screenMap[snakeY][snakeX + 1] == 0: self.moveSnake((snakeY, snakeX + 1), False) else: gameOver = True reward = self.negReward elif action == 3: if snakeX > 0: if self.screenMap[snakeY][snakeX - 1] == 0.5: gameOver = True reward = self.negReward elif self.screenMap[snakeY][snakeX - 1] == 1: reward = self.posReward self.moveSnake((snakeY, snakeX - 1), True) elif self.screenMap[snakeY][snakeX - 1] == 0: self.moveSnake((snakeY, snakeX - 1), False) else: gameOver = True reward = self.negReward # Drawing the screen, updating last move and waiting the wait time specified self.drawScreen() self.lastMove = action pg.time.wait(self.waitTime) # Returning the new frame of the game, the reward obtained and whether the game has ended or not return self.screenMap, reward, gameOver # Making a function that resets the environment def reset(self): self.screenMap = np.zeros((self.nRows, self.nColumns)) self.snakePos = list() for i in range(self.initSnakeLen): self.snakePos.append((int(self.nRows / 2) + i, int(self.nColumns / 2))) self.screenMap[int(self.nRows / 2) + i][int(self.nColumns / 2)] = 0.5 self.screenMap[self.applePos[0]][self.applePos[1]] = 1 self.lastMove = 0 # Additional code, actually not mentioned in the book, simply enables you to play the game on your own if you run this "environment.py" file. # We don't really need it, that's why it was not mentioned. if __name__ == '__main__': env = Environment(100) start = False direction = 0 gameOver = False reward = 0 while True: state = env.screenMap pos = env.snakePos for event in pg.event.get(): if event.type == pg.QUIT: gameOver = True if event.type == pg.KEYDOWN: if event.key == pg.K_SPACE and not start: start = True if event.key == pg.K_UP and direction != 1: direction = 0 elif event.key == pg.K_RIGHT and direction != 3: direction = 2 elif event.key == pg.K_LEFT and direction != 2: direction = 3 elif event.key == pg.K_DOWN and direction != 0: direction = 1 if start: _, reward, gameOver = env.step(direction) if gameOver: env.reset() direction = 0 | cs |
brain.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | # AI for Games - Beat the Snake game # Building the Brain # Importing the libraries import keras from keras.models import Sequential, load_model from keras.layers import Dense, Dropout, Conv2D, MaxPooling2D, Flatten from keras.optimizers import Adam # Creating the Brain class class Brain(): def __init__(self, iS = (100,100,3), lr = 0.0005): self.learningRate = lr self.inputShape = iS self.numOutputs = 4 self.model = Sequential() # Adding layers to the model self.model.add(Conv2D(32, (3,3), activation = 'relu', input_shape = self.inputShape)) self.model.add(MaxPooling2D((2,2))) self.model.add(Conv2D(64, (2,2), activation = 'relu')) self.model.add(Flatten()) self.model.add(Dense(units = 256, activation = 'relu')) self.model.add(Dense(units = self.numOutputs)) # Compiling the model self.model.compile(loss = 'mean_squared_error', optimizer = Adam(lr = self.learningRate)) # Making a function that will load a model from a file def loadModel(self, filepath): self.model = load_model(filepath) return self.model | cs |
DQN.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | # AI for Games - Beat the Snake game # Implementing Deep Q-Learning with Experience Replay # Importing the libraries import numpy as np # IMPLEMENTING DEEP Q-LEARNING WITH EXPERIENCE REPLAY class Dqn(object): # INTRODUCING AND INITIALIZING ALL THE PARAMETERS AND VARIABLES OF THE DQN def __init__(self, max_memory = 100, discount = 0.9): self.memory = list() self.max_memory = max_memory self.discount = discount # MAKING A METHOD THAT BUILDS THE MEMORY IN EXPERIENCE REPLAY def remember(self, transition, game_over): self.memory.append([transition, game_over]) if len(self.memory) > self.max_memory: del self.memory[0] # MAKING A METHOD THAT BUILDS TWO BATCHES OF INPUTS AND TARGETS BY EXTRACTING TRANSITIONS FROM THE MEMORY def get_batch(self, model, batch_size = 10): len_memory = len(self.memory) num_outputs = model.output_shape[-1] # Modifying the inputs batch to work with 3D states inputs = np.zeros((min(len_memory, batch_size), self.memory[0][0][0].shape[1],self.memory[0][0][0].shape[2],self.memory[0][0][0].shape[3])) targets = np.zeros((min(len_memory, batch_size), num_outputs)) for i, idx in enumerate(np.random.randint(0, len_memory, size = min(len_memory, batch_size))): current_state, action, reward, next_state = self.memory[idx][0] game_over = self.memory[idx][1] inputs[i] = current_state targets[i] = model.predict(current_state)[0] Q_sa = np.max(model.predict(next_state)[0]) if game_over: targets[i, action] = reward else: targets[i, action] = reward + self.discount * Q_sa return inputs, targets | cs |
train.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | # AI for Games - Beat the Snake game # Training the AI # Importing the libraries from environment import Environment from brain import Brain from DQN import Dqn import numpy as np import matplotlib.pyplot as plt # Defining the parameters memSize = 60000 batchSize = 32 learningRate = 0.0001 gamma = 0.9 nLastStates = 4 epsilon = 1. epsilonDecayRate = 0.0002 minEpsilon = 0.05 filepathToSave = 'model2.h5' # Creating the Environment, the Brain and the Experience Replay Memory env = Environment(0) brain = Brain((env.nRows, env.nColumns, nLastStates), learningRate) model = brain.model dqn = Dqn(memSize, gamma) # Making a function that will initialize game states def resetStates(): currentState = np.zeros((1, env.nRows, env.nColumns, nLastStates)) for i in range(nLastStates): currentState[:,:,:,i] = env.screenMap return currentState, currentState # Starting the main loop epoch = 0 scores = list() maxNCollected = 0 nCollected = 0. totNCollected = 0 while True: # Resetting the environment and game states env.reset() currentState, nextState = resetStates() epoch += 1 gameOver = False # Starting the second loop in which we play the game and teach our AI while not gameOver: # Choosing an action to play if np.random.rand() < epsilon: action = np.random.randint(0, 4) else: qvalues = model.predict(currentState)[0] action = np.argmax(qvalues) # Updating the environment state, reward, gameOver = env.step(action) # Adding new game frame to the next state and deleting the oldest frame from next state state = np.reshape(state, (1, env.nRows, env.nColumns, 1)) nextState = np.append(nextState, state, axis = 3) nextState = np.delete(nextState, 0, axis = 3) # Remembering the transition and training our AI dqn.remember([currentState, action, reward, nextState], gameOver) inputs, targets = dqn.get_batch(model, batchSize) model.train_on_batch(inputs, targets) # Checking whether we have collected an apple and updating the current state if env.collected: nCollected += 1 currentState = nextState # Checking if a record of apples eaten in a around was beaten and if yes then saving the model if nCollected > maxNCollected and nCollected > 2: maxNCollected = nCollected model.save(filepathToSave) totNCollected += nCollected nCollected = 0 # Showing the results each 100 games if epoch % 100 == 0 and epoch != 0: scores.append(totNCollected / 100) totNCollected = 0 plt.plot(scores) plt.xlabel('Epoch / 100') plt.ylabel('Average Score') plt.savefig('stats.png') plt.close() # Lowering the epsilon if epsilon > minEpsilon: epsilon -= epsilonDecayRate # Showing the results each game print('Epoch: ' + str(epoch) + ' Current Best: ' + str(maxNCollected) + ' Epsilon: {:.5f}'.format(epsilon)) | cs |
test.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | # AI for Games - Beat the Snake game # Testing the AI # Importing the libraries from environment import Environment from brain import Brain import numpy as np # Defining the parameters nLastStates = 4 filepathToOpen = 'model.h5' slowdown = 75 # Creating the Environment and the Brain env = Environment(slowdown) brain = Brain((env.nRows, env.nColumns, nLastStates)) model = brain.loadModel(filepathToOpen) # Making a function that will reset game states def resetStates(): currentState = np.zeros((1, env.nRows, env.nColumns, nLastStates)) for i in range(nLastStates): currentState[:,:,:,i] = env.screenMap return currentState, currentState # Starting the main loop while True: # Resetting the game and the game states env.reset() currentState, nextState = resetStates() gameOver = False # Playing the game while not gameOver: # Choosing an action to play qvalues = model.predict(currentState)[0] action = np.argmax(qvalues) # Updating the environment state, _, gameOver = env.step(action) # Adding new game frame to next state and deleting the oldest one from next state state = np.reshape(state, (1, env.nRows, env.nColumns, 1)) nextState = np.append(nextState, state, axis = 3) nextState = np.delete(nextState, 0, axis = 3) # Updating current state currentState = nextState | cs |