train.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | import sys import gym import pylab import numpy as np import tensorflow as tf from tensorflow.keras.layers import Dense from tensorflow.keras.optimizers import Adam from tensorflow.keras.initializers import RandomUniform # Á¤Ã¥ ½Å°æ¸Á°ú °¡Ä¡ ½Å°æ¸Á »ý¼º class A2C(tf.keras.Model): def __init__(self, action_size): super(A2C, self).__init__() self.actor_fc = Dense(24, activation='tanh') self.actor_out = Dense(action_size, activation='softmax', kernel_initializer=RandomUniform(-1e-3, 1e-3)) self.critic_fc1 = Dense(24, activation='tanh') self.critic_fc2 = Dense(24, activation='tanh') self.critic_out = Dense(1, kernel_initializer=RandomUniform(-1e-3, 1e-3)) def call(self, x): actor_x = self.actor_fc(x) policy = self.actor_out(actor_x) critic_x = self.critic_fc1(x) critic_x = self.critic_fc2(critic_x) value = self.critic_out(critic_x) return policy, value # Ä«Æ®Æú ¿¹Á¦¿¡¼ÀÇ ¾×ÅÍ-Å©¸®Æ½(A2C) ¿¡ÀÌÀüÆ® class A2CAgent: def __init__(self, action_size): self.render = False # ÇൿÀÇ Å©±â Á¤ÀÇ self.action_size = action_size # ¾×ÅÍ-Å©¸®Æ½ ÇÏÀÌÆÛÆĶó¹ÌÅÍ self.discount_factor = 0.99 self.learning_rate = 0.001 # Á¤Ã¥½Å°æ¸Á°ú °¡Ä¡½Å°æ¸Á »ý¼º self.model = A2C(self.action_size) # ÃÖÀûÈ ¾Ë°í¸®Áò ¼³Á¤, ¹ÌºÐ°ªÀÌ ³Ê¹« Ä¿Áö´Â Çö»óÀ» ¸·±â À§ÇØ clipnorm ¼³Á¤ self.optimizer = Adam(lr=self.learning_rate, clipnorm=5.0) # Á¤Ã¥½Å°æ¸ÁÀÇ Ãâ·ÂÀ» ¹Þ¾Æ È®·üÀûÀ¸·Î ÇൿÀ» ¼±Åà def get_action(self, state): policy, _ = self.model(state) policy = np.array(policy[0]) return np.random.choice(self.action_size, 1, p=policy)[0] # °¢ ŸÀÓ½ºÅܸ¶´Ù Á¤Ã¥½Å°æ¸Á°ú °¡Ä¡½Å°æ¸ÁÀ» ¾÷µ¥ÀÌÆ® def train_model(self, state, action, reward, next_state, done): model_params = self.model.trainable_variables with tf.GradientTape() as tape: policy, value = self.model(state) _, next_value = self.model(next_state) target = reward + (1 - done) * self.discount_factor * next_value[0] # Á¤Ã¥ ½Å°æ¸Á ¿À·ù ÇÔ¼ö ±¸Çϱâ one_hot_action = tf.one_hot([action], self.action_size) action_prob = tf.reduce_sum(one_hot_action * policy, axis=1) cross_entropy = - tf.math.log(action_prob + 1e-5) advantage = tf.stop_gradient(target - value[0]) actor_loss = tf.reduce_mean(cross_entropy * advantage) # °¡Ä¡ ½Å°æ¸Á ¿À·ù ÇÔ¼ö ±¸Çϱâ critic_loss = 0.5 * tf.square(tf.stop_gradient(target) - value[0]) critic_loss = tf.reduce_mean(critic_loss) # ÇϳªÀÇ ¿À·ù ÇÔ¼ö·Î ¸¸µé±â loss = 0.2 * actor_loss + critic_loss # ¿À·ùÇÔ¼ö¸¦ ÁÙÀÌ´Â ¹æÇâÀ¸·Î ¸ðµ¨ ¾÷µ¥ÀÌÆ® grads = tape.gradient(loss, model_params) self.optimizer.apply_gradients(zip(grads, model_params)) return np.array(loss) if __name__ == "__main__": # CartPole-v1 ȯ°æ, ÃÖ´ë ŸÀÓ½ºÅÜ ¼ö°¡ 500 env = gym.make('CartPole-v1') # ȯ°æÀ¸·ÎºÎÅÍ »óÅÂ¿Í ÇൿÀÇ Å©±â¸¦ ¹Þ¾Æ¿È state_size = env.observation_space.shape[0] action_size = env.action_space.n # ¾×ÅÍ-Å©¸®Æ½(A2C) ¿¡ÀÌÀüÆ® »ý¼º agent = A2CAgent(action_size) scores, episodes = [], [] score_avg = 0 num_episode = 1000 for e in range(num_episode): done = False score = 0 loss_list = [] state = env.reset() state = np.reshape(state, [1, state_size]) while not done: if agent.render: env.render() action = agent.get_action(state) next_state, reward, done, info = env.step(action) next_state = np.reshape(next_state, [1, state_size]) # ŸÀÓ½ºÅܸ¶´Ù º¸»ó 0.1, ¿¡ÇǼҵ尡 Áß°£¿¡ ³¡³ª¸é -1 º¸»ó score += reward reward = 0.1 if not done or score == 500 else -1 # ¸Å ŸÀÓ½ºÅܸ¶´Ù ÇнÀ loss = agent.train_model(state, action, reward, next_state, done) loss_list.append(loss) state = next_state if done: # ¿¡ÇǼҵ帶´Ù ÇнÀ °á°ú Ãâ·Â score_avg = 0.9 * score_avg + 0.1 * score if score_avg != 0 else score print("episode: {:3d} | score avg: {:3.2f} | loss: {:.3f}".format( e, score_avg, np.mean(loss_list))) # ¿¡ÇǼҵ帶´Ù ÇнÀ °á°ú ±×·¡ÇÁ·Î ÀúÀå scores.append(score_avg) episodes.append(e) pylab.plot(episodes, scores, 'b') pylab.xlabel("episode") pylab.ylabel("average score") pylab.savefig("./save_graph/graph.png") # À̵¿ Æò±ÕÀÌ 400 ÀÌ»óÀÏ ¶§ Á¾·á if score_avg > 400: agent.model.save_weights("./save_model/model", save_format="tf") sys.exit() | cs |
test.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | import sys import gym import pylab import numpy as np import tensorflow as tf from tensorflow.keras.layers import Dense from tensorflow.keras.initializers import RandomUniform # Á¤Ã¥ ½Å°æ¸Á°ú °¡Ä¡ ½Å°æ¸Á »ý¼º class A2C(tf.keras.Model): def __init__(self, action_size): super(A2C, self).__init__() self.actor_fc = Dense(24, activation='tanh') self.actor_out = Dense(action_size, activation='softmax', kernel_initializer=RandomUniform(-1e-3, 1e-3)) self.critic_fc1 = Dense(24, activation='tanh') self.critic_fc2 = Dense(24, activation='tanh') self.critic_out = Dense(1, kernel_initializer=RandomUniform(-1e-3, 1e-3)) def call(self, x): actor_x = self.actor_fc(x) policy = self.actor_out(actor_x) critic_x = self.critic_fc1(x) critic_x = self.critic_fc2(critic_x) value = self.critic_out(critic_x) return policy, value # Ä«Æ®Æú ¿¹Á¦¿¡¼ÀÇ ¾×ÅÍ-Å©¸®Æ½(A2C) ¿¡ÀÌÀüÆ® class A2CAgent: def __init__(self, action_size): # ÇൿÀÇ Å©±â Á¤ÀÇ self.action_size = action_size # Á¤Ã¥½Å°æ¸Á°ú °¡Ä¡½Å°æ¸Á »ý¼º self.model = A2C(self.action_size) self.model.load_weights("./save_model/trained/model") # Á¤Ã¥½Å°æ¸ÁÀÇ Ãâ·ÂÀ» ¹Þ¾Æ È®·üÀûÀ¸·Î ÇൿÀ» ¼±Åà def get_action(self, state): policy, _ = self.model(state) policy = np.array(policy[0]) return np.random.choice(self.action_size, 1, p=policy)[0] if __name__ == "__main__": # CartPole-v1 ȯ°æ, ÃÖ´ë ŸÀÓ½ºÅÜ ¼ö°¡ 500 env = gym.make('CartPole-v1') # ȯ°æÀ¸·ÎºÎÅÍ »óÅÂ¿Í ÇൿÀÇ Å©±â¸¦ ¹Þ¾Æ¿È state_size = env.observation_space.shape[0] action_size = env.action_space.n # ¾×ÅÍ-Å©¸®Æ½(A2C) ¿¡ÀÌÀüÆ® »ý¼º agent = A2CAgent(action_size) num_episode = 10 for e in range(num_episode): done = False score = 0 state = env.reset() state = np.reshape(state, [1, state_size]) while not done: env.render() action = agent.get_action(state) next_state, reward, done, info = env.step(action) next_state = np.reshape(next_state, [1, state_size]) score += reward state = next_state if done: print("episode: {:3d} | score: {:3d}".format(e, int(score))) | cs |