diff --git a/Bitcoin-Factory/Forecast-Client/notebooks/Bitcoin_Factory_RL.py b/Bitcoin-Factory/Forecast-Client/notebooks/Bitcoin_Factory_RL.py new file mode 100644 index 0000000000..330fa6b634 --- /dev/null +++ b/Bitcoin-Factory/Forecast-Client/notebooks/Bitcoin_Factory_RL.py @@ -0,0 +1,801 @@ +#!/usr/bin/env python +# coding: utf-8 + +import random +import gym +from gym import spaces +import pandas as pd +import numpy as np +import matplotlib +import matplotlib.pyplot as plt +import time +import ray +import os +import sys +import math +import json +from typing import Dict, List, Optional, Union +from ray import tune +from ray.rllib.agents import ppo +from ray.tune import CLIReporter +from ray.tune import ProgressReporter +from ray.tune.registry import register_env +from ray.rllib.env.vector_env import VectorEnv #unused +from sklearn.model_selection import train_test_split +from sklearn import preprocessing +from sklearn.preprocessing import MinMaxScaler +from tabulate import tabulate + +location: str = "/tf/notebooks/" +instructions_file: str = "instructions.csv" +run_forcast: bool = False +res_dir: str = location + "/ray_results/" + + +if os.path.isfile(location+instructions_file): #Forecaster + run_forecast = True + + # Load the Instructions Dataset + instructions_dataset = pd.read_csv( + '/tf/notebooks/instructions.csv', + header=0, + sep=' ', + skipinitialspace=True + ) + + # what are we going to do in the current run? + ACTION_TO_TAKE = instructions_dataset.values[0][1] + + # name of the model file to load or save. + MODEL_FILE_NAME = instructions_dataset.values[1][1] + + FILENAME_parameters_dataset = instructions_dataset.values[2][1] + FILENAME_timeseries_dataset = instructions_dataset.values[3][1] + + res_dir = location + "/models/" + MODEL_FILE_NAME + "/" + + parameters = pd.read_csv( + '/tf/notebooks/'+FILENAME_parameters_dataset, + header=0, + sep=' ', + skipinitialspace=True + ) + df = pd.read_csv( + location+FILENAME_timeseries_dataset, + header=0, + index_col=None, + sep=' ', + skipinitialspace=True + ) + +else: #Testclient + parameters = pd.read_csv( + location+'parameters.csv', + sep=' ', + skipinitialspace=True, + ) + df = pd.read_csv( + location+'time-series.csv', + header=0, + index_col=None, + sep=' ', + skipinitialspace=True + ) + +if {'TIMESTEPS_TO_TRAIN'}.issubset(parameters.columns): + EXPERIMENT_NAME = "Trading_Signal_Predictor_RL_V01" + PERCENTAGE_OF_DATASET_FOR_TRAINING = 80 + TIMESTEPS_TO_TRAIN = parameters['TIMESTEPS_TO_TRAIN'][0] + OBSERVATION_WINDOW_SIZE = parameters['OBSERVATION_WINDOW_SIZE'][0] + INITIAL_QUOTE_ASSET = parameters['INITIAL_QUOTE_ASSET'][0] + INITIAL_BASE_ASSET = parameters['INITIAL_BASE_ASSET'][0] + TRADING_FEE = parameters['TRADING_FEE'][0] + ENV_VERSION = parameters['ENV_VERSION'][0] + ENV_NAME = parameters['ENV_NAME'][0] + EXPLORE_ON_EVAL = parameters['EXPLORE_ON_EVAL'][0] + + # Hyper-parameters, in case we want to really control them from the test server not from ray + ALGORITHM = parameters['ALGORITHM'][0] + ROLLOUT_FRAGMENT_LENGTH = parameters['ROLLOUT_FRAGMENT_LENGTH'][0] + TRAIN_BATCH_SIZE = parameters['TRAIN_BATCH_SIZE'][0] + SGD_MINIBATCH_SIZE = parameters['SGD_MINIBATCH_SIZE'][0] + BATCH_MODE = parameters['BATCH_MODE'][0] + #VF_CLIP_PARAM = parameters['VF_CLIP_PARAM'][0] + FC_SIZE = [parameters['FC_SIZE'][0]] + LEARNING_RATE = parameters['LEARNING_RATE'][0] + GAMMA = parameters['GAMMA'][0] + +else: + EXPERIMENT_NAME = "Trading_Signal_Predictor_RL_V01" + PERCENTAGE_OF_DATASET_FOR_TRAINING = 80 + TIMESTEPS_TO_TRAIN = int(parameters.values[2][4]) + OBSERVATION_WINDOW_SIZE = int(parameters.values[2][5]) + INITIAL_QUOTE_ASSET = int(parameters.values[2][6]) + INITIAL_BASE_ASSET = int(parameters.values[2][7]) + TRADING_FEE = float(parameters.values[2][8]) + ENV_NAME = str(parameters.values[2][9]) + ENV_VERSION = int(parameters.values[2][10]) + EXPLORE_ON_EVAL = str(parameters.values[2][12]) + + # Hyper-parameters, in case we want to really control them from the test server not from ray + ALGORITHM = str(parameters.values[2][13]) + ROLLOUT_FRAGMENT_LENGTH = int(parameters.values[2][14]) + TRAIN_BATCH_SIZE = int(parameters.values[2][15]) + SGD_MINIBATCH_SIZE = int(parameters.values[2][16]) + BATCH_MODE = str(parameters.values[2][17]) + #VF_CLIP_PARAM = parameters['VF_CLIP_PARAM'][0] + FC_SIZE = int(parameters.values[2][18]) + LEARNING_RATE = float(parameters.values[2][19]) + GAMMA = float(parameters.values[2][20]) + + +print(f'TIMESTEPS_TO_TRAIN: {TIMESTEPS_TO_TRAIN}\n') +print(f'OBSERVATION_WINDOW_SIZE: {OBSERVATION_WINDOW_SIZE}\n') +print(f'INITIAL_QUOTE_ASSET: {INITIAL_QUOTE_ASSET}\n') +print(f'INITIAL_BASE_ASSET: {INITIAL_BASE_ASSET}\n') +print(f'TRADING_FEE: {TRADING_FEE}\n') + +def prepare_data(df): + # renaming column labels as we wish, regardless what test server sends, hopefully he will maintain position + df.rename(columns={df.columns[0]: "date"}, inplace=True) + df.rename(columns={df.columns[1]: "high"}, inplace=True) + df.rename(columns={df.columns[2]: "low"}, inplace=True) + df.rename(columns={df.columns[3]: "close"}, inplace=True) + df.rename(columns={df.columns[4]: "open"}, inplace=True) + df.rename(columns={df.columns[5]: "volume"}, inplace=True) + + df['volume'] = np.int64(df['volume']) + df['date'] = pd.to_datetime(df['date'], unit='ms') + df.sort_values(by='date', ascending=True, inplace=True) + df.reset_index(drop=True, inplace=True) + df['date'] = df['date'].dt.strftime('%Y-%m-%d %I:%M %p') + + return df + +data = prepare_data(df) + +# Setup which data to use for training and which data to use for evaluation of RL Model +def split_data(data): + + X_train_test, X_valid = train_test_split(data, train_size=0.67, test_size=0.33, shuffle=False) + X_train, X_test = train_test_split(X_train_test, train_size=0.50, test_size=0.50, shuffle=False) + + return X_train, X_test, X_valid + +X_train, X_test, X_valid = split_data(data) + + +# Normalize the dataset subsets to make the model converge faster +scaler_type = MinMaxScaler + +def get_feature_scalers(X, scaler_type=scaler_type): + scalers = [] + for name in list(X.columns[X.columns != 'date']): + scalers.append(scaler_type().fit(X[name].values.reshape(-1, 1))) + return scalers + +def get_scaler_transforms(X, scalers): + X_scaled = [] + for name, scaler in zip(list(X.columns[X.columns != 'date']), scalers): + X_scaled.append(scaler.transform(X[name].values.reshape(-1, 1))) + X_scaled = pd.concat([pd.DataFrame(column, columns=[name]) for name, column in zip(list(X.columns[X.columns != 'date']), X_scaled)], axis='columns') + return X_scaled + +def scale_numpy_array(np_arr, scaler_type = scaler_type): + return scaler_type().fit_transform(np_arr, (-1,1)) + +def normalize_data(X_train, X_test, X_valid): + X_train_test = pd.concat([X_train, X_test], axis='index') + X_train_test_valid = pd.concat([X_train_test, X_valid], axis='index') + + X_train_test_dates = X_train_test[['date']] + X_train_test_valid_dates = X_train_test_valid[['date']] + + X_train_test = X_train_test.drop(columns=['date']) + X_train_test_valid = X_train_test_valid.drop(columns=['date']) + + train_test_scalers = get_feature_scalers(X_train_test, + scaler_type=scaler_type) + train_test_valid_scalers = get_feature_scalers(X_train_test_valid, + scaler_type=scaler_type) + + X_train_test_scaled = get_scaler_transforms(X_train_test, + train_test_scalers) + X_train_test_valid_scaled = get_scaler_transforms(X_train_test_valid, + train_test_scalers) + X_train_test_valid_scaled_leaking = get_scaler_transforms(X_train_test_valid, + train_test_valid_scalers) + + X_train_test_scaled = pd.concat([X_train_test_dates, + X_train_test_scaled], + axis='columns') + X_train_test_valid_scaled = pd.concat([X_train_test_valid_dates, + X_train_test_valid_scaled], + axis='columns') + X_train_test_valid_scaled_leaking = pd.concat([X_train_test_valid_dates, + X_train_test_valid_scaled_leaking], + axis='columns') + + X_train_scaled = X_train_test_scaled.iloc[:X_train.shape[0]] + X_test_scaled = X_train_test_scaled.iloc[X_train.shape[0]:] + X_valid_scaled = X_train_test_valid_scaled.iloc[X_train_test.shape[0]:] + X_valid_scaled_leaking = X_train_test_valid_scaled_leaking.iloc[X_train_test.shape[0]:] + + return (train_test_scalers, + train_test_valid_scalers, + X_train_scaled, + X_test_scaled, + X_valid_scaled, + X_valid_scaled_leaking) + +train_test_scalers, train_test_valid_scalers, X_train_scaled, X_test_scaled, X_valid_scaled, X_valid_scaled_leaking = normalize_data(X_train, X_test, X_valid) + +class SimpleTradingEnv(gym.Env): + + metadata = {'render.modes': ['live', 'human', 'portfolio', 'none']} + visualization = None + + def __init__(self, config=None): + + self.df_scaled = config.get("df_scaled").reset_index(drop=True) + self.df_normal = config.get("df_normal").reset_index(drop=True) + self.window_size = OBSERVATION_WINDOW_SIZE + self.prices, self.features = self._process_data(self.df_scaled) + # The shape of the observation is (window_size * features + environment_features) the environment_features are: quote_asset, base_asset, net_worth. The entire observation is flattened in a 1D np array. + # NOT USED ANYMORE, KEPT FOR REFERENCE + # self.obs_shape = ((OBSERVATION_WINDOW_SIZE * self.features.shape[1] + 3),) + + # The shape of the observation is number of candles to look back, and the number of features (candle_features) + 3 (quote_asset, base_asset, net_worth) + self.obs_shape = (OBSERVATION_WINDOW_SIZE, self.features.shape[1] + 3) + + # Action space + #self.action_space = spaces.Box(low=np.array([0, 0]), high=np.array([3.0, 1.0]), dtype=np.float32) + self.action_space = spaces.MultiDiscrete([3, 100]) + # Observation space + self.observation_space = spaces.Box(low=-1, high=1, shape=self.obs_shape, dtype=np.float32) + + # Initialize the episode environment + + self._start_candle = OBSERVATION_WINDOW_SIZE # We assume that the first observation is not the first row of the dataframe, in order to avoid the case where there are no calculated indicators. + self._end_candle = len(self.features) - 1 + self._trading_fee = config.get("trading_fee") + + self._quote_asset = None + self._base_asset = None + self._done = None + self._current_candle = None + self._net_worth = None + self._previous_net_worth = None + + # Array that will contain observation history needed for appending it to the observation space + # It will contain observations consisting of the net_worth, base_asset and quote_asset as list of floats + # Other features (OHLC + Indicators) will be appended to the current observation in the _get_observation method that takes the data directly from the available dataframe + self._obs_env_history = None + + # Render and analysis data + self._total_reward_accumulated = None + self.portfolio_history = None + self.trade_history = None + self.positions = None + self._first_rendering = None + + + def reset(self): + self._done = False + self._current_candle = self._start_candle + self._quote_asset = INITIAL_QUOTE_ASSET + self._base_asset = INITIAL_BASE_ASSET + self._net_worth = INITIAL_QUOTE_ASSET # at the begining our net worth is the initial quote asset + self._previous_net_worth = INITIAL_QUOTE_ASSET # at the begining our previous net worth is the initial quote asset + self._total_reward_accumulated = 0. + self._first_rendering = True + self.portfolio_history = [] + self.trade_history = [] + self.positions = [] + self._obs_env_history = [] + + self._initial_obs_data() + + return self._get_observation() + + def _take_action(self, action): + self._done = False + current_price = random.uniform( + self.df_normal.loc[self._current_candle, "low"], self.df_normal.loc[self._current_candle, "high"]) + + + action_type = action[0] + amount = action[1] / 100 + + if action_type == 0: # Buy + # Buy % assets + # Determine the maximum amount of quote asset that can be bought + available_amount_to_buy_with = self._quote_asset / current_price + # Buy only the amount that agent chose + assets_bought = available_amount_to_buy_with * amount + # Update the quote asset balance + self._quote_asset -= assets_bought * current_price + # Update the base asset + self._base_asset += assets_bought + # substract trading fee from base asset based on the amount bought + self._base_asset -= self._trading_fee * assets_bought + + # Add to trade history the amount bought if greater than 0 + if assets_bought > 0: + self.trade_history.append({'step': self._current_candle, 'type': 'BuyLong', 'amount': assets_bought, 'price': current_price, 'total' : assets_bought * current_price, 'percent_amount': action[1]}) + + + elif action_type == 1: # Sell + # Sell % assets + # Determine the amount of base asset that can be sold + amount_to_sell = self._base_asset * amount + received_quote_asset = amount_to_sell * current_price + # Update the quote asset + self._quote_asset += received_quote_asset + # Update the base asset + self._base_asset -= amount_to_sell + + # substract trading fee from quote asset based on the amount sold + self._quote_asset -= self._trading_fee * received_quote_asset + + # Add to trade history the amount sold if greater than 0 + if amount_to_sell > 0: + self.trade_history.append({'step': self._current_candle, 'type': 'SellLong', 'amount': amount_to_sell, 'price': current_price, 'total' : received_quote_asset, 'percent_amount': action[1]}) + + else: + # Hold + self.trade_history.append({'step': self._current_candle, 'type': 'Hold', 'amount': '0', 'price': current_price, 'total' : 0, 'percent_amount': action[1]}) + + + self.portfolio_history.append({'step': self._current_candle, 'base_asset': self._base_asset, 'quote_asset': self._quote_asset, 'current_price': current_price, 'net_worth' : self._net_worth}) + + # Update the current net worth + self._net_worth = self._base_asset * current_price + self._quote_asset + + + def step(self, action): + """ + Returns the next observation, reward, done and info. + """ + + self._take_action(action) + + # Calculate reward comparing the current net worth with the previous net worth + reward = self._net_worth - self._previous_net_worth + + self._total_reward_accumulated += reward + + # Update the previous net worth to be the current net worth after the reward has been applied + self._previous_net_worth = self._net_worth + + obs = self._get_observation() + # Update the info and add it to history data + info = dict ( + total_reward_accumulated = self._total_reward_accumulated, + net_worth = self._net_worth, + quote_asset = self._quote_asset, + base_asset = self._base_asset, + last_action_type = self.trade_history[-1]['type'] if len(self.trade_history) > 0 else None, + last_action_amount = self.trade_history[-1]['amount'] if len(self.trade_history) > 0 else None, + current_step = self._current_candle, + current_action = action + ) + + self._current_candle += 1 + + # Update observation history + self._obs_env_history.append([self._net_worth, self._base_asset, self._quote_asset]) + + self._done = self._net_worth <= 0 or self._current_candle >= (len( + self.df_normal.loc[:, 'open'].values) - 30)# We assume that the last observation is not the last row of the dataframe, in order to avoid the case where there are no calculated indicators. + + if self._done: + print('I have finished the episode (',self._current_candle,'Candles )') + + return obs, reward, self._done, info + + + def _get_observation(self): + """ + Returns the current observation. + """ + data_frame = self.features[(self._current_candle - self.window_size):self._current_candle] + + obs_env_history = np.array(self._obs_env_history).astype(np.float32) + + #TODO We definetely need to scale the observation history in a better way, this might influence training results + # Doing it ad-hoc might change the scale of the min and max, thus changing the results + obs_env_history = preprocessing.minmax_scale(obs_env_history, (-0.9,0.9)) + + obs = np.hstack((data_frame, obs_env_history[(self._current_candle - self.window_size):self._current_candle])) + + return obs + + + def render(self, mode='human', **kwargs): + """ + Renders a plot with trades made by the agent. + """ + + if mode == 'human': + print(f'Accumulated Reward: {self._total_reward_accumulated} ---- Current Net Worth: {self._net_worth}') + print(f'Current Quote asset: {self._quote_asset} ---- Current Base asset: {self._base_asset}') + print(f'Number of trades: {len(self.trade_history)}') + + if(len(self.trade_history) > 0): + print(f'Last Action: {self.trade_history[-1]["type"]} {self.trade_history[-1]["amount"]} assets ({self.trade_history[-1]["percent_amount"]} %) at price {self.trade_history[-1]["price"]}, total: {self.trade_history[-1]["total"]}') + print(f'--------------------------------------------------------------------------------------') + elif mode == 'live': + pass + # if self.visualization == None: + # self.visualization = LiveTradingGraph(self.df_normal, kwargs.get('title', None)) + + # if self._current_candle > OBSERVATION_WINDOW_SIZE: + # self.visualization.render(self._current_candle, self._net_worth, self.trade_history, window_size=OBSERVATION_WINDOW_SIZE) + + elif mode == 'portfolio': + return self.positions, self.trade_history, self.portfolio_history, self.df_normal + + def close(self): + if self.visualization != None: + self.visualization.close() + self.visualization = None + + + def _process_data(self, df_scaled): + """ + Processes the dataframe into features. + """ + + prices = self.df_scaled.loc[:, 'close'].to_numpy(dtype=np.float32) + + data_frame = df_scaled.iloc[:, 1:] # drop first column which is date TODO: Should be probably fixed outside of this class + # Convert df to numpy array + return prices, data_frame.to_numpy(dtype=np.float32) + + def _initial_obs_data(self): + for i in range(self.window_size - len(self._obs_env_history)): + self._obs_env_history.append([self._net_worth, self._base_asset, self._quote_asset]) + +# Initialize Ray +if ray.is_initialized(): + ray.shutdown() # let's shutdown first any running instances of ray (don't confuse it with the cluster) +os.environ['RAY_record_ref_creation_sites'] = '1' # Needed for debugging when things go wrong +ray.init() + +try: + available_gpu_in_cluster = ray.available_resources()['GPU'] +except KeyError as e: + available_gpu_in_cluster = 0 + +available_cpu_in_cluster = ray.available_resources()['CPU'] if ray.available_resources()['CPU'] else 0 + +# In the first version we assume that we have only one node cluster, so the allocation logic is based on that +# So the resources are maximized for one ray tune trial at a time +def find_optimal_resource_allocation(available_cpu, available_gpu): + """ + Finds the optimal resource allocation for the agent based on the available resources in the cluster + """ + # If we have GPU available, we allocate it all for the training, while creating as much workers as CPU cores we have minus one for the driver which holds the trainer + if available_gpu > 0: + return { + 'num_workers': available_cpu - 1, + 'num_cpus_per_worker': 1, + 'num_envs_per_worker': 1, + 'num_gpus_per_worker': 0, + 'num_cpus_for_driver': 1, + 'num_gpus' : available_gpu + } + # If we don't have GPU available, we allocate enough CPU cores for stepping the env (workers) while having enough for training maintaing a ratio of around 3 workers with 1 CPU to 1 driver CPU + else: + # according to the benchmark, we should allocate more workers, each with 1 cpu, letting the rest for the driver + num_workers = int(math.floor((available_cpu * 75) / 100)) + num_cpu_for_driver = available_cpu - num_workers + return { + 'num_workers': num_workers, + 'num_cpus_per_worker': 1, # this should be enough for stepping an env at once + 'num_envs_per_worker': 1, # it doesn't seem to add any benefits to have more than one env per worker + 'num_gpus_per_worker': 0, # the inference is done pretty fast, so there is no need to use GPU, at least not when we run one trial at once + 'num_cpus_for_driver': num_cpu_for_driver, + 'num_gpus' : 0 + } + +parallel_config = find_optimal_resource_allocation(available_cpu_in_cluster, 0) # Currently we are going to disable GPU ussage due to it's poor performance on a single instance cluster + +training_config = { + "trading_fee": TRADING_FEE, + "df_normal": X_train, + "df_scaled": X_train_scaled, +} + +test_config = { + "trading_fee": TRADING_FEE, + "df_normal": X_test, + "df_scaled": X_test_scaled, +} + +eval_config = { + "trading_fee": TRADING_FEE, + "df_normal": X_valid, + "df_scaled": X_valid_scaled, +} + +if ENV_NAME == 'SimpleTrading': + training_env = SimpleTradingEnv(training_config) + test_env = SimpleTradingEnv(test_config) + eval_env = SimpleTradingEnv(eval_config) + + training_env_key = "SimpleTradingEnv-training-V01" + test_env_key = "SimpleTradingEnv-testing-V01" + eval_env_key = "SimpleTradingEnv-evaluating-V01" + +tune.register_env(training_env_key, lambda _: training_env) +tune.register_env(test_env_key, lambda _: test_env) +tune.register_env(eval_env_key, lambda _: eval_env) + + +# Create the ppo trainer configuration +ppo_trainer_config = { + "env": training_env_key, # Ray will automatically create multiple environments and vectorize them if needed + "horizon": len(X_train_scaled) - 30, + "log_level": "WARN", #or INFO + "framework": "tf", + #"eager_tracing": True, + "ignore_worker_failures": True, + "num_workers": parallel_config.get("num_workers"), # Number of workers is per trial run, so the more we put the less parallelism we have + "num_envs_per_worker": parallel_config.get("num_envs_per_worker"), # This influences also the length of the episode. the environment length will be split by the number of environments per worker + "num_gpus": parallel_config.get("num_gpus"), # Number of GPUs to use in training (0 means CPU only). After a few experiments, it seems that using GPU is not helping + "num_cpus_per_worker": parallel_config.get("num_cpus_per_worker"), # After some testing, seems the fastest way for this kind of enviroment. It's better to run more trials in parallel than to finish a trial with a couple of minutes faster. Because we can end trial earlier if we see that our model eventuall converge + "num_cpus_for_driver": parallel_config.get("num_cpus_for_driver"), # Number of CPUs to use for the driver. This is the number of CPUs used for the training process. + "num_gpus_per_worker": parallel_config.get("num_gpus_per_worker"), + "rollout_fragment_length": ROLLOUT_FRAGMENT_LENGTH, # Size of batches collected from each worker. If num_envs_per_worker is > 1 the rollout value will be multiplied by num_envs_per_worker + "train_batch_size": TRAIN_BATCH_SIZE, # Number of timesteps collected for each SGD round. This defines the size of each SGD epoch. the batch size is composed of fragments defined above + "sgd_minibatch_size": SGD_MINIBATCH_SIZE, + "batch_mode": BATCH_MODE, + "vf_clip_param": 100, # Default is 10, but we increase it to 100 to adapt it to our rewards scale. It helps our value function to converge faster + "lr": LEARNING_RATE, # Hyperparameter grid search defined above + "gamma": GAMMA, # This can have a big impact on the result and needs to be properly tuned + #"observation_filter": "MeanStdFilter", + "model": { + # "fcnet_hiddens": FC_SIZE, # Hyperparameter grid search defined above + # "use_lstm": True, + # "lstm_cell_size": 256, + # "lstm_use_prev_action_reward": True, + # "lstm_use_prev_action": True, + + }, + "evaluation_interval": 5, # Run one evaluation step on every x `Trainer.train()` call. + "evaluation_duration": 1, # How many episodes to run evaluations for each time we evaluate. + "evaluation_config": { + "explore": True, # We usually don't want to explore during evaluation. All actions have to be repeatable. Similar to deterministic = True, but on-policy algorithms can get better results with exploration. + "env": test_env_key, # We need to define a new environment for evaluation with different parameters + }, + "logger_config": { + "logdir": res_dir, + "type": "ray.tune.logger.UnifiedLogger", + } + } + + +# ### Custom reporter to get progress in Superalgos +class CustomReporter(ProgressReporter): + + def __init__( + self, + max_report_frequency: int = 10, # in seconds + location: str = "/tf/notebooks/", + ): + self._max_report_freqency = max_report_frequency + self._last_report_time = 0 + self._location = location + + def should_report(self, trials, done=False): + if time.time() - self._last_report_time > self._max_report_freqency: + self._last_report_time = time.time() + return True + return done + + def report(self, trials, *sys_info): + + trial_status_dict = {} + for trial in trials: + trial_status_dict['status'] = trial.status + trial_status_dict['name'] = trial.trial_id + trial_status_dict['episodeRewardMax'] = int(trial.last_result['episode_reward_max']) if trial.last_result.get("episode_reward_max") else 0 + trial_status_dict['episodeRewardMean'] = int(trial.last_result['episode_reward_mean']) if trial.last_result.get("episode_reward_mean") else 0 + trial_status_dict['episodeRewardMin'] = int(trial.last_result['episode_reward_min']) if trial.last_result.get("episode_reward_min") else 0 + trial_status_dict['timestepsExecuted'] = int(trial.last_result['timesteps_total']) if trial.last_result.get("timesteps_total") else 0 + trial_status_dict['timestepsTotal'] = int(TIMESTEPS_TO_TRAIN) + + + sys.stdout.write(json.dumps(trial_status_dict)) + sys.stdout.write('\n') + + # Write the results to JSON file + with open(self._location + "training_results.json", "w+") as f: + json.dump(trial_status_dict, f) + f.close() + + def set_start_time(self, timestamp: Optional[float] = None): + if timestamp is not None: + self._start_time = time.time() + else: + self._start_time = timestamp + +# Printing a custom text to let Superalgos know that we are in a RL scenario +sys.stdout.write('RL_SCENARIO') +sys.stdout.write('\n') + +# Run ray tune +analysis = tune.run( + run_or_experiment=ALGORITHM, + name=EXPERIMENT_NAME, + metric='episode_reward_mean', + mode='max', + stop={ + # An iteration is equal with one SGD round which in our case is equal to train_batch_size. If after X iterations we still don't have a good result, we stop the trial + "timesteps_total": TIMESTEPS_TO_TRAIN + }, + config=ppo_trainer_config, + num_samples=1, # Have one sample for each hyperparameter combination. You can have more to average out randomness. + keep_checkpoints_num=30, # Keep the last X checkpoints + checkpoint_freq=5, # Checkpoint every X iterations (save the model) + checkpoint_at_end=True, # Whether to checkpoint at the end of the experiment regardless of the checkpoint_freq + verbose=1, + local_dir=res_dir, # Local directory to store checkpoints and results, we are using tmp folder until we move the notebook to a docker instance and we can use the same directory across all instances, no matter the underlying OS + progress_reporter=CustomReporter(max_report_frequency=10,location=location), + fail_fast=True, + resume="AUTO" # Resume training from the last checkpoint if any exists [True, 'LOCAL', 'REMOTE', 'PROMPT', 'ERRORED_ONLY', 'AUTO'] +) + +# Evaluate trained model restoring it from checkpoint +best_trial = analysis.get_best_trial(metric="episode_reward_mean", mode="max", scope="all") +best_checkpoint = analysis.get_best_checkpoint(best_trial, metric="episode_reward_mean") + +print("best_checkpoint path: " + best_checkpoint.local_path) + +agent = ppo.PPOTrainer(config=ppo_trainer_config) +agent.restore(best_checkpoint) + +json_dict = {} +episodes_to_run = 2 + +envs = [training_env, test_env, eval_env] + +positions = [] +trade_history = [] +portfolio_history = [] +df_normal = [] + +for iter, env in enumerate(envs): + net_worths = [] + q_assets = [] + b_assets = [] + net_worths_at_end = [] + q_assets_at_end = [] + b_assets_at_end = [] + last_actions = [] + + for i in range(episodes_to_run): + episode_reward = 0 + done = False + obs = env.reset() # we are using the evaluation environment for evaluation + last_info = None + while not done: + action = agent.compute_single_action(obs, explore=True) # stochastic evaluation + obs, reward, done, info = env.step(action) + net_worths.append(info['net_worth']) # Add all historical net worths to a list to print statistics at the end of the episode + q_assets.append(info['quote_asset']) # Add all historical quote assets to a list to print statistics at the end of the episode + b_assets.append(info['base_asset']) # Add all historical base assets to a list to print statistics at the end of the episode + episode_reward += reward + last_info = info + + net_worths_at_begin = net_worths[0] + net_worths_at_end.append(last_info['net_worth']) # Add all historical net worths to a list to print statistics at the end of the episode + q_assets_at_end.append(last_info['quote_asset']) # Add all historical quote assets to a list to print statistics at the end of the episode + b_assets_at_end.append(last_info['base_asset']) # Add all historical base assets to a list to print statistics at the end of the episode + last_actions.append(last_info['current_action']) + + r1, r2, r3, r4 = env.render(mode='portfolio') + positions.append(r1) + trade_history.append(r2) + portfolio_history.append(r3) + df_normal.append(r4) + + json_dict_env = {} + json_dict_env['meanNetWorth'] = np.mean(net_worths) + json_dict_env['stdNetWorth'] = np.std(net_worths) + json_dict_env['minNetWorth'] = np.min(net_worths) + json_dict_env['maxNetWorth'] = np.max(net_worths) + json_dict_env['stdQuoteAsset'] = np.std(q_assets) + json_dict_env['minQuoteAsset'] = np.min(q_assets) + json_dict_env['maxQuoteAsset'] = np.max(q_assets) + json_dict_env['stdBaseAsset'] = np.std(b_assets) + json_dict_env['minBaseAsset'] = np.min(b_assets) + json_dict_env['maxBaseAsset'] = np.max(b_assets) + json_dict_env['NetWorthAtBegin'] = net_worths_at_begin + json_dict_env['meanNetWorthAtEnd'] = np.mean(net_worths_at_end) + json_dict_env['stdNetWorthAtEnd'] = np.std(net_worths_at_end) + json_dict_env['minNetWorthAtEnd'] = np.min(net_worths_at_end) + json_dict_env['maxNetWorthAtEnd'] = np.max(net_worths_at_end) + json_dict_env['current_action'] = {"type": int(last_actions[-1][0]), "amount":int(last_actions[-1][1])} + + print(f"NetWorthAtBegin / meanNetWorthAtEnd : {json_dict_env['NetWorthAtBegin']} / {json_dict_env['meanNetWorthAtEnd']}") + json_dict[iter] = json_dict_env + + +# Write the results to JSON file to be picked up by Superalgos +with open(location + "evaluation_results.json", "w+") as f: + json.dump(json_dict, f) + f.close() + +pd_positions_0 = pd.DataFrame(positions[0]) +pd_positions_1 = pd.DataFrame(positions[1]) +pd_positions_2 = pd.DataFrame(positions[2]) + +pd_trade_history_0 = pd.DataFrame(trade_history[0]) +pd_trade_history_1 = pd.DataFrame(trade_history[1]) +pd_trade_history_2 = pd.DataFrame(trade_history[2]) + +pd_portfolio_history_0 = pd.DataFrame(portfolio_history[0]) +pd_portfolio_history_1 = pd.DataFrame(portfolio_history[1]) +pd_portfolio_history_2 = pd.DataFrame(portfolio_history[2]) + +pd_join_0 = pd.merge(pd_trade_history_0, pd_portfolio_history_0, how='right', left_on = 'step', right_on = 'step') +pd_join_1 = pd.merge(pd_trade_history_1, pd_portfolio_history_1, how='right', left_on = 'step', right_on = 'step') +pd_join_2 = pd.merge(pd_trade_history_2, pd_portfolio_history_2, how='right', left_on = 'step', right_on = 'step') +pd_join_2[:].tail(20) + +tic = "BTC train" +plt.figure(figsize=[15,9]); +plt.title(tic) +plt.plot(pd_join_0["step"],pd_join_0["net_worth"]/pd_join_0["net_worth"][0],label = "net_worth", color='black'); +plt.plot(pd_join_0["step"],pd_join_0["current_price"]/pd_join_0["current_price"][0],label = "current_price", color='yellow'); +plt.scatter(pd_trade_history_0[pd_trade_history_0['type'] == 'BuyLong']["step"], pd_trade_history_0[pd_trade_history_0['type'] == 'BuyLong']["price"]/pd_join_0["current_price"][0]*0.9, label='BuyLong', marker='^', color='lightgreen', alpha=1) +plt.scatter(pd_trade_history_0[pd_trade_history_0['type'] == 'SellLong']["step"], pd_trade_history_0[pd_trade_history_0['type'] == 'SellLong']["price"]/pd_join_0["current_price"][0]*1.1, label='SellLong', marker='v', color='magenta', alpha=1) +plt.scatter(pd_trade_history_0[pd_trade_history_0['type'] == 'BuyShort']["step"], pd_trade_history_0[pd_trade_history_0['type'] == 'BuyShort']["price"]/pd_join_0["current_price"][0]*1.1, label='BuyShort', marker='*', color='red', alpha=1) +plt.scatter(pd_trade_history_0[pd_trade_history_0['type'] == 'SellShort']["step"], pd_trade_history_0[pd_trade_history_0['type'] == 'SellShort']["price"]/pd_join_0["current_price"][0]*0.9, label='SellShort', marker='x', color='blue', alpha=1) +plt.xlabel('Steps') +plt.ylabel('Value normed to first step'); +plt.legend() +plt.savefig(res_dir+tic.replace(" ", "_")+".png") +plt.show() + + +tic = "BTC test" +plt.figure(figsize=[15,9]); +plt.title(tic) +plt.plot(pd_join_1["step"],pd_join_1["net_worth"]/pd_join_1["net_worth"][0],label = "net_worth", color='black'); +plt.plot(pd_join_1["step"],pd_join_1["current_price"]/pd_join_1["current_price"][0],label = "current_price", color='yellow'); +plt.scatter(pd_trade_history_1[pd_trade_history_1['type'] == 'BuyLong']["step"], pd_trade_history_1[pd_trade_history_1['type'] == 'BuyLong']["price"]/pd_join_1["current_price"][0]*0.9, label='BuyLong', marker='^', color='lightgreen', alpha=1) +plt.scatter(pd_trade_history_1[pd_trade_history_1['type'] == 'SellLong']["step"], pd_trade_history_1[pd_trade_history_1['type'] == 'SellLong']["price"]/pd_join_1["current_price"][0]*1.1, label='SellLong', marker='v', color='magenta', alpha=1) +plt.scatter(pd_trade_history_1[pd_trade_history_1['type'] == 'BuyShort']["step"], pd_trade_history_1[pd_trade_history_1['type'] == 'BuyShort']["price"]/pd_join_1["current_price"][0]*1.1, label='BuyShort', marker='*', color='red', alpha=1) +plt.scatter(pd_trade_history_1[pd_trade_history_1['type'] == 'SellShort']["step"], pd_trade_history_1[pd_trade_history_1['type'] == 'SellShort']["price"]/pd_join_1["current_price"][0]*0.9, label='SellShort', marker='x', color='blue', alpha=1) +plt.xlabel('Steps') +plt.ylabel('Value normed to first step'); +plt.legend() +plt.savefig(res_dir+tic.replace(" ", "_")+".png") +plt.show() + +tic = "BTC validate" +plt.figure(figsize=[15,9]); +plt.title(tic) +plt.plot(pd_join_2["step"],pd_join_2["net_worth"]/pd_join_2["net_worth"][0],label = "net_worth", color='black'); +plt.plot(pd_join_2["step"],pd_join_2["current_price"]/pd_join_2["current_price"][0],label = "current_price", color='yellow'); +plt.scatter(pd_trade_history_2[pd_trade_history_2['type'] == 'BuyLong']["step"], pd_trade_history_2[pd_trade_history_2['type'] == 'BuyLong']["price"]/pd_join_2["current_price"][0]*0.9, label='BuyLong', marker='^', color='lightgreen', alpha=1) +plt.scatter(pd_trade_history_2[pd_trade_history_2['type'] == 'SellLong']["step"], pd_trade_history_2[pd_trade_history_2['type'] == 'SellLong']["price"]/pd_join_2["current_price"][0]*1.1, label='SellLong', marker='v', color='magenta', alpha=1) +plt.scatter(pd_trade_history_2[pd_trade_history_2['type'] == 'BuyShort']["step"], pd_trade_history_2[pd_trade_history_2['type'] == 'BuyShort']["price"]/pd_join_2["current_price"][0]*1.1, label='BuyShort', marker='*', color='red', alpha=1) +plt.scatter(pd_trade_history_2[pd_trade_history_2['type'] == 'SellShort']["step"], pd_trade_history_2[pd_trade_history_2['type'] == 'SellShort']["price"]/pd_join_2["current_price"][0]*0.9, label='SellShort', marker='x', color='blue', alpha=1) +plt.xlabel('Steps') +plt.ylabel('Value normed to first step'); +plt.legend() +plt.savefig(res_dir+tic.replace(" ", "_")+".png") +plt.show() + +# Cleanup +if ray.is_initialized(): + ray.shutdown() + +# Tell Superalgos we finished +sys.stdout.write('RL_SCENARIO_END') +sys.stdout.write('\n') \ No newline at end of file diff --git a/Bitcoin-Factory/ReadMeReinforcementLearning.md b/Bitcoin-Factory/ReadMeReinforcementLearning.md new file mode 100644 index 0000000000..b8f431be08 --- /dev/null +++ b/Bitcoin-Factory/ReadMeReinforcementLearning.md @@ -0,0 +1,126 @@ +# Reinforcement Learning +## 💫 1. Introduction +[Reinforcement Learning](https://en.wikipedia.org/wiki/Reinforcement_learning) is a term used to describe a special machine learning process. The typical framing of a Reinforcement Learning (RL) scenario: An agent takes actions in an environment, which is interpreted into a reward and a representation of the state, which are fed back into the agent. + +![Learning framework](https://upload.wikimedia.org/wikipedia/commons/1/1b/Reinforcement_learning_diagram.svg "RL Framework") + +In our usage case, the environment is a stock trading one and the possible actions are buy,sell or hold. The reward will be our gain or loss. Based on this reward the agent will learn how to trade better. The process of learning is done with a so called [Proximal Policy Optimization (PPO)](https://en.wikipedia.org/wiki/Proximal_Policy_Optimization). + +At the end the agent will provide us an action for the current candle. The possible actions at the moment are: +* 0 -> buy long +* 1 -> sell +* 2 -> hold + +For buy and sell signals an additionaly percentage is provided. + +## 📒 2. Configuration +The basic config has to be done as pointed out in [Bitcoin Factory ReadMe](./README.md). Hereafter the differences for RL are shown. +### 2.1 Testserver config +To run a Testserver for RL und need to change the configuration of the testserver node in SA. First you need to define the python script, which should be used for the docker sessions on the clients. +Second you need to define the range of parameters to be tested: For example the learning rate and so on. +```js +{ + ... + "pythonScriptName": "Bitcoin_Factory_RL.py", + ... + "parametersRanges": { + "LIST_OF_ASSETS": [ + [ + "BTC" + ] + ], + "LIST_OF_TIMEFRAMES": [ + [ + "01-hs" + ], + [ + "02-hs" + ] + ], + "NUMBER_OF_LAG_TIMESTEPS": [ + 10 + ], + "PERCENTAGE_OF_DATASET_FOR_TRAINING": [ + 80 + ], + "NUMBER_OF_EPOCHS": [ + 750 + ], + "NUMBER_OF_LSTM_NEURONS": [ + 50 + ], + "TIMESTEPS_TO_TRAIN": [ + 1e7 + ], + "OBSERVATION_WINDOW_SIZE": [ + 24, + 48 + ], + "INITIAL_QUOTE_ASSET": [ + 1000 + ], + "INITIAL_BASE_ASSET": [ + 0 + ], + "TRADING_FEE": [ + 0.01 + ], + "ENV_NAME": [ + "SimpleTrading" + ], + "ENV_VERSION": [ + 1 + ], + "REWARD_FUNCTION": [ + "unused" + ], + "EXPLORE_ON_EVAL": [ + "unused" + ], + "ALGORITHM": [ + "PPO" + ], + "ROLLOUT_FRAGMENT_LENGTH": [ + 200 + ], + "TRAIN_BATCH_SIZE": [ + 2048 + ], + "SGD_MINIBATCH_SIZE": [ + 64 + ], + "BATCH_MODE": [ + "complete_episodes" + ], + "FC_SIZE": [ + 256 + ], + "LEARNING_RATE": [ + 0.00001 + ], + "GAMMA": [ + 0.95 + ] + } +} +``` +### 2.2 Testclient config +No special config is needed. +But run only one client per machine (The python script takes care of parallel execution on its own). + +## 💡 3. Results +> __Note__ +> The processing of one test case on the client takes roughly 2h-6h on a recent System. + +The provided Timeseries values are devided in 3 parts (Train, Test, Validate). The first one (train) is used to train the network. The second one (test) is used by the PPO-agent to evaluate the current net during the learning process. The third part is never seen by the agent, it is used to validate if the trained model is able to trade profitable on unseen data. + +The python script produces 3 charts to visualize the results. The follwing 3 examples are preliminary - made by a not good trained agent. +![Example Train Results](docs/BTC_train.png) "BTC train") +![Example Test Results](docs/BTC_test.png) "BTC test") +![Example Validate Results](docs/BTC_validate.png) "BTC validate") + +## 🤝 4. Support + +Contributions, issues, and feature requests are welcome! + +Give a ⭐️ if you like this project or even better become a part of the Superalgos community! \ No newline at end of file diff --git a/Bitcoin-Factory/Test-Client/notebooks/Bitcoin_Factory_RL.py b/Bitcoin-Factory/Test-Client/notebooks/Bitcoin_Factory_RL.py index dd0f860e29..330fa6b634 100644 --- a/Bitcoin-Factory/Test-Client/notebooks/Bitcoin_Factory_RL.py +++ b/Bitcoin-Factory/Test-Client/notebooks/Bitcoin_Factory_RL.py @@ -1,17 +1,12 @@ #!/usr/bin/env python # coding: utf-8 -# ## Import needed deps - -# In[15]: - - import random import gym from gym import spaces -from sklearn import preprocessing import pandas as pd import numpy as np +import matplotlib import matplotlib.pyplot as plt import time import ray @@ -19,77 +14,126 @@ import sys import math import json +from typing import Dict, List, Optional, Union from ray import tune from ray.rllib.agents import ppo from ray.tune import CLIReporter +from ray.tune import ProgressReporter +from ray.tune.registry import register_env +from ray.rllib.env.vector_env import VectorEnv #unused from sklearn.model_selection import train_test_split +from sklearn import preprocessing from sklearn.preprocessing import MinMaxScaler from tabulate import tabulate - -# ## Run tensorboard for data visualisation - -# In[16]: - - -# %load_ext tensorboard -# %tensorboard --logdir "/tf/notebooks/ray_results/" --host 0.0.0.0 - - -# ### Set of parameters received from Test Server - -# In[17]: - - -parameters = pd.read_csv( - '/tf/notebooks/parameters.csv', - sep=' ', -) - - -parameters - - -# In[18]: - - -EXPERIMENT_NAME = "Trading_Signal_Predictor_RL_V01" -PERCENTAGE_OF_DATASET_FOR_TRAINING = 80 -TIMESTEPS_TO_TRAIN = parameters['TIMESTEPS_TO_TRAIN'][0] -OBSERVATION_WINDOW_SIZE = parameters['OBSERVATION_WINDOW_SIZE'][0] -INITIAL_QUOTE_ASSET = parameters['INITIAL_QUOTE_ASSET'][0] -INITIAL_BASE_ASSET = parameters['INITIAL_BASE_ASSET'][0] -TRADING_FEE = parameters['TRADING_FEE'][0] -ENV_VERSION = parameters['ENV_VERSION'][0] -ENV_NAME = parameters['ENV_NAME'][0] -EXPLORE_ON_EVAL = parameters['EXPLORE_ON_EVAL'][0] - -# Hyper-parameters, in case we want to really control them from the test server not from ray -ALGORITHM = parameters['ALGORITHM'][0] -ROLLOUT_FRAGMENT_LENGTH = parameters['ROLLOUT_FRAGMENT_LENGTH'][0] -TRAIN_BATCH_SIZE = parameters['TRAIN_BATCH_SIZE'][0] -SGD_MINIBATCH_SIZE = parameters['SGD_MINIBATCH_SIZE'][0] -BATCH_MODE = parameters['BATCH_MODE'][0] -#VF_CLIP_PARAM = parameters['VF_CLIP_PARAM'][0] -FC_SIZE = [parameters['FC_SIZE'][0]] -LEARNING_RATE = parameters['LEARNING_RATE'][0] -GAMMA = parameters['GAMMA'][0] - - -# In[19]: - - -df = pd.read_csv( - '/tf/notebooks/time-series.csv', - header=0, - index_col=None, - sep=' ', - skipinitialspace=True -) - - -# In[20]: - +location: str = "/tf/notebooks/" +instructions_file: str = "instructions.csv" +run_forcast: bool = False +res_dir: str = location + "/ray_results/" + + +if os.path.isfile(location+instructions_file): #Forecaster + run_forecast = True + + # Load the Instructions Dataset + instructions_dataset = pd.read_csv( + '/tf/notebooks/instructions.csv', + header=0, + sep=' ', + skipinitialspace=True + ) + + # what are we going to do in the current run? + ACTION_TO_TAKE = instructions_dataset.values[0][1] + + # name of the model file to load or save. + MODEL_FILE_NAME = instructions_dataset.values[1][1] + + FILENAME_parameters_dataset = instructions_dataset.values[2][1] + FILENAME_timeseries_dataset = instructions_dataset.values[3][1] + + res_dir = location + "/models/" + MODEL_FILE_NAME + "/" + + parameters = pd.read_csv( + '/tf/notebooks/'+FILENAME_parameters_dataset, + header=0, + sep=' ', + skipinitialspace=True + ) + df = pd.read_csv( + location+FILENAME_timeseries_dataset, + header=0, + index_col=None, + sep=' ', + skipinitialspace=True + ) + +else: #Testclient + parameters = pd.read_csv( + location+'parameters.csv', + sep=' ', + skipinitialspace=True, + ) + df = pd.read_csv( + location+'time-series.csv', + header=0, + index_col=None, + sep=' ', + skipinitialspace=True + ) + +if {'TIMESTEPS_TO_TRAIN'}.issubset(parameters.columns): + EXPERIMENT_NAME = "Trading_Signal_Predictor_RL_V01" + PERCENTAGE_OF_DATASET_FOR_TRAINING = 80 + TIMESTEPS_TO_TRAIN = parameters['TIMESTEPS_TO_TRAIN'][0] + OBSERVATION_WINDOW_SIZE = parameters['OBSERVATION_WINDOW_SIZE'][0] + INITIAL_QUOTE_ASSET = parameters['INITIAL_QUOTE_ASSET'][0] + INITIAL_BASE_ASSET = parameters['INITIAL_BASE_ASSET'][0] + TRADING_FEE = parameters['TRADING_FEE'][0] + ENV_VERSION = parameters['ENV_VERSION'][0] + ENV_NAME = parameters['ENV_NAME'][0] + EXPLORE_ON_EVAL = parameters['EXPLORE_ON_EVAL'][0] + + # Hyper-parameters, in case we want to really control them from the test server not from ray + ALGORITHM = parameters['ALGORITHM'][0] + ROLLOUT_FRAGMENT_LENGTH = parameters['ROLLOUT_FRAGMENT_LENGTH'][0] + TRAIN_BATCH_SIZE = parameters['TRAIN_BATCH_SIZE'][0] + SGD_MINIBATCH_SIZE = parameters['SGD_MINIBATCH_SIZE'][0] + BATCH_MODE = parameters['BATCH_MODE'][0] + #VF_CLIP_PARAM = parameters['VF_CLIP_PARAM'][0] + FC_SIZE = [parameters['FC_SIZE'][0]] + LEARNING_RATE = parameters['LEARNING_RATE'][0] + GAMMA = parameters['GAMMA'][0] + +else: + EXPERIMENT_NAME = "Trading_Signal_Predictor_RL_V01" + PERCENTAGE_OF_DATASET_FOR_TRAINING = 80 + TIMESTEPS_TO_TRAIN = int(parameters.values[2][4]) + OBSERVATION_WINDOW_SIZE = int(parameters.values[2][5]) + INITIAL_QUOTE_ASSET = int(parameters.values[2][6]) + INITIAL_BASE_ASSET = int(parameters.values[2][7]) + TRADING_FEE = float(parameters.values[2][8]) + ENV_NAME = str(parameters.values[2][9]) + ENV_VERSION = int(parameters.values[2][10]) + EXPLORE_ON_EVAL = str(parameters.values[2][12]) + + # Hyper-parameters, in case we want to really control them from the test server not from ray + ALGORITHM = str(parameters.values[2][13]) + ROLLOUT_FRAGMENT_LENGTH = int(parameters.values[2][14]) + TRAIN_BATCH_SIZE = int(parameters.values[2][15]) + SGD_MINIBATCH_SIZE = int(parameters.values[2][16]) + BATCH_MODE = str(parameters.values[2][17]) + #VF_CLIP_PARAM = parameters['VF_CLIP_PARAM'][0] + FC_SIZE = int(parameters.values[2][18]) + LEARNING_RATE = float(parameters.values[2][19]) + GAMMA = float(parameters.values[2][20]) + + +print(f'TIMESTEPS_TO_TRAIN: {TIMESTEPS_TO_TRAIN}\n') +print(f'OBSERVATION_WINDOW_SIZE: {OBSERVATION_WINDOW_SIZE}\n') +print(f'INITIAL_QUOTE_ASSET: {INITIAL_QUOTE_ASSET}\n') +print(f'INITIAL_BASE_ASSET: {INITIAL_BASE_ASSET}\n') +print(f'TRADING_FEE: {TRADING_FEE}\n') def prepare_data(df): # renaming column labels as we wish, regardless what test server sends, hopefully he will maintain position @@ -108,37 +152,20 @@ def prepare_data(df): return df - data = prepare_data(df) -data - - -# # Setup which data to use for training and which data to use for evaluation of RL Model - -# In[21]: - +# Setup which data to use for training and which data to use for evaluation of RL Model def split_data(data): - X_train_test, X_valid = train_test_split(data, train_size=0.67, test_size=0.33, shuffle=False) - + X_train_test, X_valid = train_test_split(data, train_size=0.67, test_size=0.33, shuffle=False) X_train, X_test = train_test_split(X_train_test, train_size=0.50, test_size=0.50, shuffle=False) - return X_train, X_test, X_valid - -# In[22]: - - X_train, X_test, X_valid = split_data(data) -# ## Normalize the dataset subsets to make the model converge faster - -# In[23]: - - +# Normalize the dataset subsets to make the model converge faster scaler_type = MinMaxScaler def get_feature_scalers(X, scaler_type=scaler_type): @@ -151,7 +178,7 @@ def get_scaler_transforms(X, scalers): X_scaled = [] for name, scaler in zip(list(X.columns[X.columns != 'date']), scalers): X_scaled.append(scaler.transform(X[name].values.reshape(-1, 1))) - X_scaled = pd.concat([pd.DataFrame(column, columns=[name]) for name, column in zip(list(X.columns[X.columns != 'date']), X_scaled)], axis='columns') + X_scaled = pd.concat([pd.DataFrame(column, columns=[name]) for name, column in zip(list(X.columns[X.columns != 'date']), X_scaled)], axis='columns') return X_scaled def scale_numpy_array(np_arr, scaler_type = scaler_type): @@ -203,21 +230,9 @@ def normalize_data(X_train, X_test, X_valid): train_test_scalers, train_test_valid_scalers, X_train_scaled, X_test_scaled, X_valid_scaled, X_valid_scaled_leaking = normalize_data(X_train, X_test, X_valid) - -# In[24]: - - -X_train_scaled.tail() - - -# # Defining the environment - -# In[25]: - - class SimpleTradingEnv(gym.Env): - metadata = {'render.modes': ['live', 'human', 'none']} + metadata = {'render.modes': ['live', 'human', 'portfolio', 'none']} visualization = None def __init__(self, config=None): @@ -259,7 +274,9 @@ def __init__(self, config=None): # Render and analysis data self._total_reward_accumulated = None + self.portfolio_history = None self.trade_history = None + self.positions = None self._first_rendering = None @@ -272,7 +289,9 @@ def reset(self): self._previous_net_worth = INITIAL_QUOTE_ASSET # at the begining our previous net worth is the initial quote asset self._total_reward_accumulated = 0. self._first_rendering = True + self.portfolio_history = [] self.trade_history = [] + self.positions = [] self._obs_env_history = [] self._initial_obs_data() @@ -303,7 +322,7 @@ def _take_action(self, action): # Add to trade history the amount bought if greater than 0 if assets_bought > 0: - self.trade_history.append({'step': self._current_candle, 'type': 'Buy', 'amount': assets_bought, 'price': current_price, 'total' : assets_bought * current_price, 'percent_amount': action[1]}) + self.trade_history.append({'step': self._current_candle, 'type': 'BuyLong', 'amount': assets_bought, 'price': current_price, 'total' : assets_bought * current_price, 'percent_amount': action[1]}) elif action_type == 1: # Sell @@ -321,13 +340,15 @@ def _take_action(self, action): # Add to trade history the amount sold if greater than 0 if amount_to_sell > 0: - self.trade_history.append({'step': self._current_candle, 'type': 'Sell', 'amount': amount_to_sell, 'price': current_price, 'total' : received_quote_asset, 'percent_amount': action[1]}) + self.trade_history.append({'step': self._current_candle, 'type': 'SellLong', 'amount': amount_to_sell, 'price': current_price, 'total' : received_quote_asset, 'percent_amount': action[1]}) else: # Hold self.trade_history.append({'step': self._current_candle, 'type': 'Hold', 'amount': '0', 'price': current_price, 'total' : 0, 'percent_amount': action[1]}) + self.portfolio_history.append({'step': self._current_candle, 'base_asset': self._base_asset, 'quote_asset': self._quote_asset, 'current_price': current_price, 'net_worth' : self._net_worth}) + # Update the current net worth self._net_worth = self._base_asset * current_price + self._quote_asset @@ -352,9 +373,12 @@ def step(self, action): info = dict ( total_reward_accumulated = self._total_reward_accumulated, net_worth = self._net_worth, + quote_asset = self._quote_asset, + base_asset = self._base_asset, last_action_type = self.trade_history[-1]['type'] if len(self.trade_history) > 0 else None, last_action_amount = self.trade_history[-1]['amount'] if len(self.trade_history) > 0 else None, - current_step = self._current_candle + current_step = self._current_candle, + current_action = action ) self._current_candle += 1 @@ -366,7 +390,7 @@ def step(self, action): self.df_normal.loc[:, 'open'].values) - 30)# We assume that the last observation is not the last row of the dataframe, in order to avoid the case where there are no calculated indicators. if self._done: - print('I have finished the episode') + print('I have finished the episode (',self._current_candle,'Candles )') return obs, reward, self._done, info @@ -408,249 +432,9 @@ def render(self, mode='human', **kwargs): # if self._current_candle > OBSERVATION_WINDOW_SIZE: # self.visualization.render(self._current_candle, self._net_worth, self.trade_history, window_size=OBSERVATION_WINDOW_SIZE) - - def close(self): - if self.visualization != None: - self.visualization.close() - self.visualization = None - - - def _process_data(self, df_scaled): - """ - Processes the dataframe into features. - """ - - prices = self.df_scaled.loc[:, 'close'].to_numpy(dtype=np.float32) - - data_frame = df_scaled.iloc[:, 1:] # drop first column which is date TODO: Should be probably fixed outside of this class - # Convert df to numpy array - return prices, data_frame.to_numpy(dtype=np.float32) - - def _initial_obs_data(self): - for i in range(self.window_size - len(self._obs_env_history)): - self._obs_env_history.append([self._net_worth, self._base_asset, self._quote_asset]) - - -# In[26]: - - -import random -import gym -from gym import spaces -from sklearn import preprocessing -import pandas as pd -import numpy as np -import matplotlib.pyplot as plt - -# infinite number in python -MAX_NET_WORTH = 2147483647 -MAX_NUM_QUOTE_OR_BASE_ASSET = 2147483647 - -INITIAL_QUOTE_ASSET = 0 -INITIAL_BASE_ASSET = 1 -OBSERVATION_WINDOW_SIZE = 24 # Probably we should put it as param ? - -class BTCAccumulationEnv(gym.Env): - - metadata = {'render.modes': ['live', 'human', 'none']} - visualization = None - - def __init__(self, config=None): - - self.df_scaled = config.get("df_scaled").reset_index(drop=True) - self.df_normal = config.get("df_normal").reset_index(drop=True) - self.window_size = OBSERVATION_WINDOW_SIZE - self.prices, self.features = self._process_data(self.df_scaled) - # The shape of the observation is (window_size * features + environment_features) the environment_features are: quote_asset, base_asset, net_worth. The entire observation is flattened in a 1D np array. - # NOT USED ANYMORE, KEPT FOR REFERENCE - # self.obs_shape = ((OBSERVATION_WINDOW_SIZE * self.features.shape[1] + 3),) - - # The shape of the observation is number of candles to look back, and the number of features (candle_features) + 3 (quote_asset, base_asset, net_worth) - self.obs_shape = (OBSERVATION_WINDOW_SIZE, self.features.shape[1] + 3) - - # Action space - #self.action_space = spaces.Box(low=np.array([0, 0]), high=np.array([3.0, 1.0]), dtype=np.float32) - self.action_space = spaces.MultiDiscrete([3, 100]) - # Observation space - self.observation_space = spaces.Box(low=-1, high=1, shape=self.obs_shape, dtype=np.float32) - - # Initialize the episode environment - - self._start_candle = OBSERVATION_WINDOW_SIZE # We assume that the first observation is not the first row of the dataframe, in order to avoid the case where there are no calculated indicators. - self._end_candle = len(self.features) - 1 - self._trading_fee = config.get("trading_fee") - - self._quote_asset = None - self._base_asset = None - self._done = None - self._current_candle = None - self._net_worth = None - self._previous_net_worth = None - self._previous_base_asset = None - self._previous_quote_asset = None - - # Array that will contain observation history needed for appending it to the observation space - # It will contain observations consisting of the net_worth, base_asset and quote_asset as list of floats - # Other features (OHLC + Indicators) will be appended to the current observation in the _get_observation method that takes the data directly from the available dataframe - self._obs_env_history = None - - # Render and analysis data - self._total_reward_accumulated = None - self.trade_history = None - self._first_rendering = None - - - def reset(self): - self._done = False - self._current_candle = self._start_candle - self._quote_asset = INITIAL_QUOTE_ASSET - self._base_asset = INITIAL_BASE_ASSET - self._net_worth = INITIAL_QUOTE_ASSET # at the begining our net worth is the initial quote asset - self._previous_net_worth = INITIAL_QUOTE_ASSET # at the begining our previous net worth is the initial quote asset - self._previous_base_asset = INITIAL_BASE_ASSET - self._previous_quote_asset = INITIAL_QUOTE_ASSET - self._total_reward_accumulated = 0 - self._first_rendering = True - self.trade_history = [] - self._obs_env_history = [] - - self._initial_obs_data() - - return self._get_observation() - - def _take_action(self, action): - self._done = False - current_price = random.uniform( - self.df_normal.loc[self._current_candle, "low"], self.df_normal.loc[self._current_candle, "high"]) - - - action_type = action[0] - amount = action[1] / 100 - - if action_type == 0: # Buy - # Buy % assets - # Determine the maximum amount of quote asset that can be bought - available_amount_to_buy_with = self._quote_asset / current_price - # Buy only the amount that agent chose - assets_bought = available_amount_to_buy_with * amount - # Update the quote asset balance - self._quote_asset -= assets_bought * current_price - # Update the base asset - self._base_asset += assets_bought - # substract trading fee from base asset based on the amount bought - self._base_asset -= self._trading_fee * assets_bought - - # Add to trade history the amount bought if greater than 0 - if assets_bought > 0: - self.trade_history.append({'step': self._current_candle, 'type': 'Buy', 'amount': assets_bought, 'price': current_price, 'total' : assets_bought * current_price, 'percent_amount': action[1]}) - - - elif action_type == 1: # Sell - # Sell % assets - # Determine the amount of base asset that can be sold - amount_to_sell = self._base_asset * amount - received_quote_asset = amount_to_sell * current_price - # Update the quote asset - self._quote_asset += received_quote_asset - # Update the base asset - self._base_asset -= amount_to_sell - - # substract trading fee from quote asset based on the amount sold - self._quote_asset -= self._trading_fee * received_quote_asset - - # Add to trade history the amount sold if greater than 0 - if amount_to_sell > 0: - self.trade_history.append({'step': self._current_candle, 'type': 'Sell', 'amount': amount_to_sell, 'price': current_price, 'total' : received_quote_asset, 'percent_amount': action[1]}) - - else: - # Hold - self.trade_history.append({'step': self._current_candle, 'type': 'Hold', 'amount': '0', 'price': current_price, 'total' : 0, 'percent_amount': action[1]}) - - - # Update the current net worth - self._net_worth = self._base_asset * current_price + self._quote_asset - - - def step(self, action): - """ - Returns the next observation, reward, done and info. - """ - self._take_action(action) - - # Calculate reward comparing the current base asset with the previous base asset - reward = self._base_asset - self._previous_base_asset - - self._total_reward_accumulated += reward - - # Update the previous net worth to be the current net worth after the reward has been applied - self._previous_net_worth = self._net_worth - self._previous_base_asset = self._base_asset - self._previous_quote_asset = self._quote_asset - - obs = self._get_observation() - # Update the info and add it to history data - info = dict ( - total_reward_accumulated = self._total_reward_accumulated, - net_worth = self._net_worth, - last_action_type = self.trade_history[-1]['type'] if len(self.trade_history) > 0 else None, - last_action_amount = self.trade_history[-1]['amount'] if len(self.trade_history) > 0 else None, - quote_asset = self._quote_asset, - base_asset = self._base_asset, - current_step = self._current_candle - ) - - self._current_candle += 1 - - # Update observation history - self._obs_env_history.append([self._net_worth, self._base_asset, self._quote_asset]) - - self._done = self._net_worth <= 0 or self._current_candle >= (len( - self.df_normal.loc[:, 'open'].values) - 30)# We assume that the last observation is not the last row of the dataframe, in order to avoid the case where there are no calculated indicators. - - if self._done: - print('The episode has finished') - - return obs, reward, self._done, info - - - def _get_observation(self): - """ - Returns the current observation. - """ - data_frame = self.features[(self._current_candle - self.window_size):self._current_candle] - - obs_env_history = np.array(self._obs_env_history).astype(np.float32) - - #TODO We definetely need to scale the observation history in a better way, this might influence training results - # Doing it ad-hoc might change the scale of the min and max, thus changing the results - obs_env_history = preprocessing.minmax_scale(obs_env_history, (-0.9,0.9)) - - obs = np.hstack((data_frame, obs_env_history[(self._current_candle - self.window_size):self._current_candle])) - - return obs - - - def render(self, mode='human', **kwargs): - """ - Renders a plot with trades made by the agent. - """ - - if mode == 'human': - print(f'Accumulated Reward: {self._total_reward_accumulated} ---- Current Net Worth: {self._net_worth}') - print(f'Current Quote asset: {self._quote_asset} ---- Current Base asset: {self._base_asset}') - print(f'Number of trades: {len(self.trade_history)}') - - if(len(self.trade_history) > 0): - print(f'Last Action: {self.trade_history[-1]["type"]} {self.trade_history[-1]["amount"]} assets ({self.trade_history[-1]["percent_amount"]} %) at price {self.trade_history[-1]["price"]}, total: {self.trade_history[-1]["total"]}') - print(f'--------------------------------------------------------------------------------------') - elif mode == 'live': - pass - # if self.visualization == None: - # self.visualization = LiveTradingGraph(self.df_normal, kwargs.get('title', None)) - - # if self._current_candle > OBSERVATION_WINDOW_SIZE: - # self.visualization.render(self._current_candle, self._net_worth, self.trade_history, window_size=OBSERVATION_WINDOW_SIZE) + elif mode == 'portfolio': + return self.positions, self.trade_history, self.portfolio_history, self.df_normal def close(self): if self.visualization != None: @@ -673,12 +457,21 @@ def _initial_obs_data(self): for i in range(self.window_size - len(self._obs_env_history)): self._obs_env_history.append([self._net_worth, self._base_asset, self._quote_asset]) +# Initialize Ray +if ray.is_initialized(): + ray.shutdown() # let's shutdown first any running instances of ray (don't confuse it with the cluster) +os.environ['RAY_record_ref_creation_sites'] = '1' # Needed for debugging when things go wrong +ray.init() -# ### Allocate optimal resources method - -# In[27]: +try: + available_gpu_in_cluster = ray.available_resources()['GPU'] +except KeyError as e: + available_gpu_in_cluster = 0 +available_cpu_in_cluster = ray.available_resources()['CPU'] if ray.available_resources()['CPU'] else 0 +# In the first version we assume that we have only one node cluster, so the allocation logic is based on that +# So the resources are maximized for one ray tune trial at a time def find_optimal_resource_allocation(available_cpu, available_gpu): """ Finds the optimal resource allocation for the agent based on the available resources in the cluster @@ -707,70 +500,37 @@ def find_optimal_resource_allocation(available_cpu, available_gpu): 'num_gpus' : 0 } - -# ### Init ray and trainer config - -# In[28]: - - -import os -import time -import ray -import os -from ray import tune -from ray.rllib.env.vector_env import VectorEnv -from ray.tune.registry import register_env - - - -# Initialize Ray -ray.shutdown() # let's shutdown first any running instances of ray (don't confuse it with the cluster) -os.environ['RAY_record_ref_creation_sites'] = '1' # Needed for debugging when things go wrong -ray.init() - -try: - available_gpu_in_cluster = ray.available_resources()['GPU'] -except KeyError as e: - available_gpu_in_cluster = 0 - -available_cpu_in_cluster = ray.available_resources()['CPU'] if ray.available_resources()['CPU'] else 0 - -# In the first version we assume that we have only one node cluster, so the allocation logic is based on that -# So the resources are maximized for one ray tune trial at a time parallel_config = find_optimal_resource_allocation(available_cpu_in_cluster, 0) # Currently we are going to disable GPU ussage due to it's poor performance on a single instance cluster -trading_fee = 0.0075 training_config = { - "trading_fee": trading_fee, + "trading_fee": TRADING_FEE, "df_normal": X_train, "df_scaled": X_train_scaled, } -eval_config = { - "trading_fee": trading_fee, +test_config = { + "trading_fee": TRADING_FEE, "df_normal": X_test, "df_scaled": X_test_scaled, } +eval_config = { + "trading_fee": TRADING_FEE, + "df_normal": X_valid, + "df_scaled": X_valid_scaled, +} + if ENV_NAME == 'SimpleTrading': training_env = SimpleTradingEnv(training_config) + test_env = SimpleTradingEnv(test_config) eval_env = SimpleTradingEnv(eval_config) training_env_key = "SimpleTradingEnv-training-V01" + test_env_key = "SimpleTradingEnv-testing-V01" eval_env_key = "SimpleTradingEnv-evaluating-V01" -elif ENV_NAME == 'BTCAccumulationEnv': - training_env = BTCAccumulationEnv(training_config) - eval_env = BTCAccumulationEnv(eval_config) - - training_env_key = "BTCAccumulationEnv-training-V01" - eval_env_key = "BTCAccumulationEnv-evaluating-V01" - - - - - tune.register_env(training_env_key, lambda _: training_env) +tune.register_env(test_env_key, lambda _: test_env) tune.register_env(eval_env_key, lambda _: eval_env) @@ -778,7 +538,7 @@ def find_optimal_resource_allocation(available_cpu, available_gpu): ppo_trainer_config = { "env": training_env_key, # Ray will automatically create multiple environments and vectorize them if needed "horizon": len(X_train_scaled) - 30, - "log_level": "INFO", + "log_level": "WARN", #or INFO "framework": "tf", #"eager_tracing": True, "ignore_worker_failures": True, @@ -788,13 +548,13 @@ def find_optimal_resource_allocation(available_cpu, available_gpu): "num_cpus_per_worker": parallel_config.get("num_cpus_per_worker"), # After some testing, seems the fastest way for this kind of enviroment. It's better to run more trials in parallel than to finish a trial with a couple of minutes faster. Because we can end trial earlier if we see that our model eventuall converge "num_cpus_for_driver": parallel_config.get("num_cpus_for_driver"), # Number of CPUs to use for the driver. This is the number of CPUs used for the training process. "num_gpus_per_worker": parallel_config.get("num_gpus_per_worker"), - "rollout_fragment_length": 200, # Size of batches collected from each worker. If num_envs_per_worker is > 1 the rollout value will be multiplied by num_envs_per_worker - "train_batch_size": 2048, # Number of timesteps collected for each SGD round. This defines the size of each SGD epoch. the batch size is composed of fragments defined above - "sgd_minibatch_size": 64, - "batch_mode": "complete_episodes", + "rollout_fragment_length": ROLLOUT_FRAGMENT_LENGTH, # Size of batches collected from each worker. If num_envs_per_worker is > 1 the rollout value will be multiplied by num_envs_per_worker + "train_batch_size": TRAIN_BATCH_SIZE, # Number of timesteps collected for each SGD round. This defines the size of each SGD epoch. the batch size is composed of fragments defined above + "sgd_minibatch_size": SGD_MINIBATCH_SIZE, + "batch_mode": BATCH_MODE, "vf_clip_param": 100, # Default is 10, but we increase it to 100 to adapt it to our rewards scale. It helps our value function to converge faster - "lr": 0.00001, # Hyperparameter grid search defined above - "gamma": 0.95, # This can have a big impact on the result and needs to be properly tuned + "lr": LEARNING_RATE, # Hyperparameter grid search defined above + "gamma": GAMMA, # This can have a big impact on the result and needs to be properly tuned #"observation_filter": "MeanStdFilter", "model": { # "fcnet_hiddens": FC_SIZE, # Hyperparameter grid search defined above @@ -804,29 +564,20 @@ def find_optimal_resource_allocation(available_cpu, available_gpu): # "lstm_use_prev_action": True, }, - #"sgd_minibatch_size": MINIBATCH_SIZE, # Hyperparameter grid search defined above "evaluation_interval": 5, # Run one evaluation step on every x `Trainer.train()` call. "evaluation_duration": 1, # How many episodes to run evaluations for each time we evaluate. "evaluation_config": { "explore": True, # We usually don't want to explore during evaluation. All actions have to be repeatable. Similar to deterministic = True, but on-policy algorithms can get better results with exploration. - "env": eval_env_key, # We need to define a new environment for evaluation with different parameters + "env": test_env_key, # We need to define a new environment for evaluation with different parameters }, "logger_config": { - "logdir": "/tmp/ray_logging/", + "logdir": res_dir, "type": "ray.tune.logger.UnifiedLogger", } } # ### Custom reporter to get progress in Superalgos - -# In[36]: - - -from ray.tune import ProgressReporter -from typing import Dict, List, Optional, Union -import json - class CustomReporter(ProgressReporter): def __init__( @@ -871,22 +622,11 @@ def set_start_time(self, timestamp: Optional[float] = None): else: self._start_time = timestamp -custom_reporter = CustomReporter(max_report_frequency=10) - - -# In[ ]: - - -# Before starting printing a custom text to let Superalgos know that we are in a RL scenario +# Printing a custom text to let Superalgos know that we are in a RL scenario sys.stdout.write('RL_SCENARIO') sys.stdout.write('\n') - -# ### Run ray tune - -# In[32]: - - +# Run ray tune analysis = tune.run( run_or_experiment=ALGORITHM, name=EXPERIMENT_NAME, @@ -900,80 +640,162 @@ def set_start_time(self, timestamp: Optional[float] = None): num_samples=1, # Have one sample for each hyperparameter combination. You can have more to average out randomness. keep_checkpoints_num=30, # Keep the last X checkpoints checkpoint_freq=5, # Checkpoint every X iterations (save the model) - local_dir="/tf/notebooks/ray_results/", # Local directory to store checkpoints and results, we are using tmp folder until we move the notebook to a docker instance and we can use the same directory across all instances, no matter the underlying OS - progress_reporter=custom_reporter, - fail_fast="raise", - resume=False # Resume training from the last checkpoint if any exists + checkpoint_at_end=True, # Whether to checkpoint at the end of the experiment regardless of the checkpoint_freq + verbose=1, + local_dir=res_dir, # Local directory to store checkpoints and results, we are using tmp folder until we move the notebook to a docker instance and we can use the same directory across all instances, no matter the underlying OS + progress_reporter=CustomReporter(max_report_frequency=10,location=location), + fail_fast=True, + resume="AUTO" # Resume training from the last checkpoint if any exists [True, 'LOCAL', 'REMOTE', 'PROMPT', 'ERRORED_ONLY', 'AUTO'] ) - -# ### Evaluate trained model restoring it from checkpoint -# -# #### Store the results in a file to be picked up by Superalgos - -# In[33]: - - +# Evaluate trained model restoring it from checkpoint best_trial = analysis.get_best_trial(metric="episode_reward_mean", mode="max", scope="all") best_checkpoint = analysis.get_best_checkpoint(best_trial, metric="episode_reward_mean") +print("best_checkpoint path: " + best_checkpoint.local_path) agent = ppo.PPOTrainer(config=ppo_trainer_config) agent.restore(best_checkpoint) json_dict = {} -net_worths = [] -q_assets = [] -b_assets = [] -net_worths_at_end = [] -q_assets_at_end = [] -b_assets_at_end = [] -episodes_to_run = 1 - -for i in range(episodes_to_run): - episode_reward = 0 - done = False - obs = eval_env.reset() # we are using the evaluation environment for evaluation - last_info = None - while not done: - action = agent.compute_single_action(obs, explore=True) # stochastic evaluation - obs, reward, done, info = eval_env.step(action) - net_worths.append(info['net_worth']) # Add all historical net worths to a list to print statistics at the end of the episode - q_assets.append(info['quote_asset']) # Add all historical quote assets to a list to print statistics at the end of the episode - b_assets.append(info['base_asset']) # Add all historical base assets to a list to print statistics at the end of the episode - episode_reward += reward - last_info = info - - net_worths_at_end.append(last_info['net_worth']) # Add all historical net worths to a list to print statistics at the end of the episode - q_assets_at_end.append(last_info['quote_asset']) # Add all historical quote assets to a list to print statistics at the end of the episode - b_assets_at_end.append(last_info['base_asset']) # Add all historical base assets to a list to print statistics at the end of the episode - -json_dict['meanNetWorth'] = np.mean(net_worths) -json_dict['stdNetWorth'] = np.std(net_worths) -json_dict['minNetWorth'] = np.min(net_worths) -json_dict['maxNetWorth'] = np.max(net_worths) -json_dict['stdQuoteAsset'] = np.std(q_assets) -json_dict['minQuoteAsset'] = np.min(q_assets) -json_dict['maxQuoteAsset'] = np.max(q_assets) -json_dict['stdBaseAsset'] = np.std(b_assets) -json_dict['minBaseAsset'] = np.min(b_assets) -json_dict['maxBaseAsset'] = np.max(b_assets) -json_dict['meanNetWorthAtEnd'] = np.mean(net_worths_at_end) -json_dict['stdNetWorthAtEnd'] = np.std(net_worths_at_end) -json_dict['minNetWorthAtEnd'] = np.min(net_worths_at_end) -json_dict['maxNetWorthAtEnd'] = np.max(net_worths_at_end) - - -# Write the results to JSON file -with open("evaluation_results.json", "w+") as f: +episodes_to_run = 2 + +envs = [training_env, test_env, eval_env] + +positions = [] +trade_history = [] +portfolio_history = [] +df_normal = [] + +for iter, env in enumerate(envs): + net_worths = [] + q_assets = [] + b_assets = [] + net_worths_at_end = [] + q_assets_at_end = [] + b_assets_at_end = [] + last_actions = [] + + for i in range(episodes_to_run): + episode_reward = 0 + done = False + obs = env.reset() # we are using the evaluation environment for evaluation + last_info = None + while not done: + action = agent.compute_single_action(obs, explore=True) # stochastic evaluation + obs, reward, done, info = env.step(action) + net_worths.append(info['net_worth']) # Add all historical net worths to a list to print statistics at the end of the episode + q_assets.append(info['quote_asset']) # Add all historical quote assets to a list to print statistics at the end of the episode + b_assets.append(info['base_asset']) # Add all historical base assets to a list to print statistics at the end of the episode + episode_reward += reward + last_info = info + + net_worths_at_begin = net_worths[0] + net_worths_at_end.append(last_info['net_worth']) # Add all historical net worths to a list to print statistics at the end of the episode + q_assets_at_end.append(last_info['quote_asset']) # Add all historical quote assets to a list to print statistics at the end of the episode + b_assets_at_end.append(last_info['base_asset']) # Add all historical base assets to a list to print statistics at the end of the episode + last_actions.append(last_info['current_action']) + + r1, r2, r3, r4 = env.render(mode='portfolio') + positions.append(r1) + trade_history.append(r2) + portfolio_history.append(r3) + df_normal.append(r4) + + json_dict_env = {} + json_dict_env['meanNetWorth'] = np.mean(net_worths) + json_dict_env['stdNetWorth'] = np.std(net_worths) + json_dict_env['minNetWorth'] = np.min(net_worths) + json_dict_env['maxNetWorth'] = np.max(net_worths) + json_dict_env['stdQuoteAsset'] = np.std(q_assets) + json_dict_env['minQuoteAsset'] = np.min(q_assets) + json_dict_env['maxQuoteAsset'] = np.max(q_assets) + json_dict_env['stdBaseAsset'] = np.std(b_assets) + json_dict_env['minBaseAsset'] = np.min(b_assets) + json_dict_env['maxBaseAsset'] = np.max(b_assets) + json_dict_env['NetWorthAtBegin'] = net_worths_at_begin + json_dict_env['meanNetWorthAtEnd'] = np.mean(net_worths_at_end) + json_dict_env['stdNetWorthAtEnd'] = np.std(net_worths_at_end) + json_dict_env['minNetWorthAtEnd'] = np.min(net_worths_at_end) + json_dict_env['maxNetWorthAtEnd'] = np.max(net_worths_at_end) + json_dict_env['current_action'] = {"type": int(last_actions[-1][0]), "amount":int(last_actions[-1][1])} + + print(f"NetWorthAtBegin / meanNetWorthAtEnd : {json_dict_env['NetWorthAtBegin']} / {json_dict_env['meanNetWorthAtEnd']}") + json_dict[iter] = json_dict_env + + +# Write the results to JSON file to be picked up by Superalgos +with open(location + "evaluation_results.json", "w+") as f: json.dump(json_dict, f) - - -# In[35]: - + f.close() + +pd_positions_0 = pd.DataFrame(positions[0]) +pd_positions_1 = pd.DataFrame(positions[1]) +pd_positions_2 = pd.DataFrame(positions[2]) + +pd_trade_history_0 = pd.DataFrame(trade_history[0]) +pd_trade_history_1 = pd.DataFrame(trade_history[1]) +pd_trade_history_2 = pd.DataFrame(trade_history[2]) + +pd_portfolio_history_0 = pd.DataFrame(portfolio_history[0]) +pd_portfolio_history_1 = pd.DataFrame(portfolio_history[1]) +pd_portfolio_history_2 = pd.DataFrame(portfolio_history[2]) + +pd_join_0 = pd.merge(pd_trade_history_0, pd_portfolio_history_0, how='right', left_on = 'step', right_on = 'step') +pd_join_1 = pd.merge(pd_trade_history_1, pd_portfolio_history_1, how='right', left_on = 'step', right_on = 'step') +pd_join_2 = pd.merge(pd_trade_history_2, pd_portfolio_history_2, how='right', left_on = 'step', right_on = 'step') +pd_join_2[:].tail(20) + +tic = "BTC train" +plt.figure(figsize=[15,9]); +plt.title(tic) +plt.plot(pd_join_0["step"],pd_join_0["net_worth"]/pd_join_0["net_worth"][0],label = "net_worth", color='black'); +plt.plot(pd_join_0["step"],pd_join_0["current_price"]/pd_join_0["current_price"][0],label = "current_price", color='yellow'); +plt.scatter(pd_trade_history_0[pd_trade_history_0['type'] == 'BuyLong']["step"], pd_trade_history_0[pd_trade_history_0['type'] == 'BuyLong']["price"]/pd_join_0["current_price"][0]*0.9, label='BuyLong', marker='^', color='lightgreen', alpha=1) +plt.scatter(pd_trade_history_0[pd_trade_history_0['type'] == 'SellLong']["step"], pd_trade_history_0[pd_trade_history_0['type'] == 'SellLong']["price"]/pd_join_0["current_price"][0]*1.1, label='SellLong', marker='v', color='magenta', alpha=1) +plt.scatter(pd_trade_history_0[pd_trade_history_0['type'] == 'BuyShort']["step"], pd_trade_history_0[pd_trade_history_0['type'] == 'BuyShort']["price"]/pd_join_0["current_price"][0]*1.1, label='BuyShort', marker='*', color='red', alpha=1) +plt.scatter(pd_trade_history_0[pd_trade_history_0['type'] == 'SellShort']["step"], pd_trade_history_0[pd_trade_history_0['type'] == 'SellShort']["price"]/pd_join_0["current_price"][0]*0.9, label='SellShort', marker='x', color='blue', alpha=1) +plt.xlabel('Steps') +plt.ylabel('Value normed to first step'); +plt.legend() +plt.savefig(res_dir+tic.replace(" ", "_")+".png") +plt.show() + + +tic = "BTC test" +plt.figure(figsize=[15,9]); +plt.title(tic) +plt.plot(pd_join_1["step"],pd_join_1["net_worth"]/pd_join_1["net_worth"][0],label = "net_worth", color='black'); +plt.plot(pd_join_1["step"],pd_join_1["current_price"]/pd_join_1["current_price"][0],label = "current_price", color='yellow'); +plt.scatter(pd_trade_history_1[pd_trade_history_1['type'] == 'BuyLong']["step"], pd_trade_history_1[pd_trade_history_1['type'] == 'BuyLong']["price"]/pd_join_1["current_price"][0]*0.9, label='BuyLong', marker='^', color='lightgreen', alpha=1) +plt.scatter(pd_trade_history_1[pd_trade_history_1['type'] == 'SellLong']["step"], pd_trade_history_1[pd_trade_history_1['type'] == 'SellLong']["price"]/pd_join_1["current_price"][0]*1.1, label='SellLong', marker='v', color='magenta', alpha=1) +plt.scatter(pd_trade_history_1[pd_trade_history_1['type'] == 'BuyShort']["step"], pd_trade_history_1[pd_trade_history_1['type'] == 'BuyShort']["price"]/pd_join_1["current_price"][0]*1.1, label='BuyShort', marker='*', color='red', alpha=1) +plt.scatter(pd_trade_history_1[pd_trade_history_1['type'] == 'SellShort']["step"], pd_trade_history_1[pd_trade_history_1['type'] == 'SellShort']["price"]/pd_join_1["current_price"][0]*0.9, label='SellShort', marker='x', color='blue', alpha=1) +plt.xlabel('Steps') +plt.ylabel('Value normed to first step'); +plt.legend() +plt.savefig(res_dir+tic.replace(" ", "_")+".png") +plt.show() + +tic = "BTC validate" +plt.figure(figsize=[15,9]); +plt.title(tic) +plt.plot(pd_join_2["step"],pd_join_2["net_worth"]/pd_join_2["net_worth"][0],label = "net_worth", color='black'); +plt.plot(pd_join_2["step"],pd_join_2["current_price"]/pd_join_2["current_price"][0],label = "current_price", color='yellow'); +plt.scatter(pd_trade_history_2[pd_trade_history_2['type'] == 'BuyLong']["step"], pd_trade_history_2[pd_trade_history_2['type'] == 'BuyLong']["price"]/pd_join_2["current_price"][0]*0.9, label='BuyLong', marker='^', color='lightgreen', alpha=1) +plt.scatter(pd_trade_history_2[pd_trade_history_2['type'] == 'SellLong']["step"], pd_trade_history_2[pd_trade_history_2['type'] == 'SellLong']["price"]/pd_join_2["current_price"][0]*1.1, label='SellLong', marker='v', color='magenta', alpha=1) +plt.scatter(pd_trade_history_2[pd_trade_history_2['type'] == 'BuyShort']["step"], pd_trade_history_2[pd_trade_history_2['type'] == 'BuyShort']["price"]/pd_join_2["current_price"][0]*1.1, label='BuyShort', marker='*', color='red', alpha=1) +plt.scatter(pd_trade_history_2[pd_trade_history_2['type'] == 'SellShort']["step"], pd_trade_history_2[pd_trade_history_2['type'] == 'SellShort']["price"]/pd_join_2["current_price"][0]*0.9, label='SellShort', marker='x', color='blue', alpha=1) +plt.xlabel('Steps') +plt.ylabel('Value normed to first step'); +plt.legend() +plt.savefig(res_dir+tic.replace(" ", "_")+".png") +plt.show() # Cleanup -os.remove('/tf/notebooks/training_results.json') -os.remove('/tf/notebooks/evaluation_results.json') -ray.shutdown() +if ray.is_initialized(): + ray.shutdown() +# Tell Superalgos we finished +sys.stdout.write('RL_SCENARIO_END') +sys.stdout.write('\n') \ No newline at end of file diff --git a/Bitcoin-Factory/docs/BTC_test.png b/Bitcoin-Factory/docs/BTC_test.png new file mode 100644 index 0000000000..dd324bf83b Binary files /dev/null and b/Bitcoin-Factory/docs/BTC_test.png differ diff --git a/Bitcoin-Factory/docs/BTC_train.png b/Bitcoin-Factory/docs/BTC_train.png new file mode 100644 index 0000000000..96afbfab6f Binary files /dev/null and b/Bitcoin-Factory/docs/BTC_train.png differ diff --git a/Bitcoin-Factory/docs/BTC_validate.png b/Bitcoin-Factory/docs/BTC_validate.png new file mode 100644 index 0000000000..4a1acd5f12 Binary files /dev/null and b/Bitcoin-Factory/docs/BTC_validate.png differ diff --git a/Projects/Bitcoin-Factory/TS/Bot-Modules/Forecast-Client/ForecastClient.js b/Projects/Bitcoin-Factory/TS/Bot-Modules/Forecast-Client/ForecastClient.js index a3106e51c5..a0433af2b4 100644 --- a/Projects/Bitcoin-Factory/TS/Bot-Modules/Forecast-Client/ForecastClient.js +++ b/Projects/Bitcoin-Factory/TS/Bot-Modules/Forecast-Client/ForecastClient.js @@ -175,8 +175,6 @@ .catch(onError) async function onSuccess(thisForecastCase) { await onSuccessgetNextForecastCase(thisForecastCase) - forecasting = false - callBackFunction(TS.projects.foundations.globals.standardResponses.DEFAULT_OK_RESPONSE) } async function onError(err) { forecasting = false @@ -249,6 +247,9 @@ userProfile: response.data.serverData.userProfile, instance: response.data.serverData.instance } + } + if ((bestPredictions[i].testServer.userProfile == undefined) || (bestPredictions[i].testServer.userProfile === '')) { + bestPredictions[i].testServer.userProfile = response.data.serverData.userProfile } } console.log((new Date()).toISOString(), '[INFO] Size of local forecast array did change by ', checkSetForecastCaseResultsResponse(bestPredictions)) @@ -391,7 +392,7 @@ for (let i = 0; i < thisObject.forecastCasesArray.length; i++) { if (thisObject.forecastCasesArray[i].id == nextForecastCase.id) { console.log((new Date()).toISOString(), '[ERROR] Test Server ' + nextForecastCase.testServer.instance + ' did send me forecast case ' + nextForecastCase.id + ', which is allready in local database') - reject('DUPLICATE FORECAST CASE') + // reject('DUPLICATE FORECAST CASE') } } resolve(nextForecastCase) @@ -410,6 +411,9 @@ async function promiseWork(resolve, reject) { + //debug + console.log("DEBUG requested testserver: " + JSON.stringify(forecastCase.testServer)) + let message = { type: 'Get This Forecast Case', testServer: forecastCase.testServer, @@ -561,8 +565,10 @@ /* Set default script name. */ - if (nextForecastCase.pythonScriptName === undefined) { nextForecastCase.pythonScriptName = "Bitcoin_Factory_LSTM_Forecasting.py" } - + if (nextForecastCase.pythonScriptName === undefined) { + console.log("pythonScriptName undefined ... set default name") + nextForecastCase.pythonScriptName = "Bitcoin_Factory_LSTM_Forecasting.py" + } console.log('') if (buildnewModel) { console.log('------------------------------------------------------- Forecasting Case # ' + nextForecastCase.id + ' ------------------------------------------------------------') @@ -587,10 +593,6 @@ return new Promise(executeThePythonScript) async function executeThePythonScript(resolve, reject) { - /* - Set default script name. - */ - if (nextForecastCase.pythonScriptName === undefined) { nextForecastCase.pythonScriptName = "Bitcoin_Factory_LSTM_Forecasting.py" } let processExecutionResult let startingTimestamp = (new Date()).valueOf() @@ -601,27 +603,44 @@ dockerPID = dockerProc.pid dockerProc.stdout.on('data', (data) => { data = data.toString() - /* - Removing Carriedge Return from string. - */ - try { - let percentage = '' - let heartbeatText = '' - let statusText = 'Forecast Case ' + nextForecastCase.id + ' from ' + nextForecastCase.testServer.instance - if (data.substring(0, 5) === 'Epoch') { - let regEx = new RegExp('Epoch (\\d+)/(\\d+)', 'gim') - let match = regEx.exec(data) - heartbeatText = match[0] - - percentage = Math.round(match[1] / match[2] * 100) - TS.projects.foundations.functionLibraries.processFunctions.processHeartBeat(processIndex, heartbeatText, percentage, statusText) - } - } catch (err) { - console.log('Error pricessing heartbeat: ' + err) - } + if (data.includes('RL_SCENARIO_START') || data.includes('episodeRewardMean')) { + try { + let fileContent = SA.nodeModules.fs.readFileSync(global.env.PATH_TO_BITCOIN_FACTORY + "/Forecast-Client/notebooks/training_results.json") + + if (fileContent !== undefined) { + let percentage = 0 + let statusText = 'Forecast Case: ' + nextForecastCase.id + ' from ' + nextForecastCase.testServer.instance - for (let i = 0; i < 1000; i++) { - data = data.replace(/\n/, "") + data = JSON.parse(fileContent) + + percentage = Math.round(data.timestepsExecuted / data.timestepsTotal * 100) + let heartbeatText = 'Episode reward mean / max / min: ' + data.episodeRewardMean + ' | ' + data.episodeRewardMax + ' | ' + data.episodeRewardMin + + TS.projects.foundations.functionLibraries.processFunctions.processHeartBeat(processIndex, heartbeatText, percentage, statusText) + } + } catch (err) { } + } else { + /* + Removing Carriedge Return from string. + */ + try { + let percentage = '' + let heartbeatText = '' + let statusText = 'Forecast Case ' + nextForecastCase.id + ' from ' + nextForecastCase.testServer.instance + if (data.substring(0, 5) === 'Epoch') { + let regEx = new RegExp('Epoch (\\d+)/(\\d+)', 'gim') + let match = regEx.exec(data) + heartbeatText = match[0] + + percentage = Math.round(match[1] / match[2] * 100) + TS.projects.foundations.functionLibraries.processFunctions.processHeartBeat(processIndex, heartbeatText, percentage, statusText) + } + } catch (err) { + console.log('Error pricessing heartbeat: ' + err) + } + for (let i = 0; i < 1000; i++) { + data = data.replace(/\n/, "") + } } dataReceived = dataReceived + data.toString() @@ -644,7 +663,7 @@ console.log((new Date()).toISOString(), '[ERROR] Forecaster: Docker Python Script exited with code ' + code); console.log((new Date()).toISOString(), '[ERROR] Unexpected error trying to execute a Python script inside the Docker container. ') console.log((new Date()).toISOString(), '[ERROR] Check at a console if you can run this command: ') - console.log((new Date()).toISOString(), '[ERROR] docker exec -it Bitcoin-Factory-ML-Forecasting python /tf/notebooks/Bitcoin_Factory_LSTM_Forecasting.py') + console.log((new Date()).toISOString(), '[ERROR] docker exec -it Bitcoin-Factory-ML-Forecasting python -u /tf/notebooks/' + nextForecastCase.pythonScriptName) console.log((new Date()).toISOString(), '[ERROR] Once you can sucessfully run it at the console you might want to try to run this App again. ') reject('Unexpected Error.') } @@ -656,34 +675,59 @@ } function onFinished(dataReceived) { - try { - - let index = dataReceived.indexOf('{') - dataReceived = dataReceived.substring(index) - //just for debug: console.log(dataReceived) - processExecutionResult = JSON.parse(fixJSON(dataReceived)) - - console.log('Prediction RMSE Error: ' + processExecutionResult.errorRMSE) - console.log('Predictions [candle.max, candle.min, candle.close]: ' + processExecutionResult.predictions) - - let endingTimestamp = (new Date()).valueOf() - processExecutionResult.enlapsedTime = (endingTimestamp - startingTimestamp) / 1000 - console.log('Enlapsed Time (HH:MM:SS): ' + (new Date(processExecutionResult.enlapsedTime * 1000).toISOString().substr(14, 5)) + ' ') - - processExecutionResult.testServer = nextForecastCase.testServer - processExecutionResult.id = nextForecastCase.id - processExecutionResult.caseIndex = nextForecastCase.caseIndex + if (dataReceived.includes('RL_SCENARIO_END')) { //RL + let fileContent = SA.nodeModules.fs.readFileSync(global.env.PATH_TO_BITCOIN_FACTORY + "/Forecast-Client/notebooks/evaluation_results.json") + if (fileContent !== undefined) { + try { + processExecutionResult = JSON.parse(fileContent) + console.log(processExecutionResult) + let endingTimestamp = (new Date()).valueOf() + processExecutionResult.enlapsedTime = (endingTimestamp - startingTimestamp) / 1000 + processExecutionResult.pythonScriptName = nextForecastCase.pythonScriptName + processExecutionResult.testServer = nextForecastCase.testServer + processExecutionResult.id = nextForecastCase.id + processExecutionResult.caseIndex = nextForecastCase.caseIndex + console.log((new Date()).toISOString(), '[INFO] {Forecastclient} Enlapsed Time: ' + timeUnits(processExecutionResult.enlapsedTime * 1000) + ' ') + console.log((new Date()).toISOString(), '[INFO] {Forecastclient} Mean Networth at End of Train: ' + processExecutionResult["0"].meanNetWorthAtEnd) + console.log((new Date()).toISOString(), '[INFO] {Forecastclient} Mean Networth at End of Test: ' + processExecutionResult["1"].meanNetWorthAtEnd) + console.log((new Date()).toISOString(), '[INFO] {Forecastclient} Mean Networth at End of Validation: ' + processExecutionResult["2"].meanNetWorthAtEnd) + console.log((new Date()).toISOString(), '[INFO] {Forecastclient} Next Action: ' + processExecutionResult["2"].current_action.type + ' / ' + processExecutionResult["2"].current_action.amount) + + } catch (err) { + console.log('Error parsing the information generated at the Docker Container executing the Python script. err.stack = ' + err.stack) + console.log('The data that can not be parsed is = ' + fileContent) + } + } else { + console.log('Can not read result file: ' + global.env.PATH_TO_BITCOIN_FACTORY + "/Forecast-Client/notebooks/evaluation_results.json") + } + } else { //LSTM + try { + + let index = dataReceived.indexOf('{') + dataReceived = dataReceived.substring(index) + //just for debug: console.log(dataReceived) + processExecutionResult = JSON.parse(fixJSON(dataReceived)) + + console.log('Prediction RMSE Error: ' + processExecutionResult.errorRMSE) + console.log('Predictions [candle.max, candle.min, candle.close]: ' + processExecutionResult.predictions) + + let endingTimestamp = (new Date()).valueOf() + processExecutionResult.enlapsedTime = (endingTimestamp - startingTimestamp) / 1000 + console.log('Enlapsed Time (HH:MM:SS): ' + (new Date(processExecutionResult.enlapsedTime * 1000).toISOString().substr(14, 5)) + ' ') + + processExecutionResult.testServer = nextForecastCase.testServer + processExecutionResult.id = nextForecastCase.id + processExecutionResult.caseIndex = nextForecastCase.caseIndex - } catch (err) { + } catch (err) { - if (processExecutionResult !== undefined && processExecutionResult.predictions !== undefined) { - console.log('processExecutionResult.predictions:' + processExecutionResult.predictions) - } - - console.log(err.stack) - console.error(err) + if (processExecutionResult !== undefined && processExecutionResult.predictions !== undefined) { + console.log('processExecutionResult.predictions:' + processExecutionResult.predictions) + } + console.log(err.stack) + console.error(err) + } } - resolve(processExecutionResult) } } @@ -751,10 +795,10 @@ caseIndex: 0 } console.log() - console.log((new Date()).toISOString(), 'Current Forecast table') + console.log((new Date()).toISOString(), '[INFO] {Forecastclient} Current Forecast table') } else { console.log() - console.log((new Date()).toISOString(), 'A new Forecast for the Case Id ' + forecastCase.id + ' was produced / attemped.') + console.log((new Date()).toISOString(), '[INFO] {Forecastclient} A new Forecast for the Case Id ' + forecastCase.id + ' was produced / attemped.') } let logQueue = [] for (let i = Math.max(0, forecastCase.caseIndex - 5); i < Math.min(thisObject.forecastCasesArray.length, forecastCase.caseIndex + 5); i++) { @@ -988,4 +1032,35 @@ } return counter } + + /** + * Converts milliseconds into greater time units as possible + * @param {int} ms - Amount of time measured in milliseconds + * @return {?Object} Reallocated time units. NULL on failure. + */ + function timeUnits( ms ) { + if ( !Number.isInteger(ms) ) { + return null + } + /** + * Takes as many whole units from the time pool (ms) as possible + * @param {int} msUnit - Size of a single unit in milliseconds + * @return {int} Number of units taken from the time pool + */ + const allocate = msUnit => { + const units = Math.trunc(ms / msUnit) + ms -= units * msUnit + return units + } + // Property order is important here. + // These arguments are the respective units in ms. + return ""+ + // weeks: allocate(604800000), // Uncomment for weeks + // days: allocate(86400000), + allocate(3600000) + "h:" + + allocate(60000)+"m:" + + allocate(1000)+"s:" + //ms: ms // remainder + + } } diff --git a/Projects/Bitcoin-Factory/TS/Bot-Modules/Test-Client/TestClient.js b/Projects/Bitcoin-Factory/TS/Bot-Modules/Test-Client/TestClient.js index 058af03b85..808d0cf0a7 100644 --- a/Projects/Bitcoin-Factory/TS/Bot-Modules/Test-Client/TestClient.js +++ b/Projects/Bitcoin-Factory/TS/Bot-Modules/Test-Client/TestClient.js @@ -226,6 +226,7 @@ Set default script name. */ if (nextTestCase.pythonScriptName === undefined) { nextTestCase.pythonScriptName = "Bitcoin_Factory_LSTM.py" } + if (BOT_CONFIG.dockerContainerName === undefined) { BOT_CONFIG.dockerContainerName = "Bitcoin-Factory-ML" } /* Remove from Parameters the properties that are in OFF */ @@ -259,7 +260,7 @@ async function promiseWork(resolve, reject) { const { spawn } = require('child_process'); - const ls = spawn('docker', ['exec', 'Bitcoin-Factory-ML', 'python', '-u', '/tf/notebooks/' + nextTestCase.pythonScriptName]); + const ls = spawn('docker', ['exec', BOT_CONFIG.dockerContainerName, 'python', '-u', '/tf/notebooks/' + nextTestCase.pythonScriptName]); let dataReceived = '' ls.stdout.on('data', (data) => { data = data.toString() @@ -280,7 +281,7 @@ data = JSON.parse(fileContent) percentage = Math.round(data.timestepsExecuted / data.timestepsTotal * 100) - let heartbeatText = 'Episode reward mean: ' + data.episodeRewardMean + ' | Episode reward max: ' + data.episodeRewardMax + ' | Episode reward min: ' + data.episodeRewardMin + let heartbeatText = 'Episode reward mean / max / min: ' + data.episodeRewardMean + ' | ' + data.episodeRewardMax + ' | ' + data.episodeRewardMin TS.projects.foundations.functionLibraries.processFunctions.processHeartBeat(processIndex, heartbeatText, percentage, statusText) @@ -325,7 +326,7 @@ } else { console.log((new Date()).toISOString(), '[ERROR] Unexpected error trying to execute a Python script inside the Docker container. ') console.log((new Date()).toISOString(), '[ERROR] Check at a console if you can run this command: ') - console.log((new Date()).toISOString(), '[ERROR] docker exec -it Bitcoin-Factory-ML python /tf/notebooks/' + nextTestCase.pythonScriptName) + console.log((new Date()).toISOString(), '[ERROR] docker exec ' + BOT_CONFIG.dockerContainerName + ' python -u /tf/notebooks/' + nextTestCase.pythonScriptName) console.log((new Date()).toISOString(), '[ERROR] Once you can sucessfully run it at the console you might want to try to run this App again. ') reject('Unexpected Error.') } @@ -339,7 +340,29 @@ function onFinished(dataReceived) { try { if (dataReceived.includes('RL_SCENARIO_END')) { - //TODO: read from the evaluation_results.json file + let fileContent = SA.nodeModules.fs.readFileSync(global.env.PATH_TO_BITCOIN_FACTORY + "/Test-Client/notebooks/evaluation_results.json") + if (fileContent !== undefined) { + try { + processExecutionResult = JSON.parse(fileContent) + console.log(processExecutionResult) +/* example of fileContent: + {"meanNetWorth": 721.2464292834837, "stdNetWorth": 271.8523338823371, "minNetWorth": 248.60280285744497, "maxNetWorth": 1264.2342365877673, "stdQuoteAsset": 193.18343367092214, "minQuoteAsset": 4.0065377572671893e-10, "maxQuoteAsset": 1161.3969522280302, "stdBaseAsset": 0.005149471273320009, "minBaseAsset": 0.0, "maxBaseAsset": 0.0284320291802237, "meanNetWorthAtEnd": 260.82332674553476, "stdNetWorthAtEnd": 0.0, "minNetWorthAtEnd": 260.82332674553476, "maxNetWorthAtEnd": 260.82332674553476} +*/ + let endingTimestamp = (new Date()).valueOf() + processExecutionResult.enlapsedTime = (endingTimestamp - startingTimestamp) / 1000 + processExecutionResult.pythonScriptName = nextTestCase.pythonScriptName + console.log((new Date()).toISOString(), '[INFO] {Testclient} Enlapsed Time: ' + timeUnits(processExecutionResult.enlapsedTime * 1000) + ' ') + console.log((new Date()).toISOString(), '[INFO] {Testclient} Mean Networth at End of Train: ' + processExecutionResult["0"].meanNetWorthAtEnd) + console.log((new Date()).toISOString(), '[INFO] {Testclient} Mean Networth at End of Test: ' + processExecutionResult["1"].meanNetWorthAtEnd) + console.log((new Date()).toISOString(), '[INFO] {Testclient} Mean Networth at End of Validation: ' + processExecutionResult["2"].meanNetWorthAtEnd) + console.log((new Date()).toISOString(), '[INFO] {Testclient} Next Action: ' + processExecutionResult["2"].current_action.type + ' / ' + processExecutionResult["2"].current_action.amount) + } catch (err) { + console.log('Error parsing the information generated at the Docker Container executing the Python script. err.stack = ' + err.stack) + console.log('The data that can not be parsed is = ' + fileContent) + } + } else { + console.log('Can not read result file: ' + global.env.PATH_TO_BITCOIN_FACTORY + "/Test-Client/notebooks/evaluation_results.json") + } } else { try { let cleanedData = filterOutput(dataReceived) @@ -422,4 +445,36 @@ } return cleanText } + + /** + * Converts milliseconds into greater time units as possible + * @param {int} ms - Amount of time measured in milliseconds + * @return {?Object} Reallocated time units. NULL on failure. + */ + function timeUnits( ms ) { + if ( !Number.isInteger(ms) ) { + return null + } + /** + * Takes as many whole units from the time pool (ms) as possible + * @param {int} msUnit - Size of a single unit in milliseconds + * @return {int} Number of units taken from the time pool + */ + const allocate = msUnit => { + const units = Math.trunc(ms / msUnit) + ms -= units * msUnit + return units + } + // Property order is important here. + // These arguments are the respective units in ms. + return ""+ + // weeks: allocate(604800000), // Uncomment for weeks + // days: allocate(86400000), + allocate(3600000) + "h:" + + allocate(60000)+"m:" + + allocate(1000)+"s:" + //ms: ms // remainder + + } + } diff --git a/Projects/Bitcoin-Factory/TS/Bot-Modules/Test-Server/ForecastCasesManager.js b/Projects/Bitcoin-Factory/TS/Bot-Modules/Test-Server/ForecastCasesManager.js index daabcd757c..2f984b627e 100644 --- a/Projects/Bitcoin-Factory/TS/Bot-Modules/Test-Server/ForecastCasesManager.js +++ b/Projects/Bitcoin-Factory/TS/Bot-Modules/Test-Server/ForecastCasesManager.js @@ -64,32 +64,68 @@ exports.newForecastCasesManager = function newForecastCasesManager(processIndex, for (let i = 0; i < thisObject.forecastCasesArray.length; i++) { let forecastCase = thisObject.forecastCasesArray[i] if (forecastCase.mainAsset === testCase.mainAsset && forecastCase.mainTimeFrame === testCase.mainTimeFrame) { - if (Number(testCase.percentageErrorRMSE) < Number(forecastCase.percentageErrorRMSE) && Number(testCase.percentageErrorRMSE) >= 0) { - thisObject.forecastCasesArray.splice(i, 1) - thisObject.forecastCasesMap.delete(testCase.id) - addForcastCase(testCase) - return + //LSTM + if (forecastCase.percentageErrorRMSE !== undefined) { + if (Number(testCase.percentageErrorRMSE) < Number(forecastCase.percentageErrorRMSE) && Number(testCase.percentageErrorRMSE) >= 0) { + thisObject.forecastCasesArray.splice(i, 1) + thisObject.forecastCasesMap.delete(testCase.id) + addForcastCase(testCase) + // return + } else { + // continue + } + //RL + } else if (forecastCase.ratio !== undefined) { + if (Number(testCase.ratio.validate) > Number(forecastCase.ratio.validate) ) { + thisObject.forecastCasesArray.splice(i, 1) + thisObject.forecastCasesMap.delete(testCase.id) + addForcastCase(testCase) + // return + } else { + // continue + } } else { - return + // continue } + } else { + addForcastCase(testCase) + // return } } - addForcastCase(testCase) + if (thisObject.forecastCasesArray.length == 0) addForcastCase(testCase) saveForecastCasesFile() + console.log("Testserver: Current Forecast table:") + console.table(thisObject.forecastCasesArray) + function addForcastCase(testCase) { - if (testCase.forcastedCandle === undefined) { testCase.forcastedCandle = {} } + let testServer + let parameters + let predictions + let forcastedCandle + try { + testServer = JSON.parse(JSON.stringify(testCase.testServer)) + } catch (err) {} + try { + parameters = JSON.parse(JSON.stringify(testCase.parameters)) + } catch (err) {} + try { + predictions = JSON.parse(JSON.stringify(testCase.predictions)) + } catch (err) {} + try { + forcastedCandle = JSON.parse(JSON.stringify(testCase.forcastedCandle)) + } catch (err) {} let forecastCase = { id: testCase.id, caseIndex: thisObject.forecastCasesArray.length, - testServer: JSON.parse(JSON.stringify(testCase.testServer)), + testServer: testServer, mainAsset: testCase.mainAsset, mainTimeFrame: testCase.mainTimeFrame, percentageErrorRMSE: testCase.percentageErrorRMSE, - parameters: JSON.parse(JSON.stringify(testCase.parameters)), + parameters: parameters, parametersHash: testCase.parametersHash, - predictions: JSON.parse(JSON.stringify(testCase.predictions)), - forcastedCandle: JSON.parse(JSON.stringify(testCase.forcastedCandle)), + predictions: predictions, + forcastedCandle: forcastedCandle, timeSeriesFileName: testCase.timeSeriesFileName, timestamp: testCase.timestamp, when: testCase.when, @@ -150,6 +186,7 @@ exports.newForecastCasesManager = function newForecastCasesManager(processIndex, caseIndex: forecastCase.caseIndex, totalCases: thisObject.forecastCasesArray.length, parameters: forecastCase.parameters, + pythonScriptName: forecastCase.pythonScriptName, testServer: { userProfile: ((forecastCase.testServer != undefined) && (forecastCase.testServer.userProfile != undefined) ? forecastCase.testServer.userProfile : ''), instance: ((forecastCase.testServer != undefined) && (forecastCase.testServer.instance != undefined) ? forecastCase.testServer.instance : TS.projects.foundations.globals.taskConstants.TASK_NODE.bot.config.serverInstanceName) @@ -172,6 +209,7 @@ exports.newForecastCasesManager = function newForecastCasesManager(processIndex, id: forecastCase.id, caseIndex: forecastCase.caseIndex, parameters: forecastCase.parameters, + pythonScriptName: forecastCase.pythonScriptName, testServer: { userProfile: ((forecastCase.testServer != undefined) && (forecastCase.testServer.userProfile != undefined) ? forecastCase.testServer.userProfile : ''), instance: ((forecastCase.testServer != undefined) && (forecastCase.testServer.instance != undefined) ? forecastCase.testServer.instance : TS.projects.foundations.globals.taskConstants.TASK_NODE.bot.config.serverInstanceName) @@ -193,16 +231,32 @@ exports.newForecastCasesManager = function newForecastCasesManager(processIndex, } if (forecastCase != undefined) { forecastCase.status = 'Forecasted' - forecastCase.predictions = forecastResult.predictions - forecastCase.errorRMSE = forecastResult.errorRMSE - forecastCase.percentageErrorRMSE = calculatePercentageErrorRMSE(forecastResult) forecastCase.enlapsedSeconds = forecastResult.enlapsedTime.toFixed(0) forecastCase.enlapsedMinutes = (forecastResult.enlapsedTime / 60).toFixed(2) forecastCase.enlapsedHours = (forecastResult.enlapsedTime / 3600).toFixed(2) forecastCase.forecastedBy = forecastedBy forecastCase.testServer = forecastResult.testServer + forecastCase.pythonScriptName = forecastResult.pythonScriptName forecastCase.timestamp = (new Date()).valueOf() - + //LSTM + if (forecastResult.errorRMSE != undefined) { + forecastCase.predictions = forecastResult.predictions + forecastCase.errorRMSE = forecastResult.errorRMSE + forecastCase.percentageErrorRMSE = calculatePercentageErrorRMSE(forecastResult) + //RL + } else if (forecastResult["0"] != undefined) { + forecastCase.predictions = forecastResult["2"].current_action + forecastCase.ratio = { + train: forecastResult["0"].meanNetWorthAtEnd / forecastResult["0"].NetWorthAtBegin, + test: forecastResult["1"].meanNetWorthAtEnd / forecastResult["1"].NetWorthAtBegin, + validate: forecastResult["2"].meanNetWorthAtEnd / forecastResult["2"].NetWorthAtBegin + } + forecastCase.std = { + train: forecastResult["0"].stdNetWorthAtEnd , + test: forecastResult["1"].stdNetWorthAtEnd , + validate: forecastResult["2"].stdNetWorthAtEnd + } + } let logQueue = [] for (let i = Math.max(0, forecastResult.caseIndex - 5); i < Math.min(thisObject.forecastCasesArray.length, forecastResult.caseIndex + 5); i++) { let forecastCase = thisObject.forecastCasesArray[i] diff --git a/Projects/Bitcoin-Factory/TS/Bot-Modules/Test-Server/ForecastClientsManager.js b/Projects/Bitcoin-Factory/TS/Bot-Modules/Test-Server/ForecastClientsManager.js index 80a1060842..f0414407bb 100644 --- a/Projects/Bitcoin-Factory/TS/Bot-Modules/Test-Server/ForecastClientsManager.js +++ b/Projects/Bitcoin-Factory/TS/Bot-Modules/Test-Server/ForecastClientsManager.js @@ -78,6 +78,7 @@ exports.newForecastClientsManager = function newForecastClientsManager(processIn console.log((new Date()).toISOString(), 'Forecast Case Id ' + nextForecastCase.id + ' delivered to', currentClientInstance) nextForecastCase.files.parameters = nextForecastCase.files.parameters.toString() nextForecastCase.files.timeSeries = nextForecastCase.files.timeSeries.toString() + if (nextForecastCase.pythonScriptName == undefined) nextForecastCase.pythonScriptName = TS.projects.foundations.globals.taskConstants.TASK_NODE.bot.config.pythonScriptName return nextForecastCase } else { console.log((new Date()).toISOString(), 'No more Forecast Cases to Build. Could not deliver one to ' + currentClientInstance) diff --git a/Projects/Bitcoin-Factory/TS/Bot-Modules/Test-Server/TestCasesManager.js b/Projects/Bitcoin-Factory/TS/Bot-Modules/Test-Server/TestCasesManager.js index f8583405a8..baed910325 100644 --- a/Projects/Bitcoin-Factory/TS/Bot-Modules/Test-Server/TestCasesManager.js +++ b/Projects/Bitcoin-Factory/TS/Bot-Modules/Test-Server/TestCasesManager.js @@ -14,7 +14,8 @@ exports.newTestCasesManager = function newTestCasesManager(processIndex, network const REPORT_NAME = networkCodeName + '-' + (new Date()).toISOString().substring(0, 16).replace("T", "-").replace(":", "-").replace(":", "-") + '-00' const MUST_BE_ON_PARAMS = [ 'CANDLES_CANDLES-VOLUMES_CANDLES_CANDLE_MAX', 'CANDLES_CANDLES-VOLUMES_CANDLES_CANDLE_MIN', - 'CANDLES_CANDLES-VOLUMES_CANDLES_CANDLE_CLOSE', 'CANDLES_CANDLES-VOLUMES_CANDLES_CANDLE_OPEN' + 'CANDLES_CANDLES-VOLUMES_CANDLES_CANDLE_CLOSE', 'CANDLES_CANDLES-VOLUMES_CANDLES_CANDLE_OPEN', + 'CANDLES_CANDLES-VOLUMES_VOLUMES_VOLUME_BUY' ] let parametersRanges = TS.projects.foundations.globals.taskConstants.TASK_NODE.bot.config.parametersRanges @@ -41,6 +42,12 @@ exports.newTestCasesManager = function newTestCasesManager(processIndex, network for (let i = 0; i < thisObject.testCasesArray.length; i++) { let testCase = thisObject.testCasesArray[i] thisObject.testCasesMap.set(testCase.parametersHash, testCase) + if (TS.projects.foundations.globals.taskConstants.TEST_SERVER.forecastCasesManager.forecastCasesArray == undefined) { + TS.projects.foundations.globals.taskConstants.TEST_SERVER.forecastCasesManager.initialize() + } + if (testCase.status === "Tested") { + TS.projects.foundations.globals.taskConstants.TEST_SERVER.forecastCasesManager.addToforecastCases(testCase) + } } } generateTestCases() @@ -371,19 +378,36 @@ exports.newTestCasesManager = function newTestCasesManager(processIndex, network return } testCase.status = 'Tested' - testCase.predictions = testResult.predictions - testCase.errorRMSE = testResult.errorRMSE - testCase.percentageErrorRMSE = calculatePercentageErrorRMSE(testResult) testCase.enlapsedSeconds = testResult.enlapsedTime.toFixed(0) testCase.enlapsedMinutes = (testResult.enlapsedTime / 60).toFixed(2) testCase.enlapsedHours = (testResult.enlapsedTime / 3600).toFixed(2) testCase.testedByInstance = currentClientInstance + testCase.pythonScriptName = testResult.pythonScriptName testCase.testedByProfile = userProfile testCase.timestamp = (new Date()).valueOf() testCase.testServer = { userProfile: ((testResult.testServer != undefined) && (testResult.testServer.userProfile != undefined) ? testResult.testServer.userProfile : ''), instance: TS.projects.foundations.globals.taskConstants.TASK_NODE.bot.config.serverInstanceName } + //LSTM + if (testResult.errorRMSE != undefined) { + testCase.predictions = testResult.predictions + testCase.errorRMSE = testResult.errorRMSE + testCase.percentageErrorRMSE = calculatePercentageErrorRMSE(testResult) + //RL + } else if (testResult["0"] != undefined) { + testCase.predictions = testResult["2"].current_action + testCase.ratio = { + train: testResult["0"].meanNetWorthAtEnd / testResult["0"].NetWorthAtBegin, + test: testResult["1"].meanNetWorthAtEnd / testResult["1"].NetWorthAtBegin, + validate: testResult["2"].meanNetWorthAtEnd / testResult["2"].NetWorthAtBegin + } + testCase.std = { + train: testResult["0"].stdNetWorthAtEnd , + test: testResult["1"].stdNetWorthAtEnd , + validate: testResult["2"].stdNetWorthAtEnd + } + } let logQueue = [] for (let i = Math.max(0, testResult.id - 5); i < Math.min(thisObject.testCasesArray.length, testResult.id + 5); i++) { diff --git a/Projects/Bitcoin-Factory/TS/Bot-Modules/Test-Server/TestServer.js b/Projects/Bitcoin-Factory/TS/Bot-Modules/Test-Server/TestServer.js index 83c4b8f562..7afcc6a863 100644 --- a/Projects/Bitcoin-Factory/TS/Bot-Modules/Test-Server/TestServer.js +++ b/Projects/Bitcoin-Factory/TS/Bot-Modules/Test-Server/TestServer.js @@ -7,7 +7,8 @@ dataBridge: undefined, testCasesManager: undefined, testClientsManager: undefined, - forecastsManager: undefined, + forecastCasesManager: undefined, + forecastClientsManager: undefined, initialize: initialize, finalize: finalize, start: start @@ -33,7 +34,9 @@ thisObject.dataBridge.initialize() await thisObject.testCasesManager.initialize() await thisObject.testClientsManager.initialize() - thisObject.forecastCasesManager.initialize() + if (thisObject.forecastCasesManager.forecastCasesArray == undefined) { + thisObject.forecastCasesManager.initialize() + } await thisObject.forecastClientsManager.initialize() console.log((new Date()).toISOString(), 'Running Test Server v.' + TEST_SERVER_VERSION) callBackFunction(TS.projects.foundations.globals.standardResponses.DEFAULT_OK_RESPONSE)