diff --git a/docs/release_notes.rst b/docs/release_notes.rst index b7a981d29..2018eba6c 100644 --- a/docs/release_notes.rst +++ b/docs/release_notes.rst @@ -87,8 +87,11 @@ releases are available on `Anaconda.org - :gh:`361` adds standard deviations of parameters for example models (:ghuser:`timmens`). - :gh:`363` enables msm function to return simulated moments or comparison plot data for - use with `estimagic `_ (:ghuser:`amageh`). + use with `estimagic `_ + (:ghuser:`amageh`). - :gh:`369` adds second set of parameters for kw_97 models (:ghuser:`amageh`). +- :gh:`373` refactors the law of motion and simplifies the collection of child indices + (:ghuser:`tobiasraabe`). - :gh:`371` changes the names of the criterion functions for maximum likelihood and msm estimation. Makes replacement functions optional for estimation with msm and sets identity matrix as default weighting matrix (:ghuser:`amageh`). diff --git a/respy/_numba.py b/respy/_numba.py index 16c28a55e..93b9d50fd 100644 --- a/respy/_numba.py +++ b/respy/_numba.py @@ -1,6 +1,8 @@ """Special functions for using numba.""" import warnings +import numba as nb +import numpy as np from numba import NumbaDeprecationWarning from numba import types from numba.extending import intrinsic @@ -74,3 +76,19 @@ def array_indexer(a, i): return tup return function_signature, codegen + + +@nb.njit +def sum_over_numba_boolean_unituple(tuple_): + """Compute the sum over a boolean :class:`numba.types.UniTuple`. + + Parameters + ---------- + tuple_ : numba.types.UniTuple[bool] + + Returns + ------- + sum_ : Union[float, int] + + """ + return np.sum(np.array([1 for i in tuple_ if i])) diff --git a/respy/shared.py b/respy/shared.py index 5d191a7f0..fb9e3aaae 100644 --- a/respy/shared.py +++ b/respy/shared.py @@ -590,7 +590,7 @@ def map_observations_to_states(states, state_space, optim_paras): core_columns = ["period"] + create_core_state_space_columns(optim_paras) core = states.reset_index(level="period")[core_columns].to_numpy(dtype="int64") - core_key, core_index = _map_observations_to_core_states_numba( + core_key, core_index = map_states_to_core_key_and_core_index( core, state_space.indexer ) @@ -611,14 +611,30 @@ def map_observations_to_states(states, state_space, optim_paras): @nb.njit -def _map_observations_to_core_states_numba(core, indexer): - """Map observations to states in Numba.""" - n_observations = core.shape[0] - core_key = np.zeros(n_observations, dtype=np.int64) - core_index = np.zeros(n_observations, dtype=np.int64) +def map_states_to_core_key_and_core_index(states, indexer): + """Map states to the core key and core index. - for i in range(n_observations): - core_key_, core_index_ = indexer[array_to_tuple(indexer, core[i])] + Parameters + ---------- + states : numpy.ndarray + Multidimensional array containing only core dimensions of states. + indexer : numba.typed.Dict + A dictionary with core states as keys and the core key and core index as values. + + Returns + ------- + core_key : numpy.ndarray + An array containing the core key. See :ref:`core_key`. + core_index : numpy.ndarray + An array containing the core index. See :ref:`core_indices`. + + """ + n_states = states.shape[0] + core_key = np.zeros(n_states, dtype=np.int64) + core_index = np.zeros(n_states, dtype=np.int64) + + for i in range(n_states): + core_key_, core_index_ = indexer[array_to_tuple(indexer, states[i])] core_key[i] = core_key_ core_index[i] = core_index_ @@ -700,3 +716,55 @@ def select_valid_choices(choices, choice_set): """ return [x for i, x in enumerate(choices) if choice_set[i]] + + +def apply_law_of_motion_for_core(df, optim_paras): + """Apply the law of motion for the core dimensions. + + This function only applies the law of motion for core dimensions which are the + period, experiences, and previous choices. Depending on the integer-encoded choice + in ``df["choice"]``, the new state is computed. + + Parameters + ---------- + df : pandas.DataFrame + The DataFrame contains states with information on the period, experiences, + previous choices. The current choice is encoded as an integer in a column named + ``"choice"``. + optim_paras : dict + Contains model parameters. + + Returns + ------- + df : pandas.DataFrame + The DataFrame contains the states in the next period. + + """ + n_lagged_choices = optim_paras["n_lagged_choices"] + + # Update work experiences. + for i, choice in enumerate(optim_paras["choices_w_exp"]): + df[f"exp_{choice}"] += df["choice"] == i + + # Update lagged choices by deleting oldest lagged, renaming other lags and inserting + # choice in the first position. + if n_lagged_choices: + # Save position of first lagged choice. + position = df.columns.tolist().index("lagged_choice_1") + + # Drop oldest lag. + df = df.drop(columns=f"lagged_choice_{n_lagged_choices}") + + # Rename newer lags + rename_lagged_choices = { + f"lagged_choice_{i}": f"lagged_choice_{i + 1}" + for i in range(1, n_lagged_choices) + } + df = df.rename(columns=rename_lagged_choices) + + # Add current choice as new lag. + df.insert(position, "lagged_choice_1", df["choice"]) + + df["period"] = df["period"] + 1 + + return df diff --git a/respy/simulate.py b/respy/simulate.py index 7333b346c..7a02cfd33 100644 --- a/respy/simulate.py +++ b/respy/simulate.py @@ -10,6 +10,7 @@ from respy.parallelization import parallelize_across_dense_dimensions from respy.parallelization import split_and_combine_df from respy.pre_processing.model_processing import process_params_and_options +from respy.shared import apply_law_of_motion_for_core from respy.shared import calculate_value_functions_and_flow_utilities from respy.shared import compute_covariates from respy.shared import create_base_draws @@ -218,10 +219,13 @@ def simulate( optim_paras=optim_paras, ) - data.append(current_df_extended) + data.append(current_df_extended.copy(deep=True)) if is_n_step_ahead and period != n_simulation_periods - 1: - df = _apply_law_of_motion(current_df_extended, optim_paras) + current_df_extended = current_df_extended.reset_index() + df = apply_law_of_motion_for_core(current_df_extended, optim_paras) + state_space_columns = create_state_space_columns(optim_paras) + df = df.set_index(["identifier", "period"])[state_space_columns] simulated_data = _process_simulation_output(data, optim_paras) @@ -554,65 +558,6 @@ def _random_choice(choices, probabilities=None, decimals=5): return out -def _apply_law_of_motion(df, optim_paras): - """Apply the law of motion to get the states in the next period. - - For n-step-ahead simulations, the states of the next period are generated from the - current states and the current decision. This function changes experiences and - previous choices according to the choice in the current period, to get the states of - the next period. - - We implicitly assume that observed variables are constant. - - Parameters - ---------- - df : pandas.DataFrame - The DataFrame contains the simulated information of individuals in one period. - optim_paras : dict - - Returns - ------- - df : pandas.DataFrame - The DataFrame contains the states of individuals in the next period. - - """ - df = df.copy() - n_lagged_choices = optim_paras["n_lagged_choices"] - - # Update work experiences. - for i, choice in enumerate(optim_paras["choices_w_exp"]): - df[f"exp_{choice}"] += df["choice"] == i - - # Update lagged choices by deleting oldest lagged, renaming other lags and inserting - # choice in the first position. - if n_lagged_choices: - # Save position of first lagged choice. - position = df.columns.tolist().index("lagged_choice_1") - - # Drop oldest lag. - df = df.drop(columns=f"lagged_choice_{n_lagged_choices}") - - # Rename newer lags - rename_lagged_choices = { - f"lagged_choice_{i}": f"lagged_choice_{i + 1}" - for i in range(1, n_lagged_choices) - } - df = df.rename(columns=rename_lagged_choices) - - # Add current choice as new lag. - df.insert(position, "lagged_choice_1", df["choice"]) - - # Increment period in MultiIndex by one. - df.index = df.index.set_levels( - df.index.get_level_values("period") + 1, level="period", verify_integrity=False - ) - - state_space_columns = create_state_space_columns(optim_paras) - df = df[state_space_columns] - - return df - - def _harmonize_simulation_arguments(method, df, n_simulation_periods, options): """Harmonize the arguments of the simulation. diff --git a/respy/state_space.py b/respy/state_space.py index 7e4ae694f..bae21c8b8 100644 --- a/respy/state_space.py +++ b/respy/state_space.py @@ -6,8 +6,9 @@ import pandas as pd from numba.typed import Dict -from respy._numba import array_to_tuple +from respy._numba import sum_over_numba_boolean_unituple from respy.parallelization import parallelize_across_dense_dimensions +from respy.shared import apply_law_of_motion_for_core from respy.shared import compute_covariates from respy.shared import convert_dictionary_keys_to_dense_indices from respy.shared import create_base_draws @@ -15,6 +16,8 @@ from respy.shared import create_dense_state_space_columns from respy.shared import downcast_to_smallest_dtype from respy.shared import dump_states +from respy.shared import load_states +from respy.shared import map_states_to_core_key_and_core_index from respy.shared import prepare_cache_directory from respy.shared import return_core_dense_key @@ -67,6 +70,8 @@ class StateSpace: core dimensions. A core dimension is a dimension whose value is uniquely determined by past choices and time. Core dimensions include choices, experiences, lagged choices and periods. + dense_key_to_core_indices : Dict[int, Array[int]] + A mapping from dense keys to ``.loc`` locations in the ``core``. """ @@ -111,6 +116,7 @@ def __init__( self.core_key_to_complex = core_key_to_complex self.core_key_to_core_indices = core_key_to_core_indices self.optim_paras = optim_paras + self.options = options self.n_periods = options["n_periods"] self._create_conversion_dictionaries() self.child_indices = self.collect_child_indices() @@ -235,6 +241,10 @@ def get_continuation_values(self, period): def collect_child_indices(self): """Collect for each state the indices of its child states. + To collect continuation values, one needs to find the child state. This function + searches for all states except for the last period their possible successors by + taking every possible combination defined by the law of motion. + See also -------- _collect_child_indices @@ -246,23 +256,21 @@ def collect_child_indices(self): child_indices = None else: - limited_index_to_indices = { + dense_key_to_complex_except_last_period = { k: v - for k, v in self.dense_key_to_core_indices.items() - if self.dense_key_to_complex[k][0] < self.n_periods - 1 + for k, v in self.dense_key_to_complex.items() + if v[0] < self.n_periods - 1 } - - limited_index_to_choice_set = { - k: v - for k, v in self.dense_key_to_choice_set.items() - if self.dense_key_to_complex[k][0] < self.n_periods - 1 + dense_key_to_choice_set_except_last_period = { + k: self.dense_key_to_choice_set[k] + for k in dense_key_to_complex_except_last_period } child_indices = _collect_child_indices( - self.core, - limited_index_to_indices, - limited_index_to_choice_set, + dense_key_to_complex_except_last_period, + dense_key_to_choice_set_except_last_period, self.indexer, self.optim_paras, + self.options, ) return child_indices @@ -742,64 +750,6 @@ def _create_dense_period_choice( return dense_period_choice -@nb.njit -def _insert_indices_of_child_states( - states, indexer, choice_set, n_choices, n_choices_w_exp, n_lagged_choices, -): - """Collect indices of child states for each parent state. - - Parameters - ---------- - states : pandas.DataFrame - Subset of :ref:`core state space ` containing all core - dimensions that arise within a particular dense period choice core. - n_choices : int - Number of admissible choices within a particular dense period choice core. - n_choices_w_exp : int - Number of total choices with experience accumulation. - n_lagged_choices : int - Number of lagged choices to be kept accounted for in the core. - - Returns - ------- - indices: numpy.ndarray - Array with shape ``(n_states, n_choices * 2)``. Represents the mapping - (core_index, choice) -> (dense_key, core_index). - - """ - indices = np.full((states.shape[0], n_choices, 2), -1, dtype=np.int64) - - for i in range(states.shape[0]): - j = 0 - for choice, is_available in enumerate(choice_set): - if is_available: - child = states[i].copy() - - # Adjust period. - child[0] += 1 - - # Increment experience if it is a choice with experience - # accumulation. - if choice < n_choices_w_exp: - child[choice + 1] += 1 - - # Change lagged choice by shifting all existing lagged choices by - # one period and inserting the current choice in first position. - if n_lagged_choices: - child[ - n_choices_w_exp + 2 : n_choices_w_exp + n_lagged_choices + 1 - ] = child[n_choices_w_exp + 1 : n_choices_w_exp + n_lagged_choices] - child[n_choices_w_exp + 1] = choice - - # Get the position of the continuation value. - idx_future = indexer[array_to_tuple(indexer, child)] - indices[i, j] = idx_future - - j += 1 - - return indices - - @parallelize_across_dense_dimensions @nb.njit def _get_continuation_values( @@ -830,8 +780,7 @@ def _get_continuation_values( period, choice_set = dense_complex_index dense_idx = 0 - # Sum over UniTuple. - n_choices = np.sum(np.array([1 for i in choice_set if i])) + n_choices = sum_over_numba_boolean_unituple(choice_set) n_states = core_indices.shape[0] @@ -848,23 +797,24 @@ def _get_continuation_values( @parallelize_across_dense_dimensions -def _collect_child_indices(core, core_indices, choice_set, indexer, optim_paras): - """Collect child indices for one particular dense choice core. +def _collect_child_indices(complex_, choice_set, indexer, optim_paras, options): + """Collect child indices for states. - Particularly creates some auxiliary objects to call _insert_indices_of_child_state - thereafter. + The function takes the states of one dense key, applies the law of motion for each + available choice and maps the resulting states to core keys and core indices. Parameters ---------- - core : pandas.DataFrame - :ref:`core state space ` - - core_indices : numpy.ndarray - Indices of core positions belonging to a particular - dense period choice core. - + complex_ : tuple + See :ref:`complex`. choice_set : tuple Tuple representing admissible choices + indexer : numba.typed.Dict + A dictionary with core states as keys and the core key and core index as values. + optim_paras : dict + Contains model parameters. + options : dict + Contains model options. Returns ------- @@ -873,18 +823,23 @@ def _collect_child_indices(core, core_indices, choice_set, indexer, optim_paras) (core_index, choice) -> (dense_key, core_index). """ + core_columns = create_core_state_space_columns(optim_paras) + states = load_states(complex_, options) + n_choices = sum(choice_set) + indices = np.full((states.shape[0], n_choices, 2), -1, dtype=np.int64) - core_columns = create_core_state_space_columns(optim_paras) - states = core.loc[core_indices, ["period"] + core_columns].to_numpy() + indices_valid_choices = [i for i, is_valid in enumerate(choice_set) if is_valid] + for i, choice in enumerate(indices_valid_choices): + states_ = states.copy(deep=True) - indices = _insert_indices_of_child_states( - states, - indexer, - choice_set, - n_choices, - len(optim_paras["choices_w_exp"]), - optim_paras["n_lagged_choices"], - ) + states_["choice"] = choice + states_ = apply_law_of_motion_for_core(states_, optim_paras) + + states_ = states_[["period"] + core_columns] + + indices[:, i, 0], indices[:, i, 1] = map_states_to_core_key_and_core_index( + states_.to_numpy(), indexer + ) return indices diff --git a/respy/tests/test_simulate.py b/respy/tests/test_simulate.py index cbb17460c..34e807e13 100644 --- a/respy/tests/test_simulate.py +++ b/respy/tests/test_simulate.py @@ -11,7 +11,7 @@ from respy.pre_processing.data_checking import check_simulated_data from respy.pre_processing.model_processing import process_params_and_options from respy.pre_processing.specification_helpers import generate_obs_labels -from respy.simulate import _apply_law_of_motion +from respy.shared import apply_law_of_motion_for_core from respy.tests.random_model import generate_random_model from respy.tests.utils import process_model_or_seed @@ -154,7 +154,6 @@ def test_distribution_of_observables(): def test_apply_law_of_motion(i): df = pd.read_csv( TEST_DIR / "test_simulate" / f"test_apply_law_of_motion_{i}_in.csv", - index_col=["identifier", "period"], ) optim_paras = yaml.safe_load( TEST_DIR.joinpath( @@ -162,11 +161,10 @@ def test_apply_law_of_motion(i): ).read_text() ) - new_df = _apply_law_of_motion(df, optim_paras) + new_df = apply_law_of_motion_for_core(df, optim_paras).drop(columns="choice") expected = pd.read_csv( TEST_DIR / "test_simulate" / f"test_apply_law_of_motion_{i}_out.csv", - index_col=["identifier", "period"], ) assert new_df.equals(expected) diff --git a/tox.ini b/tox.ini index 731342998..7924fe0cb 100644 --- a/tox.ini +++ b/tox.ini @@ -73,7 +73,7 @@ ignore = D002,D004 [flake8] exclude = docs/how_to_guides/_numerical_integration.py -max-line-length = 89 +max-line-length = 88 ignore = E203 ; ignores whitespace around : which is enforced by Black. W503 ; ignores linebreak before binary operator which is enforced by Black.