diff --git a/pyomo/contrib/parmest/parmest.py b/pyomo/contrib/parmest/parmest.py index ea9dfc00640..875ff6e7c42 100644 --- a/pyomo/contrib/parmest/parmest.py +++ b/pyomo/contrib/parmest/parmest.py @@ -37,6 +37,7 @@ import pyomo.contrib.parmest.utils.create_ef as local_ef import pyomo.contrib.parmest.utils.scenario_tree as scenario_tree +from enum import Enum import re import importlib as im import logging @@ -61,6 +62,8 @@ from pyomo.opt import SolverFactory from pyomo.environ import Block, ComponentUID +from pyomo.contrib.sensitivity_toolbox.sens import get_dsdp + import pyomo.contrib.parmest.utils as utils import pyomo.contrib.parmest.graphics as graphics from pyomo.dae import ContinuousSet @@ -229,12 +232,509 @@ def _experiment_instance_creation_callback( def SSE(model): """ - Sum of squared error between `experiment_output` model and data values + Returns an expression that is used to compute the sum of squared errors + ('SSE') objective, assuming Gaussian i.i.d. errors + + Argument: + model: annotated Pyomo model """ - expr = sum((y - y_hat) ** 2 for y, y_hat in model.experiment_outputs.items()) + # check if the model has all the required suffixes + _check_model_labels_helper(model) + + # SSE between the prediction and observation of the measured variables + expr = sum((y - y_hat) ** 2 for y_hat, y in model.experiment_outputs.items()) return expr +def SSE_weighted(model): + """ + Returns an expression that is used to compute the 'SSE_weighted' objective, + assuming Gaussian i.i.d. errors, with measurement error standard deviation + defined in the annotated Pyomo model + + Argument: + model: annotated Pyomo model + """ + # check if the model has all the required suffixes + _check_model_labels_helper(model) + + # Check that measurement errors exist + if hasattr(model, "measurement_error"): + pass + else: + raise AttributeError( + 'Experiment model does not have suffix "measurement_error". ' + '"measurement_error" is a required suffix for the "SSE_weighted" ' + 'objective.' + ) + + # check if all the values of the measurement error standard deviation + # have been supplied + if all( + model.measurement_error[y_hat] is not None for y_hat in model.experiment_outputs + ): + # calculate the weighted SSE between the prediction and observation of the + # measured variables + expr = (1 / 2) * sum( + ((y - y_hat) / model.measurement_error[y_hat]) ** 2 + for y_hat, y in model.experiment_outputs.items() + ) + return expr + else: + raise ValueError( + 'One or more values are missing from "measurement_error". All values of ' + 'the measurement errors are required for the "SSE_weighted" objective.' + ) + + +def _check_model_labels_helper(model): + """ + Checks if the annotated Pyomo model contains the necessary suffixes + + Argument: + model: annotated Pyomo model for suffix checking + """ + # check that experimental outputs exist + if hasattr(model, "experiment_outputs"): + pass + else: + raise AttributeError( + 'Experiment model does not have suffix "experiment_outputs".' + ) + + # Check that unknown parameters exist + if hasattr(model, "unknown_parameters"): + pass + else: + raise AttributeError( + 'Experiment model does not have suffix "unknown_parameters".' + ) + + logger.setLevel(level=logging.INFO) + logger.info("Model has expected labels.") + + +def _get_labeled_model_helper(experiment): + """ + Checks if the Experiment class object has a "get_labeled_model" function + + Argument: + experiment: Estimator class object that contains the model for a particular + experimental condition + + Returns: + model: Annotated Pyomo model + """ + try: + model = experiment.get_labeled_model().clone() + except Exception as e: + raise AttributeError( + f'The experiment object must have a "get_labeled_model" function. ' + f'The original error was {e}.' + ) + + return model + + +class CovarianceMethodLib(Enum): + finite_difference = "finite_difference" + automatic_differentiation_kaug = "automatic_differentiation_kaug" + reduced_hessian = "reduced_hessian" + + +class ObjectiveLib(Enum): + SSE = "SSE" + SSE_weighted = "SSE_weighted" + + +class UnsupportedArgsLib(Enum): + calc_cov = "calc_cov" + cov_n = "cov_n" + + +# Compute the Jacobian matrix of measured variables with respect to the parameters +def _compute_jacobian(experiment, theta_vals, step, solver, tee): + """ + Computes the Jacobian matrix of the measured variables with respect to the + parameters using the central finite difference scheme + + Arguments: + experiment: Estimator class object that contains the model for a particular + experimental condition + theta_vals: dictionary containing the estimates of the unknown parameters + step: float used for relative perturbation of the parameters, + e.g., step=0.02 is a 2% perturbation + solver: string ``solver`` object specified by the user, e.g., 'ipopt' + tee: boolean solver option to be passed for verbose output + + Returns: + J: Jacobian matrix + """ + # grab the model + model = _get_labeled_model_helper(experiment) + + # check if the model has all the required suffixes + _check_model_labels_helper(model) + + # fix the value of the unknown parameters to the estimated values + params = [k for k, v in model.unknown_parameters.items()] + for param in params: + param.fix(theta_vals[param.name]) + + # re-solve the model with the estimated parameters + try: + solver = pyo.SolverFactory(solver) + res = solver.solve(model, tee=tee) + pyo.assert_optimal_termination(res) + except Exception as e: + raise RuntimeError( + f"Model from experiment did not solve appropriately. Make sure the " + f"model is well-posed. The original error was {e}." + ) + + # get the measured variables + y_hat_list = [y_hat for y_hat, y in model.experiment_outputs.items()] + + # get the estimated parameter values + param_values = [p.value for p in params] + + # get the number of parameters and measured variables + n_params = len(param_values) + n_outputs = len(y_hat_list) + + # compute the sensitivity of measured variables to the parameters (Jacobian) + J = np.zeros((n_outputs, n_params)) + + for i, param in enumerate(params): + # store original value of the parameter + orig_value = param_values[i] + + # calculate the relative perturbation + relative_perturbation = step * orig_value + + # Forward perturbation + param.fix(orig_value + relative_perturbation) + + # solve the model + try: + res = solver.solve(model, tee=tee) + pyo.assert_optimal_termination(res) + except Exception as e: + raise RuntimeError( + f"Model from experiment did not solve appropriately. Make sure the " + f"model is well-posed. The original error was {e}." + ) + + # forward perturbation measured variables + y_hat_plus = [pyo.value(y_hat) for y_hat, y in model.experiment_outputs.items()] + + # Backward perturbation + param.fix(orig_value - relative_perturbation) + + # re-solve the model + try: + res = solver.solve(model, tee=tee) + pyo.assert_optimal_termination(res) + except Exception as e: + raise RuntimeError( + f"Model from experiment did not solve appropriately. Make sure the " + f"model is well-posed. The original error was {e}." + ) + + # backward perturbation measured variables + y_hat_minus = [ + pyo.value(y_hat) for y_hat, y in model.experiment_outputs.items() + ] + + # Restore the original parameter value + param.fix(orig_value) + + # Central difference approximation for the Jacobian + J[:, i] = [ + (y_hat_plus[w] - y_hat_minus[w]) / (2 * relative_perturbation) + for w in range(len(y_hat_plus)) + ] + + return J + + +# Compute the covariance matrix of the estimated parameters +def compute_covariance_matrix( + experiment_list, method, theta_vals, step, solver, tee, estimated_var=None +): + """ + Computes the covariance matrix of the estimated parameters using + 'finite_difference' and 'automatic_differentiation_kaug' methods + + Arguments: + experiment_list: list of Estimator class objects containing the model for + different experimental conditions + method: string ``method`` object specified by the user, + e.g., 'finite_difference' + theta_vals: dictionary containing the estimates of the unknown parameters + step: float used for relative perturbation of the parameters, + e.g., step=0.02 is a 2% perturbation + solver: string ``solver`` object specified by the user, e.g., 'ipopt' + tee: boolean solver option to be passed for verbose output + estimated_var: value of the estimated variance of the measurement error + in cases where the user does not supply the measurement + error standard deviation + + Returns: + cov: covariance matrix of the estimated parameters + """ + # check if the supplied method is supported + try: + cov_method = CovarianceMethodLib(method) + except ValueError: + raise ValueError( + f"Invalid method: '{method}'. " + f"Choose from {[e.value for e in CovarianceMethodLib]}." + ) + + if cov_method == CovarianceMethodLib.finite_difference: + # store the FIM of all experiments + FIM_all_exp = [] + for ( + experiment + ) in experiment_list: # loop through the experiments and compute the FIM + FIM_all_exp.append( + _finite_difference_FIM( + experiment, + theta_vals=theta_vals, + step=step, + solver=solver, + tee=tee, + estimated_var=estimated_var, + ) + ) + + FIM = np.sum(FIM_all_exp, axis=0) + + # covariance matrix + try: + cov = np.linalg.inv(FIM) + except np.linalg.LinAlgError: + cov = np.linalg.pinv(FIM) + print("The FIM is singular. Using pseudo-inverse instead.") + cov = pd.DataFrame(cov, index=theta_vals.keys(), columns=theta_vals.keys()) + elif cov_method == CovarianceMethodLib.automatic_differentiation_kaug: + # store the FIM of all experiments + FIM_all_exp = [] + for ( + experiment + ) in experiment_list: # loop through the experiments and compute the FIM + FIM_all_exp.append( + _kaug_FIM( + experiment, + theta_vals=theta_vals, + solver=solver, + tee=tee, + estimated_var=estimated_var, + ) + ) + + FIM = np.sum(FIM_all_exp, axis=0) + + # covariance matrix + try: + cov = np.linalg.inv(FIM) + except np.linalg.LinAlgError: + cov = np.linalg.pinv(FIM) + print("The FIM is singular. Using pseudo-inverse instead.") + cov = pd.DataFrame(cov, index=theta_vals.keys(), columns=theta_vals.keys()) + else: + raise ValueError( + f'The method provided, {method}, must be either "finite_difference" or ' + f'"automatic_differentiation_kaug".' + ) + + return cov + + +# compute the Fisher information matrix of the estimated parameters using +# 'finite_difference' +def _finite_difference_FIM( + experiment, theta_vals, step, solver, tee, estimated_var=None +): + """ + Computes the Fisher information matrix from 'finite_difference' Jacobian matrix + and measurement errors standard deviation defined in the annotated Pyomo model + + Arguments: + experiment: Estimator class object that contains the model for a particular + experimental condition + theta_vals: dictionary containing the estimates of the unknown parameters + step: float used for relative perturbation of the parameters, + e.g., step=0.02 is a 2% perturbation + solver: string ``solver`` object specified by the user, e.g., 'ipopt' + tee: boolean solver option to be passed for verbose output + estimated_var: value of the estimated variance of the measurement error in + cases where the user does not supply the measurement error + standard deviation + + Returns: + FIM: Fisher information matrix about the parameters + """ + # compute the Jacobian matrix using finite difference + J = _compute_jacobian(experiment, theta_vals, step, solver, tee) + + # computing the condition number of the Jacobian matrix + cond_number_jac = np.linalg.cond(J) + + # set up logging + logger.info(f"The condition number of the Jacobian matrix is {cond_number_jac}") + + # grab the model + model = _get_labeled_model_helper(experiment) + + # extract the measured variables and measurement errors + y_hat_list = [y_hat for y_hat, y in model.experiment_outputs.items()] + + # check if the model has a 'measurement_error' attribute and the measurement + # error standard deviation has been supplied + if hasattr(model, "measurement_error") and all( + model.measurement_error[y_hat] is not None for y_hat in model.experiment_outputs + ): + error_list = [ + model.measurement_error[y_hat] for y_hat in model.experiment_outputs + ] + + # compute the matrix of the inverse of the measurement variance + # the following assumes independent measurement errors + W = np.diag([1 / (err**2) for err in error_list]) + + # check if error list is consistent + if len(error_list) == 0 or len(y_hat_list) == 0: + raise ValueError( + "Experiment outputs and measurement errors cannot be empty." + ) + + # check if the dimension of error_list is same with that of y_hat_list + if len(error_list) != len(y_hat_list): + raise ValueError( + "Experiment outputs and measurement errors are not the same length." + ) + + # calculate the FIM using the formula in Lilonfe et al. (2025) + FIM = J.T @ W @ J + else: + FIM = (1 / estimated_var) * (J.T @ J) + + return FIM + + +# compute the Fisher information matrix of the estimated parameters using +# 'automatic_differentiation_kaug' +def _kaug_FIM(experiment, theta_vals, solver, tee, estimated_var=None): + """ + Computes the FIM using 'automatic_differentiation_kaug', a sensitivity-based + approach that uses the annotated Pyomo model optimality condition and + user-defined measurement errors standard deviation + + Disclaimer - code adopted from the kaug function implemented in Pyomo.DoE + + Arguments: + experiment: Estimator class object that contains the model for a particular + experimental condition + theta_vals: dictionary containing the estimates of the unknown parameters + solver: string ``solver`` object specified by the user, e.g., 'ipopt' + tee: boolean solver option to be passed for verbose output + estimated_var: value of the estimated variance of the measurement error in + cases where the user does not supply the measurement error + standard deviation + + Returns: + FIM: Fisher information matrix about the parameters + """ + # grab the model + model = _get_labeled_model_helper(experiment) + + # fix the parameter values to the estimated values + params = [k for k, v in model.unknown_parameters.items()] + for param in params: + param.fix(theta_vals[param.name]) + + # re-solve the model with the estimated parameters + try: + solver = pyo.SolverFactory(solver) + res = solver.solve(model, tee=tee) + pyo.assert_optimal_termination(res) + except Exception as e: + raise RuntimeError( + f"Model from experiment did not solve appropriately. Make sure the " + f"model is well-posed. The original error was {e}." + ) + + # add zero (dummy/placeholder) objective function + if not hasattr(model, "objective"): + model.objective = pyo.Objective(expr=0, sense=pyo.minimize) + + solver.solve(model, tee=tee) + + # Probe the solved model for dsdp results (sensitivities s.t. parameters) + params_dict = {k.name: v for k, v in model.unknown_parameters.items()} + params_names = list(params_dict.keys()) + + dsdp_re, col = get_dsdp(model, params_names, params_dict, tee=tee) + + # analyze result + dsdp_array = dsdp_re.toarray().T + + # store dsdp returned + dsdp_extract = [] + + # get right lines from results + measurement_index = [] + + # loop over measurement variables and their time points + for k, v in model.experiment_outputs.items(): + name = k.name + try: + kaug_no = col.index(name) + measurement_index.append(kaug_no) + # get right line of dsdp + dsdp_extract.append(dsdp_array[kaug_no]) + except: + # k_aug does not provide value for fixed variables + logging.getLogger(__name__).debug("The variable is fixed: %s", name) + # produce the sensitivity for fixed variables + zero_sens = np.zeros(len(params_names)) + # for fixed variables, the sensitivity are a zero vector + dsdp_extract.append(zero_sens) + + # Extract and calculate sensitivity if scaled by constants or parameters. + jac = [[] for _ in params_names] + + for d in range(len(dsdp_extract)): + for k, v in model.unknown_parameters.items(): + p = params_names.index(k.name) # Index of parameter in np array + sensi = dsdp_extract[d][p] + jac[p].append(sensi) + + # record kaug jacobian + kaug_jac = np.array(jac).T + + # compute FIM + # compute matrix of the inverse of the measurement variance + # The following assumes independent measurement error. + W = np.zeros((len(model.measurement_error), len(model.measurement_error))) + all_known_errors = all( + model.measurement_error[y_hat] is not None for y_hat in model.experiment_outputs + ) + count = 0 + for k, v in model.measurement_error.items(): + if all_known_errors: + W[count, count] = 1 / (v**2) + else: + W[count, count] = 1 / estimated_var + count += 1 + + FIM = kaug_jac.T @ W @ kaug_jac + + return FIM + + class Estimator(object): """ Parameter estimation class @@ -279,23 +779,31 @@ def __init__( assert isinstance(experiment_list, list) self.exp_list = experiment_list - # check that an experiment has experiment_outputs and unknown_parameters - model = self.exp_list[0].get_labeled_model() - try: - outputs = [k.name for k, v in model.experiment_outputs.items()] - except: - RuntimeError( - 'Experiment list model does not have suffix ' + '"experiment_outputs".' - ) - try: - params = [k.name for k, v in model.unknown_parameters.items()] - except: - RuntimeError( - 'Experiment list model does not have suffix ' + '"unknown_parameters".' - ) + # check if the experiment has a ``get_labeled_model`` function + model = _get_labeled_model_helper(self.exp_list[0]) + + # check if the model has all the required suffixes + _check_model_labels_helper(model) # populate keyword argument options - self.obj_function = obj_function + if isinstance(obj_function, str): + try: + self.obj_function = ObjectiveLib(obj_function) + except ValueError: + raise ValueError( + f"Invalid objective function: '{obj_function}'. " + f"Choose from {[e.value for e in ObjectiveLib]}." + ) + else: + deprecation_warning( + "You're using a deprecated input to the `obj_function` argument by " + "passing a custom function. This usage will be removed in a " + "future release. Please update to the new parmest interface using " + "the built-in 'SSE' and 'SSE_weighted' objectives.", + version="6.7.2", + ) + self.obj_function = obj_function + self.tee = tee self.diagnostic_mode = diagnostic_mode self.solver_options = solver_options @@ -307,7 +815,7 @@ def __init__( # We could collect the union (or intersect?) of thetas when the models are built theta_names = [] for experiment in self.exp_list: - model = experiment.get_labeled_model() + model = _get_labeled_model_helper(experiment) theta_names.extend([k.name for k, v in model.unknown_parameters.items()]) # Utilize list(dict.fromkeys(theta_names)) to preserve parameter # order compared with list(set(theta_names)), which had @@ -400,7 +908,7 @@ def _create_parmest_model(self, experiment_number): Modify the Pyomo model for parameter estimation """ - model = self.exp_list[experiment_number].get_labeled_model() + model = _get_labeled_model_helper(self.exp_list[experiment_number]) if len(model.unknown_parameters) == 0: model.parmest_dummy_var = pyo.Var(initialize=1.0) @@ -426,8 +934,16 @@ def _create_parmest_model(self, experiment_number): # TODO, this needs to be turned into an enum class of options that still support # custom functions - if self.obj_function == 'SSE': - second_stage_rule = SSE + if isinstance(self.obj_function, Enum): + if self.obj_function == ObjectiveLib.SSE: + second_stage_rule = SSE + elif self.obj_function == ObjectiveLib.SSE_weighted: + second_stage_rule = SSE_weighted + else: + raise ValueError( + f"Invalid objective function: '{self.obj_function.value}'. " + f"Choose from {[e.value for e in ObjectiveLib]}." + ) else: # A custom function uses model.experiment_outputs as data second_stage_rule = self.obj_function @@ -458,8 +974,7 @@ def _Q_opt( solver="ef_ipopt", return_values=[], bootlist=None, - calc_cov=False, - cov_n=None, + **kwargs, ): """ Set up all thetas as first stage Vars, return resulting theta @@ -508,26 +1023,17 @@ def _Q_opt( # Solve the extensive form with ipopt if solver == "ef_ipopt": - if not calc_cov: - # Do not calculate the reduced hessian - - solver = SolverFactory('ipopt') - if self.solver_options is not None: - for key in self.solver_options: - solver.options[key] = self.solver_options[key] - - solve_result = solver.solve(self.ef_instance, tee=self.tee) + if not kwargs: + # The import error will be raised when we attempt to use + # inv_reduced_hessian_barrier below. + # + # elif not asl_available: + # raise ImportError("parmest requires ASL to calculate the " + # "covariance matrix with solver 'ipopt'") - # The import error will be raised when we attempt to use - # inv_reduced_hessian_barrier below. - # - # elif not asl_available: - # raise ImportError("parmest requires ASL to calculate the " - # "covariance matrix with solver 'ipopt'") - else: # parmest makes the fitted parameters stage 1 variables ind_vars = [] - for ndname, Var, solval in ef_nonants(ef): + for nd_name, Var, sol_val in ef_nonants(ef): ind_vars.append(Var) # calculate the reduced hessian (solve_result, inv_red_hes) = ( @@ -539,6 +1045,52 @@ def _Q_opt( ) ) + self.inv_red_hes = inv_red_hes + elif kwargs and all(arg.value in kwargs for arg in UnsupportedArgsLib): + deprecation_warning( + "You're using a deprecated call to the `theta_est()` function " + "with the `calc_cov` and `cov_n` arguments. This usage will be " + "removed in a future release. Please update to the new parmest " + "interface using `cov_est()` function for covariance calculation.", + version="6.7.2", + ) + + calc_cov = kwargs[UnsupportedArgsLib.calc_cov.value] + cov_n = kwargs[UnsupportedArgsLib.cov_n.value] + if not isinstance(calc_cov, bool): + raise TypeError("Expected a boolean for 'calc_cov' argument.") + + if not calc_cov: + # Do not calculate the reduced hessian + + solver = SolverFactory('ipopt') + if self.solver_options is not None: + for key in self.solver_options: + solver.options[key] = self.solver_options[key] + + solve_result = solver.solve(self.ef_instance, tee=self.tee) + + # The import error will be raised when we attempt to use + # inv_reduced_hessian_barrier below. + # + # elif not asl_available: + # raise ImportError("parmest requires ASL to calculate the " + # "covariance matrix with solver 'ipopt'") + else: + # parmest makes the fitted parameters stage 1 variables + ind_vars = [] + for nd_name, Var, sol_val in ef_nonants(ef): + ind_vars.append(Var) + # calculate the reduced hessian + (solve_result, inv_red_hes) = ( + inverse_reduced_hessian.inv_reduced_hessian_barrier( + self.ef_instance, + independent_variables=ind_vars, + solver_options=self.solver_options, + tee=self.tee, + ) + ) + if self.diagnostic_mode: print( ' Solver termination condition = ', @@ -546,44 +1098,60 @@ def _Q_opt( ) # assume all first stage are thetas... - thetavals = {} - for ndname, Var, solval in ef_nonants(ef): + theta_vals = {} + for nd_name, Var, sol_val in ef_nonants(ef): # process the name # the scenarios are blocks, so strip the scenario name - vname = Var.name[Var.name.find(".") + 1 :] - thetavals[vname] = solval + var_name = Var.name[Var.name.find(".") + 1 :] + theta_vals[var_name] = sol_val - objval = pyo.value(ef.EF_Obj) + obj_val = pyo.value(ef.EF_Obj) - if calc_cov: - # Calculate the covariance matrix + # add the estimated theta to the class + self.estimated_theta = theta_vals - # Number of data points considered - n = cov_n + if kwargs and all(arg.value in kwargs for arg in UnsupportedArgsLib): + if calc_cov: + if not isinstance(cov_n, int): + raise TypeError("Expected an integer for 'cov_n' argument.") + num_unknowns = max( + [ + len(experiment.get_labeled_model().unknown_parameters) + for experiment in self.exp_list + ] + ) + assert cov_n > num_unknowns, ( + "The number of datapoints must be greater than the " + "number of parameters to estimate" + ) - # Extract number of fitted parameters - l = len(thetavals) + # Number of data points considered + n = cov_n - # Assumption: Objective value is sum of squared errors - sse = objval + # Extract number of fitted parameters + l = len(theta_vals) - '''Calculate covariance assuming experimental observation errors are - independent and follow a Gaussian - distribution with constant variance. + # Assumption: Objective value is sum of squared errors + sse = obj_val - The formula used in parmest was verified against equations (7-5-15) and - (7-5-16) in "Nonlinear Parameter Estimation", Y. Bard, 1974. + '''Calculate covariance assuming experimental observation errors + are independent and follow a Gaussian distribution + with constant variance. - This formula is also applicable if the objective is scaled by a constant; - the constant cancels out. (was scaled by 1/n because it computes an - expected value.) - ''' - cov = 2 * sse / (n - l) * inv_red_hes - cov = pd.DataFrame( - cov, index=thetavals.keys(), columns=thetavals.keys() - ) + The formula used in parmest was verified against equations + (7-5-15) and (7-5-16) in "Nonlinear Parameter Estimation", + Y. Bard, 1974. - thetavals = pd.Series(thetavals) + This formula is also applicable if the objective is scaled by a + constant; the constant cancels out. + (was scaled by 1/n because it computes an expected value.) + ''' + cov = 2 * sse / (n - l) * inv_red_hes + cov = pd.DataFrame( + cov, index=theta_vals.keys(), columns=theta_vals.keys() + ) + + theta_vals = pd.Series(theta_vals) if len(return_values) > 0: var_values = [] @@ -613,18 +1181,234 @@ def _Q_opt( if len(vals) > 0: var_values.append(vals) var_values = pd.DataFrame(var_values) + + if not kwargs: + return obj_val, theta_vals, var_values + elif kwargs and all(arg.value in kwargs for arg in UnsupportedArgsLib): + if calc_cov: + return obj_val, theta_vals, var_values, cov + else: + return obj_val, theta_vals, var_values + + if not kwargs: + return obj_val, theta_vals + elif kwargs and all(arg.value in kwargs for arg in UnsupportedArgsLib): if calc_cov: - return objval, thetavals, var_values, cov + return obj_val, theta_vals, cov else: - return objval, thetavals, var_values + return obj_val, theta_vals - if calc_cov: - return objval, thetavals, cov + else: + raise RuntimeError("Unknown solver in Q_Opt=" + solver) + + def _cov_at_theta(self, method, solver, cov_n, step): + """ + Covariance matrix calculation using all scenarios in the data + + Argument: + method: string ``method`` object specified by the user, + e.g., 'finite_difference' + solver: string ``solver`` object specified by the user, e.g., 'ipopt' + cov_n: integer, number of datapoints specified by the user which is used + in the objective function + step: float used for relative perturbation of the parameters, + e.g., step=0.02 is a 2% perturbation + + Returns: + cov: pd.DataFrame, covariance matrix of the estimated parameters + """ + # Number of data points considered + n = cov_n + + # Extract the number of fitted parameters + l = len(self.estimated_theta) + + # calculate the sum of squared errors at the estimated parameter values + sse_vals = [] + for experiment in self.exp_list: + model = _get_labeled_model_helper(experiment) + + # fix the value of the unknown parameters to the estimated values + params = [k for k, v in model.unknown_parameters.items()] + for param in params: + param.fix(self.estimated_theta[param.name]) + + # re-solve the model with the estimated parameters + try: + res = pyo.SolverFactory(solver).solve(model, tee=self.tee) + pyo.assert_optimal_termination(res) + except Exception as e: + raise RuntimeError( + f"Model from experiment did not solve appropriately. Make sure the " + f"model is well-posed. The original error was {e}." + ) + + # choose and evaluate the sum of squared errors expression + if self.obj_function == ObjectiveLib.SSE: + sse_expr = SSE(model) + elif self.obj_function == ObjectiveLib.SSE_weighted: + sse_expr = SSE_weighted(model) else: - return objval, thetavals + raise ValueError( + f"Invalid objective function: '{self.obj_function.value}'. " + f"Choose from {[e.value for e in ObjectiveLib]}." + ) + + # evaluate the numerical SSE and store it + sse_val = pyo.value(sse_expr) + sse_vals.append(sse_val) + + sse = sum(sse_vals) # total SSE + """Calculate covariance assuming experimental observation errors are + independent and follow a Gaussian distribution with constant variance. + + The formula used in parmest was verified against equations (7-5-15) and + (7-5-16) in "Nonlinear Parameter Estimation", Y. Bard, 1974. + + This formula is also applicable if the objective is scaled by a constant; + the constant cancels out. (was scaled by 1/n because it computes an + expected value.) + """ + # check if the user-supplied covariance method is supported + try: + cov_method = CovarianceMethodLib(method) + except ValueError: + raise ValueError( + f"Invalid method: '{method}'. Choose " + f"from {[e.value for e in CovarianceMethodLib]}." + ) + + # check if the user specified 'SSE' or 'SSE_weighted' as the objective function + if self.obj_function == ObjectiveLib.SSE: + # check if the user defined the 'measurement_error' attribute + if hasattr(model, "measurement_error"): + # get the measurement errors + meas_error = [ + model.measurement_error[y_hat] + for y_hat, y in model.experiment_outputs.items() + ] + + # check if the user supplied the values of the measurement errors + if all(item is None for item in meas_error): + measurement_var = sse / ( + n - l + ) # estimate of the measurement variance + if cov_method == CovarianceMethodLib.reduced_hessian: + cov = ( + 2 * measurement_var * self.inv_red_hes + ) # covariance matrix + cov = pd.DataFrame( + cov, + index=self.estimated_theta.keys(), + columns=self.estimated_theta.keys(), + ) + elif ( + cov_method == CovarianceMethodLib.finite_difference + or cov_method + == CovarianceMethodLib.automatic_differentiation_kaug + ): + cov = compute_covariance_matrix( + self.exp_list, + method, + theta_vals=self.estimated_theta, + solver=solver, + step=step, + tee=self.tee, + estimated_var=measurement_var, + ) + else: + raise NotImplementedError( + 'Only "finite_difference", "reduced_hessian", and ' + '"automatic_differentiation_kaug" methods are supported.' + ) + elif all(item is not None for item in meas_error): + if cov_method == CovarianceMethodLib.reduced_hessian: + cov = 2 * (meas_error[0] ** 2) * self.inv_red_hes + cov = pd.DataFrame( + cov, + index=self.estimated_theta.keys(), + columns=self.estimated_theta.keys(), + ) + elif ( + cov_method == CovarianceMethodLib.finite_difference + or cov_method + == CovarianceMethodLib.automatic_differentiation_kaug + ): + cov = compute_covariance_matrix( + self.exp_list, + method, + theta_vals=self.estimated_theta, + solver=solver, + step=step, + tee=self.tee, + ) + else: + raise NotImplementedError( + 'Only "finite_difference", "reduced_hessian", and ' + '"automatic_differentiation_kaug" methods are supported.' + ) + else: + raise ValueError( + "One or more values of the measurement errors have " + "not been supplied." + ) + else: + raise AttributeError( + 'Experiment model does not have suffix "measurement_error".' + ) + elif self.obj_function == ObjectiveLib.SSE_weighted: + # check if the user defined the 'measurement_error' attribute + if hasattr(model, "measurement_error"): + meas_error = [ + model.measurement_error[y_hat] + for y_hat, y in model.experiment_outputs.items() + ] + + # check if the user supplied the values for the measurement errors + if all(item is not None for item in meas_error): + if ( + cov_method == CovarianceMethodLib.finite_difference + or cov_method + == CovarianceMethodLib.automatic_differentiation_kaug + ): + cov = compute_covariance_matrix( + self.exp_list, + method, + theta_vals=self.estimated_theta, + step=step, + solver=solver, + tee=self.tee, + ) + elif cov_method == CovarianceMethodLib.reduced_hessian: + cov = self.inv_red_hes + cov = pd.DataFrame( + cov, + index=self.estimated_theta.keys(), + columns=self.estimated_theta.keys(), + ) + else: + raise NotImplementedError( + 'Only "finite_difference", "reduced_hessian", and ' + '"automatic_differentiation_kaug" methods are supported.' + ) + else: + raise ValueError( + 'One or more values of the measurement errors have not been ' + 'supplied. All values of the measurement errors are required ' + 'for the "SSE_weighted" objective.' + ) + else: + raise AttributeError( + 'Experiment model does not have suffix "measurement_error".' + ) else: - raise RuntimeError("Unknown solver in Q_Opt=" + solver) + raise NotImplementedError( + 'Covariance calculation is only supported for "SSE" and ' + '"SSE_weighted" objectives.' + ) + + return cov def _Q_at_theta(self, thetavals, initialize_parmest_model=False): """ @@ -855,9 +1639,7 @@ def _get_sample_list(self, samplesize, num_samples, replacement=True): return samplelist - def theta_est( - self, solver="ef_ipopt", return_values=[], calc_cov=False, cov_n=None - ): + def theta_est(self, solver="ef_ipopt", return_values=[], **kwargs): """ Parameter estimation using all scenarios in the data @@ -867,60 +1649,87 @@ def theta_est( Currently only "ef_ipopt" is supported. Default is "ef_ipopt". return_values: list, optional List of Variable names, used to return values from the model for data reconciliation - calc_cov: boolean, optional - If True, calculate and return the covariance matrix (only for "ef_ipopt" solver). - Default is False. - cov_n: int, optional - If calc_cov=True, then the user needs to supply the number of datapoints - that are used in the objective function. Returns ------- objectiveval: float The objective function value - thetavals: pd.Series + theta_vals: pd.Series Estimated values for theta variable values: pd.DataFrame Variable values for each variable name in return_values (only for solver='ef_ipopt') - cov: pd.DataFrame - Covariance matrix of the fitted parameters (only for solver='ef_ipopt') """ # check if we are using deprecated parmest if self.pest_deprecated is not None: - return self.pest_deprecated.theta_est( - solver=solver, - return_values=return_values, - calc_cov=calc_cov, - cov_n=cov_n, - ) + if not kwargs: + return self.pest_deprecated.theta_est( + solver=solver, return_values=return_values + ) + elif kwargs and all(arg.value in kwargs for arg in UnsupportedArgsLib): + calc_cov = kwargs[UnsupportedArgsLib.calc_cov.value] + cov_n = kwargs[UnsupportedArgsLib.cov_n.value] + return self.pest_deprecated.theta_est( + solver=solver, + return_values=return_values, + calc_cov=calc_cov, + cov_n=cov_n, + ) assert isinstance(solver, str) assert isinstance(return_values, list) - assert isinstance(calc_cov, bool) - if calc_cov: - num_unknowns = max( - [ - len(experiment.get_labeled_model().unknown_parameters) - for experiment in self.exp_list - ] - ) - assert isinstance(cov_n, int), ( - "The number of datapoints that are used in the objective function is " - "required to calculate the covariance matrix" - ) - assert ( - cov_n > num_unknowns - ), "The number of datapoints must be greater than the number of parameters to estimate" return self._Q_opt( - solver=solver, - return_values=return_values, - bootlist=None, - calc_cov=calc_cov, - cov_n=cov_n, + solver=solver, return_values=return_values, bootlist=None, **kwargs ) + def cov_est( + self, method="finite_difference", solver="ipopt", cov_n=None, step=1e-3 + ): + """ + Covariance matrix calculation using all scenarios in the data + + Argument: + method: string ``method`` object specified by the user + options - 'finite_difference', 'reduced_hessian', + and 'automatic_differentiation_kaug' + solver: string ``solver`` object specified by the user, e.g., 'ipopt' + cov_n: integer, number of datapoints specified by the user which is used + in the objective function + step: float used for relative perturbation of the parameters, e.g., + step=0.02 is a 2% perturbation + + Returns: + cov: pd.DataFrame, covariance matrix of the estimated parameters + """ + # check if the solver input is a string + if not isinstance(solver, str): + raise TypeError("Expected a string for the solver, e.g., 'ipopt'") + + # check if the method input is a string + if not isinstance(method, str): + raise TypeError( + "Expected a string for the method, e.g., 'finite_difference'" + ) + + # check if the user-supplied number of datapoints is an integer + if not isinstance(cov_n, int): + raise TypeError("Expected an integer for 'cov_n' argument.") + + # number of unknown parameters + num_unknowns = max( + [ + len(experiment.get_labeled_model().unknown_parameters) + for experiment in self.exp_list + ] + ) + assert cov_n > num_unknowns, ( + "The number of datapoints must be greater than the " + "number of parameters to estimate." + ) + + return self._cov_at_theta(method=method, solver=solver, cov_n=cov_n, step=step) + def theta_est_bootstrap( self, bootstrap_samples, diff --git a/pyomo/contrib/parmest/tests/test_parmest.py b/pyomo/contrib/parmest/tests/test_parmest.py index a6a549757f7..6c3c9686675 100644 --- a/pyomo/contrib/parmest/tests/test_parmest.py +++ b/pyomo/contrib/parmest/tests/test_parmest.py @@ -9,12 +9,13 @@ # This software is distributed under the 3-clause BSD License. # ___________________________________________________________________________ -import platform import sys import os import subprocess from itertools import product +import pytest +from parameterized import parameterized, parameterized_class import pyomo.common.unittest as unittest import pyomo.contrib.parmest.parmest as parmest import pyomo.contrib.parmest.graphics as graphics @@ -26,14 +27,520 @@ from pyomo.common.fileutils import this_file_dir from pyomo.contrib.parmest.experiment import Experiment from pyomo.contrib.pynumero.asl import AmplInterface -from pyomo.opt import SolverFactory -is_osx = platform.mac_ver()[0] != "" -ipopt_available = SolverFactory("ipopt").available() +ipopt_available = pyo.SolverFactory("ipopt").available() pynumero_ASL_available = AmplInterface.available() testdir = this_file_dir() +# Test class for the built-in "SSE" and "SSE_weighted" objective functions +# validated the results using the Rooney-Biegler paper example +# Rooney-Biegler paper example is the case when the measurement error is None +# we considered another case when the user supplies the value of the measurement error +@unittest.skipIf( + not parmest.parmest_available, + "Cannot test parmest: required dependencies are missing", +) +@unittest.skipIf(not ipopt_available, "The 'ipopt' command is not available") + +# we use parameterized_class to test the two objective functions +# over the two cases of the measurement error. Included a third objective function +# to test the error message when an incorrect objective function is supplied +@parameterized_class( + ("measurement_std", "objective_function"), + [ + (None, "SSE"), + (None, "SSE_weighted"), + (None, "incorrect_obj"), + (0.1, "SSE"), + (0.1, "SSE_weighted"), + (0.1, "incorrect_obj"), + ], +) +class NewTestRooneyBiegler(unittest.TestCase): + + def setUp(self): + self.data = pd.DataFrame( + data=[[1, 8.3], [2, 10.3], [3, 19.0], [4, 16.0], [5, 15.6], [7, 19.8]], + columns=["hour", "y"], + ) + + # create the Rooney-Biegler model + def rooney_biegler_model(): + """ + Formulates the Pyomo model of the Rooney-Biegler example + + Returns: + m: Pyomo model + """ + m = pyo.ConcreteModel() + + m.asymptote = pyo.Var(within=pyo.NonNegativeReals, initialize=10) + m.rate_constant = pyo.Var(within=pyo.NonNegativeReals, initialize=0.2) + + m.hour = pyo.Var(within=pyo.PositiveReals, initialize=0.1) + m.y = pyo.Var(within=pyo.NonNegativeReals) + + @m.Constraint() + def response_rule(m): + return m.y == m.asymptote * (1 - pyo.exp(-m.rate_constant * m.hour)) + + return m + + # create the Experiment class + class RooneyBieglerExperiment(Experiment): + def __init__(self, experiment_number, hour, y, measurement_error_std): + self.y = y + self.hour = hour + self.experiment_number = experiment_number + self.model = None + self.measurement_error_std = measurement_error_std + + def get_labeled_model(self): + self.create_model() + self.finalize_model() + self.label_model() + + return self.model + + def create_model(self): + m = self.model = rooney_biegler_model() + + return m + + def finalize_model(self): + m = self.model + + # fix the input variable + m.hour.fix(self.hour) + + return m + + def label_model(self): + m = self.model + + # add experiment outputs + m.experiment_outputs = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.experiment_outputs.update([(m.y, self.y)]) + + # add unknown parameters + m.unknown_parameters = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.unknown_parameters.update( + (k, pyo.value(k)) for k in [m.asymptote, m.rate_constant] + ) + + # add measurement error + m.measurement_error = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.measurement_error.update([(m.y, self.measurement_error_std)]) + + return m + + # extract the input and output variables + hour_data = self.data["hour"] + y_data = self.data["y"] + + # create the experiments list + rooney_biegler_exp_list = [] + for i in range(self.data.shape[0]): + rooney_biegler_exp_list.append( + RooneyBieglerExperiment( + i, hour_data[i], y_data[i], self.measurement_std + ) + ) + + self.exp_list = rooney_biegler_exp_list + + if self.objective_function == "incorrect_obj": + with pytest.raises( + ValueError, + match="Invalid objective function: 'incorrect_obj'\. " + "Choose from \['SSE', 'SSE_weighted'\]\.", + ): + self.pest = parmest.Estimator( + self.exp_list, obj_function=self.objective_function, tee=True + ) + else: + self.pest = parmest.Estimator( + self.exp_list, obj_function=self.objective_function, tee=True + ) + + def check_rooney_biegler_parameters( + self, obj_val, theta_vals, obj_function, measurement_error + ): + """ + Checks if the objective value and parameter estimates are equal to the + expected values and agree with the results of the Rooney-Biegler paper + + Argument: + obj_val: the objective value of the annotated Pyomo model + theta_vals: dictionary of the estimated parameters + obj_function: a string of the objective function supplied by the user, + e.g., 'SSE' + measurement_error: float or integer value of the measurement error + standard deviation + """ + if obj_function == "SSE": + self.assertAlmostEqual(obj_val, 4.33171, places=2) + elif obj_function == "SSE_weighted" and measurement_error is not None: + self.assertAlmostEqual(obj_val, 216.58556, places=2) + + self.assertAlmostEqual( + theta_vals["asymptote"], 19.1426, places=2 + ) # 19.1426 from the paper + self.assertAlmostEqual( + theta_vals["rate_constant"], 0.5311, places=2 + ) # 0.5311 from the paper + + def check_rooney_biegler_covariance( + self, cov, cov_method, obj_function, measurement_error + ): + """ + Checks if the covariance matrix elements are equal to the expected + values and agree with the results of the Rooney-Biegler paper + + Argument: + cov: pd.DataFrame, covariance matrix of the estimated parameters + cov_method: string ``method`` object specified by the user + options - 'finite_difference', 'reduced_hessian', + and 'automatic_differentiation_kaug' + obj_function: a string of the objective function supplied by the user, + e.g., 'SSE' + measurement_error: float or integer value of the measurement error + standard deviation + """ + + # get indices in covariance matrix + cov_cols = cov.columns.to_list() + asymptote_index = [idx for idx, s in enumerate(cov_cols) if "asymptote" in s][0] + rate_constant_index = [ + idx for idx, s in enumerate(cov_cols) if "rate_constant" in s + ][0] + + if measurement_error is None: + if obj_function == "SSE": + if ( + cov_method == "finite_difference" + or cov_method == "automatic_differentiation_kaug" + ): + self.assertAlmostEqual( + cov.iloc[asymptote_index, asymptote_index], 6.229612, places=2 + ) # 6.22864 from paper + self.assertAlmostEqual( + cov.iloc[asymptote_index, rate_constant_index], + -0.432265, + places=2, + ) # -0.4322 from paper + self.assertAlmostEqual( + cov.iloc[rate_constant_index, asymptote_index], + -0.432265, + places=2, + ) # -0.4322 from paper + self.assertAlmostEqual( + cov.iloc[rate_constant_index, rate_constant_index], + 0.041242, + places=2, + ) # 0.04124 from paper + else: + self.assertAlmostEqual( + cov.iloc[asymptote_index, asymptote_index], 36.935351, places=2 + ) # 6.22864 from paper + self.assertAlmostEqual( + cov.iloc[asymptote_index, rate_constant_index], + -2.551392, + places=2, + ) # -0.4322 from paper + self.assertAlmostEqual( + cov.iloc[rate_constant_index, asymptote_index], + -2.551392, + places=2, + ) # -0.4322 from paper + self.assertAlmostEqual( + cov.iloc[rate_constant_index, rate_constant_index], + 0.243428, + places=2, + ) # 0.04124 from paper + else: + if obj_function == "SSE" or obj_function == "SSE_weighted": + if ( + cov_method == "finite_difference" + or cov_method == "automatic_differentiation_kaug" + ): + self.assertAlmostEqual( + cov.iloc[asymptote_index, asymptote_index], 0.009588, places=4 + ) + self.assertAlmostEqual( + cov.iloc[asymptote_index, rate_constant_index], + -0.000665, + places=4, + ) + self.assertAlmostEqual( + cov.iloc[rate_constant_index, asymptote_index], + -0.000665, + places=4, + ) + self.assertAlmostEqual( + cov.iloc[rate_constant_index, rate_constant_index], + 0.000063, + places=4, + ) + else: + self.assertAlmostEqual( + cov.iloc[asymptote_index, asymptote_index], 0.056845, places=4 + ) + self.assertAlmostEqual( + cov.iloc[asymptote_index, rate_constant_index], + -0.003927, + places=4, + ) + self.assertAlmostEqual( + cov.iloc[rate_constant_index, asymptote_index], + -0.003927, + places=4, + ) + self.assertAlmostEqual( + cov.iloc[rate_constant_index, rate_constant_index], + 0.000375, + places=4, + ) + + # test the covariance calculation of the three supported methods + # added a 'unsupported_method' to test the error message when the method supplied + # is not supported + @parameterized.expand( + [ + ("finite_difference"), + ("automatic_differentiation_kaug"), + ("reduced_hessian"), + ("unsupported_method"), + ] + ) + def test_parmest_covariance(self, cov_method): + """ + Estimates the parameters and covariance matrix and compares them + with the results of the Rooney-Biegler paper + + Argument: + cov_method: string ``method`` object specified by the user + options - 'finite_difference', 'reduced_hessian', + and 'automatic_differentiation_kaug' + """ + if self.measurement_std is None: + if self.objective_function == "SSE": + + # estimate the parameters + obj_val, theta_vals = self.pest.theta_est() + + # check the parameter estimation result + self.check_rooney_biegler_parameters( + obj_val, + theta_vals, + obj_function=self.objective_function, + measurement_error=self.measurement_std, + ) + + # calculate the covariance matrix + if cov_method in ( + "finite_difference", + "automatic_differentiation_kaug", + "reduced_hessian", + ): + cov = self.pest.cov_est(cov_n=6, method=cov_method) + + # check the covariance calculation results + self.check_rooney_biegler_covariance( + cov, + cov_method, + obj_function=self.objective_function, + measurement_error=self.measurement_std, + ) + else: + with pytest.raises( + ValueError, + match=r"Invalid method: 'unsupported_method'\. Choose from " + r"\['finite_difference', " + r"'automatic_differentiation_kaug', " + r"'reduced_hessian'\]\.", + ): + cov = self.pest.cov_est(cov_n=6, method=cov_method) + elif self.objective_function == "SSE_weighted": + with pytest.raises( + ValueError, + match='One or more values are missing from ' + '"measurement_error". All values of the measurement errors are ' + 'required for the "SSE_weighted" objective.', + ): + # we expect this error when estimating the parameters + obj_val, theta_vals = self.pest.theta_est() + else: + if ( + self.objective_function == "SSE" + or self.objective_function == "SSE_weighted" + ): + # estimate the parameters + obj_val, theta_vals = self.pest.theta_est() + + # check the parameter estimation results + self.check_rooney_biegler_parameters( + obj_val, + theta_vals, + obj_function=self.objective_function, + measurement_error=self.measurement_std, + ) + + # calculate the covariance matrix + if cov_method in ( + "finite_difference", + "automatic_differentiation_kaug", + "reduced_hessian", + ): + cov = self.pest.cov_est(cov_n=6, method=cov_method) + + # check the covariance calculation results + self.check_rooney_biegler_covariance( + cov, + cov_method, + obj_function=self.objective_function, + measurement_error=self.measurement_std, + ) + else: + with pytest.raises( + ValueError, + match=r"Invalid method: 'unsupported_method'\. Choose from " + r"\['finite_difference', " + r"'automatic_differentiation_kaug', " + r"'reduced_hessian'\]\.", + ): + cov = self.pest.cov_est(cov_n=6, method=cov_method) + + def test_cov_scipy_least_squares_comparison(self): + """ + Scipy results differ in the 3rd decimal place from the paper. It is possible + the paper used an alternative finite difference approximation for the Jacobian. + """ + + def model(theta, t): + """ + Model to be fitted y = model(theta, t) + Arguments: + theta: vector of fitted parameters + t: independent variable [hours] + + Returns: + y: model predictions [need to check paper for units] + """ + asymptote = theta[0] + rate_constant = theta[1] + + return asymptote * (1 - np.exp(-rate_constant * t)) + + def residual(theta, t, y): + """ + Calculate residuals + Arguments: + theta: vector of fitted parameters + t: independent variable [hours] + y: dependent variable [?] + """ + return y - model(theta, t) + + # define data + t = self.data["hour"].to_numpy() + y = self.data["y"].to_numpy() + + # define initial guess + theta_guess = np.array([15, 0.5]) + + ## solve with optimize.least_squares + sol = scipy.optimize.least_squares( + residual, theta_guess, method="trf", args=(t, y), verbose=2 + ) + theta_hat = sol.x + + self.assertAlmostEqual( + theta_hat[0], 19.1426, places=2 + ) # 19.1426 from the paper + self.assertAlmostEqual(theta_hat[1], 0.5311, places=2) # 0.5311 from the paper + + # calculate residuals + r = residual(theta_hat, t, y) + + if self.measurement_std is None: + # calculate variance of the residuals + # -2 because there are 2 fitted parameters + sigre = np.matmul(r.T, r / (len(y) - 2)) + + # approximate covariance + cov = sigre * np.linalg.inv(np.matmul(sol.jac.T, sol.jac)) + + self.assertAlmostEqual(cov[0, 0], 6.22864, places=2) # 6.22864 from paper + self.assertAlmostEqual(cov[0, 1], -0.4322, places=2) # -0.4322 from paper + self.assertAlmostEqual(cov[1, 0], -0.4322, places=2) # -0.4322 from paper + self.assertAlmostEqual(cov[1, 1], 0.04124, places=2) # 0.04124 from paper + else: + sigre = self.measurement_std**2 + + cov = sigre * np.linalg.inv(np.matmul(sol.jac.T, sol.jac)) + + self.assertAlmostEqual(cov[0, 0], 0.009588, places=4) + self.assertAlmostEqual(cov[0, 1], -0.000665, places=4) + self.assertAlmostEqual(cov[1, 0], -0.000665, places=4) + self.assertAlmostEqual(cov[1, 1], 0.000063, places=4) + + def test_cov_scipy_curve_fit_comparison(self): + """ + Scipy results differ in the 3rd decimal place from the paper. It is possible + the paper used an alternative finite difference approximation for the Jacobian. + """ + + ## solve with optimize.curve_fit + def model(t, asymptote, rate_constant): + return asymptote * (1 - np.exp(-rate_constant * t)) + + # define data + t = self.data["hour"].to_numpy() + y = self.data["y"].to_numpy() + + # define initial guess + theta_guess = np.array([15, 0.5]) + + # estimate the parameters and covariance matrix + if self.measurement_std is None: + theta_hat, cov = scipy.optimize.curve_fit(model, t, y, p0=theta_guess) + + self.assertAlmostEqual( + theta_hat[0], 19.1426, places=2 + ) # 19.1426 from the paper + self.assertAlmostEqual( + theta_hat[1], 0.5311, places=2 + ) # 0.5311 from the paper + + self.assertAlmostEqual(cov[0, 0], 6.22864, places=2) # 6.22864 from paper + self.assertAlmostEqual(cov[0, 1], -0.4322, places=2) # -0.4322 from paper + self.assertAlmostEqual(cov[1, 0], -0.4322, places=2) # -0.4322 from paper + self.assertAlmostEqual(cov[1, 1], 0.04124, places=2) # 0.04124 from paper + else: + theta_hat, cov = scipy.optimize.curve_fit( + model, + t, + y, + p0=theta_guess, + sigma=self.measurement_std, + absolute_sigma=True, + ) + + self.assertAlmostEqual( + theta_hat[0], 19.1426, places=2 + ) # 19.1426 from the paper + self.assertAlmostEqual( + theta_hat[1], 0.5311, places=2 + ) # 0.5311 from the paper + + self.assertAlmostEqual(cov[0, 0], 0.0095875, places=4) + self.assertAlmostEqual(cov[0, 1], -0.0006653, places=4) + self.assertAlmostEqual(cov[1, 0], -0.0006653, places=4) + self.assertAlmostEqual(cov[1, 1], 0.00006347, places=4) + + @unittest.skipIf( not parmest.parmest_available, "Cannot test parmest: required dependencies are missing", @@ -800,6 +1307,22 @@ def label_model(self): m = self.model + if isinstance(self.data, pd.DataFrame): + meas_time_points = self.data.index + else: # dictionary + meas_time_points = list(self.data["ca"].keys()) + + m.experiment_outputs = pyo.Suffix(direction=pyo.Suffix.LOCAL) + m.experiment_outputs.update( + (m.ca[t], self.data["ca"][t]) for t in meas_time_points + ) + m.experiment_outputs.update( + (m.cb[t], self.data["cb"][t]) for t in meas_time_points + ) + m.experiment_outputs.update( + (m.cc[t], self.data["cc"][t]) for t in meas_time_points + ) + m.unknown_parameters = pyo.Suffix(direction=pyo.Suffix.LOCAL) m.unknown_parameters.update( (k, pyo.ComponentUID(k)) for k in [m.k1, m.k2]