-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbandits.py
executable file
·287 lines (213 loc) · 7.94 KB
/
bandits.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Jan 14 12:43:32 2021
@author: nick
"""
import numpy as np
import matplotlib.pyplot as plt
'''
Create a bandit problem to play with the explore-then-commit method. Rewards are
normally distributed where each arm has equal variance.
'''
class Bandit():
'''
Create the bandit
Parameters:
arms: a numpy array containing the true mean for each arm
horizon: the number of rounds in which we will make a decision
m: determines how many times pull pull each arm before committing
'''
def __init__(self,arms,horizon=10,m=1):
#Store the true mean of each arm here.
self.arms = arms
#The largest mean reward of all arms
self.mu_max = np.max(self.arms)
self.horizon = horizon
self.m = m
#At a minimum, we should have a larger horizon than the number of arms.
assert(self.horizon>self.numArms)
#Store the number of times each arms has been played here
self.N = np.zeros_like(self.arms )
#Store our estimates of the mean of each arm here
self.estimates = np.zeros_like(self.arms)
self.estimate_error = np.zeros((self.horizon,self.numArms))
#Record which arm is pulled in each round here
self.history = np.zeros(shape=(self.horizon,))
#Record regret at each round here
self.regret = np.zeros(shape=(self.horizon,))
'''
Pull an arm of the bandit
Parameters:
arm: the index i in {1,...,N} of the arm to pull
Returns: the reward generated from N(mu_i,1)
'''
def _pull(self,arm):
outcome = np.random.normal(self.arms[arm-1],1)
return outcome
'''
Update our estimate of the arm's mean and the number of plays
'''
def update(self,arm,outcome):
self.estimates[arm-1] = (self.estimates[arm-1]*self.N[arm-1] + outcome)/(self.N[arm-1]+1)
self.N[arm-1] += 1
'''
Choose an arm to play using explore-then-commit method.
'''
def play(self):
t = 1
while t <= self.numArms*self.m:
#Play each arm m times to get initial estimates of success prob
arm = (t % self.numArms) + 1
self.pull(t,arm)
t += 1
#Choose I_t to be the arm with largest sample mean
arm = np.argmax(self.estimates)+1
#Play the chosen arm until time horizon
while t <= self.horizon:
self.pull(t,arm)
t += 1
'''
Play the given arm, then update some relevant states
'''
def pull(self,t,arm):
outcome = self._pull(arm)
self.update(arm,outcome)
self.estimate_error[t-1,:] = np.abs(self.estimates-self.arms)
self.history[t-1] = arm
if t > 1:
self.regret[t-1] = self.regret[t-2] + self.mu_max - self.arms[arm-1]
else:
self.regret[t-1] = self.mu_max - self.arms[arm-1]
@property
def numArms(self):
return self.arms.shape[0]
@property
def randomArmIndex(self):
return np.random.choice(range(1,self.numArms+1))
'''
Create a bandit problem to play with the UCB method
'''
class UCBBandit(Bandit):
def __init__(self, arms, horizon=10):
super(UCBBandit,self).__init__(arms, horizon)
'''
Choose an arm to play using UCB method
'''
def play(self):
t = 1
#Play each arm once to get initial estimates of success prob
while t <= self.numArms:
arm = t
self.pull(t,arm)
t += 1
#After playing each arm once, play by choosing the largest upper bound each round
while t <= self.horizon:
#Compute confidence interval half-width
hw = np.sqrt(2*np.log(2*self.numArms*self.horizon**2)/self.N)
#Choose arm with highest upper bound (sample mean + CI half-width)
arm = np.argmax(self.estimates + hw) + 1
#Play the chosen arm, then update the estimate and history
self.pull(t,arm)
t += 1
'''
Create a bandit problem to play with the Thompson sampling method
'''
class ThompsonBandit(Bandit):
def __init__(self, arms, horizon=10):
super().__init__(arms, horizon)
#Take the prior to be N(0,1)
#We will store the posterior mean and variance for each arm here
self.params = np.zeros(shape=(self.numArms,2))
self.params[:,1] = 1
#Initial estimates are drawn from prior (Normal) distribution
self.estimates = np.zeros_like(self.arms)
for armIndex in range(self.numArms):
self.estimates[armIndex] = np.random.normal(self.params[armIndex,0],self.params[armIndex,1])
'''
Compute posterior and update and the number of plays
'''
def update(self,arm,outcome):
self.N[arm-1] += 1
#Compute new posterior mean
self.params[arm-1,0] = (self.params[arm-1,0]*self.N[arm-1] + outcome) / (self.N[arm-1]+1)
# self.params[arm-1,0] = (self.params[arm-1,0] + outcome)/2
#Compute new posterior variance
self.params[arm-1,1] = 1/(1+self.N[arm-1])
#Draw new estimates from posterior
for armIndex in range(self.numArms):
self.estimates[armIndex] = np.random.normal(self.params[armIndex,0],self.params[armIndex,1])
'''
Always choose arm with largest estimate drawn from posterior.
'''
def play(self):
t = 1
while t <= self.horizon:
#Choose I_t to be the arm with largest draw
arm = np.argmax(self.estimates)+1
self.pull(t,arm)
t += 1
# Perform experiment for problem 4.1
def partA():
np.random.seed(4321)
T = 12500
n = 10
mu = np.array([.1,]+[0,]*(n-1))
mVals = (5,25,100)
#Run explore then commit for several values of m
etcBandits = []
for m in mVals:
bandit = Bandit(mu,T,m)
bandit.play()
etcBandits.append(bandit)
#Run UCB
ucbBandit = UCBBandit(mu,T)
ucbBandit.play()
#Run Thompson Sampling
tsBandit = ThompsonBandit(mu,T)
tsBandit.play()
#Plot the regret of each bandit
fig,ax = plt.subplots(1)
ax.set_xlabel('Round')
ax.set_ylabel('Cumulative Regret')
# ax.set_ylim([0,.1])
x = range(T)
for b,m in zip(etcBandits,mVals):
ax.plot(x,b.regret,label='ETC w/ m={}'.format(m))
ax.plot(x,ucbBandit.regret,label='UCB')
ax.plot(x,tsBandit.regret,label='Thompson')
plt.legend()
return etcBandits,ucbBandit,tsBandit
# Perform experiment for problem 4.2
def partB():
np.random.seed(1234)
T = 50000
n = 40
mu = np.array([1,]+[1-1/np.sqrt(i-1) for i in range(2,n+1)])
mVals = (5,25,100)
#Run explore then commit for several values of m
etcBandits = []
for m in mVals:
bandit = Bandit(mu,T,m)
bandit.play()
etcBandits.append(bandit)
#Run UCB
ucbBandit = UCBBandit(mu,T)
ucbBandit.play()
#Run Thompson Sampling
tsBandit = ThompsonBandit(mu,T)
tsBandit.play()
#Plot the regret of each bandit
fig,ax = plt.subplots(1)
ax.set_xlabel('Round')
ax.set_ylabel('Cumulative Regret')
# ax.set_ylim([0,.1])
x = range(T)
for b,m in zip(etcBandits,mVals):
ax.plot(x,b.regret,label='ETC w/ m={}'.format(m))
ax.plot(x,ucbBandit.regret,label='UCB')
ax.plot(x,tsBandit.regret,label='Thompson')
plt.legend()
return etcBandits,ucbBandit,tsBandit
etcBandits,ucbBandit,tsBandit = partA()
# etcBandits,ucbBandit,tsBandit = partB()