-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathutils.py
More file actions
169 lines (130 loc) · 5.21 KB
/
utils.py
File metadata and controls
169 lines (130 loc) · 5.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import torch
import numpy as np
import random
import gym
import math
from torch.optim.lr_scheduler import _LRScheduler
import os
import torch
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
from PIL import Image
from numbers import Number
import cv2
def independent_multivariate_normal(loc, scale):
batch_dim = loc.shape[0]
if isinstance(scale, Number):
# If the scale was passed in as a scalar, convert it to the same shape as loc
scale = torch.ones(loc.shape)*scale
# Turn each <feature-dim>-length vector in the batch into a diagonal matrix, because we want an
# independent multivariate normal
merged_covariance_matrix = torch.stack([torch.diag(scale[i]) for i in range(batch_dim)])
return torch.distributions.MultivariateNormal(loc=loc, covariance_matrix=merged_covariance_matrix)
def add_noise(state, noise_std_dev):
noise = np.random.normal(0, noise_std_dev, state.shape[0])
noise_state = state + noise
return noise_state
def show_image(image):
im = Image.fromarray(image, 'RGB')
im.show()
def gaussian_blur(img):
image = np.array(img)
image_blur = cv2.GaussianBlur(image, (3, 3), 0)
new_image = image_blur
return new_image
def show_plt_image(img):
plt.imshow(img)
plt.show()
# TODO: Have the calls to savefig below save to the log directory (or at least make the output directory in case it doesn't exist)
def plot(arr, env_id, gap=1):
fig = plt.figure()
x = np.arange(len(arr.shape[1])) * gap
plt.plot(x, arr[0], marker='', color='steelblue', linewidth=0.8, alpha=0.9, label='Reward')
plt.plot(x, arr[1], marker='', color='Green', linewidth=0.8, alpha=0.9, label='Lossx40')
plt.legend(loc='lower right')
plt.title(f"{env_id}", fontsize=14)
plt.xlabel("episode", fontsize=12)
plt.ylabel("score", fontsize=12)
plt.savefig(os.path.abspath('../') + f'/output/[{time_now(datetime.now())}]{env_id}.png')
plt.close(fig)
def plot_single(arr, label, msg):
fig = plt.figure()
x = np.array(list(range(len(arr))))
plt.plot(x, arr, marker='', color='steelblue', linewidth=0.8, alpha=0.9, label=label)
plt.legend(loc='upper right')
plt.xlabel("episode", fontsize=12)
plt.savefig(os.path.abspath('../') + f'/output/[{time_now(datetime.now())}]{msg}.png')
plt.close(fig)
def save_model(model, env_id, save_path):
os.makedirs(save_path, exist_ok=True)
torch.save(model.state_dict(), os.path.join(save_path, f'[{time_now(datetime.now())}]{env_id}.ckpt'))
def time_now(n):
date_time = n.strftime("%m-%d-%Y-%H-%M-%S")
return date_time
class Logger:
def __init__(self, log_dir):
self.file = os.path.join(log_dir, f'train_log.txt')
def log(self, msg):
t = datetime.now()
message = f"[{time_now(t)}] {msg}"
print(message)
f = open(self.file, "a+", buffering=1)
f.write(message + '\n')
f.close()
class LinearWarmupCosine(_LRScheduler):
def __init__(self, optimizer, warmup_epoch, T_max, eta_min=0, last_epoch=-1):
self.eta_min = eta_min
self.warmup_epoch = warmup_epoch
self.cosine_epochs = T_max - warmup_epoch
super(LinearWarmupCosine, self).__init__(optimizer, last_epoch)
def get_lr(self):
if self.warmup_epoch > 0:
if self.last_epoch <= self.warmup_epoch:
return [base_lr / self.warmup_epoch * self.last_epoch for base_lr in self.base_lrs]
if ((self.last_epoch - self.warmup_epoch) - 1 - self.cosine_epochs) % (2 * self.cosine_epochs) == 0:
return [group['lr'] + (base_lr - self.eta_min) *
(1 - math.cos(math.pi / self.cosine_epochs)) / 2
for base_lr, group in
zip(self.base_lrs, self.optimizer.param_groups)]
else:
return [(1 + math.cos(math.pi * (self.last_epoch - self.warmup_epoch) / self.cosine_epochs)) /
(1 + math.cos(math.pi * ((self.last_epoch - self.warmup_epoch) - 1) / self.cosine_epochs)) *
(group['lr'] - self.eta_min) + self.eta_min
for group in self.optimizer.param_groups]
def set_global_seeds(seed):
"""
set the seed for python random, tensorflow, numpy and gym spaces
:param seed: (int) the seed
"""
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
# prng was removed in latest gym version
if hasattr(gym.spaces, 'prng'):
gym.spaces.prng.seed(seed)
def accuracy(output, target, topk=(1,)):
"""Computes the precision@k for the specified values of k"""
maxk = max(topk)
batch_size = target.size(0)
_, pred = output.topk(maxk, 1, True, True)
pred = pred.t()
correct = pred.eq(target.view(1, -1).expand_as(pred))
res = []
for k in topk:
correct_k = correct[:k].view(-1).float().sum(0, keepdim=True)
res.append(correct_k.mul_(100.0 / batch_size))
return res
class AverageMeter(object):
def __init__(self):
self.reset()
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count