From 0d0c3e3df8b486e1460fffd57e6df5b3d3468e19 Mon Sep 17 00:00:00 2001 From: Mike Date: Thu, 30 Jan 2025 10:49:36 +1300 Subject: [PATCH] HDXDSYS-1318 Simplify poverty rate HAPI pipeline to read from global resource in global dataset on HDX (#222) * Simplify poverty rate * Change Admins functions that take a row to use HXL tags rather than headers * Use Admins functions in poverty rate * Update test data * Update admins test --- CHANGELOG.md | 7 ++ pyproject.toml | 2 +- requirements.txt | 4 +- src/hapi/pipelines/database/admins.py | 74 ++++++++++++++--- .../pipelines/database/humanitarian_needs.py | 17 ++-- .../database/operational_presence.py | 19 +++-- src/hapi/pipelines/database/poverty_rate.py | 64 ++++++--------- .../input/download-global-mpi-trends.csv | 2 +- tests/fixtures/input/download-global-mpi.csv | 2 +- tests/test_main.py | 79 ++++++++++++++----- 10 files changed, 178 insertions(+), 92 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 00dfec9f..244a9801 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). +## [0.10.36] = 2025-01-30 + +### Changed + +- Row functions in Admins use HXL tags instead of headers +- Poverty rate columns updated + ## [0.10.35] = 2025-01-27 ### Changed diff --git a/pyproject.toml b/pyproject.toml index 297b6d31..cc347e86 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,7 +39,7 @@ dependencies = [ "hdx-python-country>= 3.8.7", "hdx-python-database[postgresql]>= 1.3.4", "hdx-python-scraper>= 2.5.5", - "hdx-python-utilities>= 3.8.2", + "hdx-python-utilities>= 3.8.3", "libhxl", "sqlalchemy" ] diff --git a/requirements.txt b/requirements.txt index eab5255b..e452653f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -69,7 +69,7 @@ hdx-python-database==1.3.4 # via hapi-pipelines (pyproject.toml) hdx-python-scraper==2.5.5 # via hapi-pipelines (pyproject.toml) -hdx-python-utilities==3.8.2 +hdx-python-utilities==3.8.3 # via # hapi-pipelines (pyproject.toml) # hdx-python-api @@ -295,7 +295,7 @@ xlrd3==1.1.0 # via libhxl xlsx2csv==0.8.4 # via hdx-python-utilities -xlsxwriter==3.2.1 +xlsxwriter==3.2.2 # via tableschema-to-template xlwt==1.3.0 # via hdx-python-utilities diff --git a/src/hapi/pipelines/database/admins.py b/src/hapi/pipelines/database/admins.py index 8de97496..2b2ba39e 100644 --- a/src/hapi/pipelines/database/admins.py +++ b/src/hapi/pipelines/database/admins.py @@ -26,7 +26,7 @@ class Admins(BaseUploader): - admin_name_regex = re.compile(r"Admin (\d) Name") + admin_name_regex = re.compile(r"#adm(\d)\+name") def __init__( self, @@ -202,10 +202,10 @@ def get_admin2_ref( return ref @classmethod - def get_max_admin_from_headers(cls, headers) -> int: + def get_max_admin_from_hxltags(cls, hxltag_to_header: Dict) -> int: max_admin_level = 0 - for header in headers: - match = cls.admin_name_regex.match(header) + for hxltag in hxltag_to_header: + match = cls.admin_name_regex.match(hxltag) if match: admin_level = int(match.group(1)) if admin_level > max_admin_level: @@ -213,18 +213,66 @@ def get_max_admin_from_headers(cls, headers) -> int: return max_admin_level @staticmethod - def get_admin_level_from_row(row: Dict, max_admin_level: int) -> int: + def get_admin_level_from_row( + hxltag_to_header: Dict, + row: Dict, + max_admin_level: int, + ) -> int: for i in range(max_admin_level, 0, -1): - admin_name = row.get(f"Admin {i} Name") + admin_name = row.get(hxltag_to_header[f"#adm{i}+name"]) if admin_name: return i return 0 + def get_admin1_ref_from_row( + self, + hxltag_to_header: Dict, + row: Dict, + dataset_name: str, + pipeline: str, + admin_level: int, + ) -> Optional[int]: + if admin_level == 1: + admin_code = row[hxltag_to_header["#adm1+code"]] + if admin_code: + admin1_ref = self.get_admin1_ref( + "adminone", + admin_code, + dataset_name, + pipeline, + self._error_handler, + ) + if admin1_ref: + return admin1_ref + admin_code = get_admin1_to_location_connector_code( + row[hxltag_to_header["#country+code"]] + ) + return self.get_admin1_ref( + "adminone", + admin_code, + dataset_name, + pipeline, + self._error_handler, + ) + if admin_level == 0: + return self.get_admin1_ref( + "national", + row[hxltag_to_header["#country+code"]], + dataset_name, + pipeline, + self._error_handler, + ) + def get_admin2_ref_from_row( - self, row: Dict, dataset_name: str, pipeline: str, admin_level: int + self, + hxltag_to_header: Dict, + row: Dict, + dataset_name: str, + pipeline: str, + admin_level: int, ) -> Optional[int]: if admin_level == 2: - admin_code = row["Admin 2 PCode"] + admin_code = row[hxltag_to_header["#adm2+code"]] if admin_code: admin2_ref = self.get_admin2_ref( "admintwo", @@ -235,7 +283,7 @@ def get_admin2_ref_from_row( ) if admin2_ref: return admin2_ref - admin_code = row["Admin 1 PCode"] + admin_code = row[hxltag_to_header["#adm1+code"]] if admin_code: admin_code = get_admin2_to_admin1_connector_code(admin_code) admin2_ref = self.get_admin2_ref( @@ -248,7 +296,7 @@ def get_admin2_ref_from_row( if admin2_ref: return admin2_ref admin_code = get_admin2_to_location_connector_code( - row["Country ISO3"] + row[hxltag_to_header["#country+code"]] ) return self.get_admin2_ref( "admintwo", @@ -258,7 +306,7 @@ def get_admin2_ref_from_row( self._error_handler, ) if admin_level == 1: - admin_code = row["Admin 1 PCode"] + admin_code = row[hxltag_to_header["#adm1+code"]] if admin_code: admin2_ref = self.get_admin2_ref( "adminone", @@ -270,7 +318,7 @@ def get_admin2_ref_from_row( if admin2_ref: return admin2_ref admin_code = get_admin1_to_location_connector_code( - row["Country ISO3"] + row[hxltag_to_header["#country+code"]] ) return self.get_admin2_ref( "adminone", @@ -282,7 +330,7 @@ def get_admin2_ref_from_row( if admin_level == 0: return self.get_admin2_ref( "national", - row["Country ISO3"], + row[hxltag_to_header["#country+code"]], dataset_name, pipeline, self._error_handler, diff --git a/src/hapi/pipelines/database/humanitarian_needs.py b/src/hapi/pipelines/database/humanitarian_needs.py index db1f79f2..d5f179cd 100644 --- a/src/hapi/pipelines/database/humanitarian_needs.py +++ b/src/hapi/pipelines/database/humanitarian_needs.py @@ -6,6 +6,7 @@ from hapi_schema.db_humanitarian_needs import DBHumanitarianNeeds from hdx.api.configuration import Configuration from hdx.scraper.framework.utilities.reader import Read +from hdx.utilities.dictandlist import invert_dictionary from hdx.utilities.text import get_numeric_if_possible from sqlalchemy.orm import Session @@ -47,23 +48,27 @@ def populate(self) -> None: time_period_end = datetime(year, 12, 31, 23, 59, 59) url = resource["url"] headers, rows = reader.get_tabular_rows(url, dict_form=True) - max_admin_level = self._admins.get_max_admin_from_headers(headers) + hxltag_to_header = invert_dictionary(next(rows)) + max_admin_level = self._admins.get_max_admin_from_hxltags( + hxltag_to_header + ) # Admin 1 PCode,Admin 2 PCode,Sector,Gender,Age Group,Disabled,Population Group,Population,In Need,Targeted,Affected,Reached for row in rows: error = row.get("Error") if error: continue - countryiso3 = row["Country ISO3"] - if countryiso3 == "#country+code": # ignore HXL row - continue admin_level = self._admins.get_admin_level_from_row( - row, max_admin_level + hxltag_to_header, row, max_admin_level ) # Can't handle higher admin levels if admin_level > 2: continue admin2_ref = self._admins.get_admin2_ref_from_row( - row, dataset_name, "HumanitarianNeeds", admin_level + hxltag_to_header, + row, + dataset_name, + "HumanitarianNeeds", + admin_level, ) if not admin2_ref: continue diff --git a/src/hapi/pipelines/database/operational_presence.py b/src/hapi/pipelines/database/operational_presence.py index 8607fe83..cfa17d91 100644 --- a/src/hapi/pipelines/database/operational_presence.py +++ b/src/hapi/pipelines/database/operational_presence.py @@ -7,6 +7,7 @@ from hdx.api.utilities.hdx_error_handler import HDXErrorHandler from hdx.scraper.framework.utilities.reader import Read from hdx.utilities.dateparse import parse_date +from hdx.utilities.dictandlist import invert_dictionary from sqlalchemy.orm import Session from ..utilities.batch_populate import batch_populate @@ -42,7 +43,10 @@ def populate(self) -> None: resource = dataset.get_resource() url = resource["url"] headers, rows = reader.get_tabular_rows(url, dict_form=True) - max_admin_level = self._admins.get_max_admin_from_headers(headers) + hxltag_to_header = invert_dictionary(next(rows)) + max_admin_level = self._admins.get_max_admin_from_hxltags( + hxltag_to_header + ) resources_to_ignore = [] operational_presence_rows = [] # Country ISO3,Admin 1 PCode,Admin 1 Name,Admin 2 PCode,Admin 2 Name,Admin 3 PCode,Admin 3 Name,Org Name,Org Acronym,Org Type,Sector,Start Date,End Date,Resource Id @@ -50,15 +54,12 @@ def populate(self) -> None: resource_id = row["Resource Id"] if resource_id in resources_to_ignore: continue - countryiso3 = row["Country ISO3"] dataset_id = row["Dataset Id"] - if dataset_id[0] == "#": - continue dataset_name = self._metadata.get_dataset_name(dataset_id) if not dataset_name: dataset_name = dataset_id admin_level = self._admins.get_admin_level_from_row( - row, max_admin_level + hxltag_to_header, row, max_admin_level ) actual_admin_level = admin_level # Higher admin levels treat as admin 2 @@ -68,10 +69,16 @@ def populate(self) -> None: else: error_when_duplicate = True admin2_ref = self._admins.get_admin2_ref_from_row( - row, dataset_name, "OperationalPresence", admin_level + hxltag_to_header, + row, + dataset_name, + "OperationalPresence", + admin_level, ) if not admin2_ref: continue + + countryiso3 = row["Country ISO3"] provider_admin1_name = get_provider_name(row, "Admin 1 Name") provider_admin2_name = get_provider_name(row, "Admin 2 Name") diff --git a/src/hapi/pipelines/database/poverty_rate.py b/src/hapi/pipelines/database/poverty_rate.py index d61ecdda..4cbe1c2f 100644 --- a/src/hapi/pipelines/database/poverty_rate.py +++ b/src/hapi/pipelines/database/poverty_rate.py @@ -8,13 +8,12 @@ from hdx.api.utilities.hdx_error_handler import HDXErrorHandler from hdx.scraper.framework.utilities.reader import Read from hdx.utilities.dateparse import parse_date -from hdx.utilities.dictandlist import dict_of_lists_add +from hdx.utilities.dictandlist import dict_of_lists_add, invert_dictionary from hdx.utilities.text import get_numeric_if_possible from sqlalchemy.orm import Session from ..utilities.provider_admin_names import get_provider_name from . import admins -from .admins import get_admin1_to_location_connector_code from .base_uploader import BaseUploader from .metadata import Metadata @@ -36,29 +35,6 @@ def __init__( self._configuration = configuration self._error_handler = error_handler - def get_admin1_ref(self, row, dataset_name): - countryiso3 = row["country_code"] - if countryiso3 == "#country+code": # ignore HXL row - return None - admin_code = row["admin1_code"] - if admin_code: - admin_level = "adminone" - else: - admin1_name = row["admin1_name"] - if admin1_name: - admin_level = "adminone" - admin_code = get_admin1_to_location_connector_code(countryiso3) - else: - admin_level = "national" - admin_code = countryiso3 - return self._admins.get_admin1_ref( - admin_level, - admin_code, - dataset_name, - "PovertyRate", - self._error_handler, - ) - def populate(self) -> None: logger.info("Populating poverty rate table") reader = Read.get_reader("hdx") @@ -69,9 +45,9 @@ def populate(self) -> None: null_values_by_iso3 = {} def get_value(row: Dict, in_col: str) -> float: - countryiso3 = row["country_code"] + countryiso3 = row["Country ISO3"] value = row[in_col] - admin_name = row["admin1_name"] + admin_name = row["Admin 1 Name"] if not admin_name: admin_name = countryiso3 if value is None: @@ -84,18 +60,24 @@ def get_value(row: Dict, in_col: str) -> float: resource_id = resource["id"] self._metadata.add_resource(dataset_id, resource) url = resource["url"] - _, rows = reader.get_tabular_rows(url, dict_form=True) - - # country_code,admin1_code,admin1_name,mpi,headcount_ratio,intensity_of_deprivation,vulnerable_to_poverty,in_severe_poverty,reference_period_start,reference_period_end + header, rows = reader.get_tabular_rows(url, dict_form=True) + hxltag_to_header = invert_dictionary(next(rows)) for row in rows: - admin1_ref = self.get_admin1_ref(row, dataset_name) + admin_level = self._admins.get_admin_level_from_row( + hxltag_to_header, row, 1 + ) + admin1_ref = self._admins.get_admin1_ref_from_row( + hxltag_to_header, + row, + dataset_name, + "PovertyRate", + admin_level, + ) if not admin1_ref: continue - provider_admin1_name = get_provider_name(row, "admin1_name") - reference_period_start = parse_date( - row["reference_period_start"] - ) - reference_period_end = parse_date(row["reference_period_end"]) + provider_admin1_name = get_provider_name(row, "Admin 1 Name") + reference_period_start = parse_date(row["Start Date"]) + reference_period_end = parse_date(row["End Date"]) key = ( admin1_ref, provider_admin1_name, @@ -118,15 +100,15 @@ def get_value(row: Dict, in_col: str) -> float: provider_admin1_name=provider_admin1_name, reference_period_start=reference_period_start, reference_period_end=reference_period_end, - mpi=get_value(row, "mpi"), - headcount_ratio=get_value(row, "headcount_ratio"), + mpi=get_value(row, "MPI"), + headcount_ratio=get_value(row, "Headcount Ratio"), intensity_of_deprivation=get_value( - row, "intensity_of_deprivation" + row, "Intensity of Deprivation" ), vulnerable_to_poverty=get_value( - row, "vulnerable_to_poverty" + row, "Vulnerable to Poverty" ), - in_severe_poverty=get_value(row, "in_severe_poverty"), + in_severe_poverty=get_value(row, "In Severe Poverty"), ) self._session.add(row) self._session.commit() diff --git a/tests/fixtures/input/download-global-mpi-trends.csv b/tests/fixtures/input/download-global-mpi-trends.csv index c88b900b..6a19f809 100644 --- a/tests/fixtures/input/download-global-mpi-trends.csv +++ b/tests/fixtures/input/download-global-mpi-trends.csv @@ -1,4 +1,4 @@ -country_code,admin1_code,admin1_name,mpi,headcount_ratio,intensity_of_deprivation,vulnerable_to_poverty,in_severe_poverty,reference_period_start,reference_period_end +Country ISO3,Admin 1 PCode,Admin 1 Name,MPI,Headcount Ratio,Intensity of Deprivation,Vulnerable to Poverty,In Severe Poverty,Start Date,End Date #country+code,#adm1+code,#adm1+name,#indicator+mpi,#indicator+headcount_ratio,#indicator+intensity_of_deprivation,#indicator+vulnerable_to_poverty,#indicator+in_severe_poverty,#date+start,#date+end AFG,,,0.2342396091002832,46.93584855784794,49.90633306897293,27.381337677259033,20.80265720520784,2015-01-01 00:00:00+00:00,2016-12-31 00:00:00+00:00 AFG,,,0.2683302947167732,52.177907473390185,51.42603598153431,26.33666305841249,25.971520762600047,2022-01-01 00:00:00+00:00,2023-12-31 00:00:00+00:00 diff --git a/tests/fixtures/input/download-global-mpi.csv b/tests/fixtures/input/download-global-mpi.csv index 97b96085..fca44bb4 100644 --- a/tests/fixtures/input/download-global-mpi.csv +++ b/tests/fixtures/input/download-global-mpi.csv @@ -1,4 +1,4 @@ -country_code,admin1_code,admin1_name,mpi,headcount_ratio,intensity_of_deprivation,vulnerable_to_poverty,in_severe_poverty,reference_period_start,reference_period_end +Country ISO3,Admin 1 PCode,Admin 1 Name,MPI,Headcount Ratio,Intensity of Deprivation,Vulnerable to Poverty,In Severe Poverty,Start Date,End Date #country+code,#adm1+code,#adm1+name,#indicator+mpi,#indicator+headcount_ratio,#indicator+intensity_of_deprivation,#indicator+vulnerable_to_poverty,#indicator+in_severe_poverty,#date+start,#date+end AFG,,,0.3603053189049837,64.8828500219939,55.53167266586595,19.89607719037991,39.10508275614888,2022-01-01 00:00:00+00:00,2023-12-31 00:00:00+00:00 AFG,AF01,Kabul,0.1268307691999727,27.90875807467631,45.44479150975038,34.61702284538088,7.737068424870571,2022-01-01 00:00:00+00:00,2023-12-31 00:00:00+00:00 diff --git a/tests/test_main.py b/tests/test_main.py index 620d45f7..cb9e4ff7 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -122,33 +122,52 @@ def test_admin(self, configuration, folder, pipelines): count = session.scalar(select(func.count(DBAdmin2.id))) check.equal(count, 32102) admins = pipelines._admins - max_admin_level = admins.get_max_admin_from_headers( + max_admin_level = admins.get_max_admin_from_hxltags( [ - "A", - "B", - "Admin 1 Name", - "c", - "123", - "Admin 3 Name", - "4", - "Admin 2 Name", + "#A", + "#B", + "adm1+name", + "#c", + "#lala5+name", + "#adm3+name", + "#e", + "#adm2+name", ] ) check.equal(max_admin_level, 3) row = {"a": 1, "Country ISO3": "AFG"} - admin_level = admins.get_admin_level_from_row(row, max_admin_level) + hxltag_to_header = { + "#lala": "a", + "#country+code": "Country ISO3", + "#adm1+name": "", + "#adm2+name": "", + "#adm3+name": "", + } + admin_level = admins.get_admin_level_from_row( + hxltag_to_header, row, max_admin_level + ) check.equal(admin_level, 0) + hxltag_to_header["#adm1+name"] = "Admin 1 Name" row = {"a": 1, "Country ISO3": "AFG", "Admin 1 Name": "ABC"} - admin_level = admins.get_admin_level_from_row(row, max_admin_level) + admin_level = admins.get_admin_level_from_row( + hxltag_to_header, row, max_admin_level + ) check.equal(admin_level, 1) + hxltag_to_header["#adm2+name"] = "Admin 2 Name" + hxltag_to_header["#adm3+name"] = "Admin 3 Name" row = { "a": 1, "Country ISO3": "AFG", "Admin 3 Name": "ABC", "Admin 2 Name": "ABC", } - admin_level = admins.get_admin_level_from_row(row, max_admin_level) + admin_level = admins.get_admin_level_from_row( + hxltag_to_header, row, max_admin_level + ) check.equal(admin_level, 3) + hxltag_to_header["#adm1+code"] = "Admin 1 PCode" + hxltag_to_header["#adm2+code"] = "Admin 2 PCode" + hxltag_to_header["#adm3+code"] = "Admin 3 PCode" row = { "a": 1, "Country ISO3": "AFG", @@ -156,34 +175,46 @@ def test_admin(self, configuration, folder, pipelines): "Admin 2 PCode": "", "Admin 3 PCode": "", } - admin2_ref = admins.get_admin2_ref_from_row(row, "Test", "Test", 3) + admin2_ref = admins.get_admin2_ref_from_row( + hxltag_to_header, row, "Test", "Test", 3 + ) check.equal(admin2_ref, None) - admin2_ref = admins.get_admin2_ref_from_row(row, "Test", "Test", 2) + admin2_ref = admins.get_admin2_ref_from_row( + hxltag_to_header, row, "Test", "Test", 2 + ) code = session.scalar( select(DBAdmin2.code).where(DBAdmin2.id == admin2_ref) ) assert code == "AFG-XXX-XXX" row["Admin 1 Name"] = "ABC" - admin2_ref = admins.get_admin2_ref_from_row(row, "Test", "Test", 2) + admin2_ref = admins.get_admin2_ref_from_row( + hxltag_to_header, row, "Test", "Test", 2 + ) code = session.scalar( select(DBAdmin2.code).where(DBAdmin2.id == admin2_ref) ) assert code == "AFG-XXX-XXX" del row["Admin 1 Name"] row["Admin 1 PCode"] = "AF01" - admin2_ref = admins.get_admin2_ref_from_row(row, "Test", "Test", 2) + admin2_ref = admins.get_admin2_ref_from_row( + hxltag_to_header, row, "Test", "Test", 2 + ) code = session.scalar( select(DBAdmin2.code).where(DBAdmin2.id == admin2_ref) ) assert code == "AF01-XXX" row["Admin 1 Name"] = "ABC" - admin2_ref = admins.get_admin2_ref_from_row(row, "Test", "Test", 2) + admin2_ref = admins.get_admin2_ref_from_row( + hxltag_to_header, row, "Test", "Test", 2 + ) code = session.scalar( select(DBAdmin2.code).where(DBAdmin2.id == admin2_ref) ) assert code == "AF01-XXX" row["Admin 2 Name"] = "ABC" - admin2_ref = admins.get_admin2_ref_from_row(row, "Test", "Test", 2) + admin2_ref = admins.get_admin2_ref_from_row( + hxltag_to_header, row, "Test", "Test", 2 + ) code = session.scalar( select(DBAdmin2.code).where(DBAdmin2.id == admin2_ref) ) @@ -191,19 +222,25 @@ def test_admin(self, configuration, folder, pipelines): del row["Admin 1 Name"] del row["Admin 2 Name"] row["Admin 2 PCode"] = "AF0101" - admin2_ref = admins.get_admin2_ref_from_row(row, "Test", "Test", 2) + admin2_ref = admins.get_admin2_ref_from_row( + hxltag_to_header, row, "Test", "Test", 2 + ) code = session.scalar( select(DBAdmin2.code).where(DBAdmin2.id == admin2_ref) ) assert code == "AF0101" row["Admin 1 Name"] = "ABC" - admin2_ref = admins.get_admin2_ref_from_row(row, "Test", "Test", 2) + admin2_ref = admins.get_admin2_ref_from_row( + hxltag_to_header, row, "Test", "Test", 2 + ) code = session.scalar( select(DBAdmin2.code).where(DBAdmin2.id == admin2_ref) ) assert code == "AF0101" row["Admin 2 Name"] = "ABC" - admin2_ref = admins.get_admin2_ref_from_row(row, "Test", "Test", 2) + admin2_ref = admins.get_admin2_ref_from_row( + hxltag_to_header, row, "Test", "Test", 2 + ) code = session.scalar( select(DBAdmin2.code).where(DBAdmin2.id == admin2_ref) )