Á¤¼ºÈÆ
    9Àå) ¿¬¼ÓÀû ¾×ÅÍ-Å©¸®Æ½ ½Ç½À (ÆÄÀ̽ã°ú Äɶ󽺷Π¹è¿ì´Â °­È­ÇнÀ 6Àå)



train.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import sys
import gym
import pylab
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.initializers import RandomUniform
from tensorflow_probability import distributions as tfd
 
 
# Á¤Ã¥ ½Å°æ¸Á°ú °¡Ä¡ ½Å°æ¸Á »ý¼º
class ContinuousA2C(tf.keras.Model):
    def __init__(self, action_size):
        super(ContinuousA2C, self).__init__()
        self.actor_fc1 = Dense(24, activation='tanh')
        self.actor_mu = Dense(action_size,
                              kernel_initializer=RandomUniform(-1e-3, 1e-3))
        self.actor_sigma = Dense(action_size, activation='sigmoid',
                                 kernel_initializer=RandomUniform(-1e-3, 1e-3))
 
        self.critic_fc1 = Dense(24, activation='tanh')
        self.critic_fc2 = Dense(24, activation='tanh')
        self.critic_out = Dense(1,
                                kernel_initializer=RandomUniform(-1e-3, 1e-3))
 
    def call(self, x):
        actor_x = self.actor_fc1(x)
        mu = self.actor_mu(actor_x)
        sigma = self.actor_sigma(actor_x)
        sigma = sigma + 1e-5
 
        critic_x = self.critic_fc1(x)
        critic_x = self.critic_fc2(critic_x)
        value = self.critic_out(critic_x)
        return mu, sigma, value
 
 
# Ä«Æ®Æú ¿¹Á¦¿¡¼­ÀÇ ¿¬¼ÓÀû ¾×ÅÍ-Å©¸®Æ½(A2C) ¿¡ÀÌÀüÆ®
class ContinuousA2CAgent:
    def __init__(self, action_size, max_action):
        self.render = False
 
        # ÇൿÀÇ Å©±â Á¤ÀÇ
        self.action_size = action_size
        self.max_action = max_action
 
        # ¾×ÅÍ-Å©¸®Æ½ ÇÏÀÌÆÛÆĶó¹ÌÅÍ
        self.discount_factor = 0.99
        self.learning_rate = 0.001
 
        # Á¤Ã¥½Å°æ¸Á°ú °¡Ä¡½Å°æ¸Á »ý¼º
        self.model = ContinuousA2C(self.action_size)
        # ÃÖÀûÈ­ ¾Ë°í¸®Áò ¼³Á¤, ¹ÌºÐ°ªÀÌ ³Ê¹« Ä¿Áö´Â Çö»óÀ» ¸·±â À§ÇØ clipnorm ¼³Á¤
        self.optimizer = Adam(lr=self.learning_rate, clipnorm=1.0)
 
    # Á¤Ã¥½Å°æ¸ÁÀÇ Ãâ·ÂÀ» ¹Þ¾Æ È®·üÀûÀ¸·Î ÇൿÀ» ¼±ÅÃ
    def get_action(self, state):
        mu, sigma, _ = self.model(state)
        dist = tfd.Normal(loc=mu[0], scale=sigma[0])
        action = dist.sample([1])[0]
        action = np.clip(action, -self.max_action, self.max_action)
        return action
 
    # °¢ Å¸ÀÓ½ºÅܸ¶´Ù Á¤Ã¥½Å°æ¸Á°ú °¡Ä¡½Å°æ¸ÁÀ» ¾÷µ¥ÀÌÆ®
    def train_model(self, state, action, reward, next_state, done):
        model_params = self.model.trainable_variables
        with tf.GradientTape() as tape:
            mu, sigma, value = self.model(state)
            _, _, next_value = self.model(next_state)
            target = reward + (1 - done) * self.discount_factor * next_value[0]
 
            # Á¤Ã¥ ½Å°æ¸Á ¿À·ù ÇÔ¼ö ±¸Çϱâ
            advantage = tf.stop_gradient(target - value[0])
            dist = tfd.Normal(loc=mu, scale=sigma)
            action_prob = dist.prob([action])[0]
            cross_entropy = - tf.math.log(action_prob + 1e-5)
            actor_loss = tf.reduce_mean(cross_entropy * advantage)
 
            # °¡Ä¡ ½Å°æ¸Á ¿À·ù ÇÔ¼ö ±¸Çϱâ
            critic_loss = 0.5 * tf.square(tf.stop_gradient(target) - value[0])
            critic_loss = tf.reduce_mean(critic_loss)
 
            # ÇϳªÀÇ ¿À·ù ÇÔ¼ö·Î ¸¸µé±â
            loss = 0.1 * actor_loss + critic_loss
 
        # ¿À·ùÇÔ¼ö¸¦ ÁÙÀ̴ ¹æÇâÀ¸·Î ¸ðµ¨ ¾÷µ¥ÀÌÆ®
        grads = tape.gradient(loss, model_params)
        self.optimizer.apply_gradients(zip(grads, model_params))
        return loss, sigma
 
 
if __name__ == "__main__":
    # CartPole-v1 È¯°æ, ÃÖ´ë Å¸ÀÓ½ºÅÜ ¼ö°¡ 500
    gym.envs.register(
        id='CartPoleContinuous-v0',
        entry_point='env:ContinuousCartPoleEnv',
        max_episode_steps=500,
        reward_threshold=475.0)
 
    env = gym.make('CartPoleContinuous-v0')
    # È¯°æÀ¸·ÎºÎÅÍ »óÅ¿͠ÇൿÀÇ Å©±â¸¦ ¹Þ¾Æ¿È
    state_size = env.observation_space.shape[0]
    action_size = env.action_space.shape[0]
    max_action = env.action_space.high[0]
 
    # ¾×ÅÍ-Å©¸®Æ½(A2C) ¿¡ÀÌÀüÆ® »ý¼º
    agent = ContinuousA2CAgent(action_size, max_action)
    scores, episodes = [], []
    score_avg = 0
 
    num_episode = 1000
    for e in range(num_episode):
        done = False
        score = 0
        loss_list, sigma_list = [], []
        state = env.reset()
        state = np.reshape(state, [1, state_size])
 
        while not done:
            if agent.render:
                env.render()
 
            action = agent.get_action(state)
            next_state, reward, done, info = env.step(action)
            next_state = np.reshape(next_state, [1, state_size])
 
            # Å¸ÀÓ½ºÅܸ¶´Ù º¸»ó 0.1, ¿¡ÇǼҵ尡 Áß°£¿¡ ³¡³ª¸é -1 º¸»ó
            score += reward
            reward = 0.1 if not done or score == 500 else -1
 
            # ¸Å Å¸ÀÓ½ºÅܸ¶´Ù ÇнÀ
            loss, sigma = agent.train_model(state, action, reward, next_state, done)
            loss_list.append(loss)
            sigma_list.append(sigma)
            state = next_state
 
            if done:
                # ¿¡ÇǼҵ帶´Ù ÇнÀ °á°ú Ãâ·Â
                score_avg = 0.9 * score_avg + 0.1 * score if score_avg != 0 else score
                print("episode: {:3d} | score avg: {:3.2f} | loss: {:.3f} | sigma: {:.3f}".format(
                      e, score_avg, np.mean(loss_list), np.mean(sigma)))
 
                scores.append(score_avg)
                episodes.append(e)
                pylab.plot(episodes, scores, 'b')
                pylab.xlabel("episode")
                pylab.ylabel("average score")
                pylab.savefig("./save_graph/graph.png")
 
                # À̵¿ Æò±ÕÀÌ 400 ÀÌ»óÀÏ ¶§ Á¾·á
                if score_avg > 400:
                    agent.model.save_weights("./save_model/model", save_format="tf")
                    sys.exit()
cs

 

test.py 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import sys
import gym
import pylab
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.keras.initializers import RandomUniform
from tensorflow_probability import distributions as tfd
 
 
# Á¤Ã¥ ½Å°æ¸Á°ú °¡Ä¡ ½Å°æ¸Á »ý¼º
class ContinuousA2C(tf.keras.Model):
    def __init__(self, action_size):
        super(ContinuousA2C, self).__init__()
        self.actor_fc1 = Dense(24, activation='tanh')
        self.actor_mu = Dense(action_size,
                              kernel_initializer=RandomUniform(-1e-3, 1e-3))
        self.actor_sigma = Dense(action_size, activation='softplus',
                                 kernel_initializer=RandomUniform(-1e-3, 1e-3))
 
        self.critic_fc1 = Dense(24, activation='tanh')
        self.critic_fc2 = Dense(24, activation='tanh')
        self.critic_out = Dense(1,
                                kernel_initializer=RandomUniform(-1e-3, 1e-3))
 
    def call(self, x):
        actor_x = self.actor_fc1(x)
        mu = self.actor_mu(actor_x)
        sigma = self.actor_sigma(actor_x)
        sigma = sigma + 1e-5
 
        critic_x = self.critic_fc1(x)
        critic_x = self.critic_fc2(critic_x)
        value = self.critic_out(critic_x)
        return mu, sigma, value
 
 
# Ä«Æ®Æú ¿¹Á¦¿¡¼­ÀÇ ¾×ÅÍ-Å©¸®Æ½(A2C) ¿¡ÀÌÀüÆ®
class ContinuousA2CAgent:
    def __init__(self, action_size, max_action):
        # ÇൿÀÇ Å©±â Á¤ÀÇ
        self.action_size = action_size
        self.max_action = max_action
 
        # Á¤Ã¥½Å°æ¸Á°ú °¡Ä¡½Å°æ¸Á »ý¼º
        self.model = ContinuousA2C(self.action_size)
        self.model.load_weights("./save_model/trained/model")
 
    # Á¤Ã¥½Å°æ¸ÁÀÇ Ãâ·ÂÀ» ¹Þ¾Æ È®·üÀûÀ¸·Î ÇൿÀ» ¼±ÅÃ
    def get_action(self, state):
        mu, sigma, _ = self.model(state)
        dist = tfd.Normal(loc=mu[0], scale=sigma[0])
        action = dist.sample([1])[0]
        action = np.clip(action, -self.max_action, self.max_action)
        return action
 
 
if __name__ == "__main__":
    # CartPole-v1 È¯°æ, ÃÖ´ë Å¸ÀÓ½ºÅÜ ¼ö°¡ 500
    gym.envs.register(
        id='CartPoleContinuous-v0',
        entry_point='env:ContinuousCartPoleEnv',
        max_episode_steps=500,
        reward_threshold=475.0)
 
    env = gym.make('CartPoleContinuous-v0')
    # È¯°æÀ¸·ÎºÎÅÍ »óÅ¿͠ÇൿÀÇ Å©±â¸¦ ¹Þ¾Æ¿È
    state_size = env.observation_space.shape[0]
    action_size = env.action_space.shape[0]
    max_action = env.action_space.high[0]
 
    # ¾×ÅÍ-Å©¸®Æ½(A2C) ¿¡ÀÌÀüÆ® »ý¼º
    agent = ContinuousA2CAgent(action_size, max_action)
 
    scores, episodes = [], []
 
    num_episode = 10
    for e in range(num_episode):
        done = False
        score = 0
        state = env.reset()
        state = np.reshape(state, [1, state_size])
 
        while not done:
            env.render()
 
            action = agent.get_action(state)
            next_state, reward, done, info = env.step(action)
            next_state = np.reshape(next_state, [1, state_size])
 
            score += reward
            state = next_state
 
            if done:
                print("episode: {:3d} | score: {:3d}".format(e, int(score)))
cs


env.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""
Classic cart-pole system implemented by Rich Sutton et al.
Copied from http://incompleteideas.net/sutton/book/code/pole.c
permalink: https://perma.cc/C9ZM-652R
"""
 
import math
import gym
from gym import spaces, logger
from gym.utils import seeding
import numpy as np
 
 
class ContinuousCartPoleEnv(gym.Env):
    metadata = {
        'render.modes': ['human''rgb_array'],
        'video.frames_per_second'50
    }
 
    def __init__(self):
        self.gravity = 9.8
        self.masscart = 1.0
        self.masspole = 0.1
        self.total_mass = (self.masspole + self.masscart)
        self.length = 0.5  # actually half the pole's length
        self.polemass_length = (self.masspole * self.length)
        self.force_mag = 10.0
        self.tau = 0.02  # seconds between state updates
        self.max_action = 3.0
        self.kinematics_integrator = 'euler'
 
        # Angle at which to fail the episode
        self.theta_threshold_radians = 12 * 2 * math.pi / 360
        self.x_threshold = 2.4
 
        # Angle limit set to 2 * theta_threshold_radians so failing observation
        # is still within bounds
        high = np.array([
            self.x_threshold * 2,
            np.finfo(np.float32).max,
            self.theta_threshold_radians * 2,
            np.finfo(np.float32).max])
 
        self.action_space = spaces.Box(
            low=-self.max_action,
            high=self.max_action,
            shape=(1,)
        )
        self.observation_space = spaces.Box(-high, high)
 
        self.seed()
        self.viewer = None
        self.state = None
 
        self.steps_beyond_done = None
 
    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]
 
    def step(self, action):
        assert self.action_space.contains(action), "%r (%s) invalid"%(action, type(action))
        state = self.state
        x, x_dot, theta, theta_dot = state
        force = self.force_mag * float(action)
        costheta = math.cos(theta)
        sintheta = math.sin(theta)
        temp = (force + self.polemass_length * theta_dot * theta_dot * sintheta) / self.total_mass
        thetaacc = (self.gravity * sintheta - costheta* temp) / (self.length * (4.0/3.0 - self.masspole * costheta * costheta / self.total_mass))
        xacc  = temp - self.polemass_length * thetaacc * costheta / self.total_mass
        if self.kinematics_integrator == 'euler':
            x  = x + self.tau * x_dot
            x_dot = x_dot + self.tau * xacc
            theta = theta + self.tau * theta_dot
            theta_dot = theta_dot + self.tau * thetaacc
        else# semi-implicit euler
            x_dot = x_dot + self.tau * xacc
            x  = x + self.tau * x_dot
            theta_dot = theta_dot + self.tau * thetaacc
            theta = theta + self.tau * theta_dot
        self.state = (x,x_dot,theta,theta_dot)
        done =  x < -self.x_threshold 
                or x > self.x_threshold 
                or theta < -self.theta_threshold_radians 
                or theta > self.theta_threshold_radians
        done = bool(done)
 
        if not done:
            reward = 1.0
        elif self.steps_beyond_done is None:
            # Pole just fell!
            self.steps_beyond_done = 0
            reward = 1.0
        else:
            if self.steps_beyond_done == 0:
                logger.warn("You are calling 'step()' even though this environment has already returned done = True. You should always call 'reset()' once you receive 'done = True' -- any further steps are undefined behavior.")
            self.steps_beyond_done += 1
            reward = 0.0
 
        return np.array(self.state, dtype=np.float32), reward, done, {}
 
    def reset(self):
        self.state = self.np_random.uniform(low=-0.05, high=0.05, size=(4,))
        self.steps_beyond_done = None
        return np.array(self.state, dtype=np.float32)
 
    def render(self, mode='human'):
        screen_width = 600
        screen_height = 400
 
        world_width = self.x_threshold * 2
        scale = screen_width /world_width
        carty = 100  # TOP OF CART
        polewidth = 10.0
        polelen = scale * 1.0
        cartwidth = 50.0
        cartheight = 30.0
 
        if self.viewer is None:
            from gym.envs.classic_control import rendering
            self.viewer = rendering.Viewer(screen_width, screen_height)
            l, r, t, b = -cartwidth / 2, cartwidth / 2, cartheight / 2-cartheight / 2
            axleoffset = cartheight / 4.0
            cart = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)])
            self.carttrans = rendering.Transform()
            cart.add_attr(self.carttrans)
            self.viewer.add_geom(cart)
            l, r, t, b = -polewidth / 2, polewidth / 2, polelen-polewidth / 2-polewidth / 2
            pole = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)])
            pole.set_color(.8, .6, .4)
            self.poletrans = rendering.Transform(translation=(0, axleoffset))
            pole.add_attr(self.poletrans)
            pole.add_attr(self.carttrans)
            self.viewer.add_geom(pole)
            self.axle = rendering.make_circle(polewidth / 2)
            self.axle.add_attr(self.poletrans)
            self.axle.add_attr(self.carttrans)
            self.axle.set_color(.5, .5, .8)
            self.viewer.add_geom(self.axle)
            self.track = rendering.Line((0, carty), (screen_width, carty))
            self.track.set_color(000)
            self.viewer.add_geom(self.track)
 
            self._pole_geom = pole
 
        if self.state is Nonereturn None
 
        # Edit the pole polygon vertex
        pole = self._pole_geom
        l,r,t,b = -polewidth/2,polewidth/2,polelen-polewidth/2,-polewidth/2
        pole.v = [(l,b), (l,t), (r,t), (r,b)]
 
        x = self.state
        cartx = x[0* scale + screen_width / 2.0  # MIDDLE OF CART
        self.carttrans.set_translation(cartx, carty)
        self.poletrans.set_rotation(-x[2])
 
        return self.viewer.render(return_rgb_array=(mode == 'rgb_array'))
 
    def close(self):
        if self.viewer:
            self.viewer.close()
            self.viewer = None
cs

 

  µî·ÏÀÏ : 2021-10-28 [14:47] Á¶È¸ : 309 ´Ù¿î : 0   
 
¡â ÀÌÀü±Û9Àå) Advantage ActorCritic ½Ç½À (ÆÄÀ̽ã°ú Äɶ󽺷Π¹è¿ì´Â °­È­ÇнÀ 6Àå A2C)
¡ä ´ÙÀ½±Û¿¬¼ÓÀû ¾×ÅÍ-Å©¸®Æ½ ½ÇÇà ȯ°æ ¹× °á°ú
°­È­ÇнÀ ÀÌ·Ð ¹× ½Ç½À(MD) ½Ç½À
¹øÈ£ ¨Ï Á¦ ¸ñ À̸§
¹Ù´ÚºÎÅÍ ¹è¿ì´Â °­È­ ÇнÀ ÄÚµå (github)
°­È­ÇнÀ/½ÉÃþ°­È­ÇнÀ Ư°­ (github)
ÆÄÀ̽ã°ú Äɶ󽺷Π¹è¿ì´Â °­È­ÇнÀ (github)
25 lÆÄÀ̽ã°ú Äɶ󽺷Π¹è¿ì´Â °­È­ÇнÀ (github) Á¤¼ºÈÆ
24 ¦¦❶ 7Àå) ¾ÆŸ¸® ºê·¹ÀÌÅ© ¾Æ¿ô (A3C) Á¤¼ºÈÆ
23 ¦¦❶ 7Àå) ¾ÆŸ¸® ºê·¹ÀÌÅ© ¾Æ¿ô (DQN) Á¤¼ºÈÆ
22 l°­È­ÇнÀ/½ÉÃþ°­È­ÇнÀ Ư°­ (github) Á¤¼ºÈÆ
21 ¦¦❶ 13Àå) ½º³×ÀÌÅ© °ÔÀÓ ¸¶½ºÅÍ µÇ±â Á¤¼ºÈÆ
20 ¦¦❶ 10Àå) ÀÚÀ²ÁÖÇàÂ÷¸¦ À§ÇÑ AI Á¤¼ºÈÆ
19 l¹Ù´ÚºÎÅÍ ¹è¿ì´Â °­È­ ÇнÀ ÄÚµå (github) Á¤¼ºÈÆ
18 ¦¦❶ ÆÄÀ̽ã°ú Äɶ󽺷Π¹è¿ì´Â °­È­ÇнÀÀÌ 5Àå) ÅÙ¼­Ç÷Π2.0°ú ÄÉ¶ó½º Á¤¼ºÈÆ
17    ¦¦❷ ÆÄÀ̽ã°ú Äɶ󽺷Π¹è¿ì´Â °­È­ÇнÀÀÌ 5Àå) ÅÙ¼­Ç÷Π2.0°ú ÄÉ¶ó½º Á¤¼ºÈÆ
16 ¦¦❶ l9Àå) ActorCritic (ch9_ActorCritic.py) Á¤¼ºÈÆ
15    ¦¦❷ 9Àå) Advantage ActorCritic ½Ç½À (ÆÄÀ̽ã°ú Äɶ󽺷Π¹è¿ì´Â °­È­ÇнÀ 6Àå A2C) Á¤¼ºÈÆ
14       ¦¦❸ 9Àå) ¿¬¼ÓÀû ¾×ÅÍ-Å©¸®Æ½ ½Ç½À (ÆÄÀ̽ã°ú Äɶ󽺷Π¹è¿ì´Â °­È­ÇнÀ 6Àå) Á¤¼ºÈÆ
13          ¦¦❹ ¿¬¼ÓÀû ¾×ÅÍ-Å©¸®Æ½ ½ÇÇà ȯ°æ ¹× °á°ú Á¤¼ºÈÆ
12 ¦¦❶ l9Àå) REINFORCE (ch9_REINFORCE.py) Á¤¼ºÈÆ
11 ¦¦❶ l8Àå) DQN (ch8_DQN.py) Á¤¼ºÈÆ

[1][2]