Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/routers/organisation_router.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the API Contract with this API endpoint

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure , will do

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChanukaUOJ , and there is an issue with the organisation api contract , some syntax error ,can you check it ?

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from src.dependencies import get_config
from src.models.organisation_schemas import Date
from src.services import OpenGINService, OrganisationService
from typing import Sequence

router = APIRouter(prefix="/v1/organisation", tags=["Organisation"])

Expand Down Expand Up @@ -35,6 +36,14 @@ async def prime_minister(
service_response = await service.fetch_prime_minister(selected_date=body.date)
return service_response

@router.post("/cabinet-flow/{president_id}")
async def cabinet_flow(
president_id: str = Path(..., description="ID of the president"),
dates: Sequence[str] = Body(...),
service: OrganisationService = Depends(get_organisation_service),
):
service_response = await service.fetch_cabinet_flow (president_id=president_id, dates=dates)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an unwanted space between function name and the open parentheses.

return service_response
@router.get('/department-history/{department_id}', summary="Get department history timeline.", description="Returns a timeline of a department including ministry relations and ministers.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put a new line to separate these 2 functions

async def department_history_timeline(
department_id: str = Path(..., description="ID of the department"),
Expand Down
29 changes: 1 addition & 28 deletions src/routers/payload_incoming_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async def write_metadata(writer: WriteAttributes = Depends(get_writer_service)):
else:
return "❌ Could not connect to MongoDB"

return writer.create_parent_categories_and_children_categories_v2(result)
# return writer.create_parent_categories_and_children_categories_v2(result)

@router.get("/data/yearswithdata")
async def yearswithdata(
Expand Down Expand Up @@ -103,16 +103,6 @@ async def get_ministers_and_departments(
result = await statService.get_ministers_and_departments(entityId, activeDate, session)
return result

@router.post("/data/orgchart/sankey/{entityId}")
async def get_sankey_data(
entityId: str,
dates: Sequence[str],
statService: IncomingServiceAttributes = Depends(get_stat_service),
session: ClientSession = Depends(get_http_session),
):
result = await statService.get_sankey_data(session, entityId, dates)
return result

@router.get("/data/orgchart/president/{presidentId}")
async def get_president_tenure(
presidentId: str,
Expand All @@ -122,20 +112,3 @@ async def get_president_tenure(
result = await statService.get_president_tenure(presidentId, session)
return result

# Get the timeline for the orgchart
# @router.get("/data/orgchart/timeline")
# async def get_timeline_for_orgchart(orgchartService: IncomingServiceOrgchart = Depends(get_orgchart_service)):
# documentData = await orgchartService.get_documents()
# presidentData = await orgchartService.get_presidents()
# timeLine = orgchartService.get_timeline(documentData, presidentData)
# return timeLine

# # Get ministries for the selected date
# @router.post("/data/orgchart/ministries")
# async def get__for_orgchart(orgchartService: IncomingServiceOrgchart = Depends(get_orgchart_service)):
# return

# # Get departments for the selected ministry at the selected date
# @router.post("/data/orgchart/departments")
# async def get__for_orgchart(orgchartService: IncomingServiceOrgchart = Depends(get_orgchart_service)):
# return
164 changes: 162 additions & 2 deletions src/services/organisation_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
from aiohttp import ClientSession
from src.utils import http_client
from src.models.organisation_schemas import Entity, Relation
from typing import Optional
import re
from typing import Optional, Sequence
import logging

logger = logging.getLogger(__name__)
Expand All @@ -21,6 +20,7 @@ class OrganisationService:
def __init__(self, config: dict, opengin_service):
self.config = config
self.opengin_service = opengin_service
self.department_semaphore = asyncio.Semaphore(10)

@property
def session(self) -> ClientSession:
Expand Down Expand Up @@ -421,6 +421,166 @@ async def fetch_prime_minister(self, selected_date):
except Exception as e:
raise InternalServerError("An unexpected error occurred") from e

# API: cabinet flow fot the given president id and date range of the presidency
async def get_active_ministers(self, entity_id, date_active):

relation = Relation(name=RelationNameEnum.AS_MINISTER.value,activeAt=Util.normalize_timestamp(date_active),direction=RelationDirectionEnum.OUTGOING.value)

minister_relations = await self.opengin_service.fetch_relation(
entityId=entity_id,
relation=relation
)
return [item.relatedEntityId for item in minister_relations]

async def get_active_departments(self, entity_id, date_active):

relation = Relation(name=RelationNameEnum.AS_DEPARTMENT.value,activeAt=Util.normalize_timestamp(date_active),direction=RelationDirectionEnum.OUTGOING.value)

department_relations = await self.opengin_service.fetch_relation(
entityId=entity_id,
relation=relation
)
return [
{
"ministerId": entity_id,
"departmentId": item.relatedEntityId
}
for item in department_relations
]

async def get_ministers_and_departments(self, president_id: str, selected_date: str):

departments_results = []

try:
minister_ids = await self.get_active_ministers(president_id, selected_date)

if not minister_ids:
return departments_results

async def limited_department_fetch(minister_id: str):
async with self.department_semaphore:
return await self.get_active_departments(minister_id, selected_date)

tasks_for_departments = [
limited_department_fetch(minister_id)
for minister_id in minister_ids
]

departments_results = await asyncio.gather(*tasks_for_departments, return_exceptions=True)

return [
item
for sublist in departments_results
if isinstance(sublist, list)
for item in sublist
]

except (BadRequestError, NotFoundError):
raise
except Exception as e:
raise InternalServerError("An unexpected error occurred") from e

async def fetch_cabinet_flow(self, president_id: str, dates: Sequence[str]):
"""
Fetch Cabinet Flow

:param president_id: President ID
:param dates: List of dates

output format:
{
"nodes": [
{"name": "<minister_id>", "time": "<date>"}
],
"links": [
{"source": <node_index>, "target": <node_index>, "value": <count>}
]
}
"""
MAX_DATES = 3
CONCURRENCY_LIMIT = 3
Comment on lines +501 to +502
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already handled in the aiohttp connection using the TCPConnector in lower level. limiting it using Semaphore adds some additional blocking here on top of that for now.


if len(dates) > MAX_DATES:
raise BadRequestError("Too many dates requested, only 3 dates are allowed")

semaphore = asyncio.Semaphore(CONCURRENCY_LIMIT)
Comment on lines +501 to +507
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this concurrency limiting is already handled by TCTConnector in aiohttp.


async def limited_get(date):
async with semaphore:
return await self.get_ministers_and_departments(president_id, date)

try:
tasks_for_dates = [limited_get(date) for date in dates]
dates_gov_struct = await asyncio.gather(*tasks_for_dates, return_exceptions=True)
Comment on lines +514 to +515
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this endpoint takes around 13s-16s to fetch the data for 2-3 given dates which is a little bit high latency. To reduce this I suggest to fetch all the active ministries for each dates at once and make tasks to fetch the active departments for each ministry. Try to reduce the latency here


departments_by_ministers = {}
expected_slots = len(dates)
nodes: list[dict[str, str]] = []
node_indices: dict[tuple[str, int], int] = {} # key: (minister_id, date_index), value: node_index
links_counter: dict[tuple[int, int], int] = {}

for date_index, result in enumerate(dates_gov_struct):
if isinstance(result, Exception):
continue

if not isinstance(result, list):
continue

for relation in result:
if not isinstance(relation, dict):
continue

department_id = relation.get("departmentId")
minister_id = relation.get("ministerId")

if not department_id:
continue

# create nodes dict
node_index = None
if minister_id:
node_key = (minister_id, date_index)
node_index = node_indices.get(node_key)
if node_index is None:
node_index = len(nodes)
node_indices[node_key] = node_index
nodes.append({
"name": minister_id,
"time": dates[date_index]
})

# create departments_by_ministers dict for comparison:
# key is the department
# value is the ministers index in the nodes list (for each date)
timeline = departments_by_ministers.get(department_id) # check if dept already in dict
if timeline is None:
timeline = [None] * expected_slots
departments_by_ministers[department_id] = timeline

# get previous and current minister index the department is under
previous_index = timeline[date_index - 1] if date_index > 0 else None
timeline[date_index] = node_index
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once this timeline variable updated, what happen to the newly updated value and what it does?


# if we not on the first date only then we can create a link between the previous and current minister
if previous_index is not None and node_index is not None:
key = (previous_index, node_index)
links_counter[key] = links_counter.get(key, 0) + 1 # increment the number of departments moved from m1->m2

links = [
{"source": source, "target": target, "value": value}
for (source, target), value in links_counter.items()
]

return {
"nodes": nodes,
"links": links,
}
except (BadRequestError, NotFoundError):
raise
except Exception as e:
raise InternalServerError("An unexpected error occurred") from e

# helper : get renamed lineage for a given entity id using BFS
async def _get_renamed_lineage(self, start_id: str) -> set[str]:
"""BFS to find all related entity IDs via RENAMED_TO relations."""
Expand Down