1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | import gym import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torch.distributions import Categorical #Hyperparameters learning_rate = 0.0002 gamma = 0.98 class Policy(nn.Module): def __init__(self): super(Policy, self).__init__() self.data = [] self.fc1 = nn.Linear(4, 128) self.fc2 = nn.Linear(128, 2) self.optimizer = optim.Adam(self.parameters(), lr=learning_rate) def forward(self, x): x = F.relu(self.fc1(x)) x = F.softmax(self.fc2(x), dim=0) return x def put_data(self, item): self.data.append(item) def train_net(self): R = 0 self.optimizer.zero_grad() for r, prob in self.data[::-1]: R = r + gamma * R loss = -torch.log(prob) * R loss.backward() self.optimizer.step() self.data = [] def main(): env = gym.make('CartPole-v1') pi = Policy() score = 0.0 print_interval = 20 for n_epi in range(10000): s = env.reset() done = False while not done: # CartPole-v1 forced to terminates at 500 step. prob = pi(torch.from_numpy(s).float()) m = Categorical(prob) a = m.sample() s_prime, r, done, info = env.step(a.item()) pi.put_data((r,prob[a])) s = s_prime score += r pi.train_net() if n_epi%print_interval==0 and n_epi!=0: print("# of episode :{}, avg score : {}".format(n_epi, score/print_interval)) score = 0.0 env.close() if __name__ == '__main__': main() | cs |