diff --git a/iot/inverse_optimal_tax.py b/iot/inverse_optimal_tax.py index 9e478ca..5a02492 100644 --- a/iot/inverse_optimal_tax.py +++ b/iot/inverse_optimal_tax.py @@ -4,6 +4,7 @@ import scipy from statsmodels.nonparametric.kernel_regression import KernelReg from scipy.interpolate import UnivariateSpline +from scipy.linalg import lstsq class IOT: @@ -127,22 +128,23 @@ def compute_mtr_dist( * mtr_prime (array_like): rate of change in marginal tax rates for each income bin """ - bins = 1000 # number of equal-width bins - data.loc[:, ["z_bin"]] = pd.cut( - data[income_measure], bins, include_lowest=True - ) - binned_data = pd.DataFrame( - data[["mtr", income_measure, "z_bin", weight_var]] - .groupby(["z_bin"], observed=False) - .apply(lambda x: wm(x[["mtr", income_measure]], x[weight_var])) - ) - # make column 0 into two columns - binned_data[["mtr", income_measure]] = pd.DataFrame( - binned_data[0].tolist(), index=binned_data.index - ) - binned_data.drop(columns=0, inplace=True) - binned_data.reset_index(inplace=True) + if mtr_smoother == "kreg": + bins = 1000 # number of equal-width bins + data.loc[:, ["z_bin"]] = pd.cut( + data[income_measure], bins, include_lowest=True + ) + binned_data = pd.DataFrame( + data[["mtr", income_measure, "z_bin", weight_var]] + .groupby(["z_bin"], observed=False) + .apply(lambda x: wm(x[["mtr", income_measure]], x[weight_var])) + ) + # make column 0 into two columns + binned_data[["mtr", income_measure]] = pd.DataFrame( + binned_data[0].tolist(), index=binned_data.index + ) + binned_data.drop(columns=0, inplace=True) + binned_data.reset_index(inplace=True) mtr_function = KernelReg( binned_data["mtr"].dropna(), binned_data[income_measure].dropna(), @@ -150,10 +152,25 @@ def compute_mtr_dist( reg_type="ll", ) mtr, _ = mtr_function.fit(self.z) + mtr_prime = np.gradient(mtr, edge_order=2) + elif mtr_smoother == "HSV": + # estimate the HSV function on mtrs via weighted least squares + X = np.log(data[income_measure].values) + X = np.column_stack((np.ones(len(X)), X)) + w = np.array(data[weight_var].values) + w_sqrt = np.sqrt(w) + y = np.log(1-data["mtr"].values) + X_weighted = X * w_sqrt[:, np.newaxis] + y_weighted = y * w_sqrt + coef, _, _, _ = lstsq(X_weighted, y_weighted) + tau = -coef[1] + lambda_param = np.exp(coef[0]) / (1-tau) + mtr = 1 - lambda_param * (1-tau) * self.z ** (-tau) + mtr_prime = lambda_param * tau * (1-tau) * self.z ** (-tau - 1) else: print("Please enter a value mtr_smoother method") assert False - mtr_prime = np.gradient(mtr, edge_order=2) + return mtr, mtr_prime @@ -336,20 +353,22 @@ def sw_weights(self): + ((self.theta_z * self.eti * self.mtr) / (1 - self.mtr)) + ((self.eti * self.z * self.mtr_prime) / (1 - self.mtr) ** 2) ) - integral = np.trapz(g_z, self.z) - # g_z = g_z / integral + integral = np.trapz(g_z * self.f, self.z) + g_z = g_z / integral + # use Lockwood and Weinzierl formula, which should be equivalent but using numerical differentiation bracket_term = ( 1 - self.F - (self.mtr / (1 - self.mtr)) * self.eti * self.z * self.f ) - # d_dz_bracket = np.gradient(bracket_term, edge_order=2) - d_dz_bracket = np.diff(bracket_term) / np.diff(self.z) - d_dz_bracket = np.append(d_dz_bracket, d_dz_bracket[-1]) + d_dz_bracket = np.gradient(bracket_term, edge_order=2) + # d_dz_bracket = np.diff(bracket_term) / np.diff(self.z) + # d_dz_bracket = np.append(d_dz_bracket, d_dz_bracket[-1]) g_z_numerical = -(1 / self.f) * d_dz_bracket - integral = np.trapz(g_z_numerical, self.z) - # g_z_numerical = g_z_numerical / integral + integral = np.trapz(g_z_numerical * self.f, self.z) + g_z_numerical = g_z_numerical / integral + return g_z, g_z_numerical