|
| 1 | +"""Fetch rescuer candidates from PostgreSQL within a radius determined by urgency.""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | + |
| 5 | +import math |
| 6 | + |
| 7 | +import asyncpg |
| 8 | +import structlog |
| 9 | + |
| 10 | +from config.settings import get_settings |
| 11 | +from graphs.state import CandidateData, MatchingState |
| 12 | + |
| 13 | +logger = structlog.get_logger(__name__) |
| 14 | + |
| 15 | +# Radius in km per urgency level |
| 16 | +URGENCY_RADIUS: dict[str, float] = { |
| 17 | + "LOW": 15.0, |
| 18 | + "MEDIUM": 25.0, |
| 19 | + "HIGH": 50.0, |
| 20 | + "CRITICAL": 100.0, |
| 21 | +} |
| 22 | + |
| 23 | +# Earth radius in km for Haversine |
| 24 | +_EARTH_RADIUS_KM = 6371.0 |
| 25 | + |
| 26 | + |
| 27 | +def _haversine(lat1: float, lon1: float, lat2: float, lon2: float) -> float: |
| 28 | + """Return distance in km between two lat/lng points using the Haversine formula.""" |
| 29 | + rlat1, rlon1, rlat2, rlon2 = ( |
| 30 | + math.radians(lat1), |
| 31 | + math.radians(lon1), |
| 32 | + math.radians(lat2), |
| 33 | + math.radians(lon2), |
| 34 | + ) |
| 35 | + dlat = rlat2 - rlat1 |
| 36 | + dlon = rlon2 - rlon1 |
| 37 | + a = math.sin(dlat / 2) ** 2 + math.cos(rlat1) * math.cos(rlat2) * math.sin(dlon / 2) ** 2 |
| 38 | + return 2 * _EARTH_RADIUS_KM * math.asin(math.sqrt(a)) |
| 39 | + |
| 40 | + |
| 41 | +async def fetch_candidates(state: MatchingState) -> MatchingState: |
| 42 | + """Query PostgreSQL for users with RESCUER or HELPER role within the urgency-based radius. |
| 43 | +
|
| 44 | + Filters: |
| 45 | + - Active users only |
| 46 | + - Role is RESCUER or HELPER |
| 47 | + - Not currently assigned to an active (non-completed) rescue |
| 48 | + - Within Haversine distance of the alert coordinates |
| 49 | + """ |
| 50 | + settings = get_settings() |
| 51 | + animal = state["animal_info"] |
| 52 | + max_radius = URGENCY_RADIUS.get(animal.urgency, 25.0) |
| 53 | + |
| 54 | + logger.info( |
| 55 | + "fetching_candidates", |
| 56 | + rescue_alert_id=animal.rescue_alert_id, |
| 57 | + urgency=animal.urgency, |
| 58 | + max_radius_km=max_radius, |
| 59 | + ) |
| 60 | + |
| 61 | + try: |
| 62 | + conn: asyncpg.Connection = await asyncpg.connect(settings.database_url) |
| 63 | + except Exception: |
| 64 | + logger.exception("database_connection_failed") |
| 65 | + return {**state, "candidates": [], "total_evaluated": 0, "error": "database_connection_failed"} |
| 66 | + |
| 67 | + try: |
| 68 | + # Fetch active users with RESCUER or HELPER roles who are not on an active rescue. |
| 69 | + # AltruPets schema: roles is a postgres array column on users table, |
| 70 | + # rescue_alerts tracks active rescues (auxiliarId/rescuerId columns). |
| 71 | + query = """ |
| 72 | + SELECT |
| 73 | + u.id, |
| 74 | + COALESCE(u."firstName", u.username) AS name, |
| 75 | + u.latitude, |
| 76 | + u.longitude, |
| 77 | + u.roles |
| 78 | + FROM users u |
| 79 | + WHERE u."isActive" = true |
| 80 | + AND (u.roles && ARRAY['RESCUER', 'HELPER']::varchar[]) |
| 81 | + AND u.latitude IS NOT NULL |
| 82 | + AND u.longitude IS NOT NULL |
| 83 | + AND u.id NOT IN ( |
| 84 | + SELECT COALESCE(ra."auxiliarId", ra."rescuerId") |
| 85 | + FROM rescue_alerts ra |
| 86 | + WHERE ra.status NOT IN ('COMPLETED', 'CANCELLED', 'REJECTED', 'EXPIRED') |
| 87 | + AND (ra."auxiliarId" IS NOT NULL OR ra."rescuerId" IS NOT NULL) |
| 88 | + ) |
| 89 | + """ |
| 90 | + rows = await conn.fetch(query) |
| 91 | + |
| 92 | + candidates: list[CandidateData] = [] |
| 93 | + for row in rows: |
| 94 | + dist = _haversine(animal.latitude, animal.longitude, float(row["latitude"]), float(row["longitude"])) |
| 95 | + if dist <= max_radius: |
| 96 | + candidates.append( |
| 97 | + CandidateData( |
| 98 | + user_id=str(row["id"]), |
| 99 | + name=row["name"] or "Unknown", |
| 100 | + distance_km=round(dist, 2), |
| 101 | + available_capacity=0, # enriched later from casa_cuna data |
| 102 | + roles=list(row["roles"]) if row["roles"] else [], |
| 103 | + ) |
| 104 | + ) |
| 105 | + |
| 106 | + logger.info("candidates_fetched", count=len(candidates), total_rows=len(rows)) |
| 107 | + return {**state, "candidates": candidates, "total_evaluated": len(candidates)} |
| 108 | + |
| 109 | + except Exception: |
| 110 | + logger.exception("fetch_candidates_query_failed") |
| 111 | + return {**state, "candidates": [], "total_evaluated": 0, "error": "fetch_candidates_query_failed"} |
| 112 | + finally: |
| 113 | + await conn.close() |
0 commit comments