train.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | import sys import gym import pylab import numpy as np import tensorflow as tf from tensorflow.keras.layers import Dense from tensorflow.keras.optimizers import Adam from tensorflow.keras.initializers import RandomUniform from tensorflow_probability import distributions as tfd # Á¤Ã¥ ½Å°æ¸Á°ú °¡Ä¡ ½Å°æ¸Á »ý¼º class ContinuousA2C(tf.keras.Model): def __init__(self, action_size): super(ContinuousA2C, self).__init__() self.actor_fc1 = Dense(24, activation='tanh') self.actor_mu = Dense(action_size, kernel_initializer=RandomUniform(-1e-3, 1e-3)) self.actor_sigma = Dense(action_size, activation='sigmoid', kernel_initializer=RandomUniform(-1e-3, 1e-3)) self.critic_fc1 = Dense(24, activation='tanh') self.critic_fc2 = Dense(24, activation='tanh') self.critic_out = Dense(1, kernel_initializer=RandomUniform(-1e-3, 1e-3)) def call(self, x): actor_x = self.actor_fc1(x) mu = self.actor_mu(actor_x) sigma = self.actor_sigma(actor_x) sigma = sigma + 1e-5 critic_x = self.critic_fc1(x) critic_x = self.critic_fc2(critic_x) value = self.critic_out(critic_x) return mu, sigma, value # Ä«Æ®Æú ¿¹Á¦¿¡¼ÀÇ ¿¬¼ÓÀû ¾×ÅÍ-Å©¸®Æ½(A2C) ¿¡ÀÌÀüÆ® class ContinuousA2CAgent: def __init__(self, action_size, max_action): self.render = False # ÇൿÀÇ Å©±â Á¤ÀÇ self.action_size = action_size self.max_action = max_action # ¾×ÅÍ-Å©¸®Æ½ ÇÏÀÌÆÛÆĶó¹ÌÅÍ self.discount_factor = 0.99 self.learning_rate = 0.001 # Á¤Ã¥½Å°æ¸Á°ú °¡Ä¡½Å°æ¸Á »ý¼º self.model = ContinuousA2C(self.action_size) # ÃÖÀûÈ ¾Ë°í¸®Áò ¼³Á¤, ¹ÌºÐ°ªÀÌ ³Ê¹« Ä¿Áö´Â Çö»óÀ» ¸·±â À§ÇØ clipnorm ¼³Á¤ self.optimizer = Adam(lr=self.learning_rate, clipnorm=1.0) # Á¤Ã¥½Å°æ¸ÁÀÇ Ãâ·ÂÀ» ¹Þ¾Æ È®·üÀûÀ¸·Î ÇൿÀ» ¼±Åà def get_action(self, state): mu, sigma, _ = self.model(state) dist = tfd.Normal(loc=mu[0], scale=sigma[0]) action = dist.sample([1])[0] action = np.clip(action, -self.max_action, self.max_action) return action # °¢ ŸÀÓ½ºÅܸ¶´Ù Á¤Ã¥½Å°æ¸Á°ú °¡Ä¡½Å°æ¸ÁÀ» ¾÷µ¥ÀÌÆ® def train_model(self, state, action, reward, next_state, done): model_params = self.model.trainable_variables with tf.GradientTape() as tape: mu, sigma, value = self.model(state) _, _, next_value = self.model(next_state) target = reward + (1 - done) * self.discount_factor * next_value[0] # Á¤Ã¥ ½Å°æ¸Á ¿À·ù ÇÔ¼ö ±¸Çϱâ advantage = tf.stop_gradient(target - value[0]) dist = tfd.Normal(loc=mu, scale=sigma) action_prob = dist.prob([action])[0] cross_entropy = - tf.math.log(action_prob + 1e-5) actor_loss = tf.reduce_mean(cross_entropy * advantage) # °¡Ä¡ ½Å°æ¸Á ¿À·ù ÇÔ¼ö ±¸Çϱâ critic_loss = 0.5 * tf.square(tf.stop_gradient(target) - value[0]) critic_loss = tf.reduce_mean(critic_loss) # ÇϳªÀÇ ¿À·ù ÇÔ¼ö·Î ¸¸µé±â loss = 0.1 * actor_loss + critic_loss # ¿À·ùÇÔ¼ö¸¦ ÁÙÀÌ´Â ¹æÇâÀ¸·Î ¸ðµ¨ ¾÷µ¥ÀÌÆ® grads = tape.gradient(loss, model_params) self.optimizer.apply_gradients(zip(grads, model_params)) return loss, sigma if __name__ == "__main__": # CartPole-v1 ȯ°æ, ÃÖ´ë ŸÀÓ½ºÅÜ ¼ö°¡ 500 gym.envs.register( id='CartPoleContinuous-v0', entry_point='env:ContinuousCartPoleEnv', max_episode_steps=500, reward_threshold=475.0) env = gym.make('CartPoleContinuous-v0') # ȯ°æÀ¸·ÎºÎÅÍ »óÅÂ¿Í ÇൿÀÇ Å©±â¸¦ ¹Þ¾Æ¿È state_size = env.observation_space.shape[0] action_size = env.action_space.shape[0] max_action = env.action_space.high[0] # ¾×ÅÍ-Å©¸®Æ½(A2C) ¿¡ÀÌÀüÆ® »ý¼º agent = ContinuousA2CAgent(action_size, max_action) scores, episodes = [], [] score_avg = 0 num_episode = 1000 for e in range(num_episode): done = False score = 0 loss_list, sigma_list = [], [] state = env.reset() state = np.reshape(state, [1, state_size]) while not done: if agent.render: env.render() action = agent.get_action(state) next_state, reward, done, info = env.step(action) next_state = np.reshape(next_state, [1, state_size]) # ŸÀÓ½ºÅܸ¶´Ù º¸»ó 0.1, ¿¡ÇǼҵ尡 Áß°£¿¡ ³¡³ª¸é -1 º¸»ó score += reward reward = 0.1 if not done or score == 500 else -1 # ¸Å ŸÀÓ½ºÅܸ¶´Ù ÇнÀ loss, sigma = agent.train_model(state, action, reward, next_state, done) loss_list.append(loss) sigma_list.append(sigma) state = next_state if done: # ¿¡ÇǼҵ帶´Ù ÇнÀ °á°ú Ãâ·Â score_avg = 0.9 * score_avg + 0.1 * score if score_avg != 0 else score print("episode: {:3d} | score avg: {:3.2f} | loss: {:.3f} | sigma: {:.3f}".format( e, score_avg, np.mean(loss_list), np.mean(sigma))) scores.append(score_avg) episodes.append(e) pylab.plot(episodes, scores, 'b') pylab.xlabel("episode") pylab.ylabel("average score") pylab.savefig("./save_graph/graph.png") # À̵¿ Æò±ÕÀÌ 400 ÀÌ»óÀÏ ¶§ Á¾·á if score_avg > 400: agent.model.save_weights("./save_model/model", save_format="tf") sys.exit() | cs |
test.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 | import sys import gym import pylab import numpy as np import tensorflow as tf from tensorflow.keras.layers import Dense from tensorflow.keras.initializers import RandomUniform from tensorflow_probability import distributions as tfd # Á¤Ã¥ ½Å°æ¸Á°ú °¡Ä¡ ½Å°æ¸Á »ý¼º class ContinuousA2C(tf.keras.Model): def __init__(self, action_size): super(ContinuousA2C, self).__init__() self.actor_fc1 = Dense(24, activation='tanh') self.actor_mu = Dense(action_size, kernel_initializer=RandomUniform(-1e-3, 1e-3)) self.actor_sigma = Dense(action_size, activation='softplus', kernel_initializer=RandomUniform(-1e-3, 1e-3)) self.critic_fc1 = Dense(24, activation='tanh') self.critic_fc2 = Dense(24, activation='tanh') self.critic_out = Dense(1, kernel_initializer=RandomUniform(-1e-3, 1e-3)) def call(self, x): actor_x = self.actor_fc1(x) mu = self.actor_mu(actor_x) sigma = self.actor_sigma(actor_x) sigma = sigma + 1e-5 critic_x = self.critic_fc1(x) critic_x = self.critic_fc2(critic_x) value = self.critic_out(critic_x) return mu, sigma, value # Ä«Æ®Æú ¿¹Á¦¿¡¼ÀÇ ¾×ÅÍ-Å©¸®Æ½(A2C) ¿¡ÀÌÀüÆ® class ContinuousA2CAgent: def __init__(self, action_size, max_action): # ÇൿÀÇ Å©±â Á¤ÀÇ self.action_size = action_size self.max_action = max_action # Á¤Ã¥½Å°æ¸Á°ú °¡Ä¡½Å°æ¸Á »ý¼º self.model = ContinuousA2C(self.action_size) self.model.load_weights("./save_model/trained/model") # Á¤Ã¥½Å°æ¸ÁÀÇ Ãâ·ÂÀ» ¹Þ¾Æ È®·üÀûÀ¸·Î ÇൿÀ» ¼±Åà def get_action(self, state): mu, sigma, _ = self.model(state) dist = tfd.Normal(loc=mu[0], scale=sigma[0]) action = dist.sample([1])[0] action = np.clip(action, -self.max_action, self.max_action) return action if __name__ == "__main__": # CartPole-v1 ȯ°æ, ÃÖ´ë ŸÀÓ½ºÅÜ ¼ö°¡ 500 gym.envs.register( id='CartPoleContinuous-v0', entry_point='env:ContinuousCartPoleEnv', max_episode_steps=500, reward_threshold=475.0) env = gym.make('CartPoleContinuous-v0') # ȯ°æÀ¸·ÎºÎÅÍ »óÅÂ¿Í ÇൿÀÇ Å©±â¸¦ ¹Þ¾Æ¿È state_size = env.observation_space.shape[0] action_size = env.action_space.shape[0] max_action = env.action_space.high[0] # ¾×ÅÍ-Å©¸®Æ½(A2C) ¿¡ÀÌÀüÆ® »ý¼º agent = ContinuousA2CAgent(action_size, max_action) scores, episodes = [], [] num_episode = 10 for e in range(num_episode): done = False score = 0 state = env.reset() state = np.reshape(state, [1, state_size]) while not done: env.render() action = agent.get_action(state) next_state, reward, done, info = env.step(action) next_state = np.reshape(next_state, [1, state_size]) score += reward state = next_state if done: print("episode: {:3d} | score: {:3d}".format(e, int(score))) | cs |
env.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 | """ Classic cart-pole system implemented by Rich Sutton et al. Copied from http://incompleteideas.net/sutton/book/code/pole.c permalink: https://perma.cc/C9ZM-652R """ import math import gym from gym import spaces, logger from gym.utils import seeding import numpy as np class ContinuousCartPoleEnv(gym.Env): metadata = { 'render.modes': ['human', 'rgb_array'], 'video.frames_per_second': 50 } def __init__(self): self.gravity = 9.8 self.masscart = 1.0 self.masspole = 0.1 self.total_mass = (self.masspole + self.masscart) self.length = 0.5 # actually half the pole's length self.polemass_length = (self.masspole * self.length) self.force_mag = 10.0 self.tau = 0.02 # seconds between state updates self.max_action = 3.0 self.kinematics_integrator = 'euler' # Angle at which to fail the episode self.theta_threshold_radians = 12 * 2 * math.pi / 360 self.x_threshold = 2.4 # Angle limit set to 2 * theta_threshold_radians so failing observation # is still within bounds high = np.array([ self.x_threshold * 2, np.finfo(np.float32).max, self.theta_threshold_radians * 2, np.finfo(np.float32).max]) self.action_space = spaces.Box( low=-self.max_action, high=self.max_action, shape=(1,) ) self.observation_space = spaces.Box(-high, high) self.seed() self.viewer = None self.state = None self.steps_beyond_done = None def seed(self, seed=None): self.np_random, seed = seeding.np_random(seed) return [seed] def step(self, action): assert self.action_space.contains(action), "%r (%s) invalid"%(action, type(action)) state = self.state x, x_dot, theta, theta_dot = state force = self.force_mag * float(action) costheta = math.cos(theta) sintheta = math.sin(theta) temp = (force + self.polemass_length * theta_dot * theta_dot * sintheta) / self.total_mass thetaacc = (self.gravity * sintheta - costheta* temp) / (self.length * (4.0/3.0 - self.masspole * costheta * costheta / self.total_mass)) xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass if self.kinematics_integrator == 'euler': x = x + self.tau * x_dot x_dot = x_dot + self.tau * xacc theta = theta + self.tau * theta_dot theta_dot = theta_dot + self.tau * thetaacc else: # semi-implicit euler x_dot = x_dot + self.tau * xacc x = x + self.tau * x_dot theta_dot = theta_dot + self.tau * thetaacc theta = theta + self.tau * theta_dot self.state = (x,x_dot,theta,theta_dot) done = x < -self.x_threshold or x > self.x_threshold or theta < -self.theta_threshold_radians or theta > self.theta_threshold_radians done = bool(done) if not done: reward = 1.0 elif self.steps_beyond_done is None: # Pole just fell! self.steps_beyond_done = 0 reward = 1.0 else: if self.steps_beyond_done == 0: logger.warn("You are calling 'step()' even though this environment has already returned done = True. You should always call 'reset()' once you receive 'done = True' -- any further steps are undefined behavior.") self.steps_beyond_done += 1 reward = 0.0 return np.array(self.state, dtype=np.float32), reward, done, {} def reset(self): self.state = self.np_random.uniform(low=-0.05, high=0.05, size=(4,)) self.steps_beyond_done = None return np.array(self.state, dtype=np.float32) def render(self, mode='human'): screen_width = 600 screen_height = 400 world_width = self.x_threshold * 2 scale = screen_width /world_width carty = 100 # TOP OF CART polewidth = 10.0 polelen = scale * 1.0 cartwidth = 50.0 cartheight = 30.0 if self.viewer is None: from gym.envs.classic_control import rendering self.viewer = rendering.Viewer(screen_width, screen_height) l, r, t, b = -cartwidth / 2, cartwidth / 2, cartheight / 2, -cartheight / 2 axleoffset = cartheight / 4.0 cart = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)]) self.carttrans = rendering.Transform() cart.add_attr(self.carttrans) self.viewer.add_geom(cart) l, r, t, b = -polewidth / 2, polewidth / 2, polelen-polewidth / 2, -polewidth / 2 pole = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)]) pole.set_color(.8, .6, .4) self.poletrans = rendering.Transform(translation=(0, axleoffset)) pole.add_attr(self.poletrans) pole.add_attr(self.carttrans) self.viewer.add_geom(pole) self.axle = rendering.make_circle(polewidth / 2) self.axle.add_attr(self.poletrans) self.axle.add_attr(self.carttrans) self.axle.set_color(.5, .5, .8) self.viewer.add_geom(self.axle) self.track = rendering.Line((0, carty), (screen_width, carty)) self.track.set_color(0, 0, 0) self.viewer.add_geom(self.track) self._pole_geom = pole if self.state is None: return None # Edit the pole polygon vertex pole = self._pole_geom l,r,t,b = -polewidth/2,polewidth/2,polelen-polewidth/2,-polewidth/2 pole.v = [(l,b), (l,t), (r,t), (r,b)] x = self.state cartx = x[0] * scale + screen_width / 2.0 # MIDDLE OF CART self.carttrans.set_translation(cartx, carty) self.poletrans.set_rotation(-x[2]) return self.viewer.render(return_rgb_array=(mode == 'rgb_array')) def close(self): if self.viewer: self.viewer.close() self.viewer = None | cs |