diff --git a/ReinforcementLearning/MAB/README.md b/ReinforcementLearning/MAB/README.md new file mode 100644 index 00000000..dd6c2bfe --- /dev/null +++ b/ReinforcementLearning/MAB/README.md @@ -0,0 +1,17 @@ +# Multi-Armed Bandit + +The multi-armed bandit problem is a classic problem that well demonstrates the exploration vs exploitation dilemma. +A fundamental aspect of bandit problems is that choosing an arm does not affect the properties of the arm or other arms. + +Imagine you are in a casino facing multiple slot machines and each is configured with an unknown probability of how likely you can get a reward at one play. +The question is: What is the best strategy to achieve highest long-term rewards? + +Here, we will only discuss the setting of having an infinite number of trials. +The restriction on a finite number of trials introduces a new type of exploration problem. +For instance, if the number of trials is smaller than the number of slot machines, we cannot even try every machine to estimate the reward probability and hence we have to behave smartly with respect to a limited set of knowledge and resources (i.e. time). + +A naive approach can be that you continue to playing with one machine for many many rounds so as to eventually estimate the “true” reward probability according to the law of large numbers. +However, this is quite wasteful and surely does not guarantee the best long-term reward. + +Instances of the multi-armed bandit problem include the task of iteratively allocating a fixed, limited set of resources between competing (alternative) choices in a way that minimizes the regret. +A notable alternative setup for the multi-armed bandit problem include the "best arm identification" problem where the goal is instead to identify the best choice by the end of a finite number of rounds. diff --git a/ReinforcementLearning/MAB/bernoulli_mab/bandits.py b/ReinforcementLearning/MAB/bernoulli_mab/bandits.py new file mode 100644 index 00000000..5b70b8cc --- /dev/null +++ b/ReinforcementLearning/MAB/bernoulli_mab/bandits.py @@ -0,0 +1,29 @@ +import time +import numpy as np + + +class Bandit(object): + + def generate_reward(self, i): + raise NotImplementedError + + +class BernoulliBandit(Bandit): + + def __init__(self, n, probas=None): + assert probas is None or len(probas) == n + self.n = n + if probas is None: + np.random.seed(int(time.time())) + self.probas = [np.random.random() for _ in range(self.n)] + else: + self.probas = probas + + self.best_proba = max(self.probas) + + def generate_reward(self, i): + # The player selected the i-th machine. + if np.random.random() < self.probas[i]: + return 1 + else: + return 0 diff --git a/ReinforcementLearning/MAB/bernoulli_mab/main.py b/ReinforcementLearning/MAB/bernoulli_mab/main.py new file mode 100644 index 00000000..6252d98c --- /dev/null +++ b/ReinforcementLearning/MAB/bernoulli_mab/main.py @@ -0,0 +1,100 @@ +import matplotlib +matplotlib.use('Agg') + +import matplotlib.pyplot as plt +import numpy as np + +# custom modules +from bandits import BernoulliBandit +from solvers import Solver, EpsilonGreedy, UCB1, BayesianUCB, ThompsonSampling + + +def plot_results(solvers, solver_names, figname): + """ + Plot the results by multi-armed bandit solvers. + + Args: + solvers (list): All of them should have been fitted. + solver_names (list 0, solvers)) + + b = solvers[0].bandit + + fig = plt.figure(figsize=(14, 4)) + fig.subplots_adjust(bottom=0.3, wspace=0.3) + + ax1 = fig.add_subplot(131) + ax2 = fig.add_subplot(132) + ax3 = fig.add_subplot(133) + + # Sub.fig. 1: Regrets in time. + for i, s in enumerate(solvers): + ax1.plot(range(len(s.regrets)), s.regrets, label=solver_names[i]) + + ax1.set_xlabel('Time step') + ax1.set_ylabel('Cumulative regret') + ax1.legend(loc=9, bbox_to_anchor=(1.82, -0.25), ncol=5) + ax1.grid('k', ls='--', alpha=0.3) + + # Sub.fig. 2: Probabilities estimated by solvers. + sorted_indices = sorted(range(b.n), key=lambda x: b.probas[x]) + ax2.plot(range(b.n), [b.probas[x] for x in sorted_indices], 'k--', markersize=12) + for s in solvers: + ax2.plot(range(b.n), [s.estimated_probas[x] for x in sorted_indices], 'x', markeredgewidth=2) + ax2.set_xlabel('Actions sorted by ' + r'$\theta$') + ax2.set_ylabel('Estimated') + ax2.grid('k', ls='--', alpha=0.3) + + # Sub.fig. 3: Action counts + for s in solvers: + ax3.plot(range(b.n), np.array(s.counts) / float(len(solvers[0].regrets)), ls='steps', lw=2) + ax3.set_xlabel('Actions') + ax3.set_ylabel('Frac. # trials') + ax3.grid('k', ls='--', alpha=0.3) + + plt.savefig(figname) + + +def experiment(K, N): + """ + Run a small experiment on solving a Bernoulli bandit with K slot machines, + each with a randomly initialized reward probability. + + Args: + K (int): number of slot machiens. + N (int): number of time steps to try. + """ + + b = BernoulliBandit(K) + print("Randomly generated Bernoulli bandit has reward probabilities:\n", b.probas) + print("The best machine has index: {} and proba: {}".format(max(range(K), key=lambda i: b.probas[i]), max(b.probas))) + + test_solvers = [ + # EpsilonGreedy(b, 0), + # EpsilonGreedy(b, 1), + EpsilonGreedy(b, 0.01), + UCB1(b), + BayesianUCB(b, 3, 1, 1), + ThompsonSampling(b, 1, 1) + ] + names = [ + # 'Full-exploitation', + # 'Full-exploration', + r'$\epsilon$' + '-Greedy', + 'UCB1', + 'Bayesian UCB', + 'Thompson Sampling' + ] + + for s in test_solvers: + s.run(N) + + plot_results(test_solvers, names, "results_K{}_N{}.png".format(K, N)) + + +if __name__ == '__main__': + experiment(10, 5000) diff --git a/ReinforcementLearning/MAB/bernoulli_mab/solvers.py b/ReinforcementLearning/MAB/bernoulli_mab/solvers.py new file mode 100644 index 00000000..272c756b --- /dev/null +++ b/ReinforcementLearning/MAB/bernoulli_mab/solvers.py @@ -0,0 +1,158 @@ +import numpy as np +import time +from scipy.stats import beta + +# custom modules +from bandits import BernoulliBandit + + +class Solver(object): + def __init__(self, bandit): + """ + bandit (Bandit): the target bandit to solve. + """ + assert isinstance(bandit, BernoulliBandit) + np.random.seed(int(time.time())) + + self.bandit = bandit + + self.counts = [0] * self.bandit.n + self.actions = [] # A list of machine ids, 0 to bandit.n-1. + self.regret = 0. # Cumulative regret. + self.regrets = [0.] # History of cumulative regret. + + def update_regret(self, i): + # i (int): index of the selected machine. + self.regret += self.bandit.best_proba - self.bandit.probas[i] + self.regrets.append(self.regret) + + @property + def estimated_probas(self): + raise NotImplementedError + + def run_one_step(self): + """Return the machine index to take action on.""" + raise NotImplementedError + + def run(self, num_steps): + assert self.bandit is not None + for _ in range(num_steps): + i = self.run_one_step() + + self.counts[i] += 1 + self.actions.append(i) + self.update_regret(i) + + +class EpsilonGreedy(Solver): + def __init__(self, bandit, eps, init_proba=1.0): + """ + eps (float): the probability to explore at each time step. + init_proba (float): default to be 1.0; optimistic initialization + """ + super(EpsilonGreedy, self).__init__(bandit) + + assert 0. <= eps <= 1.0 + self.eps = eps + + self.estimates = [init_proba] * self.bandit.n # Optimistic initialization + + @property + def estimated_probas(self): + return self.estimates + + def run_one_step(self): + if np.random.random() < self.eps: + # Let's do random exploration! + i = np.random.randint(0, self.bandit.n) + else: + # Pick the best one. + i = max(range(self.bandit.n), key=lambda x: self.estimates[x]) + + r = self.bandit.generate_reward(i) + self.estimates[i] += 1. / (self.counts[i] + 1) * (r - self.estimates[i]) + + return i + + +class UCB1(Solver): + def __init__(self, bandit, init_proba=1.0): + super(UCB1, self).__init__(bandit) + self.t = 0 + self.estimates = [init_proba] * self.bandit.n + + @property + def estimated_probas(self): + return self.estimates + + def run_one_step(self): + self.t += 1 + + # Pick the best one with consideration of upper confidence bounds. + i = max(range(self.bandit.n), key=lambda x: self.estimates[x] + np.sqrt( + 2 * np.log(self.t) / (1 + self.counts[x]))) + r = self.bandit.generate_reward(i) + + self.estimates[i] += 1. / (self.counts[i] + 1) * (r - self.estimates[i]) + + return i + + +class BayesianUCB(Solver): + """Assuming Beta prior.""" + + def __init__(self, bandit, c=3, init_a=1, init_b=1): + """ + c (float): how many standard dev to consider as upper confidence bound. + init_a (int): initial value of a in Beta(a, b). + init_b (int): initial value of b in Beta(a, b). + """ + super(BayesianUCB, self).__init__(bandit) + self.c = c + self._as = [init_a] * self.bandit.n + self._bs = [init_b] * self.bandit.n + + @property + def estimated_probas(self): + return [self._as[i] / float(self._as[i] + self._bs[i]) for i in range(self.bandit.n)] + + def run_one_step(self): + # Pick the best one with consideration of upper confidence bounds. + i = max( + range(self.bandit.n), + key=lambda x: self._as[x] / float(self._as[x] + self._bs[x]) + beta.std( + self._as[x], self._bs[x]) * self.c + ) + r = self.bandit.generate_reward(i) + + # Update Gaussian posterior + self._as[i] += r + self._bs[i] += (1 - r) + + return i + + +class ThompsonSampling(Solver): + def __init__(self, bandit, init_a=1, init_b=1): + """ + init_a (int): initial value of a in Beta(a, b). + init_b (int): initial value of b in Beta(a, b). + """ + super(ThompsonSampling, self).__init__(bandit) + + self._as = [init_a] * self.bandit.n + self._bs = [init_b] * self.bandit.n + + @property + def estimated_probas(self): + return [self._as[i] / (self._as[i] + self._bs[i]) for i in range(self.bandit.n)] + + def run_one_step(self): + samples = [np.random.beta(self._as[x], self._bs[x]) for x in range(self.bandit.n)] + i = max(range(self.bandit.n), key=lambda x: samples[x]) + r = self.bandit.generate_reward(i) + + self._as[i] += r + self._bs[i] += (1 - r) + + return i