Skip to content

Commit

Permalink
HDXDSYS-1318 Simplify poverty rate HAPI pipeline to read from global …
Browse files Browse the repository at this point in the history
…resource in global dataset on HDX (#222)

* Simplify poverty rate
* Change Admins functions that take a row to use HXL tags rather than headers
* Use Admins functions in poverty rate
* Update test data
* Update admins test
  • Loading branch information
mcarans authored Jan 29, 2025
1 parent 6068787 commit 0d0c3e3
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 92 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [0.10.36] = 2025-01-30

### Changed

- Row functions in Admins use HXL tags instead of headers
- Poverty rate columns updated

## [0.10.35] = 2025-01-27

### Changed
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ dependencies = [
"hdx-python-country>= 3.8.7",
"hdx-python-database[postgresql]>= 1.3.4",
"hdx-python-scraper>= 2.5.5",
"hdx-python-utilities>= 3.8.2",
"hdx-python-utilities>= 3.8.3",
"libhxl",
"sqlalchemy"
]
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ hdx-python-database==1.3.4
# via hapi-pipelines (pyproject.toml)
hdx-python-scraper==2.5.5
# via hapi-pipelines (pyproject.toml)
hdx-python-utilities==3.8.2
hdx-python-utilities==3.8.3
# via
# hapi-pipelines (pyproject.toml)
# hdx-python-api
Expand Down Expand Up @@ -295,7 +295,7 @@ xlrd3==1.1.0
# via libhxl
xlsx2csv==0.8.4
# via hdx-python-utilities
xlsxwriter==3.2.1
xlsxwriter==3.2.2
# via tableschema-to-template
xlwt==1.3.0
# via hdx-python-utilities
74 changes: 61 additions & 13 deletions src/hapi/pipelines/database/admins.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


class Admins(BaseUploader):
admin_name_regex = re.compile(r"Admin (\d) Name")
admin_name_regex = re.compile(r"#adm(\d)\+name")

def __init__(
self,
Expand Down Expand Up @@ -202,29 +202,77 @@ def get_admin2_ref(
return ref

@classmethod
def get_max_admin_from_headers(cls, headers) -> int:
def get_max_admin_from_hxltags(cls, hxltag_to_header: Dict) -> int:
max_admin_level = 0
for header in headers:
match = cls.admin_name_regex.match(header)
for hxltag in hxltag_to_header:
match = cls.admin_name_regex.match(hxltag)
if match:
admin_level = int(match.group(1))
if admin_level > max_admin_level:
max_admin_level = admin_level
return max_admin_level

@staticmethod
def get_admin_level_from_row(row: Dict, max_admin_level: int) -> int:
def get_admin_level_from_row(
hxltag_to_header: Dict,
row: Dict,
max_admin_level: int,
) -> int:
for i in range(max_admin_level, 0, -1):
admin_name = row.get(f"Admin {i} Name")
admin_name = row.get(hxltag_to_header[f"#adm{i}+name"])
if admin_name:
return i
return 0

def get_admin1_ref_from_row(
self,
hxltag_to_header: Dict,
row: Dict,
dataset_name: str,
pipeline: str,
admin_level: int,
) -> Optional[int]:
if admin_level == 1:
admin_code = row[hxltag_to_header["#adm1+code"]]
if admin_code:
admin1_ref = self.get_admin1_ref(
"adminone",
admin_code,
dataset_name,
pipeline,
self._error_handler,
)
if admin1_ref:
return admin1_ref
admin_code = get_admin1_to_location_connector_code(
row[hxltag_to_header["#country+code"]]
)
return self.get_admin1_ref(
"adminone",
admin_code,
dataset_name,
pipeline,
self._error_handler,
)
if admin_level == 0:
return self.get_admin1_ref(
"national",
row[hxltag_to_header["#country+code"]],
dataset_name,
pipeline,
self._error_handler,
)

def get_admin2_ref_from_row(
self, row: Dict, dataset_name: str, pipeline: str, admin_level: int
self,
hxltag_to_header: Dict,
row: Dict,
dataset_name: str,
pipeline: str,
admin_level: int,
) -> Optional[int]:
if admin_level == 2:
admin_code = row["Admin 2 PCode"]
admin_code = row[hxltag_to_header["#adm2+code"]]
if admin_code:
admin2_ref = self.get_admin2_ref(
"admintwo",
Expand All @@ -235,7 +283,7 @@ def get_admin2_ref_from_row(
)
if admin2_ref:
return admin2_ref
admin_code = row["Admin 1 PCode"]
admin_code = row[hxltag_to_header["#adm1+code"]]
if admin_code:
admin_code = get_admin2_to_admin1_connector_code(admin_code)
admin2_ref = self.get_admin2_ref(
Expand All @@ -248,7 +296,7 @@ def get_admin2_ref_from_row(
if admin2_ref:
return admin2_ref
admin_code = get_admin2_to_location_connector_code(
row["Country ISO3"]
row[hxltag_to_header["#country+code"]]
)
return self.get_admin2_ref(
"admintwo",
Expand All @@ -258,7 +306,7 @@ def get_admin2_ref_from_row(
self._error_handler,
)
if admin_level == 1:
admin_code = row["Admin 1 PCode"]
admin_code = row[hxltag_to_header["#adm1+code"]]
if admin_code:
admin2_ref = self.get_admin2_ref(
"adminone",
Expand All @@ -270,7 +318,7 @@ def get_admin2_ref_from_row(
if admin2_ref:
return admin2_ref
admin_code = get_admin1_to_location_connector_code(
row["Country ISO3"]
row[hxltag_to_header["#country+code"]]
)
return self.get_admin2_ref(
"adminone",
Expand All @@ -282,7 +330,7 @@ def get_admin2_ref_from_row(
if admin_level == 0:
return self.get_admin2_ref(
"national",
row["Country ISO3"],
row[hxltag_to_header["#country+code"]],
dataset_name,
pipeline,
self._error_handler,
Expand Down
17 changes: 11 additions & 6 deletions src/hapi/pipelines/database/humanitarian_needs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from hapi_schema.db_humanitarian_needs import DBHumanitarianNeeds
from hdx.api.configuration import Configuration
from hdx.scraper.framework.utilities.reader import Read
from hdx.utilities.dictandlist import invert_dictionary
from hdx.utilities.text import get_numeric_if_possible
from sqlalchemy.orm import Session

Expand Down Expand Up @@ -47,23 +48,27 @@ def populate(self) -> None:
time_period_end = datetime(year, 12, 31, 23, 59, 59)
url = resource["url"]
headers, rows = reader.get_tabular_rows(url, dict_form=True)
max_admin_level = self._admins.get_max_admin_from_headers(headers)
hxltag_to_header = invert_dictionary(next(rows))
max_admin_level = self._admins.get_max_admin_from_hxltags(
hxltag_to_header
)
# Admin 1 PCode,Admin 2 PCode,Sector,Gender,Age Group,Disabled,Population Group,Population,In Need,Targeted,Affected,Reached
for row in rows:
error = row.get("Error")
if error:
continue
countryiso3 = row["Country ISO3"]
if countryiso3 == "#country+code": # ignore HXL row
continue
admin_level = self._admins.get_admin_level_from_row(
row, max_admin_level
hxltag_to_header, row, max_admin_level
)
# Can't handle higher admin levels
if admin_level > 2:
continue
admin2_ref = self._admins.get_admin2_ref_from_row(
row, dataset_name, "HumanitarianNeeds", admin_level
hxltag_to_header,
row,
dataset_name,
"HumanitarianNeeds",
admin_level,
)
if not admin2_ref:
continue
Expand Down
19 changes: 13 additions & 6 deletions src/hapi/pipelines/database/operational_presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from hdx.api.utilities.hdx_error_handler import HDXErrorHandler
from hdx.scraper.framework.utilities.reader import Read
from hdx.utilities.dateparse import parse_date
from hdx.utilities.dictandlist import invert_dictionary
from sqlalchemy.orm import Session

from ..utilities.batch_populate import batch_populate
Expand Down Expand Up @@ -42,23 +43,23 @@ def populate(self) -> None:
resource = dataset.get_resource()
url = resource["url"]
headers, rows = reader.get_tabular_rows(url, dict_form=True)
max_admin_level = self._admins.get_max_admin_from_headers(headers)
hxltag_to_header = invert_dictionary(next(rows))
max_admin_level = self._admins.get_max_admin_from_hxltags(
hxltag_to_header
)
resources_to_ignore = []
operational_presence_rows = []
# Country ISO3,Admin 1 PCode,Admin 1 Name,Admin 2 PCode,Admin 2 Name,Admin 3 PCode,Admin 3 Name,Org Name,Org Acronym,Org Type,Sector,Start Date,End Date,Resource Id
for row in rows:
resource_id = row["Resource Id"]
if resource_id in resources_to_ignore:
continue
countryiso3 = row["Country ISO3"]
dataset_id = row["Dataset Id"]
if dataset_id[0] == "#":
continue
dataset_name = self._metadata.get_dataset_name(dataset_id)
if not dataset_name:
dataset_name = dataset_id
admin_level = self._admins.get_admin_level_from_row(
row, max_admin_level
hxltag_to_header, row, max_admin_level
)
actual_admin_level = admin_level
# Higher admin levels treat as admin 2
Expand All @@ -68,10 +69,16 @@ def populate(self) -> None:
else:
error_when_duplicate = True
admin2_ref = self._admins.get_admin2_ref_from_row(
row, dataset_name, "OperationalPresence", admin_level
hxltag_to_header,
row,
dataset_name,
"OperationalPresence",
admin_level,
)
if not admin2_ref:
continue

countryiso3 = row["Country ISO3"]
provider_admin1_name = get_provider_name(row, "Admin 1 Name")
provider_admin2_name = get_provider_name(row, "Admin 2 Name")

Expand Down
64 changes: 23 additions & 41 deletions src/hapi/pipelines/database/poverty_rate.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@
from hdx.api.utilities.hdx_error_handler import HDXErrorHandler
from hdx.scraper.framework.utilities.reader import Read
from hdx.utilities.dateparse import parse_date
from hdx.utilities.dictandlist import dict_of_lists_add
from hdx.utilities.dictandlist import dict_of_lists_add, invert_dictionary
from hdx.utilities.text import get_numeric_if_possible
from sqlalchemy.orm import Session

from ..utilities.provider_admin_names import get_provider_name
from . import admins
from .admins import get_admin1_to_location_connector_code
from .base_uploader import BaseUploader
from .metadata import Metadata

Expand All @@ -36,29 +35,6 @@ def __init__(
self._configuration = configuration
self._error_handler = error_handler

def get_admin1_ref(self, row, dataset_name):
countryiso3 = row["country_code"]
if countryiso3 == "#country+code": # ignore HXL row
return None
admin_code = row["admin1_code"]
if admin_code:
admin_level = "adminone"
else:
admin1_name = row["admin1_name"]
if admin1_name:
admin_level = "adminone"
admin_code = get_admin1_to_location_connector_code(countryiso3)
else:
admin_level = "national"
admin_code = countryiso3
return self._admins.get_admin1_ref(
admin_level,
admin_code,
dataset_name,
"PovertyRate",
self._error_handler,
)

def populate(self) -> None:
logger.info("Populating poverty rate table")
reader = Read.get_reader("hdx")
Expand All @@ -69,9 +45,9 @@ def populate(self) -> None:
null_values_by_iso3 = {}

def get_value(row: Dict, in_col: str) -> float:
countryiso3 = row["country_code"]
countryiso3 = row["Country ISO3"]
value = row[in_col]
admin_name = row["admin1_name"]
admin_name = row["Admin 1 Name"]
if not admin_name:
admin_name = countryiso3
if value is None:
Expand All @@ -84,18 +60,24 @@ def get_value(row: Dict, in_col: str) -> float:
resource_id = resource["id"]
self._metadata.add_resource(dataset_id, resource)
url = resource["url"]
_, rows = reader.get_tabular_rows(url, dict_form=True)

# country_code,admin1_code,admin1_name,mpi,headcount_ratio,intensity_of_deprivation,vulnerable_to_poverty,in_severe_poverty,reference_period_start,reference_period_end
header, rows = reader.get_tabular_rows(url, dict_form=True)
hxltag_to_header = invert_dictionary(next(rows))
for row in rows:
admin1_ref = self.get_admin1_ref(row, dataset_name)
admin_level = self._admins.get_admin_level_from_row(
hxltag_to_header, row, 1
)
admin1_ref = self._admins.get_admin1_ref_from_row(
hxltag_to_header,
row,
dataset_name,
"PovertyRate",
admin_level,
)
if not admin1_ref:
continue
provider_admin1_name = get_provider_name(row, "admin1_name")
reference_period_start = parse_date(
row["reference_period_start"]
)
reference_period_end = parse_date(row["reference_period_end"])
provider_admin1_name = get_provider_name(row, "Admin 1 Name")
reference_period_start = parse_date(row["Start Date"])
reference_period_end = parse_date(row["End Date"])
key = (
admin1_ref,
provider_admin1_name,
Expand All @@ -118,15 +100,15 @@ def get_value(row: Dict, in_col: str) -> float:
provider_admin1_name=provider_admin1_name,
reference_period_start=reference_period_start,
reference_period_end=reference_period_end,
mpi=get_value(row, "mpi"),
headcount_ratio=get_value(row, "headcount_ratio"),
mpi=get_value(row, "MPI"),
headcount_ratio=get_value(row, "Headcount Ratio"),
intensity_of_deprivation=get_value(
row, "intensity_of_deprivation"
row, "Intensity of Deprivation"
),
vulnerable_to_poverty=get_value(
row, "vulnerable_to_poverty"
row, "Vulnerable to Poverty"
),
in_severe_poverty=get_value(row, "in_severe_poverty"),
in_severe_poverty=get_value(row, "In Severe Poverty"),
)
self._session.add(row)
self._session.commit()
Expand Down
2 changes: 1 addition & 1 deletion tests/fixtures/input/download-global-mpi-trends.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
country_code,admin1_code,admin1_name,mpi,headcount_ratio,intensity_of_deprivation,vulnerable_to_poverty,in_severe_poverty,reference_period_start,reference_period_end
Country ISO3,Admin 1 PCode,Admin 1 Name,MPI,Headcount Ratio,Intensity of Deprivation,Vulnerable to Poverty,In Severe Poverty,Start Date,End Date
#country+code,#adm1+code,#adm1+name,#indicator+mpi,#indicator+headcount_ratio,#indicator+intensity_of_deprivation,#indicator+vulnerable_to_poverty,#indicator+in_severe_poverty,#date+start,#date+end
AFG,,,0.2342396091002832,46.93584855784794,49.90633306897293,27.381337677259033,20.80265720520784,2015-01-01 00:00:00+00:00,2016-12-31 00:00:00+00:00
AFG,,,0.2683302947167732,52.177907473390185,51.42603598153431,26.33666305841249,25.971520762600047,2022-01-01 00:00:00+00:00,2023-12-31 00:00:00+00:00
Expand Down
Loading

0 comments on commit 0d0c3e3

Please sign in to comment.