Skip to content

Commit

Permalink
Add Pareto lognormal dist as per LW16
Browse files Browse the repository at this point in the history
  • Loading branch information
john-p-ryan committed Apr 15, 2024
1 parent 8ebb323 commit c9c5f2f
Showing 1 changed file with 46 additions and 100 deletions.
146 changes: 46 additions & 100 deletions iot/inverse_optimal_tax.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ def __init__(
income_measure="e00200",
weight_var="s006",
eti=0.25,
bandwidth=1000,
lower_bound=0,
upper_bound=500000,
pareto_cutoff=200_000,
dist_type="log_normal",
kde_bw=None,
Expand Down Expand Up @@ -188,7 +185,7 @@ def compute_income_dist(
* f_prime (array_like): slope of the density function for
income bin z
"""
z_line = np.linspace(1, 1000000, 100000)
z_line = np.linspace(100, 1000000, 100000)
# drop zero income observations
data = data[data[income_measure] > 0]
if dist_type == "log_normal":
Expand Down Expand Up @@ -228,102 +225,6 @@ def compute_income_dist(
/ (z_line**2 * sigma**3 * np.sqrt(2 * np.pi))
)
)
elif dist_type == "log_normal_pareto_mle":
# lognormal before cutoff, pareto after
def lognormal_pareto_pdf(x, mu, sigma, b, cutoff):
# for continuity at cutoff
c = st.lognorm.pdf(cutoff, sigma, scale=np.exp(mu)) * cutoff / b
# for renormalization, derived analytically
integral = st.lognorm.cdf(cutoff, sigma, scale=np.exp(mu)) + c
return np.where(x <= cutoff, st.lognorm.pdf(x, sigma, scale=np.exp(mu)), c*b*cutoff**b / x**(b+1)) / integral

if pareto_cutoff is None:
def weighted_log_likelihood(params):
mu, sigma, b, cutoff = params
# penalize negative values
if sigma < 0 or b < 0 or cutoff < 0:
return float('inf')
# return negative log likelihood for minimization
return -np.sum(np.log(lognormal_pareto_pdf(data[income_measure], mu, sigma, b, cutoff)) * data[weight_var])
else:
def weighted_log_likelihood(params):
mu, sigma, b = params
# penalize negative values
if sigma < 0 or b < 0:
return float('inf')
# return negative log likelihood for minimization
return -np.sum(np.log(lognormal_pareto_pdf(data[income_measure], mu, sigma, b, pareto_cutoff)) * data[weight_var])

mu_guess = (
np.log(data[income_measure]) * data[weight_var]
).sum() / data[weight_var].sum()
sigmasq_guess = (
(
((np.log(data[income_measure]) - mu_guess) ** 2)
* data[weight_var]
).values
/ data[weight_var].sum()
).sum()
b_guess = 2

# maximize log likelihood
if pareto_cutoff is None:
cutoff_guess = 200_000
params_guess = np.array([mu_guess, np.sqrt(sigmasq_guess), b_guess, cutoff_guess])
mu, sigma, b, cutoff = scipy.optimize.minimize(weighted_log_likelihood, params_guess, method='Nelder-Mead').x
else:
params_guess = np.array([mu_guess, np.sqrt(sigmasq_guess), b_guess])
mu, sigma, b = scipy.optimize.minimize(weighted_log_likelihood, params_guess, method='Nelder-Mead').x
cutoff = pareto_cutoff

# assert that mu, sigma, b, and cutoff are all positive
assert mu > 0 and sigma > 0 and b > 0 and cutoff > 0
# assert that sigma, b and cutoff not too large
assert sigma < 25 and b < 25 and cutoff < 1_000_000

c = st.lognorm.pdf(cutoff, sigma, scale=np.exp(mu)) * cutoff / b
integral = st.lognorm.cdf(cutoff, sigma, scale=np.exp(mu)) + c

f = lognormal_pareto_pdf(z_line, mu, sigma, b, cutoff)
F = np.where(z_line <= cutoff, st.lognorm.cdf(z_line, sigma, scale=np.exp(mu)),
st.lognorm.cdf(cutoff, sigma, scale=np.exp(mu)) + c*(1 - (cutoff / z_line)**b)) / integral
f_prime = np.where(z_line <= cutoff, -1 *
np.exp(-((np.log(z_line) - mu) ** 2) / (2 * sigma**2))
* (
(np.log(z_line) + sigma**2 - mu)
/ (z_line**2 * sigma**3 * np.sqrt(2 * np.pi))
),
(-c * b * (b+1) * cutoff**b / z_line**(b+2))) / integral
elif dist_type == "log_normal_pareto_sequential":
# requires a cutoff
cutoff = pareto_cutoff
data_pareto = data[data[income_measure] > pareto_cutoff]
b = sum(data_pareto[weight_var]) / sum(data_pareto[weight_var] * np.log(data_pareto[income_measure] / pareto_cutoff))
# for continuity at cutoff
mu = (
np.log(data[income_measure]) * data[weight_var]
).sum() / data[weight_var].sum()
sigma = np.sqrt((
(
((np.log(data[income_measure]) - mu) ** 2)
* data[weight_var]
).values
/ data[weight_var].sum()
).sum())

c = st.lognorm.pdf(cutoff, sigma, scale=np.exp(mu)) * cutoff / b
# for renormalization, derived analytically
integral = st.lognorm.cdf(cutoff, sigma, scale=np.exp(mu)) + c
f = np.where(z_line < cutoff, st.lognorm.pdf(z_line, sigma, scale=np.exp(mu)), c*b*cutoff**b / z_line**(b+1)) / integral
F = np.where(z_line <= cutoff, st.lognorm.cdf(z_line, sigma, scale=np.exp(mu)),
st.lognorm.cdf(cutoff, sigma, scale=np.exp(mu)) + c*(1 - (cutoff / z_line)**b)) / integral
f_prime = np.where(z_line <= cutoff, -1 *
np.exp(-((np.log(z_line) - mu) ** 2) / (2 * sigma**2))
* (
(np.log(z_line) + sigma**2 - mu)
/ (z_line**2 * sigma**3 * np.sqrt(2 * np.pi))
),
(-c * b * (b+1) * cutoff**b / z_line**(b+2))) / integral
elif dist_type == "kde":
# uses the original full data for kde estimation
f_function = st.gaussian_kde(
Expand Down Expand Up @@ -353,6 +254,51 @@ def weighted_log_likelihood(params):
f = np.where(z_line < cutoff, kde.pdf(z_line), c*b*cutoff**b / z_line**(b+1)) / integral
F = np.where(z_line < cutoff, np.cumsum(f), (integral - c + c*(1 - (cutoff / z_line)**b)) / integral)
f_prime = np.where(z_line < cutoff, np.gradient(f, edge_order=2) , -c * b * (b+1) * cutoff**b / z_line**(b+2) / integral)

elif dist_type == "Pln":
def pln_pdf(y, mu, sigma, alpha):
x1 = alpha * sigma - (np.log(y) - mu) / sigma
phi = st.norm.pdf((np.log(y) - mu) / sigma)
R = (1 - st.norm.cdf(x1)) / (st.norm.pdf(x1) + 1e-15) # 1e-15 to avoid division by zero
pdf = alpha / y * phi * R
return pdf

def neg_weighted_log_likelihood(params, data, weights):
mu, sigma, alpha = params
likelihood = np.sum(weights * np.log(pln_pdf(data, mu, sigma, alpha) + 1e-15)) # 1e-15 to avoid log(0)
return -likelihood

def fit_pln(data, weights, initial_guess):
bounds = [(None, None), (0.01, None), (0.01, None)]
result = scipy.optimize.minimize(neg_weighted_log_likelihood, initial_guess, args=(data, weights),
method='L-BFGS-B', bounds=bounds)
return result.x

mu_initial = (np.log(data[income_measure]) * data[weight_var]
).sum() / data[weight_var].sum()
sigmasq = ((((np.log(data[income_measure]) - mu_initial) ** 2)
* data[weight_var]).values / data[weight_var].sum()).sum()
sigma_initial = np.sqrt(sigmasq)


initial_guess = np.array([mu_initial, sigma_initial, 1.5]) # Initial guess for m, sigma, alpha
mu, sigma, alpha = fit_pln(data[income_measure], data[weight_var], initial_guess)

def pln_cdf(y, mu, sigma, alpha):
x1 = alpha * sigma - (np.log(y) - mu) / sigma
R = (1 - st.norm.cdf(x1)) / (st.norm.pdf(x1) + 1e-12)
return st.norm.cdf((np.log(y) - mu) / sigma) - st.norm.pdf((np.log(y) - mu) / sigma)*R

def pln_dpdf(y, mu, sigma, alpha):
x = (np.log(y) - mu) / sigma
R = (1 - st.norm.cdf(alpha * sigma - x)) / (st.norm.pdf(alpha * sigma - x) + 1e-15)
left = (1 + x / sigma) * pln_pdf(y, mu, sigma, alpha)
right = alpha * st.norm.pdf(x) * ((alpha * sigma - x) * R - 1) / (sigma * y)
return -(left + right) / y

f = pln_pdf(z_line, mu, sigma, alpha)
F = pln_cdf(z_line, mu, sigma, alpha)
f_prime = pln_dpdf(z_line, mu, sigma, alpha)
else:
print("Please enter a valid value for dist_type")
assert False
Expand Down

0 comments on commit c9c5f2f

Please sign in to comment.