From c9c5f2fe634a65b39733d919500de9ea71425f9f Mon Sep 17 00:00:00 2001 From: John Ryan Date: Mon, 15 Apr 2024 14:50:47 -0500 Subject: [PATCH] Add Pareto lognormal dist as per LW16 --- iot/inverse_optimal_tax.py | 146 ++++++++++++------------------------- 1 file changed, 46 insertions(+), 100 deletions(-) diff --git a/iot/inverse_optimal_tax.py b/iot/inverse_optimal_tax.py index 4b7870b..5806a28 100644 --- a/iot/inverse_optimal_tax.py +++ b/iot/inverse_optimal_tax.py @@ -40,9 +40,6 @@ def __init__( income_measure="e00200", weight_var="s006", eti=0.25, - bandwidth=1000, - lower_bound=0, - upper_bound=500000, pareto_cutoff=200_000, dist_type="log_normal", kde_bw=None, @@ -188,7 +185,7 @@ def compute_income_dist( * f_prime (array_like): slope of the density function for income bin z """ - z_line = np.linspace(1, 1000000, 100000) + z_line = np.linspace(100, 1000000, 100000) # drop zero income observations data = data[data[income_measure] > 0] if dist_type == "log_normal": @@ -228,102 +225,6 @@ def compute_income_dist( / (z_line**2 * sigma**3 * np.sqrt(2 * np.pi)) ) ) - elif dist_type == "log_normal_pareto_mle": - # lognormal before cutoff, pareto after - def lognormal_pareto_pdf(x, mu, sigma, b, cutoff): - # for continuity at cutoff - c = st.lognorm.pdf(cutoff, sigma, scale=np.exp(mu)) * cutoff / b - # for renormalization, derived analytically - integral = st.lognorm.cdf(cutoff, sigma, scale=np.exp(mu)) + c - return np.where(x <= cutoff, st.lognorm.pdf(x, sigma, scale=np.exp(mu)), c*b*cutoff**b / x**(b+1)) / integral - - if pareto_cutoff is None: - def weighted_log_likelihood(params): - mu, sigma, b, cutoff = params - # penalize negative values - if sigma < 0 or b < 0 or cutoff < 0: - return float('inf') - # return negative log likelihood for minimization - return -np.sum(np.log(lognormal_pareto_pdf(data[income_measure], mu, sigma, b, cutoff)) * data[weight_var]) - else: - def weighted_log_likelihood(params): - mu, sigma, b = params - # penalize negative values - if sigma < 0 or b < 0: - return float('inf') - # return negative log likelihood for minimization - return -np.sum(np.log(lognormal_pareto_pdf(data[income_measure], mu, sigma, b, pareto_cutoff)) * data[weight_var]) - - mu_guess = ( - np.log(data[income_measure]) * data[weight_var] - ).sum() / data[weight_var].sum() - sigmasq_guess = ( - ( - ((np.log(data[income_measure]) - mu_guess) ** 2) - * data[weight_var] - ).values - / data[weight_var].sum() - ).sum() - b_guess = 2 - - # maximize log likelihood - if pareto_cutoff is None: - cutoff_guess = 200_000 - params_guess = np.array([mu_guess, np.sqrt(sigmasq_guess), b_guess, cutoff_guess]) - mu, sigma, b, cutoff = scipy.optimize.minimize(weighted_log_likelihood, params_guess, method='Nelder-Mead').x - else: - params_guess = np.array([mu_guess, np.sqrt(sigmasq_guess), b_guess]) - mu, sigma, b = scipy.optimize.minimize(weighted_log_likelihood, params_guess, method='Nelder-Mead').x - cutoff = pareto_cutoff - - # assert that mu, sigma, b, and cutoff are all positive - assert mu > 0 and sigma > 0 and b > 0 and cutoff > 0 - # assert that sigma, b and cutoff not too large - assert sigma < 25 and b < 25 and cutoff < 1_000_000 - - c = st.lognorm.pdf(cutoff, sigma, scale=np.exp(mu)) * cutoff / b - integral = st.lognorm.cdf(cutoff, sigma, scale=np.exp(mu)) + c - - f = lognormal_pareto_pdf(z_line, mu, sigma, b, cutoff) - F = np.where(z_line <= cutoff, st.lognorm.cdf(z_line, sigma, scale=np.exp(mu)), - st.lognorm.cdf(cutoff, sigma, scale=np.exp(mu)) + c*(1 - (cutoff / z_line)**b)) / integral - f_prime = np.where(z_line <= cutoff, -1 * - np.exp(-((np.log(z_line) - mu) ** 2) / (2 * sigma**2)) - * ( - (np.log(z_line) + sigma**2 - mu) - / (z_line**2 * sigma**3 * np.sqrt(2 * np.pi)) - ), - (-c * b * (b+1) * cutoff**b / z_line**(b+2))) / integral - elif dist_type == "log_normal_pareto_sequential": - # requires a cutoff - cutoff = pareto_cutoff - data_pareto = data[data[income_measure] > pareto_cutoff] - b = sum(data_pareto[weight_var]) / sum(data_pareto[weight_var] * np.log(data_pareto[income_measure] / pareto_cutoff)) - # for continuity at cutoff - mu = ( - np.log(data[income_measure]) * data[weight_var] - ).sum() / data[weight_var].sum() - sigma = np.sqrt(( - ( - ((np.log(data[income_measure]) - mu) ** 2) - * data[weight_var] - ).values - / data[weight_var].sum() - ).sum()) - - c = st.lognorm.pdf(cutoff, sigma, scale=np.exp(mu)) * cutoff / b - # for renormalization, derived analytically - integral = st.lognorm.cdf(cutoff, sigma, scale=np.exp(mu)) + c - f = np.where(z_line < cutoff, st.lognorm.pdf(z_line, sigma, scale=np.exp(mu)), c*b*cutoff**b / z_line**(b+1)) / integral - F = np.where(z_line <= cutoff, st.lognorm.cdf(z_line, sigma, scale=np.exp(mu)), - st.lognorm.cdf(cutoff, sigma, scale=np.exp(mu)) + c*(1 - (cutoff / z_line)**b)) / integral - f_prime = np.where(z_line <= cutoff, -1 * - np.exp(-((np.log(z_line) - mu) ** 2) / (2 * sigma**2)) - * ( - (np.log(z_line) + sigma**2 - mu) - / (z_line**2 * sigma**3 * np.sqrt(2 * np.pi)) - ), - (-c * b * (b+1) * cutoff**b / z_line**(b+2))) / integral elif dist_type == "kde": # uses the original full data for kde estimation f_function = st.gaussian_kde( @@ -353,6 +254,51 @@ def weighted_log_likelihood(params): f = np.where(z_line < cutoff, kde.pdf(z_line), c*b*cutoff**b / z_line**(b+1)) / integral F = np.where(z_line < cutoff, np.cumsum(f), (integral - c + c*(1 - (cutoff / z_line)**b)) / integral) f_prime = np.where(z_line < cutoff, np.gradient(f, edge_order=2) , -c * b * (b+1) * cutoff**b / z_line**(b+2) / integral) + + elif dist_type == "Pln": + def pln_pdf(y, mu, sigma, alpha): + x1 = alpha * sigma - (np.log(y) - mu) / sigma + phi = st.norm.pdf((np.log(y) - mu) / sigma) + R = (1 - st.norm.cdf(x1)) / (st.norm.pdf(x1) + 1e-15) # 1e-15 to avoid division by zero + pdf = alpha / y * phi * R + return pdf + + def neg_weighted_log_likelihood(params, data, weights): + mu, sigma, alpha = params + likelihood = np.sum(weights * np.log(pln_pdf(data, mu, sigma, alpha) + 1e-15)) # 1e-15 to avoid log(0) + return -likelihood + + def fit_pln(data, weights, initial_guess): + bounds = [(None, None), (0.01, None), (0.01, None)] + result = scipy.optimize.minimize(neg_weighted_log_likelihood, initial_guess, args=(data, weights), + method='L-BFGS-B', bounds=bounds) + return result.x + + mu_initial = (np.log(data[income_measure]) * data[weight_var] + ).sum() / data[weight_var].sum() + sigmasq = ((((np.log(data[income_measure]) - mu_initial) ** 2) + * data[weight_var]).values / data[weight_var].sum()).sum() + sigma_initial = np.sqrt(sigmasq) + + + initial_guess = np.array([mu_initial, sigma_initial, 1.5]) # Initial guess for m, sigma, alpha + mu, sigma, alpha = fit_pln(data[income_measure], data[weight_var], initial_guess) + + def pln_cdf(y, mu, sigma, alpha): + x1 = alpha * sigma - (np.log(y) - mu) / sigma + R = (1 - st.norm.cdf(x1)) / (st.norm.pdf(x1) + 1e-12) + return st.norm.cdf((np.log(y) - mu) / sigma) - st.norm.pdf((np.log(y) - mu) / sigma)*R + + def pln_dpdf(y, mu, sigma, alpha): + x = (np.log(y) - mu) / sigma + R = (1 - st.norm.cdf(alpha * sigma - x)) / (st.norm.pdf(alpha * sigma - x) + 1e-15) + left = (1 + x / sigma) * pln_pdf(y, mu, sigma, alpha) + right = alpha * st.norm.pdf(x) * ((alpha * sigma - x) * R - 1) / (sigma * y) + return -(left + right) / y + + f = pln_pdf(z_line, mu, sigma, alpha) + F = pln_cdf(z_line, mu, sigma, alpha) + f_prime = pln_dpdf(z_line, mu, sigma, alpha) else: print("Please enter a valid value for dist_type") assert False