Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDXDSYS-1318 Simplify poverty rate HAPI pipeline to read from global… #53

Merged
merged 1 commit into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [0.10.36] = 2025-01-30

### Changed

- Row functions in Admins use HXL tags instead of headers
- Poverty rate columns updated

## [0.10.35] = 2025-01-27

### Changed
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ dependencies = [
"hdx-python-country>= 3.8.7",
"hdx-python-database[postgresql]>= 1.3.4",
"hdx-python-scraper>= 2.5.5",
"hdx-python-utilities>= 3.8.2",
"hdx-python-utilities>= 3.8.3",
"libhxl",
"sqlalchemy"
]
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ hdx-python-database==1.3.4
# via hapi-pipelines (pyproject.toml)
hdx-python-scraper==2.5.5
# via hapi-pipelines (pyproject.toml)
hdx-python-utilities==3.8.2
hdx-python-utilities==3.8.3
# via
# hapi-pipelines (pyproject.toml)
# hdx-python-api
Expand Down Expand Up @@ -295,7 +295,7 @@ xlrd3==1.1.0
# via libhxl
xlsx2csv==0.8.4
# via hdx-python-utilities
xlsxwriter==3.2.1
xlsxwriter==3.2.2
# via tableschema-to-template
xlwt==1.3.0
# via hdx-python-utilities
74 changes: 61 additions & 13 deletions src/hapi/pipelines/database/admins.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


class Admins(BaseUploader):
admin_name_regex = re.compile(r"Admin (\d) Name")
admin_name_regex = re.compile(r"#adm(\d)\+name")

def __init__(
self,
Expand Down Expand Up @@ -202,29 +202,77 @@ def get_admin2_ref(
return ref

@classmethod
def get_max_admin_from_headers(cls, headers) -> int:
def get_max_admin_from_hxltags(cls, hxltag_to_header: Dict) -> int:
max_admin_level = 0
for header in headers:
match = cls.admin_name_regex.match(header)
for hxltag in hxltag_to_header:
match = cls.admin_name_regex.match(hxltag)
if match:
admin_level = int(match.group(1))
if admin_level > max_admin_level:
max_admin_level = admin_level
return max_admin_level

@staticmethod
def get_admin_level_from_row(row: Dict, max_admin_level: int) -> int:
def get_admin_level_from_row(
hxltag_to_header: Dict,
row: Dict,
max_admin_level: int,
) -> int:
for i in range(max_admin_level, 0, -1):
admin_name = row.get(f"Admin {i} Name")
admin_name = row.get(hxltag_to_header[f"#adm{i}+name"])
if admin_name:
return i
return 0

def get_admin1_ref_from_row(
self,
hxltag_to_header: Dict,
row: Dict,
dataset_name: str,
pipeline: str,
admin_level: int,
) -> Optional[int]:
if admin_level == 1:
admin_code = row[hxltag_to_header["#adm1+code"]]
if admin_code:
admin1_ref = self.get_admin1_ref(
"adminone",
admin_code,
dataset_name,
pipeline,
self._error_handler,
)
if admin1_ref:
return admin1_ref
admin_code = get_admin1_to_location_connector_code(
row[hxltag_to_header["#country+code"]]
)
return self.get_admin1_ref(
"adminone",
admin_code,
dataset_name,
pipeline,
self._error_handler,
)
if admin_level == 0:
return self.get_admin1_ref(
"national",
row[hxltag_to_header["#country+code"]],
dataset_name,
pipeline,
self._error_handler,
)

def get_admin2_ref_from_row(
self, row: Dict, dataset_name: str, pipeline: str, admin_level: int
self,
hxltag_to_header: Dict,
row: Dict,
dataset_name: str,
pipeline: str,
admin_level: int,
) -> Optional[int]:
if admin_level == 2:
admin_code = row["Admin 2 PCode"]
admin_code = row[hxltag_to_header["#adm2+code"]]
if admin_code:
admin2_ref = self.get_admin2_ref(
"admintwo",
Expand All @@ -235,7 +283,7 @@ def get_admin2_ref_from_row(
)
if admin2_ref:
return admin2_ref
admin_code = row["Admin 1 PCode"]
admin_code = row[hxltag_to_header["#adm1+code"]]
if admin_code:
admin_code = get_admin2_to_admin1_connector_code(admin_code)
admin2_ref = self.get_admin2_ref(
Expand All @@ -248,7 +296,7 @@ def get_admin2_ref_from_row(
if admin2_ref:
return admin2_ref
admin_code = get_admin2_to_location_connector_code(
row["Country ISO3"]
row[hxltag_to_header["#country+code"]]
)
return self.get_admin2_ref(
"admintwo",
Expand All @@ -258,7 +306,7 @@ def get_admin2_ref_from_row(
self._error_handler,
)
if admin_level == 1:
admin_code = row["Admin 1 PCode"]
admin_code = row[hxltag_to_header["#adm1+code"]]
if admin_code:
admin2_ref = self.get_admin2_ref(
"adminone",
Expand All @@ -270,7 +318,7 @@ def get_admin2_ref_from_row(
if admin2_ref:
return admin2_ref
admin_code = get_admin1_to_location_connector_code(
row["Country ISO3"]
row[hxltag_to_header["#country+code"]]
)
return self.get_admin2_ref(
"adminone",
Expand All @@ -282,7 +330,7 @@ def get_admin2_ref_from_row(
if admin_level == 0:
return self.get_admin2_ref(
"national",
row["Country ISO3"],
row[hxltag_to_header["#country+code"]],
dataset_name,
pipeline,
self._error_handler,
Expand Down
17 changes: 11 additions & 6 deletions src/hapi/pipelines/database/humanitarian_needs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from hapi_schema.db_humanitarian_needs import DBHumanitarianNeeds
from hdx.api.configuration import Configuration
from hdx.scraper.framework.utilities.reader import Read
from hdx.utilities.dictandlist import invert_dictionary
from hdx.utilities.text import get_numeric_if_possible
from sqlalchemy.orm import Session

Expand Down Expand Up @@ -47,23 +48,27 @@ def populate(self) -> None:
time_period_end = datetime(year, 12, 31, 23, 59, 59)
url = resource["url"]
headers, rows = reader.get_tabular_rows(url, dict_form=True)
max_admin_level = self._admins.get_max_admin_from_headers(headers)
hxltag_to_header = invert_dictionary(next(rows))
max_admin_level = self._admins.get_max_admin_from_hxltags(
hxltag_to_header
)
# Admin 1 PCode,Admin 2 PCode,Sector,Gender,Age Group,Disabled,Population Group,Population,In Need,Targeted,Affected,Reached
for row in rows:
error = row.get("Error")
if error:
continue
countryiso3 = row["Country ISO3"]
if countryiso3 == "#country+code": # ignore HXL row
continue
admin_level = self._admins.get_admin_level_from_row(
row, max_admin_level
hxltag_to_header, row, max_admin_level
)
# Can't handle higher admin levels
if admin_level > 2:
continue
admin2_ref = self._admins.get_admin2_ref_from_row(
row, dataset_name, "HumanitarianNeeds", admin_level
hxltag_to_header,
row,
dataset_name,
"HumanitarianNeeds",
admin_level,
)
if not admin2_ref:
continue
Expand Down
19 changes: 13 additions & 6 deletions src/hapi/pipelines/database/operational_presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from hdx.api.utilities.hdx_error_handler import HDXErrorHandler
from hdx.scraper.framework.utilities.reader import Read
from hdx.utilities.dateparse import parse_date
from hdx.utilities.dictandlist import invert_dictionary
from sqlalchemy.orm import Session

from ..utilities.batch_populate import batch_populate
Expand Down Expand Up @@ -42,23 +43,23 @@ def populate(self) -> None:
resource = dataset.get_resource()
url = resource["url"]
headers, rows = reader.get_tabular_rows(url, dict_form=True)
max_admin_level = self._admins.get_max_admin_from_headers(headers)
hxltag_to_header = invert_dictionary(next(rows))
max_admin_level = self._admins.get_max_admin_from_hxltags(
hxltag_to_header
)
resources_to_ignore = []
operational_presence_rows = []
# Country ISO3,Admin 1 PCode,Admin 1 Name,Admin 2 PCode,Admin 2 Name,Admin 3 PCode,Admin 3 Name,Org Name,Org Acronym,Org Type,Sector,Start Date,End Date,Resource Id
for row in rows:
resource_id = row["Resource Id"]
if resource_id in resources_to_ignore:
continue
countryiso3 = row["Country ISO3"]
dataset_id = row["Dataset Id"]
if dataset_id[0] == "#":
continue
dataset_name = self._metadata.get_dataset_name(dataset_id)
if not dataset_name:
dataset_name = dataset_id
admin_level = self._admins.get_admin_level_from_row(
row, max_admin_level
hxltag_to_header, row, max_admin_level
)
actual_admin_level = admin_level
# Higher admin levels treat as admin 2
Expand All @@ -68,10 +69,16 @@ def populate(self) -> None:
else:
error_when_duplicate = True
admin2_ref = self._admins.get_admin2_ref_from_row(
row, dataset_name, "OperationalPresence", admin_level
hxltag_to_header,
row,
dataset_name,
"OperationalPresence",
admin_level,
)
if not admin2_ref:
continue

countryiso3 = row["Country ISO3"]
provider_admin1_name = get_provider_name(row, "Admin 1 Name")
provider_admin2_name = get_provider_name(row, "Admin 2 Name")

Expand Down
64 changes: 23 additions & 41 deletions src/hapi/pipelines/database/poverty_rate.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@
from hdx.api.utilities.hdx_error_handler import HDXErrorHandler
from hdx.scraper.framework.utilities.reader import Read
from hdx.utilities.dateparse import parse_date
from hdx.utilities.dictandlist import dict_of_lists_add
from hdx.utilities.dictandlist import dict_of_lists_add, invert_dictionary
from hdx.utilities.text import get_numeric_if_possible
from sqlalchemy.orm import Session

from ..utilities.provider_admin_names import get_provider_name
from . import admins
from .admins import get_admin1_to_location_connector_code
from .base_uploader import BaseUploader
from .metadata import Metadata

Expand All @@ -36,29 +35,6 @@ def __init__(
self._configuration = configuration
self._error_handler = error_handler

def get_admin1_ref(self, row, dataset_name):
countryiso3 = row["country_code"]
if countryiso3 == "#country+code": # ignore HXL row
return None
admin_code = row["admin1_code"]
if admin_code:
admin_level = "adminone"
else:
admin1_name = row["admin1_name"]
if admin1_name:
admin_level = "adminone"
admin_code = get_admin1_to_location_connector_code(countryiso3)
else:
admin_level = "national"
admin_code = countryiso3
return self._admins.get_admin1_ref(
admin_level,
admin_code,
dataset_name,
"PovertyRate",
self._error_handler,
)

def populate(self) -> None:
logger.info("Populating poverty rate table")
reader = Read.get_reader("hdx")
Expand All @@ -69,9 +45,9 @@ def populate(self) -> None:
null_values_by_iso3 = {}

def get_value(row: Dict, in_col: str) -> float:
countryiso3 = row["country_code"]
countryiso3 = row["Country ISO3"]
value = row[in_col]
admin_name = row["admin1_name"]
admin_name = row["Admin 1 Name"]
if not admin_name:
admin_name = countryiso3
if value is None:
Expand All @@ -84,18 +60,24 @@ def get_value(row: Dict, in_col: str) -> float:
resource_id = resource["id"]
self._metadata.add_resource(dataset_id, resource)
url = resource["url"]
_, rows = reader.get_tabular_rows(url, dict_form=True)

# country_code,admin1_code,admin1_name,mpi,headcount_ratio,intensity_of_deprivation,vulnerable_to_poverty,in_severe_poverty,reference_period_start,reference_period_end
header, rows = reader.get_tabular_rows(url, dict_form=True)
hxltag_to_header = invert_dictionary(next(rows))
for row in rows:
admin1_ref = self.get_admin1_ref(row, dataset_name)
admin_level = self._admins.get_admin_level_from_row(
hxltag_to_header, row, 1
)
admin1_ref = self._admins.get_admin1_ref_from_row(
hxltag_to_header,
row,
dataset_name,
"PovertyRate",
admin_level,
)
if not admin1_ref:
continue
provider_admin1_name = get_provider_name(row, "admin1_name")
reference_period_start = parse_date(
row["reference_period_start"]
)
reference_period_end = parse_date(row["reference_period_end"])
provider_admin1_name = get_provider_name(row, "Admin 1 Name")
reference_period_start = parse_date(row["Start Date"])
reference_period_end = parse_date(row["End Date"])
key = (
admin1_ref,
provider_admin1_name,
Expand All @@ -118,15 +100,15 @@ def get_value(row: Dict, in_col: str) -> float:
provider_admin1_name=provider_admin1_name,
reference_period_start=reference_period_start,
reference_period_end=reference_period_end,
mpi=get_value(row, "mpi"),
headcount_ratio=get_value(row, "headcount_ratio"),
mpi=get_value(row, "MPI"),
headcount_ratio=get_value(row, "Headcount Ratio"),
intensity_of_deprivation=get_value(
row, "intensity_of_deprivation"
row, "Intensity of Deprivation"
),
vulnerable_to_poverty=get_value(
row, "vulnerable_to_poverty"
row, "Vulnerable to Poverty"
),
in_severe_poverty=get_value(row, "in_severe_poverty"),
in_severe_poverty=get_value(row, "In Severe Poverty"),
)
self._session.add(row)
self._session.commit()
Expand Down
2 changes: 1 addition & 1 deletion tests/fixtures/input/download-global-mpi-trends.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
country_code,admin1_code,admin1_name,mpi,headcount_ratio,intensity_of_deprivation,vulnerable_to_poverty,in_severe_poverty,reference_period_start,reference_period_end
Country ISO3,Admin 1 PCode,Admin 1 Name,MPI,Headcount Ratio,Intensity of Deprivation,Vulnerable to Poverty,In Severe Poverty,Start Date,End Date
#country+code,#adm1+code,#adm1+name,#indicator+mpi,#indicator+headcount_ratio,#indicator+intensity_of_deprivation,#indicator+vulnerable_to_poverty,#indicator+in_severe_poverty,#date+start,#date+end
AFG,,,0.2342396091002832,46.93584855784794,49.90633306897293,27.381337677259033,20.80265720520784,2015-01-01 00:00:00+00:00,2016-12-31 00:00:00+00:00
AFG,,,0.2683302947167732,52.177907473390185,51.42603598153431,26.33666305841249,25.971520762600047,2022-01-01 00:00:00+00:00,2023-12-31 00:00:00+00:00
Expand Down
Loading