From 6735692969e46b7b93f9a56fd660a1b0b2fcf9aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9dric=20Traizet?= Date: Thu, 21 Nov 2024 17:02:06 +0100 Subject: [PATCH 1/5] refactor: perform loc inverse in parallel (for now this sequential loc inverse are also realized for comparison) --- .../dense_matching/census_mccnn_sgm.py | 141 +++++++++++++++++- .../dense_matching/dense_matching.py | 3 + cars/pipelines/default/default_pipeline.py | 3 + 3 files changed, 145 insertions(+), 2 deletions(-) diff --git a/cars/applications/dense_matching/census_mccnn_sgm.py b/cars/applications/dense_matching/census_mccnn_sgm.py index e0293840..d3237e7a 100644 --- a/cars/applications/dense_matching/census_mccnn_sgm.py +++ b/cars/applications/dense_matching/census_mccnn_sgm.py @@ -23,6 +23,7 @@ """ # pylint: disable=too-many-lines import collections +import datetime # Standard imports import logging @@ -34,6 +35,7 @@ import affine import numpy as np import pandora +import pandas import rasterio import xarray as xr from affine import Affine @@ -544,6 +546,7 @@ def generate_disparity_grids( # noqa: C901 dem_min=None, dem_max=None, pair_folder=None, + loc_inverse_orchestrator=None, ): """ Generate disparity grids min and max, with given step @@ -575,6 +578,8 @@ def generate_disparity_grids( # noqa: C901 :type dem_max: str :param pair_folder: folder used for current pair :type pair_folder: str + :param loc_inverse_orchestrator: orchestrator to perform inverse locs + :type loc_inverse_orchestrator: Orchestrator :return disparity grid range, containing grid min and max @@ -817,10 +822,12 @@ def generate_disparity_grids( # noqa: C901 dem_min_list = dem_min_list[nan_mask] dem_max_list = dem_max_list[nan_mask] + sequential_begin = datetime.datetime.now() + # sensors physical positions ( - ind_cols_sensor, - ind_rows_sensor, + ind_cols_sensor_seq, + ind_rows_sensor_seq, _, ) = geom_plugin_with_dem_and_geoid.inverse_loc( sensor_image_right["image"], @@ -830,6 +837,82 @@ def generate_disparity_grids( # noqa: C901 z_coord=dem_median_list, ) + sequential_end = datetime.datetime.now() + sequential_duration = sequential_end - sequential_begin + print(f"Loc inverse (sequential): {sequential_duration}") + + parallel_begin = datetime.datetime.now() + + num_points = len(dem_median_list) + + if loc_inverse_orchestrator is None: + loc_inverse_orchestrator = grid_orchestrator + + num_workers = loc_inverse_orchestrator.get_conf().get( + "nb_workers", 1 + ) + + loc_inverse_dataset = cars_dataset.CarsDataset( + "points", name="loc_inverse" + ) + step = int(np.ceil(num_points / num_workers)) + # Create a grid with num_workers elements + loc_inverse_dataset.create_grid(1, num_workers, 1, 1, 0, 0) + + # Get saving infos in order to save tiles when they are computed + [saving_info] = loc_inverse_orchestrator.get_saving_infos( + [loc_inverse_dataset] + ) + + for task_id in range(0, num_workers): + first_elem = task_id * step + last_elem = min((task_id + 1) * step, num_points) + full_saving_info = ocht.update_saving_infos( + saving_info, row=task_id, col=0 + ) + loc_inverse_dataset[ + task_id, 0 + ] = loc_inverse_orchestrator.cluster.create_task( + loc_inverse_wrapper + )( + geom_plugin_with_dem_and_geoid, + sensor_image_right["image"], + sensor_image_right["geomodel"], + lat_mean[first_elem:last_elem], + lon_mean[first_elem:last_elem], + dem_median_list[first_elem:last_elem], + full_saving_info, + ) + + loc_inverse_orchestrator.add_to_replace_lists(loc_inverse_dataset) + + loc_inverse_orchestrator.compute_futures( + only_remaining_delayed=[ + tile[0] for tile in loc_inverse_dataset.tiles + ] + ) + + ind_cols_sensor = [] + ind_rows_sensor = [] + + for tile in loc_inverse_dataset.tiles: + ind_cols_sensor += list(tile[0]["col"]) + ind_rows_sensor += list(tile[0]["row"]) + + parallel_end = datetime.datetime.now() + parallel_duration = parallel_end - parallel_begin + print(f"Loc inverse (parallel): {parallel_duration}") + + is_same = ( + np.array(ind_cols_sensor_seq) == np.array(ind_cols_sensor) + ).all() + print(f"is_same {is_same}") + + is_same = ( + np.array(ind_rows_sensor_seq) == np.array(ind_rows_sensor) + ).all() + print(f"is_same {is_same}") + # Generate epipolar disp grids # Get epipolar positions (epipolar_positions_row, epipolar_positions_col) = np.meshgrid( @@ -1426,3 +1509,57 @@ def compute_disparity_wrapper( ) return disp_dataset + + +def loc_inverse_wrapper( + geom_plugin, + image, + geomodel, + latitudes, + longitudes, + altitudes, + saving_info=None, +) -> pandas.DataFrame: + """ + Perform inverse localizations on input coordinates + This function will be run as a delayed task. + + :param geom_plugin: geometry plugin used to perform localizations + :type geom_plugin: SharelocGeometry + :param image: input image path + :type image: str + :param geomodel: input geometric model + :type geomodel: str + :param latitudes: input latitude coordinates + :type latitudes: np.array + :param longitudes: input longitudes coordinates + :type longitudes: np.array + :param altitudes: input latitude coordinates + :type altitudes: np.array + :param saving_info: saving info for cars orchestrator + :type saving_info: dict + + """ + print(f"type {type(geom_plugin)}") + + wrapper_begin = datetime.datetime.now() + + col, row, _ = geom_plugin.inverse_loc( + image, + geomodel, + latitudes, + longitudes, + z_coord=altitudes, + ) + output = pandas.DataFrame({"col": col, "row": row}, copy=False) + cars_dataset.fill_dataframe( + output, saving_info=saving_info, attributes=None + ) + + wrapper_end = datetime.datetime.now() + wrapper_duration = wrapper_end - wrapper_begin + print( + f"Loc inverse (wrapper): {wrapper_duration}, {len(longitudes)} points" + ) + + return output diff --git a/cars/applications/dense_matching/dense_matching.py b/cars/applications/dense_matching/dense_matching.py index 9e475ff4..3e32d052 100644 --- a/cars/applications/dense_matching/dense_matching.py +++ b/cars/applications/dense_matching/dense_matching.py @@ -132,6 +132,7 @@ def generate_disparity_grids( dem_min=None, dem_max=None, pair_folder=None, + loc_inverse_orchestrator=None, ): """ Generate disparity grids min and max, with given step @@ -165,6 +166,8 @@ def generate_disparity_grids( :type dem_max: str :param pair_folder: folder used for current pair :type pair_folder: str + :param loc_inverse_orchestrator: orchestrator to perform inverse locs + :type loc_inverse_orchestrator: Orchestrator :return disparity grid range, containing grid min and max diff --git a/cars/pipelines/default/default_pipeline.py b/cars/pipelines/default/default_pipeline.py index c982abe2..8cd4f428 100644 --- a/cars/pipelines/default/default_pipeline.py +++ b/cars/pipelines/default/default_pipeline.py @@ -1689,6 +1689,7 @@ def sensor_to_depth_maps(self): # noqa: C901 dmin=dmin, dmax=dmax, pair_folder=dense_matching_pair_folder, + loc_inverse_orchestrator=self.cars_orchestrator, ) ) elif None in (altitude_delta_min, altitude_delta_max): @@ -1702,6 +1703,7 @@ def sensor_to_depth_maps(self): # noqa: C901 dem_max=dem_max, dem_median=dem_median, pair_folder=dense_matching_pair_folder, + loc_inverse_orchestrator=self.cars_orchestrator, ) ) else: @@ -1715,6 +1717,7 @@ def sensor_to_depth_maps(self): # noqa: C901 altitude_delta_max=altitude_delta_max, dem_median=dem_median, pair_folder=dense_matching_pair_folder, + loc_inverse_orchestrator=self.cars_orchestrator, ) ) From 33b54915ebd74aee4fbfd0e10d34e8aa73849b53 Mon Sep 17 00:00:00 2001 From: Marian RASSAT Date: Thu, 6 Feb 2025 16:37:44 +0000 Subject: [PATCH 2/5] feat: perform inverse locs in parallel when libgeo is used, sequential when shareloc is used --- .../dense_matching/census_mccnn_sgm.py | 142 ++++++++---------- 1 file changed, 64 insertions(+), 78 deletions(-) diff --git a/cars/applications/dense_matching/census_mccnn_sgm.py b/cars/applications/dense_matching/census_mccnn_sgm.py index d3237e7a..b5e251f9 100644 --- a/cars/applications/dense_matching/census_mccnn_sgm.py +++ b/cars/applications/dense_matching/census_mccnn_sgm.py @@ -34,8 +34,8 @@ # Third party imports import affine import numpy as np -import pandora import pandas +import pandora import rasterio import xarray as xr from affine import Affine @@ -822,96 +822,82 @@ def generate_disparity_grids( # noqa: C901 dem_min_list = dem_min_list[nan_mask] dem_max_list = dem_max_list[nan_mask] - sequential_begin = datetime.datetime.now() - - # sensors physical positions - ( - ind_cols_sensor_seq, - ind_rows_sensor_seq, - _, - ) = geom_plugin_with_dem_and_geoid.inverse_loc( - sensor_image_right["image"], - sensor_image_right["geomodel"], - lat_mean, - lon_mean, - z_coord=dem_median_list, - ) - - sequential_end = datetime.datetime.now() - sequential_duration = sequential_end - sequential_begin - print(f"Loc inverse (sequential): {sequential_duration}") + # if shareloc is used, perform inverse locs sequentially + if geom_plugin_with_dem_and_geoid.plugin_name == "SharelocGeometry": - parallel_begin = datetime.datetime.now() - - num_points = len(dem_median_list) + # sensors physical positions + ( + ind_cols_sensor, + ind_rows_sensor, + _, + ) = geom_plugin_with_dem_and_geoid.inverse_loc( + sensor_image_right["image"], + sensor_image_right["geomodel"], + lat_mean, + lon_mean, + z_coord=dem_median_list, + ) - if loc_inverse_orchestrator is None: - loc_inverse_orchestrator = grid_orchestrator + # else (if libgeo is used) perform inverse locs in parallel + else: - num_workers = loc_inverse_orchestrator.get_conf().get( - "nb_workers", 1 - ) + num_points = len(dem_median_list) - loc_inverse_dataset = cars_dataset.CarsDataset( - "points", name="loc_inverse" - ) - step = int(np.ceil(num_points / num_workers)) - # Create a grid with num_workers elements - loc_inverse_dataset.create_grid(1, num_workers, 1, 1, 0, 0) + if loc_inverse_orchestrator is None: + loc_inverse_orchestrator = grid_orchestrator - # Get saving infos in order to save tiles when they are computed - [saving_info] = loc_inverse_orchestrator.get_saving_infos( - [loc_inverse_dataset] - ) - - for task_id in range(0, num_workers): - first_elem = task_id * step - last_elem = min((task_id + 1) * step, num_points) - full_saving_info = ocht.update_saving_infos( - saving_info, row=task_id, col=0 - ) - loc_inverse_dataset[ - task_id, 0 - ] = loc_inverse_orchestrator.cluster.create_task( - loc_inverse_wrapper - )( - geom_plugin_with_dem_and_geoid, - sensor_image_right["image"], - sensor_image_right["geomodel"], - lat_mean[first_elem:last_elem], - lon_mean[first_elem:last_elem], - dem_median_list[first_elem:last_elem], - full_saving_info, + num_workers = loc_inverse_orchestrator.get_conf().get( + "nb_workers", 1 ) - loc_inverse_orchestrator.add_to_replace_lists(loc_inverse_dataset) + loc_inverse_dataset = cars_dataset.CarsDataset( + "points", name="loc_inverse" + ) + step = int(np.ceil(num_points / num_workers)) + # Create a grid with num_workers elements + loc_inverse_dataset.create_grid(1, num_workers, 1, 1, 0, 0) - loc_inverse_orchestrator.compute_futures( - only_remaining_delayed=[ - tile[0] for tile in loc_inverse_dataset.tiles - ] - ) + # Get saving info in order to save tiles when they are computed + [saving_info] = loc_inverse_orchestrator.get_saving_infos( + [loc_inverse_dataset] + ) - ind_cols_sensor = [] - ind_rows_sensor = [] + for task_id in range(0, num_workers): + first_elem = task_id * step + last_elem = min((task_id + 1) * step, num_points) + full_saving_info = ocht.update_saving_infos( + saving_info, row=task_id, col=0 + ) + loc_inverse_dataset[ + task_id, 0 + ] = loc_inverse_orchestrator.cluster.create_task( + loc_inverse_wrapper + )( + geom_plugin_with_dem_and_geoid, + sensor_image_right["image"], + sensor_image_right["geomodel"], + lat_mean[first_elem:last_elem], + lon_mean[first_elem:last_elem], + dem_median_list[first_elem:last_elem], + full_saving_info, + ) - for tile in loc_inverse_dataset.tiles: - ind_cols_sensor += list(tile[0]["col"]) - ind_rows_sensor += list(tile[0]["row"]) + loc_inverse_orchestrator.add_to_replace_lists( + loc_inverse_dataset + ) - parallel_end = datetime.datetime.now() - parallel_duration = parallel_end - parallel_begin - print(f"Loc inverse (parallel): {parallel_duration}") + loc_inverse_orchestrator.compute_futures( + only_remaining_delayed=[ + tile[0] for tile in loc_inverse_dataset.tiles + ] + ) - is_same = ( - np.array(ind_cols_sensor_seq) == np.array(ind_cols_sensor) - ).all() - print(f"is_same {is_same}") + ind_cols_sensor = [] + ind_rows_sensor = [] - is_same = ( - np.array(ind_rows_sensor_seq) == np.array(ind_rows_sensor) - ).all() - print(f"is_same {is_same}") + for tile in loc_inverse_dataset.tiles: + ind_cols_sensor += list(tile[0]["col"]) + ind_rows_sensor += list(tile[0]["row"]) # Generate epipolar disp grids # Get epipolar positions From 274780241855a6ede53736dd74644f3d42e097ee Mon Sep 17 00:00:00 2001 From: Marian RASSAT Date: Mon, 10 Feb 2025 16:20:54 +0000 Subject: [PATCH 3/5] perf: sample values using a single window read, instead of n reads for n points --- cars/core/inputs.py | 48 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 11 deletions(-) diff --git a/cars/core/inputs.py b/cars/core/inputs.py index df89f165..055d709f 100644 --- a/cars/core/inputs.py +++ b/cars/core/inputs.py @@ -37,6 +37,7 @@ import xarray as xr from json_checker import Checker from rasterio.warp import Resampling, calculate_default_transform, reproject +from rasterio.windows import Window from shapely.geometry import shape # CARS imports @@ -104,19 +105,44 @@ def rasterio_get_values(raster_file: str, x_list, y_list, proj_function): cloud_in = np.stack([x_list, y_list], axis=1) cloud_out = proj_function(cloud_in, 4326, file_espg) - new_x = cloud_out[:, 0] - new_y = cloud_out[:, 1] - - # get z list - z_list = list( - descriptor.sample( - [(new_x[row], new_y[row]) for row in range(new_x.shape[0])] - ) + # get the transform and inverse + aff_tr = descriptor.transform + np_tr = np.array( + [ + [aff_tr[0], aff_tr[1], aff_tr[2]], + [aff_tr[3], aff_tr[4], aff_tr[5]], + [0, 0, 1], + ] ) - z_list = np.array(z_list, dtype=float) - z_list[z_list == nodata_value] = np.nan + inv_tr = np.linalg.inv(np_tr) + + # convert sensor to pixel coordinates + pix_pos = np.hstack([cloud_out, np.ones((len(cloud_out), 1))]) + pix_pos = inv_tr @ pix_pos.T + pix_pos = pix_pos.T[:, [1, 0]].astype(int) + + # get the data needed + min_pt = pix_pos.min(axis=0) + max_pt = pix_pos.max(axis=0) + + width = max_pt[0] - min_pt[0] + 1 + height = max_pt[1] - min_pt[1] + 1 + window = Window(min_pt[1], min_pt[0], height, width) + + data = descriptor.read(1, window=window) + + # read the data for all points + max_sampled_pos = np.array(data.shape)[:2] - 1 + pix_pos -= min_pt + pix_pos[:, 0] = np.clip(pix_pos[:, 0], 0, max_sampled_pos[0]) + pix_pos[:, 1] = np.clip(pix_pos[:, 1], 0, max_sampled_pos[1]) + + z_list = data[pix_pos[:, 0], pix_pos[:, 1]].astype(float) + + if nodata_value is not None: + z_list[z_list == nodata_value] = np.nan - return z_list[:, 0] + return z_list def rasterio_get_nb_bands(raster_file: str) -> int: From ba1df8c11df568e9d5701188f1929128d0f232c4 Mon Sep 17 00:00:00 2001 From: Marian Rassat Date: Thu, 13 Feb 2025 11:25:54 +0100 Subject: [PATCH 4/5] lint: removed unused if statement --- cars/applications/dense_matching/census_mccnn_sgm.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/cars/applications/dense_matching/census_mccnn_sgm.py b/cars/applications/dense_matching/census_mccnn_sgm.py index b5e251f9..1f74b539 100644 --- a/cars/applications/dense_matching/census_mccnn_sgm.py +++ b/cars/applications/dense_matching/census_mccnn_sgm.py @@ -662,8 +662,6 @@ def generate_disparity_grids( # noqa: C901 altitude_delta_max, ): # use local disparity - if None not in (dmin, dmax): - raise RuntimeError("Mix between local and global mode") # Get associated alti mean / min / max values dem_median_shape = inputs.rasterio_get_size(dem_median) From 8be94f71ddbe05eb40652636c6410aa770402902 Mon Sep 17 00:00:00 2001 From: Marian Rassat Date: Wed, 19 Feb 2025 14:44:24 +0100 Subject: [PATCH 5/5] fix: removed prints --- cars/applications/dense_matching/census_mccnn_sgm.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/cars/applications/dense_matching/census_mccnn_sgm.py b/cars/applications/dense_matching/census_mccnn_sgm.py index 1f74b539..79a2d50f 100644 --- a/cars/applications/dense_matching/census_mccnn_sgm.py +++ b/cars/applications/dense_matching/census_mccnn_sgm.py @@ -23,7 +23,6 @@ """ # pylint: disable=too-many-lines import collections -import datetime # Standard imports import logging @@ -1524,10 +1523,6 @@ def loc_inverse_wrapper( :type saving_info: dict """ - print(f"type {type(geom_plugin)}") - - wrapper_begin = datetime.datetime.now() - col, row, _ = geom_plugin.inverse_loc( image, geomodel, @@ -1540,10 +1535,4 @@ def loc_inverse_wrapper( output, saving_info=saving_info, attributes=None ) - wrapper_end = datetime.datetime.now() - wrapper_duration = wrapper_end - wrapper_begin - print( - f"Loc inverse (wrapper): {wrapper_duration}, {len(longitudes)} points" - ) - return output