This repository has been archived by the owner on May 12, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
agents.py
176 lines (149 loc) · 6.77 KB
/
agents.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import numpy as np
import copy
def binary_cross_entropy(P, Q):
'''
binary variable with support of {0, 1}
Input;
P: actual action performed, one hot index
Q: soft score
Output:
H: entropy
'''
return -np.log(Q)[P]
class model_free_with_eligibility_trace_agent():
def __init__(self, num_states, num_actions, alpha, beta, gamma, lamda):
print("model free agent with eligibility trace has been initiated")
self.num_states = num_states
self.num_actions = num_actions
self.alpha = alpha
self.beta = beta
self.gamma = gamma
self.lamda = lamda
self.eligibility_trace = np.zeros((num_states, num_actions))
self.Q = np.zeros((num_states, num_actions))
self.score = np.zeros((num_actions))
self.td_error = 0.
def softmax(self, q_s):
# softmax
exp_q_state = np.exp(q_s)
score = exp_q_state/np.sum(exp_q_state)
return score
def act(self, state):
# softmax policy
if state is 0:
q_s = self.Q[state,0:2]*self.beta(0)
score = self.softmax(q_s)
action = np.random.multinomial(1,score)
action = np.nonzero(action)[0]
else:
action = 2
return int(action)
def update(self, state, action, reward, new_state):
self.eligibility_trace[state,action] += 1
if new_state>0:# if new state is not terminal
V_new_state = self.Q[new_state,2]
else:
V_new_state = 0
self.td_error = reward + self.gamma(0)*V_new_state - self.Q[state, action]
self.Q += self.alpha(0)*self.td_error*self.eligibility_trace
self.eligibility_trace *= self.gamma(0)*self.lamda(0)
def reset(self):
self.eligibility_trace = np.zeros((self.num_states, self.num_actions))
class model_based_agent_change_point():
def __init__(self, num_states, num_actions, beta, H_lamda):
print("model based agent with change point detection and eligibility trace has been initiated")
self.Q = np.ones((num_states, num_actions))*0.5
self.theta_mus = np.zeros((num_states, num_actions))
self.alpha = np.zeros((num_states, num_actions))
self.beta = beta
# bernuli model
# hyper-parameter alpha, beta of
# harzard function with geometric distribution
# with initial run length at 0 with probability 1
self.H = lambda tau: 1./H_lamda
#self.r_distribution = [[[1] for j in range(num_actions)] for i in range(num_states)]
self.r_distribution = [[{0:1} for j in range(num_actions)] for i in range(num_states)]
self.statistics = [[[] for j in range(num_actions)] for i in range(num_states)]
self.action_last = 2
def softmax(self, q_s):
# softmax
exp_q_state = np.exp(q_s)
score = exp_q_state/np.sum(exp_q_state)
return score
def act(self, state):
# softmax policy
if state is 0:
q_s = self.Q[state,0:2]*self.beta(0)
score = self.softmax(q_s)
action = np.random.multinomial(1,score)
action = np.nonzero(action)[0]
else:
action = 2
return int(action)
def calculate_model_params(self):
for state, action in [(0,0),(0,1),(1,2),(2,2)]:
self.theta_mus[state, action] = self._get_model_param(state, action)
def _get_model_param(self, state, action):
theta_mu_given_r = 0.
for r in self._run_length_list(state, action):
beta = 1 if r is 0 else sum(self.statistics[state][action][-r:]) + 1
alpha = 1 if r is 0 else r - beta + 2
theta_mu = alpha/(alpha+beta)
theta_mu_given_r += self.r_distribution[state][action][r]*theta_mu
return theta_mu_given_r
def _run_length_list(self, state, action):
return sorted(list(self.r_distribution[state][action].keys()))
def _update(self, state, action, reward, new_state, num_r):
transited_state = new_state-1 if state is 0 else 1-reward
r_distribution_new = {0:0.}
for r in self._run_length_list(state, action):
prob_reset = self.H(r)
prob_grow = 1. - prob_reset
r_distribution_new[0] += self.r_distribution[state][action][r]*prob_reset
r_distribution_new[r+1] = self.r_distribution[state][action][r]*prob_grow
V_g = self.Q[1,2] if state is 0 else 1.
V_b = self.Q[2,2] if state is 0 else 0.
# posteior distribution of run length
r_list_new = sorted(list(r_distribution_new.keys()))
r_x_distribution = np.zeros((num_r+1,2))
for i, r in enumerate(r_list_new):
beta = 1 if r is 0 else sum(self.statistics[state][action][-r:]) + 1
alpha = 1 if r is 0 else len(self.statistics[state][action][-r:]) - beta + 1
theta_mu = (alpha*V_g+beta*V_b)/(alpha+beta)
r_x_distribution[i,0] = r_distribution_new[r]*theta_mu
r_x_distribution[i,1] = r_distribution_new[r]*(1-theta_mu)
r_given_x_distribution = r_x_distribution/np.sum(r_x_distribution,0)
r_posterior = {}
for i, r in enumerate(r_list_new):
r_prob = r_given_x_distribution[i,int(transited_state)]
if r_prob > 0.01: r_posterior[r] = r_prob
self.r_distribution[state][action] = r_posterior
# prediction of observation
Q = 0.
for r in self._run_length_list(state, action):
beta = 1 if r is 0 else sum(self.statistics[state][action][-r:]) + 1
alpha = 1 if r is 0 else r - beta + 2
theta_mu = (alpha*V_g+beta*V_b)/(alpha+beta)
Q += self.r_distribution[state][action][r]*theta_mu
Q_old = self.Q[state,action]
Q_next = self.Q[new_state,2] if state is 0 else reward
td_error = Q_next - Q_old
alpha = (Q - Q_old)/td_error if abs(td_error) > 0 else 0
self.Q_old = copy.deepcopy(self.Q)
self.Q[state,action] = Q
self.alpha[state,action] = alpha
def update(self, state, action, reward, new_state):
# update statistics
transited_state = new_state-1 if state is 0 else 1-reward
self.statistics[state][action].append(transited_state)
# prior distribution of run length
num_r = len(self.r_distribution[state][action].keys())
# update twice of state is final
self._update(state, action, reward, new_state, num_r)
if state is not 0:
# prior distribution of run length
num_r = len(self.r_distribution[0][self.action_last].keys())
self._update( 0, self.action_last, 0, state, num_r)
self.action_last = action
def reset(self):
pass