Skip to content

Commit

Permalink
Merge pull request PSLmodels#31 from john-p-ryan/main
Browse files Browse the repository at this point in the history
Add HSV tax estimation
  • Loading branch information
jdebacker authored Jan 9, 2025
2 parents 8599cfb + 8f9e24a commit 4d86863
Showing 1 changed file with 42 additions and 23 deletions.
65 changes: 42 additions & 23 deletions iot/inverse_optimal_tax.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import scipy
from statsmodels.nonparametric.kernel_regression import KernelReg
from scipy.interpolate import UnivariateSpline
from scipy.linalg import lstsq


class IOT:
Expand Down Expand Up @@ -127,33 +128,49 @@ def compute_mtr_dist(
* mtr_prime (array_like): rate of change in marginal tax rates
for each income bin
"""
bins = 1000 # number of equal-width bins
data.loc[:, ["z_bin"]] = pd.cut(
data[income_measure], bins, include_lowest=True
)
binned_data = pd.DataFrame(
data[["mtr", income_measure, "z_bin", weight_var]]
.groupby(["z_bin"], observed=False)
.apply(lambda x: wm(x[["mtr", income_measure]], x[weight_var]))
)
# make column 0 into two columns
binned_data[["mtr", income_measure]] = pd.DataFrame(
binned_data[0].tolist(), index=binned_data.index
)
binned_data.drop(columns=0, inplace=True)
binned_data.reset_index(inplace=True)

if mtr_smoother == "kreg":
bins = 1000 # number of equal-width bins
data.loc[:, ["z_bin"]] = pd.cut(
data[income_measure], bins, include_lowest=True
)
binned_data = pd.DataFrame(
data[["mtr", income_measure, "z_bin", weight_var]]
.groupby(["z_bin"], observed=False)
.apply(lambda x: wm(x[["mtr", income_measure]], x[weight_var]))
)
# make column 0 into two columns
binned_data[["mtr", income_measure]] = pd.DataFrame(
binned_data[0].tolist(), index=binned_data.index
)
binned_data.drop(columns=0, inplace=True)
binned_data.reset_index(inplace=True)
mtr_function = KernelReg(
binned_data["mtr"].dropna(),
binned_data[income_measure].dropna(),
var_type="c",
reg_type="ll",
)
mtr, _ = mtr_function.fit(self.z)
mtr_prime = np.gradient(mtr, edge_order=2)
elif mtr_smoother == "HSV":
# estimate the HSV function on mtrs via weighted least squares
X = np.log(data[income_measure].values)
X = np.column_stack((np.ones(len(X)), X))
w = np.array(data[weight_var].values)
w_sqrt = np.sqrt(w)
y = np.log(1-data["mtr"].values)
X_weighted = X * w_sqrt[:, np.newaxis]
y_weighted = y * w_sqrt
coef, _, _, _ = lstsq(X_weighted, y_weighted)
tau = -coef[1]
lambda_param = np.exp(coef[0]) / (1-tau)
mtr = 1 - lambda_param * (1-tau) * self.z ** (-tau)
mtr_prime = lambda_param * tau * (1-tau) * self.z ** (-tau - 1)
else:
print("Please enter a value mtr_smoother method")
assert False
mtr_prime = np.gradient(mtr, edge_order=2)


return mtr, mtr_prime

Expand Down Expand Up @@ -336,20 +353,22 @@ def sw_weights(self):
+ ((self.theta_z * self.eti * self.mtr) / (1 - self.mtr))
+ ((self.eti * self.z * self.mtr_prime) / (1 - self.mtr) ** 2)
)
integral = np.trapz(g_z, self.z)
# g_z = g_z / integral
integral = np.trapz(g_z * self.f, self.z)
g_z = g_z / integral

# use Lockwood and Weinzierl formula, which should be equivalent but using numerical differentiation
bracket_term = (
1
- self.F
- (self.mtr / (1 - self.mtr)) * self.eti * self.z * self.f
)
# d_dz_bracket = np.gradient(bracket_term, edge_order=2)
d_dz_bracket = np.diff(bracket_term) / np.diff(self.z)
d_dz_bracket = np.append(d_dz_bracket, d_dz_bracket[-1])
d_dz_bracket = np.gradient(bracket_term, edge_order=2)
# d_dz_bracket = np.diff(bracket_term) / np.diff(self.z)
# d_dz_bracket = np.append(d_dz_bracket, d_dz_bracket[-1])
g_z_numerical = -(1 / self.f) * d_dz_bracket
integral = np.trapz(g_z_numerical, self.z)
# g_z_numerical = g_z_numerical / integral
integral = np.trapz(g_z_numerical * self.f, self.z)
g_z_numerical = g_z_numerical / integral

return g_z, g_z_numerical


Expand Down

0 comments on commit 4d86863

Please sign in to comment.