diff --git a/qlib/contrib/strategy/cost_control.py b/qlib/contrib/strategy/cost_control.py index ff51f484f54..a0be77c9755 100644 --- a/qlib/contrib/strategy/cost_control.py +++ b/qlib/contrib/strategy/cost_control.py @@ -1,101 +1,122 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. -""" -This strategy is not well maintained -""" - from .order_generator import OrderGenWInteract from .signal_strategy import WeightStrategyBase -import copy class SoftTopkStrategy(WeightStrategyBase): def __init__( self, - model, - dataset, - topk, + model=None, + dataset=None, + topk=None, order_generator_cls_or_obj=OrderGenWInteract, max_sold_weight=1.0, + trade_impact_limit=None, + priority="IMPACT_FIRST", risk_degree=0.95, buy_method="first_fill", - trade_exchange=None, - level_infra=None, - common_infra=None, **kwargs, ): """ + Refactored SoftTopkStrategy with a budget-constrained rebalancing engine. + Parameters ---------- topk : int - top-N stocks to buy + The number of top-N stocks to be held in the portfolio. + trade_impact_limit : float + Maximum weight change for each stock in one trade. + priority : str + "COMPLIANCE_FIRST" or "IMPACT_FIRST". risk_degree : float - position percentage of total value buy_method: - - rank_fill: assign the weight stocks that rank high first(1/topk max) - average_fill: assign the weight to the stocks rank high averagely. + The target percentage of total value to be invested. """ super(SoftTopkStrategy, self).__init__( - model, dataset, order_generator_cls_or_obj, trade_exchange, level_infra, common_infra, **kwargs + model=model, dataset=dataset, order_generator_cls_or_obj=order_generator_cls_or_obj, **kwargs ) + self.topk = topk - self.max_sold_weight = max_sold_weight + self.trade_impact_limit = trade_impact_limit if trade_impact_limit is not None else max_sold_weight + self.priority = priority.upper() self.risk_degree = risk_degree self.buy_method = buy_method def get_risk_degree(self, trade_step=None): - """get_risk_degree - Return the proportion of your total value you will used in investment. - Dynamically risk_degree will result in Market timing - """ - # It will use 95% amount of your total value by default return self.risk_degree - def generate_target_weight_position(self, score, current, trade_start_time, trade_end_time): + def generate_target_weight_position(self, score, current, trade_start_time, trade_end_time, **kwargs): + """ + Generates target position using Proportional Budget Allocation. + Ensures deterministic sells and synchronized buys under impact limits. """ - Parameters - ---------- - score: - pred score for this trade date, pd.Series, index is stock_id, contain 'score' column - current: - current position, use Position() class - trade_date: - trade date - generate target position from score for this date and the current position + if self.topk is None or self.topk <= 0: + return {} - The cache is not considered in the position - """ - # TODO: - # If the current stock list is more than topk(eg. The weights are modified - # by risk control), the weight will not be handled correctly. - buy_signal_stocks = set(score.sort_values(ascending=False).iloc[: self.topk].index) - cur_stock_weight = current.get_stock_weight_dict(only_stock=True) - - if len(cur_stock_weight) == 0: - final_stock_weight = {code: 1 / self.topk for code in buy_signal_stocks} - else: - final_stock_weight = copy.deepcopy(cur_stock_weight) - sold_stock_weight = 0.0 - for stock_id in final_stock_weight: - if stock_id not in buy_signal_stocks: - sw = min(self.max_sold_weight, final_stock_weight[stock_id]) - sold_stock_weight += sw - final_stock_weight[stock_id] -= sw - if self.buy_method == "first_fill": - for stock_id in buy_signal_stocks: - add_weight = min( - max(1 / self.topk - final_stock_weight.get(stock_id, 0), 0.0), - sold_stock_weight, - ) - final_stock_weight[stock_id] = final_stock_weight.get(stock_id, 0.0) + add_weight - sold_stock_weight -= add_weight - elif self.buy_method == "average_fill": - for stock_id in buy_signal_stocks: - final_stock_weight[stock_id] = final_stock_weight.get(stock_id, 0.0) + sold_stock_weight / len( - buy_signal_stocks + ideal_per_stock = self.risk_degree / self.topk + ideal_list = score.sort_values(ascending=False).iloc[: self.topk].index.tolist() + + cur_weights = current.get_stock_weight_dict(only_stock=True) + initial_total_weight = sum(cur_weights.values()) + + # --- Case A: Cold Start --- + if not cur_weights: + fill = ( + ideal_per_stock + if self.priority == "COMPLIANCE_FIRST" + else min(ideal_per_stock, self.trade_impact_limit) + ) + return {code: fill for code in ideal_list} + + # --- Case B: Rebalancing --- + all_tickers = set(cur_weights.keys()) | set(ideal_list) + next_weights = {t: cur_weights.get(t, 0.0) for t in all_tickers} + + # Phase 1: Deterministic Sell Phase + released_cash = 0.0 + for t in list(next_weights.keys()): + cur = next_weights[t] + if cur <= 1e-8: + continue + + if t not in ideal_list: + sell = cur if self.priority == "COMPLIANCE_FIRST" else min(cur, self.trade_impact_limit) + next_weights[t] -= sell + released_cash += sell + elif cur > ideal_per_stock + 1e-8: + excess = cur - ideal_per_stock + sell = excess if self.priority == "COMPLIANCE_FIRST" else min(excess, self.trade_impact_limit) + next_weights[t] -= sell + released_cash += sell + + # Phase 2: Budget Calculation + # Budget = Cash from sells + Available space from target risk degree + total_budget = released_cash + (self.risk_degree - initial_total_weight) + + # Phase 3: Proportional Buy Allocation + if total_budget > 1e-8: + shortfalls = { + t: (ideal_per_stock - next_weights.get(t, 0.0)) + for t in ideal_list + if next_weights.get(t, 0.0) < ideal_per_stock - 1e-8 + } + + if shortfalls: + total_shortfall = sum(shortfalls.values()) + # Normalize total_budget to not exceed total_shortfall + available_to_spend = min(total_budget, total_shortfall) + + for t, shortfall in shortfalls.items(): + # Every stock gets its fair share based on its distance to target + share_of_budget = (shortfall / total_shortfall) * available_to_spend + + # Capped by impact limit or compliance priority + max_buy_cap = ( + shortfall if self.priority == "COMPLIANCE_FIRST" else min(shortfall, self.trade_impact_limit) ) - else: - raise ValueError("Buy method not found") - return final_stock_weight + + next_weights[t] += min(share_of_budget, max_buy_cap) + + return {k: v for k, v in next_weights.items() if v > 1e-8} diff --git a/tests/test_soft_topk_strategy.py b/tests/test_soft_topk_strategy.py new file mode 100644 index 00000000000..34a4bbd6dad --- /dev/null +++ b/tests/test_soft_topk_strategy.py @@ -0,0 +1,57 @@ +import pandas as pd +import pytest +from qlib.contrib.strategy.cost_control import SoftTopkStrategy + + +class MockPosition: + def __init__(self, weights): + self.weights = weights + + def get_stock_weight_dict(self, only_stock=True): + return self.weights + + +def test_soft_topk_logic(): + # Initial: A=0.8, B=0.2 (Total=1.0). Target Risk=0.95. + # Scores: A and B are low, C and D are topk. + scores = pd.Series({"C": 0.9, "D": 0.8, "A": 0.1, "B": 0.1}) + current_pos = MockPosition({"A": 0.8, "B": 0.2}) + + topk = 2 + risk_degree = 0.95 + impact_limit = 0.1 # Max change per step + + def create_test_strategy(priority): + strat = SoftTopkStrategy.__new__(SoftTopkStrategy) + strat.topk = topk + strat.risk_degree = risk_degree + strat.trade_impact_limit = impact_limit + strat.priority = priority.upper() + return strat + + # 1. Test IMPACT_FIRST: Expect deterministic sell and limited buy + strat_i = create_test_strategy("IMPACT_FIRST") + res_i = strat_i.generate_target_weight_position(scores, current_pos, None, None) + + # A should be exactly 0.8 - 0.1 = 0.7 + assert abs(res_i["A"] - 0.7) < 1e-8 + # B should be exactly 0.2 - 0.1 = 0.1 + assert abs(res_i["B"] - 0.1) < 1e-8 + # Total sells = 0.2 released. New budget = 0.2 + (0.95 - 1.0) = 0.15. + # C and D share 0.15 -> 0.075 each. + assert abs(res_i["C"] - 0.075) < 1e-8 + assert abs(res_i["D"] - 0.075) < 1e-8 + + # 2. Test COMPLIANCE_FIRST: Expect full liquidation and full target fill + strat_c = create_test_strategy("COMPLIANCE_FIRST") + res_c = strat_c.generate_target_weight_position(scores, current_pos, None, None) + + # A, B not in topk -> Liquidated + assert "A" not in res_c and "B" not in res_c + # C, D should reach ideal_per_stock (0.95/2 = 0.475) + assert abs(res_c["C"] - 0.475) < 1e-8 + assert abs(res_c["D"] - 0.475) < 1e-8 + + +if __name__ == "__main__": + pytest.main([__file__])