Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/routers/organisation_router.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the API Contract with this API endpoint

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure , will do

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChanukaUOJ , and there is an issue with the organisation api contract , some syntax error ,can you check it ?

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from src.dependencies import get_config
from src.models.organisation_schemas import Date
from src.services import OpenGINService, OrganisationService
from typing import Sequence

router = APIRouter(prefix="/v1/organisation", tags=["Organisation"])

Expand Down Expand Up @@ -35,6 +36,14 @@ async def prime_minister(
service_response = await service.fetch_prime_minister(selected_date=body.date)
return service_response

@router.post("/cabinet-flow/{president_id}")
async def cabinet_flow(
president_id: str = Path(..., description="ID of the president"),
dates: Sequence[str] = Body(...),
service: OrganisationService = Depends(get_organisation_service),
):
service_response = await service.fetch_cabinet_flow (president_id=president_id, dates=dates)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an unwanted space between function name and the open parentheses.

return service_response
@router.get('/department-history/{department_id}', summary="Get department history timeline.", description="Returns a timeline of a department including ministry relations and ministers.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put a new line to separate these 2 functions

async def department_history_timeline(
department_id: str = Path(..., description="ID of the department"),
Expand Down
29 changes: 1 addition & 28 deletions src/routers/payload_incoming_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async def write_metadata(writer: WriteAttributes = Depends(get_writer_service)):
else:
return "❌ Could not connect to MongoDB"

return writer.create_parent_categories_and_children_categories_v2(result)
# return writer.create_parent_categories_and_children_categories_v2(result)

@router.get("/data/yearswithdata")
async def yearswithdata(
Expand Down Expand Up @@ -103,16 +103,6 @@ async def get_ministers_and_departments(
result = await statService.get_ministers_and_departments(entityId, activeDate, session)
return result

@router.post("/data/orgchart/sankey/{entityId}")
async def get_sankey_data(
entityId: str,
dates: Sequence[str],
statService: IncomingServiceAttributes = Depends(get_stat_service),
session: ClientSession = Depends(get_http_session),
):
result = await statService.get_sankey_data(session, entityId, dates)
return result

@router.get("/data/orgchart/president/{presidentId}")
async def get_president_tenure(
presidentId: str,
Expand All @@ -122,20 +112,3 @@ async def get_president_tenure(
result = await statService.get_president_tenure(presidentId, session)
return result

# Get the timeline for the orgchart
# @router.get("/data/orgchart/timeline")
# async def get_timeline_for_orgchart(orgchartService: IncomingServiceOrgchart = Depends(get_orgchart_service)):
# documentData = await orgchartService.get_documents()
# presidentData = await orgchartService.get_presidents()
# timeLine = orgchartService.get_timeline(documentData, presidentData)
# return timeLine

# # Get ministries for the selected date
# @router.post("/data/orgchart/ministries")
# async def get__for_orgchart(orgchartService: IncomingServiceOrgchart = Depends(get_orgchart_service)):
# return

# # Get departments for the selected ministry at the selected date
# @router.post("/data/orgchart/departments")
# async def get__for_orgchart(orgchartService: IncomingServiceOrgchart = Depends(get_orgchart_service)):
# return
169 changes: 167 additions & 2 deletions src/services/organisation_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
from aiohttp import ClientSession
from src.utils import http_client
from src.models.organisation_schemas import Entity, Relation
from typing import Optional
import re
from typing import Optional, Sequence
import logging

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -421,6 +420,172 @@ async def fetch_prime_minister(self, selected_date):
except Exception as e:
raise InternalServerError("An unexpected error occurred") from e

# API: cabinet flow fot the given president id and date range of the presidency
async def get_active_ministers(self, entityId, dateActive):

relation = Relation(name=RelationNameEnum.AS_MINISTER.value,activeAt=Util.normalize_timestamp(dateActive),direction=RelationDirectionEnum.OUTGOING.value)

minister_relations = await self.opengin_service.fetch_relation(
entityId=entityId,
relation=relation
)
activeMinisterIds = []

for item in minister_relations:
activeMinisterIds.append(item.relatedEntityId)
return activeMinisterIds

async def get_active_departments(self, entityId, dateActive):

relation = Relation(name=RelationNameEnum.AS_DEPARTMENT.value,activeAt=Util.normalize_timestamp(dateActive),direction=RelationDirectionEnum.OUTGOING.value)

department_relations = await self.opengin_service.fetch_relation(
entityId=entityId,
relation=relation
)
activeDepartments = []

for item in department_relations:
activeDepartments.append({
"ministerId": entityId,
"departmentId": item.relatedEntityId
})
return activeDepartments

async def get_ministers_and_departments(self, president_id: str, selected_date: str):
"""
Get Ministers and Departments

:param president_id: President ID
:param selected_date: Selected Date

output format:
{
"body": {
"id": "",
"name": "",
"isNew": false,
"term": ""
}
}
"""
departments_results = []
try:
# Step 1: Get active ministers (sequential, must happen first)
minister_ids = await self.get_active_ministers(president_id, selected_date)

if len(minister_ids) == 0:
return departments_results

# Step 2: Get departments for each minister in parallel
tasks_for_departments = [
self.get_active_departments(minister_id, selected_date)
for minister_id in minister_ids
]

# Execute all department fetches in parallel
departments_results = await asyncio.gather(*tasks_for_departments, return_exceptions=True)

# Step 3: Flatten the nested list
flattened_results = []
for result in departments_results:
if isinstance(result, list):
flattened_results.extend(result)

return flattened_results

except (BadRequestError, NotFoundError):
raise
except Exception as e:
raise InternalServerError("An unexpected error occurred") from e

async def fetch_cabinet_flow(self, president_id: str, dates: Sequence[str]):
"""
Fetch Cabinet Flow

:param president_id: President ID
:param dates: List of dates

output format:
{to be added}
"""
try:
tasks_for_dates = [
self.get_ministers_and_departments(president_id, date)
for date in dates
]
dates_gov_struct = await asyncio.gather(*tasks_for_dates, return_exceptions=True)

departments_by_ministers = {}
expected_slots = len(dates)
nodes: list[dict[str, str]] = []
node_indices: dict[tuple[str, int], int] = {} # key: (minister_id, date_index), value: node_index
links_counter: dict[tuple[int, int], int] = {}

for date_index, result in enumerate(dates_gov_struct):
if isinstance(result, Exception):
continue

if isinstance(result, dict) and "error" in result:
continue

if not isinstance(result, list):
continue

for relation in result:
if not isinstance(relation, dict):
continue

department_id = relation.get("departmentId")
minister_id = relation.get("ministerId")

if not department_id:
continue

# create nodes dict
node_index = None
if minister_id:
node_key = (minister_id, date_index)
node_index = node_indices.get(node_key)
if node_index is None:
node_index = len(nodes)
node_indices[node_key] = node_index
nodes.append({
"name": minister_id,
"time": dates[date_index]
})

# create departments_by_ministers dict for comparison:
# key is the department
# value is the ministers index in the nodes list (for each date)
timeline = departments_by_ministers.get(department_id) # check if dept already in dict
if timeline is None:
timeline = [None] * expected_slots
departments_by_ministers[department_id] = timeline

# get previous and current minister index the department is under
previous_index = timeline[date_index - 1] if date_index > 0 else None
timeline[date_index] = node_index
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once this timeline variable updated, what happen to the newly updated value and what it does?


# if we not on the first date only then we can create a link between the previous and current minister
if previous_index is not None and node_index is not None:
key = (previous_index, node_index)
links_counter[key] = links_counter.get(key, 0) + 1 # increment the number of departments moved from m1->m2

links = [
{"source": source, "target": target, "value": value}
for (source, target), value in links_counter.items()
]

return {
"nodes": nodes,
"links": links,
}
except (BadRequestError, NotFoundError):
raise
except Exception as e:
raise InternalServerError("An unexpected error occurred") from e

# helper : get renamed lineage for a given entity id using BFS
async def _get_renamed_lineage(self, start_id: str) -> set[str]:
"""BFS to find all related entity IDs via RENAMED_TO relations."""
Expand Down