diff --git a/lib/RLTrader.py b/lib/RLTrader.py index 3687744..eeeb4ea 100644 --- a/lib/RLTrader.py +++ b/lib/RLTrader.py @@ -6,15 +6,25 @@ from typing import Dict from stable_baselines.common.base_class import BaseRLModel from stable_baselines.common.policies import BasePolicy, MlpPolicy -from stable_baselines.common.vec_env import DummyVecEnv +from stable_baselines.common.vec_env import DummyVecEnv, SubprocVecEnv +from stable_baselines.common import set_global_seeds from stable_baselines import PPO2 from lib.env.TradingEnv import TradingEnv from lib.data.providers.dates import ProviderDateFormat -from lib.data.providers import StaticDataProvider, ExchangeDataProvider +from lib.data.providers import BaseDataProvider, StaticDataProvider, ExchangeDataProvider from lib.util.logger import init_logger +def make_env(data_provider: BaseDataProvider, rank: int = 0, seed: int = 0): + def _init(): + env = TradingEnv(data_provider) + env.seed(seed + rank) + return env + set_global_seeds(seed) + return _init + + class RLTrader: data_provider = None study_name = None @@ -32,7 +42,8 @@ def __init__(self, modelClass: BaseRLModel = PPO2, policyClass: BasePolicy = Mlp self.date_format = kwargs.get('date_format', ProviderDateFormat.DATETIME_HOUR_24) self.model_verbose = kwargs.get('model_verbose', 1) - self.nminibatches = kwargs.get('nminibatches', 1) + self.n_envs = kwargs.get('n_envs', os.cpu_count()) + self.n_minibatches = kwargs.get('n_minibatches', self.n_envs) self.train_split_percentage = kwargs.get('train_split_percentage', 0.8) self.data_provider = kwargs.get('data_provider', 'static') @@ -109,11 +120,14 @@ def optimize_params(self, trial, n_prune_evals_per_trial: int = 2, n_tests_per_e del test_provider - train_env = DummyVecEnv([lambda: TradingEnv(train_provider)]) - validation_env = DummyVecEnv([lambda: TradingEnv(validation_provider)]) + n_envs = min(self.n_envs, 4) + n_minibatches = self.n_minibatches if n_envs % self.n_minibatches == 0 else n_envs + + train_env = SubprocVecEnv([make_env(train_provider, i) for i in range(n_envs)]) + validation_env = SubprocVecEnv([make_env(validation_provider, i) for i in range(n_envs)]) model_params = self.optimize_agent_params(trial) - model = self.Model(self.Policy, train_env, verbose=self.model_verbose, nminibatches=self.nminibatches, + model = self.Model(self.Policy, train_env, verbose=self.model_verbose, nminibatches=n_minibatches, tensorboard_log=self.tensorboard_path, **model_params) last_reward = -np.finfo(np.float16).max @@ -135,7 +149,7 @@ def optimize_params(self, trial, n_prune_evals_per_trial: int = 2, n_tests_per_e obs, reward, done, _ = validation_env.step(action) reward_sum += reward - if done: + if all(done): rewards.append(reward_sum) reward_sum = 0.0 n_episodes += 1 @@ -149,7 +163,7 @@ def optimize_params(self, trial, n_prune_evals_per_trial: int = 2, n_tests_per_e return -1 * last_reward - def optimize(self, n_trials: int = 20, n_parallel_jobs: int = 1, *optimize_params): + def optimize(self, n_trials: int = 100, n_parallel_jobs: int = 1, *optimize_params): try: self.optuna_study.optimize( self.optimize_params, n_trials=n_trials, n_jobs=n_parallel_jobs, *optimize_params) @@ -166,16 +180,16 @@ def optimize(self, n_trials: int = 20, n_parallel_jobs: int = 1, *optimize_param return self.optuna_study.trials_dataframe() - def train(self, n_epochs: int = 10, test_trained_model: bool = False, render_trained_model: bool = False): + def train(self, n_epochs: int = 100, save_every: int = 10, test_trained_model: bool = False, render_trained_model: bool = False): train_provider, test_provider = self.data_provider.split_data_train_test(self.train_split_percentage) del test_provider - train_env = DummyVecEnv([lambda: TradingEnv(train_provider)]) + train_env = SubprocVecEnv([make_env(train_provider, i) for i in range(self.n_envs)]) model_params = self.get_model_params() - model = self.Model(self.Policy, train_env, verbose=self.model_verbose, nminibatches=self.nminibatches, + model = self.Model(self.Policy, train_env, verbose=self.model_verbose, nminibatches=self.n_minibatches, tensorboard_log=self.tensorboard_path, **model_params) self.logger.info(f'Training for {n_epochs} epochs') @@ -200,7 +214,7 @@ def test(self, model_epoch: int = 0, should_render: bool = True): del train_provider - test_env = DummyVecEnv([lambda: TradingEnv(test_provider)]) + test_env = SubprocVecEnv([make_env(test_provider, i) for i in range(self.n_envs)]) model_path = path.join('data', 'agents', f'{self.study_name}__{model_epoch}.pkl') model = self.Model.load(model_path, env=test_env) @@ -208,15 +222,15 @@ def test(self, model_epoch: int = 0, should_render: bool = True): self.logger.info(f'Testing model ({self.study_name}__{model_epoch})') state = None - obs, done, rewards = test_env.reset(), False, [] - while not done: + obs, done, rewards = test_env.reset(), [False], [] + while not all(done): action, state = model.predict(obs, state=state) obs, reward, done, _ = test_env.step(action) rewards.append(reward) - if should_render: + if should_render and self.n_envs == 1: test_env.render(mode='human') self.logger.info( - f'Finished testing model ({self.study_name}__{model_epoch}): ${"{:.2f}".format(str(np.mean(rewards)))}') + f'Finished testing model ({self.study_name}__{model_epoch}): ${"{:.2f}".format(np.sum(rewards))}') diff --git a/lib/__init__.pyc b/lib/__init__.pyc index 2581be6..a6af580 100644 Binary files a/lib/__init__.pyc and b/lib/__init__.pyc differ diff --git a/optimize.py b/optimize.py index 88e06e5..0641b27 100644 --- a/optimize.py +++ b/optimize.py @@ -7,5 +7,5 @@ if __name__ == '__main__': trader = RLTrader() - trader.optimize() + # trader.optimize() trader.train(test_trained_model=True, render_trained_model=True)