Skip to content
This repository has been archived by the owner on Nov 15, 2024. It is now read-only.

Commit

Permalink
Merge pull request #7 from nihar1024/main
Browse files Browse the repository at this point in the history
Move SQL functions out of this project and into goat-core for consistency
  • Loading branch information
majkshkurti authored Nov 15, 2024
2 parents 897bee1 + 2c26aa6 commit 020a478
Show file tree
Hide file tree
Showing 20 changed files with 242 additions and 1,364 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pyarrow = "^14.0.1"
celery = "^5.3.6"
redis = "^5.0.1"
tqdm = "^4.66.1"
sentry-sdk = {extras = ["celery", "fastapi"], version = "^2.14.0"}


[tool.poetry.group.dev.dependencies]
Expand Down
15 changes: 10 additions & 5 deletions src/core/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Any, Dict, Optional
from uuid import UUID

from pydantic import BaseSettings, PostgresDsn, validator

Expand All @@ -13,7 +14,9 @@ class SyncPostgresDsn(PostgresDsn):


class Settings(BaseSettings):
DEBUG_MODE: bool = True
# Monitoring
SENTRY_DSN: Optional[str] = None
ENVIRONMENT: Optional[str] = "dev"

CUSTOMER_SCHEMA: str = "customer"
USER_DATA_SCHEMA: str = "user_data"
Expand All @@ -22,13 +25,15 @@ class Settings(BaseSettings):
PROJECT_NAME: Optional[str] = "GOAT Routing API"
CACHE_DIR: str = "/app/src/cache"

STREET_NETWORK_EDGE_DEFAULT_LAYER_PROJECT_ID = 36126
STREET_NETWORK_NODE_DEFAULT_LAYER_PROJECT_ID = 37319

NETWORK_REGION_TABLE = "basic.geofence_active_mobility"

CATCHMENT_AREA_CAR_BUFFER_DEFAULT_SPEED = 80 # km/h
CATCHMENT_AREA_HOLE_THRESHOLD_SQM = 10000 # 100m x 100m
CATCHMENT_AREA_HOLE_THRESHOLD_SQM = 200000 # 20 hectares, ~450m x 450m

BASE_STREET_NETWORK: Optional[UUID] = "903ecdca-b717-48db-bbce-0219e41439cf"
DEFAULT_STREET_NETWORK_NODE_LAYER_PROJECT_ID = (
37319 # Hardcoded until node layers are added to GOAT projects by default
)

DATA_INSERT_BATCH_SIZE = 800

Expand Down
45 changes: 17 additions & 28 deletions src/core/street_network/street_network_cache.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from uuid import UUID

import polars as pl
from polars import DataFrame
Expand All @@ -15,56 +16,50 @@ def __init__(self):

def _get_edge_cache_file_name(
self,
edge_layer_project_id: int,
edge_layer_id: UUID,
h3_short: str,
):
"""Get edge cache file path for the specified H3_3 cell."""

return os.path.join(
settings.CACHE_DIR,
f"{edge_layer_project_id}_{h3_short}_edge.parquet",
f"{str(edge_layer_id)}_{h3_short}_edge.parquet",
)

def _get_node_cache_file_name(
self,
node_layer_project_id: int,
node_layer_id: UUID,
h3_short: str,
):
"""Get node cache file path for the specified H3_3 cell."""

return os.path.join(
settings.CACHE_DIR,
f"{node_layer_project_id}_{h3_short}_node.parquet",
f"{node_layer_id}_{h3_short}_node.parquet",
)

def edge_cache_exists(self, edge_layer_project_id: int, h3_short: str):
def edge_cache_exists(self, edge_layer_id: UUID, h3_short: str):
"""Check if edge data for the specified H3_3 cell is cached."""

edge_cache_file = self._get_edge_cache_file_name(
edge_layer_project_id, h3_short
)
edge_cache_file = self._get_edge_cache_file_name(edge_layer_id, h3_short)
return os.path.exists(edge_cache_file)

def node_cache_exists(self, node_layer_project_id: int, h3_short: str):
def node_cache_exists(self, node_layer_id: UUID, h3_short: str):
"""Check if node data for the specified H3_3 cell is cached."""

node_cache_file = self._get_node_cache_file_name(
node_layer_project_id, h3_short
)
node_cache_file = self._get_node_cache_file_name(node_layer_id, h3_short)
return os.path.exists(node_cache_file)

def read_edge_cache(
self,
edge_layer_project_id: int,
edge_layer_id: UUID,
h3_short: str,
):
"""Read edge data for the specified H3_3 cell from cache."""

edge_df: DataFrame = None

edge_cache_file = self._get_edge_cache_file_name(
edge_layer_project_id, h3_short
)
edge_cache_file = self._get_edge_cache_file_name(edge_layer_id, h3_short)

try:
with open(edge_cache_file, "rb") as file:
Expand All @@ -78,16 +73,14 @@ def read_edge_cache(

def read_node_cache(
self,
node_layer_project_id: int,
node_layer_id: UUID,
h3_short: str,
):
"""Read node data for the specified H3_3 cell from cache."""

node_df: DataFrame = None

node_cache_file = self._get_node_cache_file_name(
node_layer_project_id, h3_short
)
node_cache_file = self._get_node_cache_file_name(node_layer_id, h3_short)

try:
with open(node_cache_file, "rb") as file:
Expand All @@ -101,15 +94,13 @@ def read_node_cache(

def write_edge_cache(
self,
edge_layer_project_id: int,
edge_layer_id: UUID,
h3_short: str,
edge_df: DataFrame,
):
"""Write edge data for the specified H3_3 cell into cache."""

edge_cache_file = self._get_edge_cache_file_name(
edge_layer_project_id, h3_short
)
edge_cache_file = self._get_edge_cache_file_name(edge_layer_id, h3_short)

try:
with open(edge_cache_file, "wb") as file:
Expand All @@ -124,15 +115,13 @@ def write_edge_cache(

def write_node_cache(
self,
node_layer_project_id: int,
node_layer_id: UUID,
h3_short: str,
node_df: DataFrame,
):
"""Write node data for the specified H3_3 cell into cache."""

node_cache_file = self._get_node_cache_file_name(
node_layer_project_id, h3_short
)
node_cache_file = self._get_node_cache_file_name(node_layer_id, h3_short)

try:
with open(node_cache_file, "wb") as file:
Expand Down
102 changes: 36 additions & 66 deletions src/core/street_network/street_network_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,12 @@ class StreetNetworkUtil:
def __init__(self, db_connection: AsyncSession):
self.db_connection = db_connection

async def _get_layer_and_user_id(self, layer_project_id: int):
"""Get the layer ID and user ID of the specified layer project ID."""
async def _get_user_id(self, layer_id: UUID):
"""Get the user ID of the specified layer ID."""

layer_id: UUID = None
user_id: UUID = None

try:
# Get the associated layer ID
result = await self.db_connection.execute(
text(
f"""SELECT layer_id
FROM {settings.CUSTOMER_SCHEMA}.layer_project
WHERE id = {layer_project_id};"""
)
)
layer_id = UUID(str(result.fetchone()[0]))

# Get the user ID of the layer
result = await self.db_connection.execute(
text(
Expand All @@ -41,55 +30,47 @@ async def _get_layer_and_user_id(self, layer_project_id: int):
)
user_id = UUID(str(result.fetchone()[0]))
except Exception:
raise ValueError(
f"Could not fetch layer and user ID for layer project ID {layer_project_id}."
)
raise ValueError(f"Could not fetch user ID for layer ID {layer_id}.")

return layer_id, user_id
return user_id

async def _get_street_network_tables(
self,
street_network_edge_layer_project_id: int,
street_network_node_layer_project_id: int,
edge_layer_id: UUID,
node_layer_id: UUID,
):
"""Get table names and layer IDs of the edge and node tables."""

edge_table: str = None
edge_layer_id: UUID = None
node_table: str = None
node_layer_id: UUID = None

# Get edge table name if a layer project ID is specified
if street_network_edge_layer_project_id:
# Get edge table name if a layer ID is specified
if edge_layer_id:
try:
# Get the edge layer ID and associated user ID
edge_layer_id, user_id = await self._get_layer_and_user_id(
street_network_edge_layer_project_id
)
user_id = await self._get_user_id(edge_layer_id)

# Produce the edge table name
edge_table = f"{settings.USER_DATA_SCHEMA}.street_network_line_{str(user_id).replace('-', '')}"
except Exception:
raise ValueError(
f"Could not fetch edge table name for layer project ID {street_network_edge_layer_project_id}."
f"Could not fetch edge table name for layer ID {edge_layer_id}."
)

# Get node table name if a layer project ID is specified
if street_network_node_layer_project_id:
# Get node table name if a layer ID is specified
if node_layer_id:
try:
# Get the node layer ID and associated user ID
node_layer_id, user_id = await self._get_layer_and_user_id(
street_network_node_layer_project_id
)
user_id = await self._get_user_id(node_layer_id)

# Produce the node table name
node_table = f"{settings.USER_DATA_SCHEMA}.street_network_point_{str(user_id).replace('-', '')}"
except Exception:
raise ValueError(
f"Could not fetch node table name for layer project ID {street_network_node_layer_project_id}."
f"Could not fetch node table name for layer ID {node_layer_id}."
)

return edge_table, edge_layer_id, node_table, node_layer_id
return edge_table, node_table

async def _get_street_network_region_h3_3_cells(self, region_geofence_table: str):
"""Get list of H3_3 cells covering the street network region."""
Expand Down Expand Up @@ -118,8 +99,8 @@ async def _get_street_network_region_h3_3_cells(self, region_geofence_table: str

async def fetch(
self,
edge_layer_project_id: int,
node_layer_project_id: int,
edge_layer_id: UUID,
node_layer_id: UUID,
region_geofence_table: str,
):
"""Fetch street network from specified layer and load into Polars dataframes."""
Expand All @@ -139,28 +120,22 @@ async def fetch(
# Get table names and layer IDs of the edge and node tables
(
street_network_edge_table,
street_network_edge_layer_id,
street_network_node_table,
street_network_node_layer_id,
) = await self._get_street_network_tables(
edge_layer_project_id, node_layer_project_id
)
) = await self._get_street_network_tables(edge_layer_id, node_layer_id)

# Initialize cache
street_network_cache = StreetNetworkCache()

try:
for h3_short in street_network_region_h3_3_cells:
if edge_layer_project_id is not None:
if street_network_cache.edge_cache_exists(
edge_layer_project_id, h3_short
):
if edge_layer_id is not None:
if street_network_cache.edge_cache_exists(edge_layer_id, h3_short):
# Read edge data from cache
edge_df = street_network_cache.read_edge_cache(
edge_layer_project_id, h3_short
edge_layer_id, h3_short
)
else:
if settings.DEBUG_MODE:
if settings.ENVIRONMENT == "dev":
print(
f"Fetching street network edge data for H3_3 cell {h3_short}"
)
Expand All @@ -174,7 +149,7 @@ async def fetch(
maxspeed_backward, source, target, h3_3, h3_6
FROM {street_network_edge_table}
WHERE h3_3 = {h3_short}
AND layer_id = '{str(street_network_edge_layer_id)}'
AND layer_id = '{str(edge_layer_id)}'
""",
uri=settings.POSTGRES_DATABASE_URI,
schema_overrides=SEGMENT_DATA_SCHEMA,
Expand All @@ -185,22 +160,20 @@ async def fetch(

# Write edge data into cache
street_network_cache.write_edge_cache(
edge_layer_project_id, h3_short, edge_df
edge_layer_id, h3_short, edge_df
)
# Update street network edge dictionary and memory usage
street_network_edge[h3_short] = edge_df
street_network_size += edge_df.estimated_size("gb")

if node_layer_project_id is not None:
if street_network_cache.node_cache_exists(
node_layer_project_id, h3_short
):
if node_layer_id is not None:
if street_network_cache.node_cache_exists(node_layer_id, h3_short):
# Read node data from cache
node_df = street_network_cache.read_node_cache(
node_layer_project_id, h3_short
node_layer_id, h3_short
)
else:
if settings.DEBUG_MODE:
if settings.ENVIRONMENT == "dev":
print(
f"Fetching street network node data for H3_3 cell {h3_short}"
)
Expand All @@ -211,15 +184,15 @@ async def fetch(
SELECT node_id AS id, h3_3, h3_6
FROM {street_network_node_table}
WHERE h3_3 = {h3_short}
AND layer_id = '{str(street_network_node_layer_id)}'
AND layer_id = '{str(node_layer_id)}'
""",
uri=settings.POSTGRES_DATABASE_URI,
schema_overrides=CONNECTOR_DATA_SCHEMA,
)

# Write node data into cache
street_network_cache.write_node_cache(
node_layer_project_id, h3_short, node_df
node_layer_id, h3_short, node_df
)

# Update street network node dictionary and memory usage
Expand All @@ -231,23 +204,20 @@ async def fetch(
)

# Raise error if a edge layer project ID is specified but no edge data is fetched
if edge_layer_project_id is not None and len(street_network_edge) == 0:
if edge_layer_id is not None and len(street_network_edge) == 0:
raise RuntimeError(
f"Failed to fetch street network edge data for layer project ID {edge_layer_project_id}."
f"Failed to fetch street network edge data for layer project ID {edge_layer_id}."
)

# Raise error if a node layer project ID is specified but no node data is fetched
if node_layer_project_id is not None and len(street_network_node) == 0:
if node_layer_id is not None and len(street_network_node) == 0:
raise RuntimeError(
f"Failed to fetch street network node data for layer project ID {node_layer_project_id}."
f"Failed to fetch street network node data for layer project ID {node_layer_id}."
)

end_time = time.time()

if settings.DEBUG_MODE:
print(
f"Street network load time: {round((end_time - start_time) / 60, 1)} min"
)
print(f"Street network in-memory size: {round(street_network_size, 1)} GB")
print(f"Street network load time: {round((end_time - start_time) / 60, 1)} min")
print(f"Street network in-memory size: {round(street_network_size, 1)} GB")

return street_network_edge, street_network_node
Loading

0 comments on commit 020a478

Please sign in to comment.