From cfc1f6e3dd944a045d145c9629ff340fd0109551 Mon Sep 17 00:00:00 2001 From: Nihar Thakkar Date: Tue, 25 Jun 2024 11:52:20 +0000 Subject: [PATCH 1/8] Ensure catchment area polygon is valid before writing to database --- src/crud/crud_catchment_area.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/crud/crud_catchment_area.py b/src/crud/crud_catchment_area.py index 0cf12ca..b77f568 100644 --- a/src/crud/crud_catchment_area.py +++ b/src/crud/crud_catchment_area.py @@ -495,7 +495,7 @@ async def save_result(self, obj_in, shapes, network, grid_index, grid): FROM isochrones_filled ) INSERT INTO {obj_in.result_table} (layer_id, geom, integer_attr1) - SELECT '{obj_in.layer_id}', COALESCE(j.geom, a.geom) AS geom, a.MINUTE + SELECT '{obj_in.layer_id}', ST_MakeValid(COALESCE(j.geom, a.geom)) AS geom, a.MINUTE FROM isochrones_with_id a LEFT JOIN LATERAL ( From 55f05ce4eac6caafb18f1e1e03b58a879f36608f Mon Sep 17 00:00:00 2001 From: Nihar Thakkar Date: Wed, 24 Jul 2024 09:37:42 +0000 Subject: [PATCH 2/8] Adapt DB session to configure datatypes correctly --- src/crud/crud_catchment_area.py | 49 ++++++++++++++++------- src/db/session.py | 70 ++++++++++++++++++++++++++++++++- 2 files changed, 103 insertions(+), 16 deletions(-) diff --git a/src/crud/crud_catchment_area.py b/src/crud/crud_catchment_area.py index b77f568..97274ec 100644 --- a/src/crud/crud_catchment_area.py +++ b/src/crud/crud_catchment_area.py @@ -7,6 +7,7 @@ import numpy as np import polars as pl from redis import Redis +from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from tqdm import tqdm @@ -42,13 +43,15 @@ async def fetch(self): # Get network H3 cells h3_3_grid = [] try: - sql_get_h3_3_grid = f""" + sql_get_h3_3_grid = text( + f""" WITH region AS ( SELECT ST_Union(geom) AS geom FROM {settings.NETWORK_REGION_TABLE} ) SELECT g.h3_short FROM region r, LATERAL basic.fill_polygon_h3_3(r.geom) g; """ + ) result = (await self.db_connection.execute(sql_get_h3_3_grid)).fetchall() for h3_index in result: h3_3_grid.append(h3_index[0]) @@ -143,7 +146,8 @@ async def read_network( h3_3_cells = set() h3_6_cells = set() - sql_get_relevant_cells = f""" + sql_get_relevant_cells = text( + f""" WITH point AS ( SELECT geom FROM temporal.\"{input_table}\" LIMIT {num_points} ), @@ -160,9 +164,10 @@ async def read_network( FROM cells GROUP BY h3_3; """ - for h3_3_cell in ( - await self.db_connection.execute(sql_get_relevant_cells) - ).fetchall(): + ) + result = (await self.db_connection.execute(sql_get_relevant_cells)).fetchall() + + for h3_3_cell in result: h3_3_cells.add(h3_3_cell[0]) for h3_6_cell in h3_3_cell[1]: h3_6_cells.add(h3_6_cell) @@ -191,7 +196,8 @@ async def read_network( origin_point_cell_index = [] origin_point_h3_3 = [] segments_to_discard = [] - sql_get_artificial_segments = f""" + sql_get_artificial_segments = text( + f""" SELECT point_id, old_id, @@ -207,6 +213,7 @@ async def read_network( 10 ); """ + ) result = ( await self.db_connection.execute(sql_get_artificial_segments) ).fetchall() # TODO Check if artificial segments are even required for car routing @@ -305,12 +312,14 @@ async def create_input_table(self, obj_in): # Create temporary table for storing catchment area starting points await self.db_connection.execute( - f""" + text( + f""" CREATE TABLE temporal.\"{table_name}\" ( id serial PRIMARY KEY, geom geometry(Point, 4326) ); """ + ) ) # Insert catchment area starting points into the temporary table @@ -322,10 +331,12 @@ async def create_input_table(self, obj_in): f"(ST_SetSRID(ST_MakePoint({longitude}, {latitude}), 4326))," ) await self.db_connection.execute( - f""" + text( + f""" INSERT INTO temporal.\"{table_name}\" (geom) VALUES {insert_string.rstrip(",")}; """ + ) ) await self.db_connection.commit() @@ -335,7 +346,7 @@ async def create_input_table(self, obj_in): async def delete_input_table(self, table_name: str): """Delete the input table after reading the relevant sub-network.""" - await self.db_connection.execute(f'DROP TABLE temporal."{table_name}";') + await self.db_connection.execute(text(f'DROP TABLE temporal."{table_name}";')) await self.db_connection.commit() def compute_segment_cost(self, sub_network, mode, speed): @@ -435,7 +446,8 @@ async def get_h3_10_grid(self, db_connection, obj_in, origin_h3_10: str): buffer_dist = obj_in.travel_cost.max_distance # Fetch H3_10 cell grid relevant to the catchment area calculation - sql_get_relevant_cells = f""" + sql_get_relevant_cells = text( + f""" WITH cells AS ( SELECT DISTINCT(h3_grid_disk(sub.origin_h3_index, radius.value)) AS h3_index FROM (SELECT UNNEST(ARRAY{origin_h3_10}::h3index[]) AS origin_h3_index) sub, @@ -448,6 +460,7 @@ async def get_h3_10_grid(self, db_connection, obj_in, origin_h3_10: str): FROM h3_cell_to_lat_lng(h3_index) AS point ) sub; """ + ) result = (await db_connection.execute(sql_get_relevant_cells)).fetchall() h3_index = [] @@ -474,7 +487,8 @@ async def save_result(self, obj_in, shapes, network, grid_index, grid): insert_string += f"SELECT ST_SetSRID(ST_GeomFromText('{geom}'), 4326) AS geom, {minute} AS minute UNION ALL " insert_string, _, _ = insert_string.rpartition(" UNION ALL ") - sql_insert_into_table = f""" + sql_insert_into_table = text( + f""" WITH isochrones AS ( SELECT p."minute", (ST_DUMP(p.geom)).geom @@ -505,6 +519,7 @@ async def save_result(self, obj_in, shapes, network, grid_index, grid): ) j ON {"TRUE" if obj_in.polygon_difference else "FALSE"} ORDER BY a.MINUTE DESC; """ + ) await self.db_connection.execute(sql_insert_into_table) await self.db_connection.commit() @@ -524,10 +539,12 @@ async def save_result(self, obj_in, shapes, network, grid_index, grid): {cost} ),""" if i % batch_size == 0 or i == (len(network["features"]) - 1): - insert_string = f""" + insert_string = text( + f""" INSERT INTO {obj_in.result_table} (layer_id, geom, float_attr1) VALUES {insert_string.rstrip(",")}; """ + ) await self.db_connection.execute(insert_string) await self.db_connection.commit() insert_string = "" @@ -544,18 +561,22 @@ async def save_result(self, obj_in, shapes, network, grid_index, grid): {grid[i]} ),""" if i % batch_size == 0 or i == (len(grid_index) - 1): - insert_string = f""" + insert_string = text( + f""" INSERT INTO {obj_in.result_table} (layer_id, text_attr1, integer_attr1) VALUES {insert_string.rstrip(",")}; """ + ) await self.db_connection.execute(insert_string) await self.db_connection.commit() insert_string = "" - sql_update_geom = f""" + sql_update_geom = text( + f""" UPDATE {obj_in.result_table} SET geom = ST_SetSRID(h3_cell_to_boundary(text_attr1::h3index)::geometry, 4326) WHERE layer_id = '{obj_in.layer_id}'; """ + ) await self.db_connection.execute(sql_update_geom) await self.db_connection.commit() diff --git a/src/db/session.py b/src/db/session.py index e7e4fc9..7c21b63 100644 --- a/src/db/session.py +++ b/src/db/session.py @@ -1,4 +1,5 @@ -from sqlalchemy.engine import create_engine +import asyncpg +from sqlalchemy import event from sqlalchemy.ext.asyncio import ( AsyncSession, create_async_engine, @@ -7,10 +8,74 @@ from src.core.config import settings + +async def set_type_codec( + conn, + typenames, + encode=lambda a: a, + decode=lambda a: a, + schema="pg_catalog", + format="text", +): + conn._check_open() + for typename in typenames: + typeinfo = await conn.fetchrow( + asyncpg.introspection.TYPE_BY_NAME, typename, schema + ) + if not typeinfo: + raise ValueError(f"unknown type: {schema}.{typename}") + + oid = typeinfo["oid"] + conn._protocol.get_settings().add_python_codec( + oid, typename, schema, "scalar", encode, decode, format + ) + + # Statement cache is no longer valid due to codec changes. + conn._drop_local_statement_cache() + + +async def setup(conn): + # Register geometry type + await conn.set_type_codec( + "geometry", encoder=str, decoder=str, schema="public", format="text" + ) + + # Register integer array type + await set_type_codec( + conn, + ["_int4"], + encode=lambda a: "{" + + ",".join(map(str, a)) + + "}", # Convert list to PostgreSQL array literal + decode=lambda a: ( + list(map(int, a.strip("{}").split(","))) if a else [] + ), # Convert PostgreSQL array literal to list + schema="pg_catalog", + format="text", + ) + + # Register UUID array type + await set_type_codec( + conn, + ["_uuid"], + encode=lambda a: "{" + ",".join(a) + "}", # Directly join UUID strings + decode=lambda a: ( + a.strip("{}").split(",") if a else [] + ), # Split string into UUID strings + schema="pg_catalog", + format="text", + ) + + +def register_event_listeners(async_engine): + @event.listens_for(async_engine.sync_engine, "connect") + def register_custom_types(dbapi_connection, connection_record): + dbapi_connection.run_async(setup) + + async_engine = create_async_engine( settings.ASYNC_SQLALCHEMY_DATABASE_URI, pool_pre_ping=True ) -sync_engine = create_engine(settings.SQLALCHEMY_DATABASE_URI, future=False) async_session = sessionmaker( bind=async_engine, @@ -19,3 +84,4 @@ autoflush=False, expire_on_commit=False, ) +register_event_listeners(async_engine) From 8e3d9cf533ce1294fafb982e2ecc72cebf6c510e Mon Sep 17 00:00:00 2001 From: Nihar Thakkar Date: Mon, 29 Jul 2024 07:49:54 +0000 Subject: [PATCH 3/8] Implement support for routing with street network scenarios --- src/core/config.py | 3 + src/crud/crud_catchment_area.py | 93 ++- src/crud/crud_catchment_area_sync.py | 5 +- src/db/functions/create_intersection_line.sql | 10 + src/db/functions/extend_line.sql | 55 ++ src/db/functions/get_artificial_segments.sql | 50 +- .../functions/get_network_modifications.sql | 644 ++++++++++++++++++ src/db/functions/split_by_drawn_lines.sql | 37 + src/schemas/catchment_area.py | 2 +- 9 files changed, 869 insertions(+), 30 deletions(-) create mode 100644 src/db/functions/create_intersection_line.sql create mode 100644 src/db/functions/extend_line.sql create mode 100644 src/db/functions/get_network_modifications.sql create mode 100644 src/db/functions/split_by_drawn_lines.sql diff --git a/src/core/config.py b/src/core/config.py index 79bb4a8..b6edf0a 100644 --- a/src/core/config.py +++ b/src/core/config.py @@ -17,6 +17,9 @@ class Settings(BaseSettings): PROJECT_NAME: Optional[str] = "GOAT Routing API" CACHE_DIR: str = "/app/src/cache" + STREET_NETWORK_EDGE_DEFAULT_LAYER_PROJECT_ID = 36126 + STREET_NETWORK_NODE_DEFAULT_LAYER_PROJECT_ID = 37319 + NETWORK_REGION_TABLE = "basic.geofence_active_mobility" CATCHMENT_AREA_CAR_BUFFER_DEFAULT_SPEED = 80 # km/h diff --git a/src/crud/crud_catchment_area.py b/src/crud/crud_catchment_area.py index 97274ec..e872a6f 100644 --- a/src/crud/crud_catchment_area.py +++ b/src/crud/crud_catchment_area.py @@ -40,6 +40,38 @@ async def fetch(self): start_time = time.time() + # Get edge network table + edge_network_table = None + try: + layer_id = str( + ( + await self.db_connection.execute( + text( + f"""SELECT layer_id + FROM customer.layer_project + WHERE id = {settings.STREET_NETWORK_EDGE_DEFAULT_LAYER_PROJECT_ID};""" + ) + ) + ).fetchall()[0][0] + ) + user_id = str( + ( + await self.db_connection.execute( + text( + f"""SELECT user_id + FROM customer.layer + WHERE id = '{layer_id}';""" + ) + ) + ).fetchall()[0][0] + ) + edge_network_table = ( + f"user_data.street_network_line_{user_id.replace('-', '')}" + ) + except Exception as e: + print(e) + return + # Get network H3 cells h3_3_grid = [] try: @@ -79,10 +111,10 @@ async def fetch(self): segments_df[h3_index] = pl.read_database_uri( query=f""" SELECT - id, length_m, length_3857, class_, impedance_slope, impedance_slope_reverse, + edge_id AS id, length_m, length_3857, class_, impedance_slope, impedance_slope_reverse, impedance_surface, CAST(coordinates_3857 AS TEXT) AS coordinates_3857, maxspeed_forward, maxspeed_backward, source, target, h3_3, h3_6 - FROM basic.segment + FROM {edge_network_table} WHERE h3_3 = {h3_index} """, uri=settings.POSTGRES_DATABASE_URI, @@ -207,6 +239,7 @@ async def read_network( maxspeed_forward, maxspeed_backward, source, target, h3_3, h3_6, point_cell_index, point_h3_3 FROM basic.get_artificial_segments( + {settings.STREET_NETWORK_EDGE_DEFAULT_LAYER_PROJECT_ID}, '{input_table}', {num_points}, '{",".join(valid_segment_classes)}', @@ -253,10 +286,60 @@ async def read_network( "Starting point(s) are disconnected from the street network." ) - # Remove segments which are now replaced by artificial segments - sub_network = sub_network.filter(~pl.col("id").is_in(segments_to_discard)) + # Process network modifications according to the specified scenario + scenario_id = ( + f"'{obj_in.scenario_id}'" if obj_in.scenario_id is not None else "NULL" + ) + sql_get_network_modifications = text( + f""" + SELECT r_edit_type, r_id, r_class_, r_source, r_target, + r_length_m, r_length_3857, CAST(r_coordinates_3857 AS TEXT) AS coordinates_3857, + r_impedance_slope, r_impedance_slope_reverse, r_impedance_surface, r_maxspeed_forward, + r_maxspeed_backward, r_h3_6, r_h3_3 + FROM basic.get_network_modifications( + {scenario_id}, + {settings.STREET_NETWORK_EDGE_DEFAULT_LAYER_PROJECT_ID}, + {settings.STREET_NETWORK_NODE_DEFAULT_LAYER_PROJECT_ID} + ); + """ + ) + result = ( + await self.db_connection.execute(sql_get_network_modifications) + ).fetchall() + + for modification in result: + if modification[0] == "d": + segments_to_discard.append(modification[1]) + continue - # TODO: We need to read the scenario network dynamically from DB if a scenario is selected. + new_segment = pl.DataFrame( + [ + { + "id": modification[1], + "length_m": modification[5], + "length_3857": modification[6], + "class_": modification[2], + "impedance_slope": modification[8], + "impedance_slope_reverse": modification[9], + "impedance_surface": modification[10], + "coordinates_3857": modification[7], + "maxspeed_forward": modification[11], + "maxspeed_backward": modification[12], + "source": modification[3], + "target": modification[4], + "h3_3": modification[13], + "h3_6": modification[14], + } + ], + schema_overrides=SEGMENT_DATA_SCHEMA, + ) + new_segment = new_segment.with_columns( + pl.col("coordinates_3857").str.json_extract() + ) + sub_network.extend(new_segment) + + # Remove segments which are replaced by artificial segments or modified due to the scenario + sub_network = sub_network.filter(~pl.col("id").is_in(segments_to_discard)) # Replace all NULL values in the impedance columns with 0 sub_network = sub_network.with_columns(pl.col("impedance_surface").fill_null(0)) diff --git a/src/crud/crud_catchment_area_sync.py b/src/crud/crud_catchment_area_sync.py index f841809..11077b3 100644 --- a/src/crud/crud_catchment_area_sync.py +++ b/src/crud/crud_catchment_area_sync.py @@ -72,10 +72,10 @@ def fetch(self): segments_df[h3_index] = pl.read_database_uri( query=f""" SELECT - id, length_m, length_3857, class_, impedance_slope, impedance_slope_reverse, + edge_id AS id, length_m, length_3857, class_, impedance_slope, impedance_slope_reverse, impedance_surface, CAST(coordinates_3857 AS TEXT) AS coordinates_3857, maxspeed_forward, maxspeed_backward, source, target, h3_3, h3_6 - FROM basic.segment + FROM user_data.street_network_line_b6ddb594bfed4a8788b214af45378d75 WHERE h3_3 = {h3_index} """, uri=settings.POSTGRES_DATABASE_URI, @@ -196,6 +196,7 @@ def read_network( maxspeed_forward, maxspeed_backward, source, target, h3_3, h3_6, point_cell_index, point_h3_3 FROM basic.get_artificial_segments( + {settings.STREET_NETWORK_EDGE_DEFAULT_LAYER_PROJECT_ID}, '{input_table}', {num_points}, '{",".join(valid_segment_classes)}', diff --git a/src/db/functions/create_intersection_line.sql b/src/db/functions/create_intersection_line.sql new file mode 100644 index 0000000..507b3a6 --- /dev/null +++ b/src/db/functions/create_intersection_line.sql @@ -0,0 +1,10 @@ +CREATE OR REPLACE FUNCTION basic.create_intersection_line(point_geom geometry, length_line double precision) + RETURNS SETOF geometry + LANGUAGE sql + IMMUTABLE +AS $function$ + SELECT ST_SETSRID(ST_MAKELINE( + ST_Translate(point_geom, length_line, length_line), + ST_Translate(point_geom, -length_line, -length_line) + ), 4326) +$function$; \ No newline at end of file diff --git a/src/db/functions/extend_line.sql b/src/db/functions/extend_line.sql new file mode 100644 index 0000000..067d234 --- /dev/null +++ b/src/db/functions/extend_line.sql @@ -0,0 +1,55 @@ +CREATE OR REPLACE FUNCTION basic.extend_line(geom geometry, extend_distance NUMERIC, point_to_extend text) +RETURNS geometry +LANGUAGE plpgsql +AS $function$ +DECLARE + start_geom geometry; + end_geom geometry; + azimuth_A float; + azimuth_B float; + length_A NUMERIC; + length_B NUMERIC; + newpoint_A geometry; + newpoint_B geometry; + new_line geometry; +BEGIN + + -- get the points A and B given a line L + start_geom = ST_STARTPOINT(geom); + end_geom = ST_ENDPOINT(geom); + + -- Start line section + azimuth_A = ST_AZIMUTH(ST_POINTN(geom,2),start_geom); + + -- End line section + azimuth_B = ST_AZIMUTH(ST_POINTN(geom,-2),end_geom); + + -- get the length of the line A --> B + length_A = ST_DISTANCE(ST_STARTPOINT(geom),ST_POINTN(geom,2)); + length_B = ST_DISTANCE(ST_ENDPOINT(geom),ST_POINTN(geom,-2)); + + newpoint_A = ST_TRANSLATE(start_geom, sin(azimuth_A) * extend_distance, cos(azimuth_A) * extend_distance); + newpoint_B = ST_TRANSLATE(end_geom, sin(azimuth_B) * extend_distance, cos(azimuth_B) * extend_distance); + + IF point_to_extend = 'start' THEN + new_line = st_addpoint(geom, newpoint_a, 0); + ELSEIF point_to_extend = 'end' THEN + new_line = st_addpoint(geom, newpoint_b, -1); + ELSEIF point_to_extend = 'both' THEN + new_line = st_addpoint(st_addpoint(geom,newpoint_B), newpoint_A, 0); + ELSE + RAISE EXCEPTION 'Please specify a valid point_to_extend type.'; + END IF; + + If new_line IS NULL THEN + RAISE NOTICE 'The new line is NULL. Please check the input parameters.'; + new_line = geom; + END IF; + + RETURN new_line; +END +$function$ +/*point_to_extend = 'start', 'end', 'both'*/ +--1 meter in Germany approx. 0.0000127048 +--SELECT basic.extend_line(geom, 0.0127048, 'both') +--FROM customer.way_modified WHERE id = 112; diff --git a/src/db/functions/get_artificial_segments.sql b/src/db/functions/get_artificial_segments.sql index 9698648..3128ff3 100644 --- a/src/db/functions/get_artificial_segments.sql +++ b/src/db/functions/get_artificial_segments.sql @@ -1,48 +1,54 @@ DROP TYPE IF EXISTS basic.origin_segment; CREATE TYPE basic.origin_segment AS ( - id int, class_ text, impedance_slope float8, impedance_slope_reverse float8, - impedance_surface float8, maxspeed_forward int, maxspeed_backward int, source int, - target int, geom geometry, h3_3 int2, h3_6 int4, fraction float[], fraction_geom geometry[], - point_id int2[], point_geom geometry[] + id INT, class_ TEXT, impedance_slope FLOAT8, impedance_slope_reverse FLOAT8, + impedance_surface FLOAT8, maxspeed_forward INT, maxspeed_backward INT, source INT, + target INT, geom GEOMETRY, h3_3 INT2, h3_6 INT, fraction FLOAT[], fraction_geom GEOMETRY[], + point_id INT2[], point_geom GEOMETRY[] ); DROP TYPE IF EXISTS basic.artificial_segment CASCADE; CREATE TYPE basic.artificial_segment AS ( - point_id int2, point_geom geometry, point_cell_index h3index, point_h3_3 int, old_id int, - id int, length_m float, length_3857 float, class_ text, impedance_slope float8, - impedance_slope_reverse float8, impedance_surface float8, coordinates_3857 jsonb, - maxspeed_forward int, maxspeed_backward int, source int, target int, geom geometry, - h3_3 int2, h3_6 int4 + point_id INT2, point_geom GEOMETRY, point_cell_index H3INDEX, point_h3_3 INT, old_id INT, + id INT, length_m FLOAT, length_3857 FLOAT, class_ TEXT, impedance_slope FLOAT8, + impedance_slope_reverse FLOAT8, impedance_surface FLOAT8, coordinates_3857 JSONB, + maxspeed_forward INT, maxspeed_backward INT, source INT, target INT, geom GEOMETRY, + h3_3 INT2, h3_6 INT ); DROP FUNCTION IF EXISTS basic.get_artificial_segments; CREATE OR REPLACE FUNCTION basic.get_artificial_segments( - input_table text, - num_points integer, - classes text, - point_cell_resolution int + edge_layer_project_id INT, + input_table TEXT, + num_points INT, + classes TEXT, + point_cell_resolution INT ) RETURNS SETOF basic.artificial_segment LANGUAGE plpgsql AS $function$ DECLARE + edge_layer_id UUID := (SELECT layer_id FROM customer.layer_project WHERE id = edge_layer_project_id); + edge_network_table TEXT := 'user_data.street_network_line_' || REPLACE(( + SELECT user_id FROM customer.layer WHERE id = edge_layer_id + )::TEXT, '-', ''); + custom_cursor REFCURSOR; origin_segment basic.origin_segment; artificial_segment basic.artificial_segment; -- Increment everytime a new artificial segment is created - artificial_seg_index int = 1000000000; -- Defaults to 1 billion + artificial_seg_index INT = 1000000000; -- Defaults to 1 billion -- Increment everytime a new artificial connector/node is created - artificial_con_index int = 1000000000; -- Defaults to 1 billion + artificial_con_index INT = 1000000000; -- Defaults to 1 billion -- Increment everytime a new artificial origin node is created (for isochrone starting points) - artifical_origin_index int = 2000000000; -- Defaults to 2 billion + artifical_origin_index INT = 2000000000; -- Defaults to 2 billion - fraction float; - new_geom geometry; + fraction FLOAT; + new_geom GEOMETRY; BEGIN OPEN custom_cursor FOR EXECUTE @@ -50,7 +56,7 @@ BEGIN 'WITH origin AS ( SELECT id, geom, - ST_SETSRID(ST_Buffer(geom::geography, 100)::geometry, 4326) AS buffer_geom, + ST_SETSRID(ST_Buffer(geom::geography, 100)::GEOMETRY, 4326) AS buffer_geom, basic.to_short_h3_3(h3_lat_lng_to_cell(ST_Centroid(geom)::point, 3)::bigint) AS h3_3 FROM temporal."%s" LIMIT %s::int @@ -58,12 +64,12 @@ BEGIN best_segment AS ( SELECT DISTINCT ON (o.id) o.id AS point_id, o.geom AS point_geom, o.buffer_geom AS point_buffer, - s.id, s.class_, s.impedance_slope, s.impedance_slope_reverse, + s.edge_id AS id, s.class_, s.impedance_slope, s.impedance_slope_reverse, s.impedance_surface, s.maxspeed_forward, s.maxspeed_backward, s."source", s.target, s.geom, s.h3_3, s.h3_6, ST_LineLocatePoint(s.geom, o.geom) AS fraction, ST_ClosestPoint(s.geom, o.geom) AS fraction_geom - FROM basic.segment s, origin o + FROM %s s, origin o WHERE s.h3_3 = o.h3_3 AND s.class_ = ANY(string_to_array(''%s'', '','')) @@ -84,7 +90,7 @@ BEGIN bs.id, bs.class_, bs.impedance_slope, bs.impedance_slope_reverse, bs.impedance_surface, bs.maxspeed_forward, bs.maxspeed_backward, bs."source", bs.target, bs.geom, bs.h3_3, bs.h3_6;' - , input_table, num_points, classes); + , input_table, num_points, edge_network_table, classes); LOOP FETCH custom_cursor INTO origin_segment; diff --git a/src/db/functions/get_network_modifications.sql b/src/db/functions/get_network_modifications.sql new file mode 100644 index 0000000..e5c7ce7 --- /dev/null +++ b/src/db/functions/get_network_modifications.sql @@ -0,0 +1,644 @@ +DROP FUNCTION IF EXISTS basic.get_network_modifications; +CREATE OR REPLACE FUNCTION basic.get_network_modifications( + scenario_id_input UUID, + edge_layer_project_id INTEGER, + node_layer_project_id INTEGER +) +RETURNS TABLE ( + r_edit_type TEXT, + r_id INTEGER, + r_class_ TEXT, + r_source INTEGER, + r_target INTEGER, + r_length_m FLOAT8, + r_length_3857 FLOAT8, + r_coordinates_3857 JSONB, + r_impedance_slope FLOAT8, + r_impedance_slope_reverse FLOAT8, + r_impedance_surface FLOAT8, + r_maxspeed_forward INTEGER, + r_maxspeed_backward INTEGER, + r_geom GEOMETRY(LINESTRING, 4326), + r_h3_6 INTEGER, + r_h3_3 INTEGER +) +LANGUAGE plpgsql +AS $function$ +DECLARE + edge_layer_id UUID := (SELECT layer_id FROM customer.layer_project WHERE id = edge_layer_project_id); + edge_network_table TEXT := 'user_data.street_network_line_' || REPLACE(( + SELECT user_id FROM customer.layer WHERE id = edge_layer_id + )::TEXT, '-', ''); + node_layer_id UUID := (SELECT layer_id FROM customer.layer_project WHERE id = node_layer_project_id); + node_network_table TEXT := 'user_data.street_network_point_' || REPLACE(( + SELECT user_id FROM customer.layer WHERE id = node_layer_id + )::TEXT, '-', ''); + + cnt integer; + rec record; + + max_node_id integer; + max_node_id_increment integer := 1; + max_edge_id integer; + max_edge_id_increment integer := 1; +BEGIN + --------------------------------------------------------------------------------------------------------------------- + --Proceed only if the scenario contains features which apply to the specified edge layer + --------------------------------------------------------------------------------------------------------------------- + + IF NOT EXISTS ( + SELECT 1 + FROM customer.scenario_scenario_feature ssf + INNER JOIN customer.scenario_feature sf ON sf.id = ssf.scenario_feature_id + WHERE ssf.scenario_id = scenario_id_input + AND sf.layer_project_id = edge_layer_project_id + ) THEN + RETURN; + END IF; + + --------------------------------------------------------------------------------------------------------------------- + --Prepare Table + --------------------------------------------------------------------------------------------------------------------- + + EXECUTE FORMAT('SELECT max(node_id) FROM %s;', node_network_table) INTO max_node_id; + EXECUTE FORMAT('SELECT max(edge_id) FROM %s;', edge_network_table) INTO max_edge_id; + + DROP TABLE IF EXISTS modified_attributes_only; + EXECUTE FORMAT(' + CREATE TEMP TABLE modified_attributes_only AS + SELECT w.*, e.edge_id AS original_id + FROM %s e, ( + SELECT sf.* + FROM customer.scenario_scenario_feature ssf + INNER JOIN customer.scenario_feature sf ON sf.id = ssf.scenario_feature_id + WHERE ssf.scenario_id = %L + AND sf.layer_project_id = %s + ) w + WHERE e.h3_3 = w.h3_3 + AND e.id = w.feature_id + AND ST_ASTEXT(ST_ReducePrecision(w.geom,0.00001)) = ST_ASTEXT(ST_ReducePrecision(e.geom,0.00001)) + AND edit_type = ''m''; + ', edge_network_table, scenario_id_input, edge_layer_project_id); + CREATE INDEX ON modified_attributes_only USING GIST(geom); + + DROP TABLE IF EXISTS drawn_features; + EXECUTE FORMAT(' + CREATE TEMP TABLE drawn_features AS + WITH scenario_features AS ( + SELECT sf.*, ssf.scenario_id + FROM customer.scenario_scenario_feature ssf + INNER JOIN customer.scenario_feature sf ON sf.id = ssf.scenario_feature_id + WHERE ssf.scenario_id = %L + AND sf.layer_project_id = %s + AND sf.edit_type IN (''n'', ''m'') + ) + SELECT w.id, ST_RemoveRepeatedPoints(w.geom) AS geom, ''road'' AS way_type, + e.class_, e.impedance_surface, e.maxspeed_forward, e.maxspeed_backward, + w.scenario_id, e.edge_id AS original_id, e.h3_6, e.h3_3 + FROM %s e, + scenario_features w + WHERE w.feature_id IS NOT NULL + AND e.h3_3 = w.h3_3 + AND e.id = w.feature_id + UNION ALL + SELECT w.id, ST_RemoveRepeatedPoints(w.geom) AS geom, ''road'' AS way_type, + ''tertiary'' AS class_, NULL AS impedance_surface, 50 AS maxspeed_forward, + 50 AS maxspeed_backward, w.scenario_id, NULL AS original_id, NULL AS h3_6, + NULL AS h3_3 + FROM scenario_features w + WHERE w.feature_id IS NULL; + ', scenario_id_input, edge_layer_project_id, edge_network_table); + CREATE INDEX ON drawn_features USING GIST(geom); + + --------------------------------------------------------------------------------------------------------------------- + --Snap start and end points + --------------------------------------------------------------------------------------------------------------------- + /*Round start and end point for snapping*/ + DROP TABLE IF EXISTS snapped_drawn_features; + CREATE TEMP TABLE snapped_drawn_features AS + WITH start_end_point AS + ( + SELECT d.id AS did, st_startpoint(d.geom) geom, 's' AS point_type, FALSE AS snapped, NULL::integer AS node_id + FROM drawn_features d + UNION ALL + SELECT d.id AS did, st_endpoint(d.geom) geom, 'e' AS point_type, FALSE AS snapped, NULL AS node_id + FROM drawn_features d + ), + clusters AS + ( + SELECT did, geom, ST_ClusterDBSCAN(geom, eps := 0.00001, minpoints := 1) OVER() AS cid, point_type + FROM start_end_point + ), + grouped AS + ( + SELECT ARRAY_AGG(did) AS did, ST_CENTROID(ST_COLLECT(geom)) AS geom, ARRAY_AGG(point_type)::text[] AS point_types + FROM clusters + GROUP BY cid + ) + SELECT UNNEST(did) AS did, geom, UNNEST(point_types) AS point_type, FALSE AS snapped, + NULL::integer AS node_id, ARRAY_LENGTH(point_types, 1) AS group_size, + basic.to_short_h3_3(h3_lat_lng_to_cell(geom::point, 3)::bigint) AS h3_3 + FROM grouped; + + ALTER TABLE snapped_drawn_features ADD COLUMN id serial; + CREATE INDEX ON snapped_drawn_features USING GIST(geom); + CREATE INDEX ON snapped_drawn_features (id); + + /*Snapping to existing Nodes*/ + DROP TABLE IF EXISTS snapped_to_node; + EXECUTE FORMAT(' + CREATE TEMP TABLE snapped_to_node AS + SELECT r.id, r.did, r.point_type, r.geom original_geom, + s.geom node_geom, s.node_id, s.h3_3 + FROM snapped_drawn_features r + CROSS JOIN LATERAL + ( + SELECT n.node_id, n.geom, n.h3_3 + FROM %s n + WHERE ST_Intersects(ST_BUFFER(r.geom,0.00001), n.geom) + ORDER BY r.geom <-> n.geom + LIMIT 1 + ) s; + CREATE INDEX ON snapped_to_node USING GIST(node_geom); + + UPDATE snapped_drawn_features d + SET geom = node_geom, snapped = TRUE, node_id = s.node_id, h3_3 = s.h3_3 + FROM snapped_to_node s + WHERE s.did = d.did + AND d.point_type = s.point_type; + ', node_network_table); + + /*Snapping to existing edges*/ + DROP TABLE IF EXISTS snapped_to_edge; + EXECUTE FORMAT(' + CREATE TEMP TABLE snapped_to_edge AS + WITH closest_points AS ( + SELECT + r.id, r.did, r.point_type, r.geom AS original_geom, ST_CLOSESTPOINT(n.geom, r.geom) AS closest_point_edge, + basic.to_short_h3_3(h3_lat_lng_to_cell(ST_CLOSESTPOINT(n.geom, r.geom)::point, 3)::bigint) AS h3_3, + ROW_NUMBER() OVER (PARTITION BY r.geom ORDER BY r.geom <-> ST_CLOSESTPOINT(n.geom, r.geom)) AS rnk + FROM snapped_drawn_features r + JOIN %s n + ON n.h3_3 = r.h3_3 + WHERE ST_Intersects(ST_BUFFER(r.geom, 0.00001), n.geom) + AND r.snapped = False + ) + SELECT id, did, point_type, original_geom, closest_point_edge, h3_3 + FROM closest_points + WHERE rnk = 1; + ', edge_network_table); + + /*Update based on snapped to new*/ + UPDATE snapped_drawn_features d + SET geom = closest_point_edge, snapped = True, h3_3 = s.h3_3 + FROM snapped_to_edge s + WHERE s.did = d.did + AND d.point_type = s.point_type; + + /*Snapping to each other*/ + DROP TABLE IF EXISTS snapped_to_each_other; + CREATE TEMP TABLE snapped_to_each_other AS + SELECT r.id, r.did, r.point_type, r.geom original_geom, s.geom closest_point_edge + FROM snapped_drawn_features r + CROSS JOIN LATERAL + ( + SELECT n.id, ST_CLOSESTPOINT(n.geom, r.geom) AS geom + FROM drawn_features n + WHERE ST_Intersects(ST_BUFFER(r.geom,0.00001), n.geom) + AND r.did <> n.id + ORDER BY r.geom <-> ST_CLOSESTPOINT(n.geom, r.geom) + LIMIT 1 + ) s + WHERE r.snapped = FALSE + AND r.group_size = 1 + UNION ALL + SELECT s.id, s.did, s.point_type, s.geom, s.geom + FROM snapped_drawn_features s + WHERE s.group_size > 1; + + /*Update based on snapped to each other*/ + UPDATE snapped_drawn_features d + SET geom = closest_point_edge, snapped = True + FROM snapped_to_each_other s + WHERE s.did = d.did + AND d.point_type = s.point_type; + + /*Update drawn features*/ + UPDATE drawn_features d + SET geom = st_setpoint(d.geom, 0, s.geom) + FROM snapped_drawn_features s + WHERE d.id = s.did + AND s.snapped = TRUE + AND s.point_type = 's'; + + UPDATE drawn_features d + SET geom = st_setpoint(d.geom, -1, s.geom) + FROM snapped_drawn_features s + WHERE d.id = s.did + AND s.snapped = TRUE + AND s.point_type = 'e'; + + UPDATE drawn_features d + SET geom = st_setpoint(d.geom, 0, s.geom) + FROM snapped_drawn_features s + WHERE s.snapped = FALSE + AND s.point_type = 's' + AND d.id = s.did; + + UPDATE drawn_features d + SET geom = st_setpoint(d.geom, -1, s.geom) + FROM snapped_drawn_features s + WHERE s.snapped = FALSE + AND s.point_type = 'e' + AND d.id = s.did; + + --------------------------------------------------------------------------------------------------------------------- + --Cut network + --------------------------------------------------------------------------------------------------------------------- + + /*Extend lines to cut network*/ + DROP TABLE IF EXISTS extended_lines; + CREATE TEMP TABLE extended_lines AS + WITH agg_snapped_nodes AS + ( + SELECT d.id, ARRAY_AGG(point_type) AS point_type + FROM snapped_to_node s, drawn_features d + WHERE d.id = s.did + GROUP BY d.id + ) + SELECT CASE WHEN ARRAY['e', 's'] && point_type THEN d.geom + WHEN ARRAY['s'] = point_type THEN basic.extend_line(d.geom, 0.00001, 'end') + WHEN ARRAY['e'] = point_type THEN basic.extend_line(d.geom, 0.00001, 'start') + END AS geom, d.original_id + FROM agg_snapped_nodes a, drawn_features d + WHERE a.id = d.id + AND (d.way_type = 'road' OR d.way_type IS NULL) + UNION ALL + SELECT basic.extend_line(d.geom, 0.00001, 'both'), d.original_id + FROM drawn_features d + LEFT JOIN snapped_to_node s + ON d.id = s.did + WHERE s.id IS NULL + AND (d.way_type = 'road' OR d.way_type IS NULL); + + /*Intersects drawn bridges*/ + DROP TABLE IF EXISTS start_end_bridges; + CREATE TEMP TABLE start_end_bridges AS + SELECT st_startpoint(geom) AS geom, + basic.to_short_h3_3(h3_lat_lng_to_cell(st_startpoint(geom)::point, 3)::bigint) AS h3_3 + FROM drawn_features + WHERE way_type = 'bridge' + UNION + SELECT ST_endpoint(geom) AS geom, + basic.to_short_h3_3(h3_lat_lng_to_cell(st_endpoint(geom)::point, 3)::bigint) AS h3_3 + FROM drawn_features + WHERE way_type = 'bridge'; + CREATE INDEX ON start_end_bridges USING GIST(geom); + + /*Intersect drawn ways with existing ways*/ + DROP TABLE IF EXISTS intersection_existing_network; + EXECUTE FORMAT(' + CREATE TEMP TABLE intersection_existing_network AS + WITH intersection_result AS + ( + SELECT (ST_DUMP(ST_Intersection(d.geom, w.geom))).geom AS geom, w.edge_id AS id + FROM extended_lines d, %s w + WHERE ST_Intersects(ST_BUFFER(d.geom, 0.00001), w.geom) + ) + SELECT i.* + FROM intersection_result i + LEFT JOIN extended_lines e + ON i.id = e.original_id + WHERE e.original_id IS NULL + AND st_geometrytype(i.geom) = ''ST_Point''; + ', edge_network_table); + + EXECUTE FORMAT(' + INSERT INTO intersection_existing_network (geom) + WITH closest_points AS ( + SELECT + %L AS scenario_id, + ST_CLOSESTPOINT(w.geom, s.geom) AS closest_point, + ST_LineLocatePoint(w.geom, s.geom) AS fraction, + s.geom AS s_geom, + ROW_NUMBER() OVER (PARTITION BY s.geom ORDER BY ST_CLOSESTPOINT(w.geom, s.geom) <-> s.geom) AS rnk + FROM start_end_bridges s + JOIN %s w + ON w.h3_3 = s.h3_3 + WHERE ST_Intersects(ST_Buffer(s.geom, 0.00001), w.geom) + ) + SELECT closest_point + FROM closest_points + WHERE rnk = 1 + AND NOT EXISTS ( + SELECT 1 + FROM intersection_existing_network i + WHERE ST_Intersects(ST_Buffer(closest_points.closest_point, 0.00001), i.geom) + ); + ', scenario_id_input, edge_network_table); + + DROP TABLE IF EXISTS distinct_intersection_existing_network; + CREATE TABLE distinct_intersection_existing_network AS + SELECT DISTINCT geom + FROM intersection_existing_network i; + + CREATE INDEX ON distinct_intersection_existing_network USING GIST(geom); + ALTER TABLE distinct_intersection_existing_network ADD COLUMN id serial; + ALTER TABLE distinct_intersection_existing_network ADD PRIMARY key(id); + + /*Filter out snapped start or end point*/ + DELETE FROM intersection_existing_network h + USING + ( + SELECT h.geom + FROM snapped_to_node n, distinct_intersection_existing_network h + WHERE ST_Intersects(ST_BUFFER(n.node_geom,0.00001), h.geom) + ) d + WHERE h.geom = d.geom; + + DROP TABLE IF EXISTS split_drawn_features; + /*Split network with itself*/ + SELECT count(*) + INTO cnt + FROM drawn_features + WHERE (way_type IS NULL OR way_type <> 'bridge') + LIMIT 2; + + IF cnt <= 1 THEN + CREATE TEMP TABLE split_drawn_features as + SELECT id as did, geom, class_, impedance_surface, maxspeed_forward, + maxspeed_backward, way_type, scenario_id, original_id, h3_6, h3_3 + FROM drawn_features; + ELSE + CREATE TEMP TABLE split_drawn_features AS + SELECT id AS did, basic.split_by_drawn_lines(id, geom) AS geom, class_, + impedance_surface, maxspeed_forward, maxspeed_backward, way_type, + scenario_id, original_id, h3_6, h3_3 + FROM drawn_features x + WHERE (way_type IS NULL OR way_type <> 'bridge') + UNION ALL + SELECT id AS did, geom, class_, impedance_surface, maxspeed_forward, + maxspeed_backward, way_type, scenario_id, original_id, h3_6, h3_3 + FROM drawn_features + WHERE way_type = 'bridge'; + END IF; + CREATE INDEX ON split_drawn_features USING GIST(geom); + + /*Create perpendicular lines to split new network*/ + DROP TABLE IF EXISTS perpendicular_split_lines; + CREATE TEMP TABLE perpendicular_split_lines AS + SELECT basic.create_intersection_line(i.geom, 0.000001) AS geom + FROM intersection_existing_network i; + + DROP TABLE IF EXISTS union_perpendicular_split_lines; + CREATE TEMP TABLE union_perpendicular_split_lines AS + SELECT ST_Union(geom) AS geom + FROM perpendicular_split_lines p; + + /*Split new network with existing network*/ + DROP TABLE IF EXISTS new_network; + CREATE TEMP TABLE new_network AS + SELECT d.did, (dp.geom).geom, class_, impedance_surface, maxspeed_forward, + maxspeed_backward, way_type, scenario_id, original_id, 'geom' AS edit_type, + h3_6, h3_3 + FROM split_drawn_features d, union_perpendicular_split_lines w, + LATERAL (SELECT ST_DUMP(ST_CollectionExtract(ST_SPLIT(d.geom,w.geom),2)) AS geom) dp + WHERE (d.way_type IS NULL OR d.way_type <> 'bridge'); + CREATE INDEX ON new_network USING GIST(geom); + + /*Delete extended part*/ + DELETE FROM new_network + WHERE st_length(geom) < 0.0000011; + + /*Inject drawn bridges*/ + INSERT INTO new_network(did, geom, way_type, scenario_id, original_id, edit_type) + SELECT id, geom, way_type, scenario_id, original_id, 'geom' + FROM drawn_features + WHERE way_type = 'bridge'; + + ALTER TABLE new_network ADD COLUMN id serial; + ALTER TABLE new_network ADD COLUMN source integer; + ALTER TABLE new_network ADD COLUMN target integer; + + --------------------------------------------------------------------------------------------------------------------- + --Prepare source and target + --------------------------------------------------------------------------------------------------------------------- + /*Existing network is split using perpendicular lines*/ + DROP TABLE IF EXISTS existing_network; + EXECUTE FORMAT(' + CREATE TEMP TABLE existing_network as + SELECT w.edge_id AS original_id, (dp.geom).geom, w.source, w.target, + w.class_, w.impedance_slope, w.impedance_slope_reverse, w.impedance_surface, + w.maxspeed_forward, w.maxspeed_backward, w.h3_6, w.h3_3 + FROM %s w, union_perpendicular_split_lines p, + LATERAL (SELECT ST_DUMP(ST_CollectionExtract(ST_SPLIT(w.geom,p.geom),2)) AS geom) dp + WHERE ST_Intersects(w.geom, p.geom) + AND w.edge_id NOT IN (SELECT original_id FROM modified_attributes_only); + ALTER TABLE existing_network ADD COLUMN id serial; + ALTER TABLE existing_network ADD PRIMARY KEY(id); + CREATE INDEX ON existing_network USING GIST(geom); + ', edge_network_table); + + /*Assign vertices that where snapped to new features*/ + UPDATE new_network n + SET SOURCE = s.node_id + FROM snapped_drawn_features s + WHERE n.did = s.did + AND s.node_id IS NOT NULL + AND s.point_type = 's' + AND ST_ASTEXT(st_startpoint(n.geom)) = ST_ASTEXT(s.geom); + + UPDATE new_network n + SET target = s.node_id + FROM snapped_drawn_features s + WHERE n.did = s.did + AND s.node_id IS NOT NULL + AND ST_ASTEXT(st_endpoint(n.geom)) = ST_ASTEXT(s.geom); + + /*Create new vertices*/ + DROP TABLE IF EXISTS loop_vertices; + CREATE TEMP TABLE loop_vertices AS + WITH start_end_point AS + ( + SELECT e.id, st_startpoint(geom) geom, 's' AS point_type + FROM new_network e + WHERE SOURCE IS NULL + UNION ALL + SELECT e.id, st_endpoint(geom) geom, 'e' AS point_type + FROM new_network e + WHERE target IS NULL + ), + clusters AS + ( + SELECT s.id, s.geom, ST_ClusterDBSCAN(geom, eps := 0.000001, minpoints := 1) OVER() AS cid, point_type + FROM start_end_point s + ), + grouped AS + ( + SELECT ST_CENTROID(ST_COLLECT(geom)) AS geom, ARRAY_AGG(point_type)::text[] AS point_types, ARRAY_AGG(id)::integer[] new_network_ids + FROM clusters c + GROUP BY cid + ) + SELECT geom, point_types, new_network_ids + FROM grouped; + + DROP TABLE IF EXISTS new_vertices; + CREATE TEMP TABLE new_vertices + ( + node_id integer, + new_network_ids integer[], + point_types text[], + geom geometry + ); + /* + DO $$ + DECLARE + rec record; + max_id integer; + BEGIN*/ + FOR rec IN SELECT * FROM loop_vertices v + LOOP + INSERT INTO new_vertices(node_id, new_network_ids, point_types, geom) + SELECT max_node_id + max_node_id_increment AS id, rec.new_network_ids, + rec.point_types, rec.geom; + max_node_id_increment := max_node_id_increment + 1; + END LOOP; + /*END $$;*/ + CREATE INDEX ON new_vertices USING GIST(geom); + + WITH unnest_to_update AS + ( + SELECT v.node_id, UNNEST(v.new_network_ids) new_network_id, UNNEST(v.point_types) point_type + FROM new_vertices v + ) + UPDATE new_network n + SET SOURCE = u.node_id + FROM unnest_to_update u + WHERE n.id = u.new_network_id + AND point_type = 's'; + + WITH unnest_to_update AS + ( + SELECT v.node_id, UNNEST(v.new_network_ids) new_network_id, UNNEST(v.point_types) point_type + FROM new_vertices v + ) + UPDATE new_network n + SET target = u.node_id + FROM unnest_to_update u + WHERE n.id = u.new_network_id + AND point_type = 'e'; + + DROP TABLE IF EXISTS new_source_target_existing; + CREATE TEMP TABLE new_source_target_existing AS + WITH start_and_end AS + ( + SELECT e.id, st_startpoint(geom) geom, 's' AS point_type + FROM existing_network e + UNION ALL + SELECT e.id, st_endpoint(geom) geom, 'e' AS point_type + FROM existing_network e + ) + SELECT v.id, point_type, c.node_id, v.geom + FROM start_and_end v + CROSS JOIN LATERAL + ( + SELECT n.node_id + FROM new_vertices n + WHERE ST_Intersects(ST_BUFFER(v.geom, 0.00001), n.geom) + ORDER BY n.geom <-> v.geom + LIMIT 1 + ) c; + + UPDATE existing_network e + SET SOURCE = n.node_id + FROM new_source_target_existing n + WHERE e.id = n.id + AND n.point_type = 's'; + + UPDATE existing_network e + SET target = n.node_id + FROM new_source_target_existing n + WHERE e.id = n.id + AND n.point_type = 'e'; + + ---------------------------------------------------------------------------------------------------------------------- + --PRODUCE FINAL LIST OF NETWORK MODIFICATIONS (SIMPLIFIED INTO ADDITIONS AND DELETIONS) + ---------------------------------------------------------------------------------------------------------------------- + + DROP TABLE IF EXISTS network_modifications; + CREATE TEMP TABLE network_modifications ( + edit_type TEXT, + id INTEGER, + class_ TEXT, + source INTEGER, + target INTEGER, + length_m FLOAT8, + length_3857 FLOAT8, + coordinates_3857 JSONB, + impedance_slope FLOAT8, + impedance_slope_reverse FLOAT8, + impedance_surface FLOAT8, + maxspeed_forward INTEGER, + maxspeed_backward INTEGER, + geom GEOMETRY(LINESTRING, 4326), + h3_6 INTEGER, + h3_3 INTEGER + ); + + -- Edges to be explicitly deleted according to the scenario + EXECUTE FORMAT(' + INSERT INTO network_modifications (edit_type, id, h3_6, h3_3) + SELECT ''d'' AS edit_type, e.edge_id AS id, e.h3_6, e.h3_3 + FROM %s e, ( + SELECT sf.* + FROM customer.scenario_scenario_feature ssf + INNER JOIN customer.scenario_feature sf ON sf.id = ssf.scenario_feature_id + WHERE ssf.scenario_id = %L + AND sf.layer_project_id = %s + AND sf.edit_type = ''d'' + ) w + WHERE e.h3_3 = w.h3_3 + AND e.id = w.feature_id; + ', edge_network_table, scenario_id_input, edge_layer_project_id); + + -- Existing edges to be deleted due to a modified or new edge replacing it + INSERT INTO network_modifications (edit_type, id, h3_6, h3_3) + SELECT 'd' AS edit_type, original_id AS id, h3_6, h3_3 + FROM existing_network + UNION + SELECT 'd' AS edit_type, original_id AS id, h3_6, h3_3 + FROM new_network + WHERE original_id IS NOT NULL; + + -- Modified edges to be added to the network + FOR rec IN SELECT * FROM existing_network + LOOP + INSERT INTO network_modifications + SELECT 'n', max_edge_id + max_edge_id_increment AS id, rec.class_, rec.source, rec.target, + ST_Length(rec.geom::geography) AS length_m, ST_Length(ST_Transform(rec.geom, 3857)) AS length_3857, + ST_AsGeoJSON(ST_Transform(rec.geom, 3857))::JSONB->'coordinates' AS coordinates_3857, + rec.impedance_slope, rec.impedance_slope_reverse, rec.impedance_surface, + rec.maxspeed_forward, rec.maxspeed_backward, rec.geom, rec.h3_6, rec.h3_3; + max_edge_id_increment := max_edge_id_increment + 1; + END LOOP; + + -- New edges to be added to the network + -- TODO: Compute slope impedances + FOR rec IN SELECT * FROM new_network + LOOP + INSERT INTO network_modifications + SELECT 'n', max_edge_id + max_edge_id_increment AS id, rec.class_, rec.source, rec.target, + ST_Length(rec.geom::geography) AS length_m, ST_Length(ST_Transform(rec.geom, 3857)) AS length_3857, + ST_AsGeoJSON(ST_Transform(rec.geom, 3857))::JSONB->'coordinates' AS coordinates_3857, + NULL AS impedance_slope, NULL AS impedance_slope_reverse, rec.impedance_surface, + rec.maxspeed_forward, rec.maxspeed_backward, rec.geom, + basic.to_short_h3_6(h3_lat_lng_to_cell(ST_Centroid(rec.geom)::point, 6)::bigint) AS h3_6, + basic.to_short_h3_3(h3_lat_lng_to_cell(ST_Centroid(rec.geom)::point, 3)::bigint) AS h3_3; + max_edge_id_increment := max_edge_id_increment + 1; + END LOOP; + + RETURN QUERY + SELECT * FROM network_modifications; + +END +$function$; diff --git a/src/db/functions/split_by_drawn_lines.sql b/src/db/functions/split_by_drawn_lines.sql new file mode 100644 index 0000000..fc45a2c --- /dev/null +++ b/src/db/functions/split_by_drawn_lines.sql @@ -0,0 +1,37 @@ +CREATE OR REPLACE FUNCTION basic.split_by_drawn_lines(id_input UUID, input_geom geometry) + RETURNS SETOF geometry + LANGUAGE plpgsql + AS $function$ + DECLARE + union_geom geometry; + does_intersect boolean := FALSE; + BEGIN + + does_intersect = ( + SELECT TRUE + FROM drawn_features d + WHERE ST_Intersects(basic.extend_line(d.geom, 0.00001, 'both'), + (SELECT geom FROM drawn_features WHERE id = id_input)) + LIMIT 1 + ); + + IF does_intersect = TRUE THEN + union_geom = + ( + SELECT ST_UNION(basic.extend_line(geom, 0.00001, 'both')) AS geom + FROM drawn_features + WHERE id <> id_input + AND ST_Intersects(input_geom, basic.extend_line(geom, 0.00001, 'both')) + AND (way_type IS NULL OR way_type <> 'bridge') + ); + END IF; + + IF union_geom IS NOT NULL THEN + RETURN query + SELECT (dump).geom + FROM (SELECT ST_DUMP(ST_CollectionExtract(ST_SPLIT(input_geom, union_geom),2)) AS dump) d; + ELSE + RETURN query SELECT input_geom; + END IF; + END +$function$; diff --git a/src/schemas/catchment_area.py b/src/schemas/catchment_area.py index 790d7bf..8c55656 100644 --- a/src/schemas/catchment_area.py +++ b/src/schemas/catchment_area.py @@ -249,7 +249,7 @@ class ICatchmentAreaActiveMobility(BaseModel): scenario_id: UUID | None = Field( None, title="Scenario ID", - description="The ID of the scenario that is used for the routing.", + description="The ID of the scenario that is to be applied on the base network.", ) catchment_area_type: CatchmentAreaType = Field( ..., From bc36f3d5ea4384aa9c04046bd9bebbea30d608ac Mon Sep 17 00:00:00 2001 From: Nihar Thakkar Date: Mon, 5 Aug 2024 05:09:59 +0000 Subject: [PATCH 4/8] Adapt artificial segments function to use modified network according to the specified scenario --- src/crud/crud_catchment_area.py | 165 +++++++++++------- src/db/functions/get_artificial_segments.sql | 42 +++-- .../functions/get_network_modifications.sql | 106 ++++++----- 3 files changed, 179 insertions(+), 134 deletions(-) diff --git a/src/crud/crud_catchment_area.py b/src/crud/crud_catchment_area.py index 62f40e9..e5891ea 100644 --- a/src/crud/crud_catchment_area.py +++ b/src/crud/crud_catchment_area.py @@ -181,7 +181,7 @@ async def read_network( sql_get_relevant_cells = text( f""" WITH point AS ( - SELECT geom FROM temporal.\"{input_table}\" LIMIT {num_points} + SELECT geom FROM "{input_table}" LIMIT {num_points} ), buffer AS ( SELECT ST_Buffer(point.geom::geography, {buffer_dist})::geometry AS geom @@ -223,6 +223,74 @@ async def read_network( else: sub_network = sub_df + # Produce all network modifications required to apply the specified scenario + network_modifications_table = f"temporal.{str(uuid.uuid4()).replace('-', '_')}" + print(network_modifications_table) + scenario_id = ( + f"'{obj_in.scenario_id}'" if obj_in.scenario_id is not None else "NULL" + ) + await self.db_connection.execute( + text( + f""" + SELECT basic.produce_network_modifications( + {scenario_id}, + {settings.STREET_NETWORK_EDGE_DEFAULT_LAYER_PROJECT_ID}, + {settings.STREET_NETWORK_NODE_DEFAULT_LAYER_PROJECT_ID}, + '{network_modifications_table}' + ); + """ + ) + ) + + # Apply network modifications to the sub-network + segments_to_discard = [] + sql_get_network_modifications = text( + f""" + SELECT edit_type, id, class_, source, target, + length_m, length_3857, CAST(coordinates_3857 AS TEXT) AS coordinates_3857, + impedance_slope, impedance_slope_reverse, impedance_surface, maxspeed_forward, + maxspeed_backward, h3_6, h3_3 + FROM "{network_modifications_table}"; + """ + ) + result = ( + await self.db_connection.execute(sql_get_network_modifications) + ).fetchall() + + for modification in result: + if modification[0] == "d": + segments_to_discard.append(modification[1]) + continue + + new_segment = pl.DataFrame( + [ + { + "id": modification[1], + "length_m": modification[5], + "length_3857": modification[6], + "class_": modification[2], + "impedance_slope": modification[8], + "impedance_slope_reverse": modification[9], + "impedance_surface": modification[10], + "coordinates_3857": modification[7], + "maxspeed_forward": modification[11], + "maxspeed_backward": modification[12], + "source": modification[3], + "target": modification[4], + "h3_3": modification[13], + "h3_6": modification[14], + } + ], + schema_overrides=SEGMENT_DATA_SCHEMA, + ) + new_segment = new_segment.with_columns( + pl.col("coordinates_3857").str.json_extract() + ) + sub_network.extend(new_segment) + + # Remove segments which are deleted or modified due to the scenario + sub_network = sub_network.filter(~pl.col("id").is_in(segments_to_discard)) + # Create necessary artifical segments and add them to our sub network origin_point_connectors = [] origin_point_cell_index = [] @@ -240,6 +308,7 @@ async def read_network( h3_3, h3_6, point_cell_index, point_h3_3 FROM basic.get_artificial_segments( {settings.STREET_NETWORK_EDGE_DEFAULT_LAYER_PROJECT_ID}, + '{network_modifications_table}', '{input_table}', {num_points}, '{",".join(valid_segment_classes)}', @@ -286,58 +355,6 @@ async def read_network( "Starting point(s) are disconnected from the street network." ) - # Process network modifications according to the specified scenario - scenario_id = ( - f"'{obj_in.scenario_id}'" if obj_in.scenario_id is not None else "NULL" - ) - sql_get_network_modifications = text( - f""" - SELECT r_edit_type, r_id, r_class_, r_source, r_target, - r_length_m, r_length_3857, CAST(r_coordinates_3857 AS TEXT) AS coordinates_3857, - r_impedance_slope, r_impedance_slope_reverse, r_impedance_surface, r_maxspeed_forward, - r_maxspeed_backward, r_h3_6, r_h3_3 - FROM basic.get_network_modifications( - {scenario_id}, - {settings.STREET_NETWORK_EDGE_DEFAULT_LAYER_PROJECT_ID}, - {settings.STREET_NETWORK_NODE_DEFAULT_LAYER_PROJECT_ID} - ); - """ - ) - result = ( - await self.db_connection.execute(sql_get_network_modifications) - ).fetchall() - - for modification in result: - if modification[0] == "d": - segments_to_discard.append(modification[1]) - continue - - new_segment = pl.DataFrame( - [ - { - "id": modification[1], - "length_m": modification[5], - "length_3857": modification[6], - "class_": modification[2], - "impedance_slope": modification[8], - "impedance_slope_reverse": modification[9], - "impedance_surface": modification[10], - "coordinates_3857": modification[7], - "maxspeed_forward": modification[11], - "maxspeed_backward": modification[12], - "source": modification[3], - "target": modification[4], - "h3_3": modification[13], - "h3_6": modification[14], - } - ], - schema_overrides=SEGMENT_DATA_SCHEMA, - ) - new_segment = new_segment.with_columns( - pl.col("coordinates_3857").str.json_extract() - ) - sub_network.extend(new_segment) - # Remove segments which are replaced by artificial segments or modified due to the scenario sub_network = sub_network.filter(~pl.col("id").is_in(segments_to_discard)) @@ -382,6 +399,7 @@ async def read_network( return ( sub_network, + network_modifications_table, origin_point_connectors, origin_point_cell_index, origin_point_h3_3, @@ -391,13 +409,13 @@ async def create_input_table(self, obj_in): """Create the input table for the catchment area calculation.""" # Generate random table name - table_name = str(uuid.uuid4()).replace("-", "_") + table_name = f"temporal.{str(uuid.uuid4()).replace('-', '_')}" # Create temporary table for storing catchment area starting points await self.db_connection.execute( text( f""" - CREATE TABLE temporal.\"{table_name}\" ( + CREATE TABLE "{table_name}" ( id serial PRIMARY KEY, geom geometry(Point, 4326) ); @@ -416,7 +434,7 @@ async def create_input_table(self, obj_in): await self.db_connection.execute( text( f""" - INSERT INTO temporal.\"{table_name}\" (geom) + INSERT INTO "{table_name}" (geom) VALUES {insert_string.rstrip(",")}; """ ) @@ -426,10 +444,19 @@ async def create_input_table(self, obj_in): return table_name, len(obj_in.starting_points.latitude) - async def delete_input_table(self, table_name: str): - """Delete the input table after reading the relevant sub-network.""" + async def drop_temp_tables( + self, input_table: str, network_modifications_table: str + ): + """Delete the temporary input and network modifications tables.""" - await self.db_connection.execute(text(f'DROP TABLE temporal."{table_name}";')) + await self.db_connection.execute( + text( + f""" + DROP TABLE "{input_table}"; + DROP TABLE "{network_modifications_table}"; + """ + ) + ) await self.db_connection.commit() def compute_segment_cost(self, sub_network, mode, speed): @@ -687,17 +714,21 @@ async def run(self, obj_in: ICatchmentAreaActiveMobility | ICatchmentAreaCar): input_table, num_points = await self.create_input_table(obj_in) # Read & process routing network to extract relevant sub-network - sub_routing_network, origin_connector_ids, origin_point_h3_10, _ = ( - await self.read_network( - routing_network, - obj_in, - input_table, - num_points, - ) + ( + sub_routing_network, + network_modifications_table, + origin_connector_ids, + origin_point_h3_10, + _, + ) = await self.read_network( + routing_network, + obj_in, + input_table, + num_points, ) # Delete input table for catchment area origin points - await self.delete_input_table(input_table) + await self.drop_temp_tables(input_table, network_modifications_table) except Exception as e: self.redis.set(str(obj_in.layer_id), ProcessingStatus.failure.value) await self.db_connection.rollback() diff --git a/src/db/functions/get_artificial_segments.sql b/src/db/functions/get_artificial_segments.sql index 3128ff3..82f4a40 100644 --- a/src/db/functions/get_artificial_segments.sql +++ b/src/db/functions/get_artificial_segments.sql @@ -20,8 +20,9 @@ CREATE TYPE basic.artificial_segment AS ( DROP FUNCTION IF EXISTS basic.get_artificial_segments; CREATE OR REPLACE FUNCTION basic.get_artificial_segments( edge_layer_project_id INT, - input_table TEXT, - num_points INT, + network_modifications_table TEXT, + origin_points_table TEXT, + num_origin_points INT, classes TEXT, point_cell_resolution INT ) @@ -58,22 +59,42 @@ BEGIN id, geom, ST_SETSRID(ST_Buffer(geom::geography, 100)::GEOMETRY, 4326) AS buffer_geom, basic.to_short_h3_3(h3_lat_lng_to_cell(ST_Centroid(geom)::point, 3)::bigint) AS h3_3 - FROM temporal."%s" + FROM %I LIMIT %s::int ), + modified_network AS ( + SELECT original_features.* + FROM ( + SELECT s.edge_id AS id, s.class_, s.source, s.target, s.length_m, s.length_3857, + s.coordinates_3857, s.impedance_slope, s.impedance_slope_reverse, + s.impedance_surface, s.maxspeed_forward, s.maxspeed_backward, s.geom, + s.h3_6, s.h3_3 + FROM %s s, + origin o + WHERE s.class_ = ANY(string_to_array(''%s'', '','')) + AND ST_Intersects(s.geom, o.buffer_geom) + ) original_features + LEFT JOIN %I scenario_features ON original_features.id = scenario_features.id + WHERE scenario_features.id IS NULL + UNION ALL + SELECT id, class_, source, target, length_m, length_3857, + coordinates_3857::json, impedance_slope, impedance_slope_reverse, + impedance_surface, maxspeed_forward, maxspeed_backward, geom, + h3_6, h3_3 + FROM %I scenario_features + WHERE edit_type = ''n'' + AND class_ = ANY(string_to_array(''%s'', '','')) + ), best_segment AS ( SELECT DISTINCT ON (o.id) o.id AS point_id, o.geom AS point_geom, o.buffer_geom AS point_buffer, - s.edge_id AS id, s.class_, s.impedance_slope, s.impedance_slope_reverse, + s.id, s.class_, s.impedance_slope, s.impedance_slope_reverse, s.impedance_surface, s.maxspeed_forward, s.maxspeed_backward, s."source", s.target, s.geom, s.h3_3, s.h3_6, ST_LineLocatePoint(s.geom, o.geom) AS fraction, ST_ClosestPoint(s.geom, o.geom) AS fraction_geom - FROM %s s, origin o - WHERE - s.h3_3 = o.h3_3 - AND s.class_ = ANY(string_to_array(''%s'', '','')) - AND ST_Intersects(s.geom, o.buffer_geom) + FROM modified_network s, origin o + WHERE ST_Intersects(s.geom, o.buffer_geom) ORDER BY o.id, ST_ClosestPoint(s.geom, o.geom) <-> o.geom ) SELECT @@ -90,7 +111,8 @@ BEGIN bs.id, bs.class_, bs.impedance_slope, bs.impedance_slope_reverse, bs.impedance_surface, bs.maxspeed_forward, bs.maxspeed_backward, bs."source", bs.target, bs.geom, bs.h3_3, bs.h3_6;' - , input_table, num_points, edge_network_table, classes); + , origin_points_table, num_origin_points, edge_network_table, classes, + network_modifications_table, network_modifications_table, classes); LOOP FETCH custom_cursor INTO origin_segment; diff --git a/src/db/functions/get_network_modifications.sql b/src/db/functions/get_network_modifications.sql index e5c7ce7..74edc06 100644 --- a/src/db/functions/get_network_modifications.sql +++ b/src/db/functions/get_network_modifications.sql @@ -1,27 +1,11 @@ -DROP FUNCTION IF EXISTS basic.get_network_modifications; -CREATE OR REPLACE FUNCTION basic.get_network_modifications( +DROP FUNCTION IF EXISTS basic.produce_network_modifications; +CREATE OR REPLACE FUNCTION basic.produce_network_modifications( scenario_id_input UUID, edge_layer_project_id INTEGER, - node_layer_project_id INTEGER -) -RETURNS TABLE ( - r_edit_type TEXT, - r_id INTEGER, - r_class_ TEXT, - r_source INTEGER, - r_target INTEGER, - r_length_m FLOAT8, - r_length_3857 FLOAT8, - r_coordinates_3857 JSONB, - r_impedance_slope FLOAT8, - r_impedance_slope_reverse FLOAT8, - r_impedance_surface FLOAT8, - r_maxspeed_forward INTEGER, - r_maxspeed_backward INTEGER, - r_geom GEOMETRY(LINESTRING, 4326), - r_h3_6 INTEGER, - r_h3_3 INTEGER + node_layer_project_id INTEGER, + network_modifications_table TEXT ) +RETURNS void LANGUAGE plpgsql AS $function$ DECLARE @@ -46,6 +30,19 @@ BEGIN --Proceed only if the scenario contains features which apply to the specified edge layer --------------------------------------------------------------------------------------------------------------------- + -- Create network modifications table + EXECUTE FORMAT(' + DROP TABLE IF EXISTS %I; + CREATE TABLE %I ( + edit_type TEXT, id INTEGER, class_ TEXT, source INTEGER, + target INTEGER, length_m FLOAT8, length_3857 FLOAT8, + coordinates_3857 JSONB, impedance_slope FLOAT8, + impedance_slope_reverse FLOAT8, impedance_surface FLOAT8, + maxspeed_forward INTEGER, maxspeed_backward INTEGER, + geom GEOMETRY(LINESTRING, 4326), h3_6 INTEGER, h3_3 INTEGER + ); + ', network_modifications_table, network_modifications_table); + IF NOT EXISTS ( SELECT 1 FROM customer.scenario_scenario_feature ssf @@ -565,29 +562,9 @@ BEGIN --PRODUCE FINAL LIST OF NETWORK MODIFICATIONS (SIMPLIFIED INTO ADDITIONS AND DELETIONS) ---------------------------------------------------------------------------------------------------------------------- - DROP TABLE IF EXISTS network_modifications; - CREATE TEMP TABLE network_modifications ( - edit_type TEXT, - id INTEGER, - class_ TEXT, - source INTEGER, - target INTEGER, - length_m FLOAT8, - length_3857 FLOAT8, - coordinates_3857 JSONB, - impedance_slope FLOAT8, - impedance_slope_reverse FLOAT8, - impedance_surface FLOAT8, - maxspeed_forward INTEGER, - maxspeed_backward INTEGER, - geom GEOMETRY(LINESTRING, 4326), - h3_6 INTEGER, - h3_3 INTEGER - ); - -- Edges to be explicitly deleted according to the scenario EXECUTE FORMAT(' - INSERT INTO network_modifications (edit_type, id, h3_6, h3_3) + INSERT INTO %I (edit_type, id, h3_6, h3_3) SELECT ''d'' AS edit_type, e.edge_id AS id, e.h3_6, e.h3_3 FROM %s e, ( SELECT sf.* @@ -599,22 +576,34 @@ BEGIN ) w WHERE e.h3_3 = w.h3_3 AND e.id = w.feature_id; - ', edge_network_table, scenario_id_input, edge_layer_project_id); + ', network_modifications_table, edge_network_table, scenario_id_input, edge_layer_project_id); -- Existing edges to be deleted due to a modified or new edge replacing it - INSERT INTO network_modifications (edit_type, id, h3_6, h3_3) - SELECT 'd' AS edit_type, original_id AS id, h3_6, h3_3 - FROM existing_network - UNION - SELECT 'd' AS edit_type, original_id AS id, h3_6, h3_3 - FROM new_network - WHERE original_id IS NOT NULL; + EXECUTE FORMAT(' + INSERT INTO %I (edit_type, id, h3_6, h3_3) + SELECT ''d'' AS edit_type, original_id AS id, h3_6, h3_3 + FROM existing_network + UNION + SELECT ''d'' AS edit_type, original_id AS id, h3_6, h3_3 + FROM new_network + WHERE original_id IS NOT NULL; + ', network_modifications_table); + + -- Create temp table to store all new edges before copying them into the final network modifications table + DROP TABLE IF EXISTS new_edges; + EXECUTE FORMAT(' + CREATE TEMP TABLE new_edges AS + SELECT * FROM %I LIMIT 0; + ', network_modifications_table); -- Modified edges to be added to the network - FOR rec IN SELECT * FROM existing_network + FOR rec IN SELECT e.* + FROM existing_network e + LEFT JOIN new_network n ON e.original_id = n.original_id + WHERE n.original_id IS NULL LOOP - INSERT INTO network_modifications - SELECT 'n', max_edge_id + max_edge_id_increment AS id, rec.class_, rec.source, rec.target, + INSERT INTO new_edges + SELECT 'n' AS edit_type, max_edge_id + max_edge_id_increment AS id, rec.class_, rec.source, rec.target, ST_Length(rec.geom::geography) AS length_m, ST_Length(ST_Transform(rec.geom, 3857)) AS length_3857, ST_AsGeoJSON(ST_Transform(rec.geom, 3857))::JSONB->'coordinates' AS coordinates_3857, rec.impedance_slope, rec.impedance_slope_reverse, rec.impedance_surface, @@ -626,8 +615,8 @@ BEGIN -- TODO: Compute slope impedances FOR rec IN SELECT * FROM new_network LOOP - INSERT INTO network_modifications - SELECT 'n', max_edge_id + max_edge_id_increment AS id, rec.class_, rec.source, rec.target, + INSERT INTO new_edges + SELECT 'n' as edit_type, max_edge_id + max_edge_id_increment AS id, rec.class_, rec.source, rec.target, ST_Length(rec.geom::geography) AS length_m, ST_Length(ST_Transform(rec.geom, 3857)) AS length_3857, ST_AsGeoJSON(ST_Transform(rec.geom, 3857))::JSONB->'coordinates' AS coordinates_3857, NULL AS impedance_slope, NULL AS impedance_slope_reverse, rec.impedance_surface, @@ -637,8 +626,11 @@ BEGIN max_edge_id_increment := max_edge_id_increment + 1; END LOOP; - RETURN QUERY - SELECT * FROM network_modifications; + -- Copy new edges into the final network modifications table + EXECUTE FORMAT(' + INSERT INTO %I + SELECT * FROM new_edges; + ', network_modifications_table); END $function$; From 647bb1e823d3cdf9c1612a315937cc6581fd7c9f Mon Sep 17 00:00:00 2001 From: Nihar Thakkar Date: Mon, 5 Aug 2024 12:19:53 +0000 Subject: [PATCH 5/8] Minor fixes, retain holes in isochrone geometry if they are larger than a certain threshold --- src/core/config.py | 1 + src/crud/crud_catchment_area.py | 25 ++++++--------- src/db/functions/fill_polygon_holes.sql | 42 +++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 15 deletions(-) create mode 100644 src/db/functions/fill_polygon_holes.sql diff --git a/src/core/config.py b/src/core/config.py index b6edf0a..a62c716 100644 --- a/src/core/config.py +++ b/src/core/config.py @@ -23,6 +23,7 @@ class Settings(BaseSettings): NETWORK_REGION_TABLE = "basic.geofence_active_mobility" CATCHMENT_AREA_CAR_BUFFER_DEFAULT_SPEED = 80 # km/h + CATCHMENT_AREA_HOLE_THRESHOLD_SQM = 10000 # 100m x 100m CELERY_BROKER_URL: Optional[str] = "pyamqp://guest@rabbitmq//" REDIS_HOST: Optional[str] = "redis" diff --git a/src/crud/crud_catchment_area.py b/src/crud/crud_catchment_area.py index e5891ea..3cad627 100644 --- a/src/crud/crud_catchment_area.py +++ b/src/crud/crud_catchment_area.py @@ -225,7 +225,6 @@ async def read_network( # Produce all network modifications required to apply the specified scenario network_modifications_table = f"temporal.{str(uuid.uuid4()).replace('-', '_')}" - print(network_modifications_table) scenario_id = ( f"'{obj_in.scenario_id}'" if obj_in.scenario_id is not None else "NULL" ) @@ -449,13 +448,9 @@ async def drop_temp_tables( ): """Delete the temporary input and network modifications tables.""" + await self.db_connection.execute(text(f'DROP TABLE "{input_table}";')) await self.db_connection.execute( - text( - f""" - DROP TABLE "{input_table}"; - DROP TABLE "{network_modifications_table}"; - """ - ) + text(f'DROP TABLE "{network_modifications_table}";') ) await self.db_connection.commit() @@ -608,10 +603,11 @@ async def save_result(self, obj_in, shapes, network, grid_index, grid): ), isochrones_filled AS ( - SELECT "minute", ST_COLLECT(ST_MAKEPOLYGON(st_exteriorring(geom))) geom - FROM isochrones + SELECT "minute", ST_COLLECT(ST_MAKEPOLYGON(ST_ExteriorRing(geom))) AS orig_geom, ST_COLLECT(filled) AS filled_geom + FROM isochrones, + LATERAL basic.fill_polygon_holes(geom, {settings.CATCHMENT_AREA_HOLE_THRESHOLD_SQM}) filled GROUP BY "minute" - ORDER BY "minute" + ORDER BY "minute" DESC ), isochrones_with_id AS ( @@ -619,15 +615,14 @@ async def save_result(self, obj_in, shapes, network, grid_index, grid): FROM isochrones_filled ) INSERT INTO {obj_in.result_table} (layer_id, geom, integer_attr1) - SELECT '{obj_in.layer_id}', ST_MakeValid(COALESCE(j.geom, a.geom)) AS geom, a.MINUTE + SELECT '{obj_in.layer_id}', ST_MakeValid(COALESCE(j.geom, a.filled_geom)) AS geom, a."minute" FROM isochrones_with_id a LEFT JOIN LATERAL ( - SELECT ST_DIFFERENCE(a.geom, b.geom) AS geom + SELECT ST_DIFFERENCE(a.filled_geom, b.orig_geom) AS geom FROM isochrones_with_id b - WHERE a.id - 1 = b.id - ) j ON {"TRUE" if obj_in.polygon_difference else "FALSE"} - ORDER BY a.MINUTE DESC; + WHERE a.id + 1 = b.id + ) j ON {"TRUE" if obj_in.polygon_difference else "FALSE"}; """ ) diff --git a/src/db/functions/fill_polygon_holes.sql b/src/db/functions/fill_polygon_holes.sql new file mode 100644 index 0000000..e94f5dd --- /dev/null +++ b/src/db/functions/fill_polygon_holes.sql @@ -0,0 +1,42 @@ +-- Function to remove small holes from a polygon +-- threshold: minimum area of a hole to keep (in square meters) +DROP FUNCTION IF EXISTS basic.fill_polygon_holes; +CREATE OR REPLACE FUNCTION basic.fill_polygon_holes(geom geometry, threshold float8) +RETURNS geometry AS $$ +DECLARE + outer_ring geometry; + inner_ring geometry; + num_rings integer; + ring_index integer; + new_rings geometry[]; +BEGIN + -- Ensure geometry is a valid polygon + IF NOT ST_IsValid(geom) OR NOT ST_GeometryType(geom) = 'ST_Polygon' THEN + RETURN NULL; + END IF; + + -- Extract the outer ring + outer_ring := ST_ExteriorRing(geom); + + -- Get the number of interior rings + num_rings := ST_NumInteriorRings(geom); + + -- Initialize array to store remaining interior rings + new_rings := ARRAY[]::geometry[]; + + -- Loop through each interior ring + FOR ring_index IN 1..num_rings LOOP + -- Extract the current interior ring + inner_ring := ST_InteriorRingN(geom, ring_index); + + -- Check if the area of the interior ring is larger than the threshold + IF ST_Area(ST_MakePolygon(inner_ring)::geography) >= threshold THEN + -- Add the ring to the new_rings array + new_rings := array_append(new_rings, inner_ring); + END IF; + END LOOP; + + -- Construct a new polygon from the outer ring and the remaining interior rings + RETURN ST_MakePolygon(outer_ring, new_rings) AS geom; +END; +$$ LANGUAGE plpgsql; From 2c80335b58158621b88edb820822f42bf167b803 Mon Sep 17 00:00:00 2001 From: Nihar Thakkar Date: Thu, 8 Aug 2024 08:02:47 +0000 Subject: [PATCH 6/8] Refactor routing network fetch code, improve error handling, add support for dynamically specfied street networks --- src/core/config.py | 5 + .../street_network/street_network_cache.py | 146 ++++++++++ .../street_network/street_network_util.py | 253 ++++++++++++++++++ src/crud/crud_catchment_area.py | 132 +-------- src/schemas/catchment_area.py | 6 + 5 files changed, 424 insertions(+), 118 deletions(-) create mode 100644 src/core/street_network/street_network_cache.py create mode 100644 src/core/street_network/street_network_util.py diff --git a/src/core/config.py b/src/core/config.py index a62c716..dd8d7a0 100644 --- a/src/core/config.py +++ b/src/core/config.py @@ -13,6 +13,11 @@ class SyncPostgresDsn(PostgresDsn): class Settings(BaseSettings): + DEBUG_MODE: bool = True + + CUSTOMER_SCHEMA: str = "customer" + USER_DATA_SCHEMA: str = "user_data" + API_V2_STR: str = "/api/v2" PROJECT_NAME: Optional[str] = "GOAT Routing API" CACHE_DIR: str = "/app/src/cache" diff --git a/src/core/street_network/street_network_cache.py b/src/core/street_network/street_network_cache.py new file mode 100644 index 0000000..7e245a4 --- /dev/null +++ b/src/core/street_network/street_network_cache.py @@ -0,0 +1,146 @@ +import os + +import polars as pl +from polars import DataFrame + +from src.core.config import settings + + +class StreetNetworkCache: + def __init__(self): + """Initialize the cache directory if it does not exist.""" + + if not os.path.exists(settings.CACHE_DIR): + os.makedirs(settings.CACHE_DIR) + + def _get_edge_cache_file_name( + self, + edge_layer_project_id: int, + h3_short: str, + ): + """Get edge cache file path for the specified H3_3 cell.""" + + return os.path.join( + settings.CACHE_DIR, + f"{edge_layer_project_id}_{h3_short}_edge.parquet", + ) + + def _get_node_cache_file_name( + self, + node_layer_project_id: int, + h3_short: str, + ): + """Get node cache file path for the specified H3_3 cell.""" + + return os.path.join( + settings.CACHE_DIR, + f"{node_layer_project_id}_{h3_short}_node.parquet", + ) + + def edge_cache_exists(self, edge_layer_project_id: int, h3_short: str): + """Check if edge data for the specified H3_3 cell is cached.""" + + edge_cache_file = self._get_edge_cache_file_name( + edge_layer_project_id, h3_short + ) + return os.path.exists(edge_cache_file) + + def node_cache_exists(self, node_layer_project_id: int, h3_short: str): + """Check if node data for the specified H3_3 cell is cached.""" + + node_cache_file = self._get_node_cache_file_name( + node_layer_project_id, h3_short + ) + return os.path.exists(node_cache_file) + + def read_edge_cache( + self, + edge_layer_project_id: int, + h3_short: str, + ): + """Read edge data for the specified H3_3 cell from cache.""" + + edge_df: DataFrame = None + + edge_cache_file = self._get_edge_cache_file_name( + edge_layer_project_id, h3_short + ) + + try: + with open(edge_cache_file, "rb") as file: + edge_df = pl.read_parquet(file) + except Exception: + raise ValueError( + f"Failed to read edge data for H3_3 cell {h3_short} from cache." + ) + + return edge_df + + def read_node_cache( + self, + node_layer_project_id: int, + h3_short: str, + ): + """Read node data for the specified H3_3 cell from cache.""" + + node_df: DataFrame = None + + node_cache_file = self._get_node_cache_file_name( + node_layer_project_id, h3_short + ) + + try: + with open(node_cache_file, "rb") as file: + node_df = pl.read_parquet(file) + except Exception: + raise ValueError( + f"Failed to read node data for H3_3 cell {h3_short} from cache." + ) + + return node_df + + def write_edge_cache( + self, + edge_layer_project_id: int, + h3_short: str, + edge_df: DataFrame, + ): + """Write edge data for the specified H3_3 cell into cache.""" + + edge_cache_file = self._get_edge_cache_file_name( + edge_layer_project_id, h3_short + ) + + try: + with open(edge_cache_file, "wb") as file: + edge_df.write_parquet(file) + except Exception: + # Clean up cache file if writing fails + if os.path.exists(edge_cache_file): + os.remove(edge_cache_file) + raise RuntimeError( + f"Failed to write edge data for H3_3 cell {h3_short} into cache." + ) + + def write_node_cache( + self, + node_layer_project_id: int, + h3_short: str, + node_df: DataFrame, + ): + """Write node data for the specified H3_3 cell into cache.""" + + node_cache_file = self._get_node_cache_file_name( + node_layer_project_id, h3_short + ) + + try: + with open(node_cache_file, "wb") as file: + node_df.write_parquet(file) + except Exception: + # Clean up cache file if writing fails + if os.path.exists(node_cache_file): + os.remove(node_cache_file) + raise RuntimeError( + f"Failed to write node data for H3_3 cell {h3_short} into cache." + ) diff --git a/src/core/street_network/street_network_util.py b/src/core/street_network/street_network_util.py new file mode 100644 index 0000000..f95b10b --- /dev/null +++ b/src/core/street_network/street_network_util.py @@ -0,0 +1,253 @@ +import time +from uuid import UUID + +import polars as pl +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + +from src.core.config import settings +from src.core.street_network.street_network_cache import StreetNetworkCache +from src.schemas.catchment_area import CONNECTOR_DATA_SCHEMA, SEGMENT_DATA_SCHEMA + + +class StreetNetworkUtil: + def __init__(self, db_connection: AsyncSession): + self.db_connection = db_connection + + async def _get_layer_and_user_id(self, layer_project_id: int): + """Get the layer ID and user ID of the specified layer project ID.""" + + layer_id: UUID = None + user_id: UUID = None + + try: + # Get the associated layer ID + result = await self.db_connection.execute( + text( + f"""SELECT layer_id + FROM {settings.CUSTOMER_SCHEMA}.layer_project + WHERE id = {layer_project_id};""" + ) + ) + layer_id = UUID(str(result.fetchone()[0])) + + # Get the user ID of the layer + result = await self.db_connection.execute( + text( + f"""SELECT user_id + FROM {settings.CUSTOMER_SCHEMA}.layer + WHERE id = '{layer_id}';""" + ) + ) + user_id = UUID(str(result.fetchone()[0])) + except Exception: + raise ValueError( + f"Could not fetch layer and user ID for layer project ID {layer_project_id}." + ) + + return layer_id, user_id + + async def _get_street_network_tables( + self, + street_network_edge_layer_project_id: int, + street_network_node_layer_project_id: int, + ): + """Get table names and layer IDs of the edge and node tables.""" + + edge_table: str = None + edge_layer_id: UUID = None + node_table: str = None + node_layer_id: UUID = None + + # Get edge table name if a layer project ID is specified + if street_network_edge_layer_project_id: + try: + # Get the edge layer ID and associated user ID + edge_layer_id, user_id = await self._get_layer_and_user_id( + street_network_edge_layer_project_id + ) + + # Produce the edge table name + edge_table = f"{settings.USER_DATA_SCHEMA}.street_network_line_{str(user_id).replace('-', '')}" + except Exception: + raise ValueError( + f"Could not fetch edge table name for layer project ID {street_network_edge_layer_project_id}." + ) + + # Get node table name if a layer project ID is specified + if street_network_node_layer_project_id: + try: + # Get the node layer ID and associated user ID + node_layer_id, user_id = await self._get_layer_and_user_id( + street_network_node_layer_project_id + ) + + # Produce the node table name + node_table = f"{settings.USER_DATA_SCHEMA}.street_network_point_{str(user_id).replace('-', '')}" + except Exception: + raise ValueError( + f"Could not fetch node table name for layer project ID {street_network_node_layer_project_id}." + ) + + return edge_table, edge_layer_id, node_table, node_layer_id + + async def _get_street_network_region_h3_3_cells(self, region_geofence_table: str): + """Get list of H3_3 cells covering the street network region.""" + + h3_3_cells = [] + try: + sql_fetch_h3_3_cells = f""" + WITH region AS ( + SELECT ST_Union(geom) AS geom FROM {region_geofence_table} + ) + SELECT g.h3_short FROM region r, + LATERAL basic.fill_polygon_h3_3(r.geom) g; + """ + result = ( + await self.db_connection.execute(text(sql_fetch_h3_3_cells)) + ).fetchall() + + for h3_short in result: + h3_3_cells.append(h3_short[0]) + except Exception: + raise ValueError( + f"Could not fetch H3_3 grid for street network geofence {settings.NETWORK_REGION_TABLE}." + ) + + return h3_3_cells + + async def fetch( + self, + edge_layer_project_id: int, + node_layer_project_id: int, + region_geofence_table: str, + ): + """Fetch street network from specified layer and load into Polars dataframes.""" + + # Street network is stored as a dictionary of Polars dataframes, with the H3_3 index as the key + street_network_edge: dict = {} + street_network_node: dict = {} + + start_time = time.time() + street_network_size: float = 0.0 + + # Get H3_3 cells covering the street network region + street_network_region_h3_3_cells = ( + await self._get_street_network_region_h3_3_cells(region_geofence_table) + ) + + # Get table names and layer IDs of the edge and node tables + ( + street_network_edge_table, + street_network_edge_layer_id, + street_network_node_table, + street_network_node_layer_id, + ) = await self._get_street_network_tables( + edge_layer_project_id, node_layer_project_id + ) + + # Initialize cache + street_network_cache = StreetNetworkCache() + + try: + for h3_short in street_network_region_h3_3_cells: + if edge_layer_project_id is not None: + if street_network_cache.edge_cache_exists( + edge_layer_project_id, h3_short + ): + # Read edge data from cache + edge_df = street_network_cache.read_edge_cache( + edge_layer_project_id, h3_short + ) + else: + if settings.DEBUG_MODE: + print( + f"Fetching street network edge data for H3_3 cell {h3_short}" + ) + + # Read edge data from database + edge_df = pl.read_database_uri( + query=f""" + SELECT + edge_id AS id, length_m, length_3857, class_, impedance_slope, impedance_slope_reverse, + impedance_surface, CAST(coordinates_3857 AS TEXT) AS coordinates_3857, maxspeed_forward, + maxspeed_backward, source, target, h3_3, h3_6 + FROM {street_network_edge_table} + WHERE h3_3 = {h3_short} + AND layer_id = '{str(street_network_edge_layer_id)}' + """, + uri=settings.POSTGRES_DATABASE_URI, + schema_overrides=SEGMENT_DATA_SCHEMA, + ) + edge_df = edge_df.with_columns( + pl.col("coordinates_3857").str.json_extract() + ) + + # Write edge data into cache + street_network_cache.write_edge_cache( + edge_layer_project_id, h3_short, edge_df + ) + # Update street network edge dictionary and memory usage + street_network_edge[h3_short] = edge_df + street_network_size += edge_df.estimated_size("gb") + + if node_layer_project_id is not None: + if street_network_cache.node_cache_exists( + node_layer_project_id, h3_short + ): + # Read node data from cache + node_df = street_network_cache.read_node_cache( + node_layer_project_id, h3_short + ) + else: + if settings.DEBUG_MODE: + print( + f"Fetching street network node data for H3_3 cell {h3_short}" + ) + + # Read node data from database + node_df = pl.read_database_uri( + query=f""" + SELECT node_id AS id, h3_3, h3_6 + FROM {street_network_node_table} + WHERE h3_3 = {h3_short} + AND layer_id = '{str(street_network_node_layer_id)}' + """, + uri=settings.POSTGRES_DATABASE_URI, + schema_overrides=CONNECTOR_DATA_SCHEMA, + ) + + # Write node data into cache + street_network_cache.write_node_cache( + node_layer_project_id, h3_short, node_df + ) + + # Update street network node dictionary and memory usage + street_network_node[h3_short] = node_df + street_network_size += node_df.estimated_size("gb") + except Exception as e: + raise RuntimeError( + f"Failed to fetch street network data from database, error: {e}" + ) + + # Raise error if a edge layer project ID is specified but no edge data is fetched + if edge_layer_project_id is not None and len(street_network_edge) == 0: + raise RuntimeError( + f"Failed to fetch street network edge data for layer project ID {edge_layer_project_id}." + ) + + # Raise error if a node layer project ID is specified but no node data is fetched + if node_layer_project_id is not None and len(street_network_node) == 0: + raise RuntimeError( + f"Failed to fetch street network node data for layer project ID {node_layer_project_id}." + ) + + end_time = time.time() + + if settings.DEBUG_MODE: + print( + f"Street network load time: {round((end_time - start_time) / 60, 1)} min" + ) + print(f"Street network in-memory size: {round(street_network_size, 1)} GB") + + return street_network_edge, street_network_node diff --git a/src/crud/crud_catchment_area.py b/src/crud/crud_catchment_area.py index 3cad627..db95cd6 100644 --- a/src/crud/crud_catchment_area.py +++ b/src/crud/crud_catchment_area.py @@ -1,5 +1,4 @@ import math -import os import time import uuid from typing import Any @@ -9,11 +8,11 @@ from redis import Redis from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession -from tqdm import tqdm from src.core.config import settings from src.core.isochrone import compute_isochrone, compute_isochrone_h3 from src.core.jsoline import generate_jsolines +from src.core.street_network.street_network_util import StreetNetworkUtil from src.schemas.catchment_area import ( SEGMENT_DATA_SCHEMA, VALID_BICYCLE_CLASSES, @@ -28,113 +27,6 @@ ) from src.schemas.error import BufferExceedsNetworkError, DisconnectedOriginError from src.schemas.status import ProcessingStatus -from src.utils import make_dir - - -class FetchRoutingNetwork: - def __init__(self, db_connection: AsyncSession): - self.db_connection = db_connection - - async def fetch(self): - """Fetch routing network (processed segments) and load into memory.""" - - start_time = time.time() - - # Get edge network table - edge_network_table = None - try: - layer_id = str( - ( - await self.db_connection.execute( - text( - f"""SELECT layer_id - FROM customer.layer_project - WHERE id = {settings.STREET_NETWORK_EDGE_DEFAULT_LAYER_PROJECT_ID};""" - ) - ) - ).fetchall()[0][0] - ) - user_id = str( - ( - await self.db_connection.execute( - text( - f"""SELECT user_id - FROM customer.layer - WHERE id = '{layer_id}';""" - ) - ) - ).fetchall()[0][0] - ) - edge_network_table = ( - f"user_data.street_network_line_{user_id.replace('-', '')}" - ) - except Exception as e: - print(e) - return - - # Get network H3 cells - h3_3_grid = [] - try: - sql_get_h3_3_grid = text( - f""" - WITH region AS ( - SELECT ST_Union(geom) AS geom FROM {settings.NETWORK_REGION_TABLE} - ) - SELECT g.h3_short FROM region r, - LATERAL basic.fill_polygon_h3_3(r.geom) g; - """ - ) - result = (await self.db_connection.execute(sql_get_h3_3_grid)).fetchall() - for h3_index in result: - h3_3_grid.append(h3_index[0]) - except Exception as e: - print(e) - # TODO Throw appropriate exception - - # Load segments into polars data frames - segments_df = {} - df_size = 0.0 - try: - # Create cache dir if it doesn't exist - make_dir(settings.CACHE_DIR) - - for index in tqdm( - range(len(h3_3_grid)), desc="Loading H3_3 grid", unit=" cell" - ): - h3_index = h3_3_grid[index] - - if os.path.exists(f"{settings.CACHE_DIR}/{h3_index}.parquet"): - segments_df[h3_index] = pl.read_parquet( - f"{settings.CACHE_DIR}/{h3_index}.parquet" - ) - else: - segments_df[h3_index] = pl.read_database_uri( - query=f""" - SELECT - edge_id AS id, length_m, length_3857, class_, impedance_slope, impedance_slope_reverse, - impedance_surface, CAST(coordinates_3857 AS TEXT) AS coordinates_3857, - maxspeed_forward, maxspeed_backward, source, target, h3_3, h3_6 - FROM {edge_network_table} - WHERE h3_3 = {h3_index} - """, - uri=settings.POSTGRES_DATABASE_URI, - schema_overrides=SEGMENT_DATA_SCHEMA, - ) - segments_df[h3_index] = segments_df[h3_index].with_columns( - pl.col("coordinates_3857").str.json_extract() - ) - - with open(f"{settings.CACHE_DIR}/{h3_index}.parquet", "wb") as file: - segments_df[h3_index].write_parquet(file) - - df_size += segments_df[h3_index].estimated_size("gb") - except Exception as e: - print(e) - - print(f"Network load time: {round((time.time() - start_time) / 60, 1)} min") - print(f"Network in-memory size: {round(df_size, 1)} GB") - - return segments_df class CRUDCatchmentArea: @@ -153,7 +45,7 @@ async def read_network( """Read relevant sub-network for catchment area calculation from polars dataframe.""" # Get valid segment classes based on transport mode - if type(obj_in) == ICatchmentAreaActiveMobility: + if type(obj_in) is ICatchmentAreaActiveMobility: valid_segment_classes = ( VALID_WALKING_CLASSES if obj_in.routing_type == "walking" @@ -163,11 +55,11 @@ async def read_network( valid_segment_classes = VALID_CAR_CLASSES # Compute buffer distance for identifying relevant H3_6 cells - if type(obj_in.travel_cost) == CatchmentAreaTravelTimeCostActiveMobility: + if type(obj_in.travel_cost) is CatchmentAreaTravelTimeCostActiveMobility: buffer_dist = obj_in.travel_cost.max_traveltime * ( (obj_in.travel_cost.speed * 1000) / 60 ) - elif type(obj_in.travel_cost) == CatchmentAreaTravelTimeCostMotorizedMobility: + elif type(obj_in.travel_cost) is CatchmentAreaTravelTimeCostMotorizedMobility: buffer_dist = obj_in.travel_cost.max_traveltime * ( (settings.CATCHMENT_AREA_CAR_BUFFER_DEFAULT_SPEED * 1000) / 60 ) @@ -372,7 +264,7 @@ async def read_network( speed=( obj_in.travel_cost.speed / 3.6 if type(obj_in.travel_cost) - == CatchmentAreaTravelTimeCostActiveMobility + is CatchmentAreaTravelTimeCostActiveMobility else None ), ) @@ -539,11 +431,11 @@ async def get_h3_10_grid(self, db_connection, obj_in, origin_h3_10: str): """Get H3_10 cell grid required for computing a grid-type catchment area.""" # Compute buffer distance for identifying relevant H3_10 cells - if type(obj_in.travel_cost) == CatchmentAreaTravelTimeCostActiveMobility: + if type(obj_in.travel_cost) is CatchmentAreaTravelTimeCostActiveMobility: buffer_dist = obj_in.travel_cost.max_traveltime * ( (obj_in.travel_cost.speed * 1000) / 60 ) - elif type(obj_in.travel_cost) == CatchmentAreaTravelTimeCostMotorizedMobility: + elif type(obj_in.travel_cost) is CatchmentAreaTravelTimeCostMotorizedMobility: buffer_dist = obj_in.travel_cost.max_traveltime * ( (settings.CATCHMENT_AREA_CAR_BUFFER_DEFAULT_SPEED * 1000) / 60 ) @@ -690,7 +582,11 @@ async def run(self, obj_in: ICatchmentAreaActiveMobility | ICatchmentAreaCar): # Fetch routing network (processed segments) and load into memory if self.routing_network is None: - self.routing_network = await FetchRoutingNetwork(self.db_connection).fetch() + self.routing_network, _ = await StreetNetworkUtil(self.db_connection).fetch( + edge_layer_project_id=settings.STREET_NETWORK_EDGE_DEFAULT_LAYER_PROJECT_ID, + node_layer_project_id=None, + region_geofence_table=settings.NETWORK_REGION_TABLE, + ) routing_network = self.routing_network total_start = time.time() @@ -747,14 +643,14 @@ async def run(self, obj_in: ICatchmentAreaActiveMobility | ICatchmentAreaCar): ] if ( - type(obj_in) == ICatchmentAreaActiveMobility + type(obj_in) is ICatchmentAreaActiveMobility and is_travel_time_catchment_area ): speed = obj_in.travel_cost.speed / 3.6 else: speed = None - if type(obj_in) == ICatchmentAreaActiveMobility: + if type(obj_in) is ICatchmentAreaActiveMobility: zoom = 12 else: zoom = 10 # Use lower resolution grid for car catchment areas diff --git a/src/schemas/catchment_area.py b/src/schemas/catchment_area.py index 8c55656..acf98e2 100644 --- a/src/schemas/catchment_area.py +++ b/src/schemas/catchment_area.py @@ -22,6 +22,12 @@ "h3_6": pl.Int32, } +CONNECTOR_DATA_SCHEMA = { + "id": pl.Int64, + "h3_3": pl.Int32, + "h3_6": pl.Int32, +} + VALID_WALKING_CLASSES = [ "secondary", "tertiary", From 875766869eff885e2b7da12b1e764c964432449e Mon Sep 17 00:00:00 2001 From: Nihar Thakkar Date: Sat, 10 Aug 2024 08:26:32 +0000 Subject: [PATCH 7/8] Produce rounded integer cost value of all catchment area types --- src/crud/crud_catchment_area.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/crud/crud_catchment_area.py b/src/crud/crud_catchment_area.py index db95cd6..25b2565 100644 --- a/src/crud/crud_catchment_area.py +++ b/src/crud/crud_catchment_area.py @@ -507,7 +507,7 @@ async def save_result(self, obj_in, shapes, network, grid_index, grid): FROM isochrones_filled ) INSERT INTO {obj_in.result_table} (layer_id, geom, integer_attr1) - SELECT '{obj_in.layer_id}', ST_MakeValid(COALESCE(j.geom, a.filled_geom)) AS geom, a."minute" + SELECT '{obj_in.layer_id}', ST_MakeValid(COALESCE(j.geom, a.filled_geom)) AS geom, ROUND(a."minute") FROM isochrones_with_id a LEFT JOIN LATERAL ( @@ -533,12 +533,12 @@ async def save_result(self, obj_in, shapes, network, grid_index, grid): insert_string += f"""( '{obj_in.layer_id}', ST_Transform(ST_SetSRID(ST_MakeLine(ARRAY[{points_string.rstrip(',')}]), 3857), 4326), - {cost} + ROUND({cost}) ),""" if i % batch_size == 0 or i == (len(network["features"]) - 1): insert_string = text( f""" - INSERT INTO {obj_in.result_table} (layer_id, geom, float_attr1) + INSERT INTO {obj_in.result_table} (layer_id, geom, integer_attr1) VALUES {insert_string.rstrip(",")}; """ ) @@ -555,7 +555,7 @@ async def save_result(self, obj_in, shapes, network, grid_index, grid): insert_string += f"""( '{obj_in.layer_id}', '{grid_index[i]}', - {grid[i]} + ROUND({grid[i]}) ),""" if i % batch_size == 0 or i == (len(grid_index) - 1): insert_string = text( From 9b4a6f4e63d1bfcf19297bf7088b501e89875049 Mon Sep 17 00:00:00 2001 From: Nihar Thakkar Date: Sat, 10 Aug 2024 09:39:29 +0000 Subject: [PATCH 8/8] Refactor network and grid type catchment area insert code to fix bug which occurred while inserting a small number of features --- src/core/config.py | 2 + src/crud/crud_catchment_area.py | 105 ++++++++++++++++---------------- 2 files changed, 55 insertions(+), 52 deletions(-) diff --git a/src/core/config.py b/src/core/config.py index dd8d7a0..f718af4 100644 --- a/src/core/config.py +++ b/src/core/config.py @@ -30,6 +30,8 @@ class Settings(BaseSettings): CATCHMENT_AREA_CAR_BUFFER_DEFAULT_SPEED = 80 # km/h CATCHMENT_AREA_HOLE_THRESHOLD_SQM = 10000 # 100m x 100m + DATA_INSERT_BATCH_SIZE = 800 + CELERY_BROKER_URL: Optional[str] = "pyamqp://guest@rabbitmq//" REDIS_HOST: Optional[str] = "redis" REDIS_PORT: Optional[str] = 6379 diff --git a/src/crud/crud_catchment_area.py b/src/crud/crud_catchment_area.py index 25b2565..7f2e7de 100644 --- a/src/crud/crud_catchment_area.py +++ b/src/crud/crud_catchment_area.py @@ -522,60 +522,61 @@ async def save_result(self, obj_in, shapes, network, grid_index, grid): await self.db_connection.commit() elif obj_in.catchment_area_type == "network": # Save catchment area network data - batch_size = 1000 - insert_string = "" - for i in range(0, len(network["features"])): - coordinates = network["features"][i]["geometry"]["coordinates"] - cost = network["features"][i]["properties"]["cost"] - points_string = "" - for pair in coordinates: - points_string += f"ST_MakePoint({pair[0]}, {pair[1]})," - insert_string += f"""( - '{obj_in.layer_id}', - ST_Transform(ST_SetSRID(ST_MakeLine(ARRAY[{points_string.rstrip(',')}]), 3857), 4326), - ROUND({cost}) - ),""" - if i % batch_size == 0 or i == (len(network["features"]) - 1): - insert_string = text( - f""" - INSERT INTO {obj_in.result_table} (layer_id, geom, integer_attr1) - VALUES {insert_string.rstrip(",")}; - """ - ) - await self.db_connection.execute(insert_string) - await self.db_connection.commit() - insert_string = "" + for batch_index in range( + 0, len(network["features"]), settings.DATA_INSERT_BATCH_SIZE + ): + insert_string = "" + for i in range( + batch_index, + min( + len(network["features"]), + batch_index + settings.DATA_INSERT_BATCH_SIZE, + ), + ): + coordinates = network["features"][i]["geometry"]["coordinates"] + cost = network["features"][i]["properties"]["cost"] + points_string = "" + for pair in coordinates: + points_string += f"ST_MakePoint({pair[0]}, {pair[1]})," + insert_string += f"""( + '{obj_in.layer_id}', + ST_Transform(ST_SetSRID(ST_MakeLine(ARRAY[{points_string.rstrip(',')}]), 3857), 4326), + ROUND({cost}) + ),""" + insert_string = text( + f""" + INSERT INTO {obj_in.result_table} (layer_id, geom, integer_attr1) + VALUES {insert_string.rstrip(",")}; + """ + ) + await self.db_connection.execute(insert_string) + await self.db_connection.commit() else: # Save catchment area grid data - batch_size = 1000 - insert_string = "" - for i in range(0, len(grid_index)): - if math.isnan(grid[i]): - continue - insert_string += f"""( - '{obj_in.layer_id}', - '{grid_index[i]}', - ROUND({grid[i]}) - ),""" - if i % batch_size == 0 or i == (len(grid_index) - 1): - insert_string = text( - f""" - INSERT INTO {obj_in.result_table} (layer_id, text_attr1, integer_attr1) - VALUES {insert_string.rstrip(",")}; - """ - ) - await self.db_connection.execute(insert_string) - await self.db_connection.commit() - insert_string = "" - sql_update_geom = text( - f""" - UPDATE {obj_in.result_table} - SET geom = ST_SetSRID(h3_cell_to_boundary(text_attr1::h3index)::geometry, 4326) - WHERE layer_id = '{obj_in.layer_id}'; - """ - ) - await self.db_connection.execute(sql_update_geom) - await self.db_connection.commit() + for batch_index in range( + 0, len(grid_index), settings.DATA_INSERT_BATCH_SIZE + ): + insert_string = "" + for i in range( + batch_index, + min(len(grid_index), batch_index + settings.DATA_INSERT_BATCH_SIZE), + ): + if math.isnan(grid[i]): + continue + insert_string += f"""( + '{obj_in.layer_id}', + ST_SetSRID(h3_cell_to_boundary('{grid_index[i]}'::h3index)::geometry, 4326), + '{grid_index[i]}', + ROUND({grid[i]}) + ),""" + insert_string = text( + f""" + INSERT INTO {obj_in.result_table} (layer_id, geom, text_attr1, integer_attr1) + VALUES {insert_string.rstrip(",")}; + """ + ) + await self.db_connection.execute(insert_string) + await self.db_connection.commit() async def run(self, obj_in: ICatchmentAreaActiveMobility | ICatchmentAreaCar): """Compute catchment areas for the given request parameters."""