|
| 1 | +# +-------------------------------------------------------------------------------------+ |
| 2 | +# | Calibration algorithm inspired by: | |
| 3 | +# | Repository: https://github.com/ros-perception/image_pipeline | |
| 4 | +# | File: `image_pipeline/camera_calibration/src/camera_calibration/mono_calibrator.py` | | |
| 5 | +# | Commit: 722ca08b98f37b7b148d429753da133ff1e2c7cf | |
| 6 | +# +-------------------------------------------------------------------------------------+ |
| 7 | + |
| 8 | +import random |
| 9 | +from collections.abc import Callable |
| 10 | +from dataclasses import dataclass |
| 11 | +from textwrap import dedent |
| 12 | +from typing import Self |
| 13 | + |
| 14 | +import cv2 as opencv |
| 15 | +import numpy |
| 16 | +from jaxtyping import Float |
| 17 | +from marker_detection import chessboard |
| 18 | +from marker_detection.chessboard import ( |
| 19 | + AggregatedDetectionDetails, |
| 20 | + BoardProperties, |
| 21 | + DetectionDetails, |
| 22 | + Detector, |
| 23 | + VisualizationContext, |
| 24 | +) |
| 25 | +from video_io.calibration import Calibration |
| 26 | +from video_io.reader import Reader |
| 27 | +from video_io.writer import Writer |
| 28 | + |
| 29 | +from child_lab_procedures.garbage_collection import no_garbage_collection |
| 30 | + |
| 31 | + |
| 32 | +@dataclass(slots=True) |
| 33 | +class Configuration: |
| 34 | + board_properties: BoardProperties |
| 35 | + max_samples: int | None = None |
| 36 | + max_speed: float = float('inf') |
| 37 | + min_distance: float = 0.3 |
| 38 | + |
| 39 | + |
| 40 | +@dataclass(slots=True) |
| 41 | +class VideoIoContext: |
| 42 | + name: str |
| 43 | + reader: Reader |
| 44 | + writer: Writer[VisualizationContext] | None = None |
| 45 | + |
| 46 | + |
| 47 | +@dataclass(slots=True) |
| 48 | +class VideoAnalysisContext: |
| 49 | + name: str |
| 50 | + reader: Reader |
| 51 | + detector: Detector |
| 52 | + writer: Writer[VisualizationContext] | None = None |
| 53 | + |
| 54 | + @classmethod |
| 55 | + def from_io(cls, io: VideoIoContext, configuration: Configuration) -> Self: |
| 56 | + detector = Detector(configuration.board_properties) |
| 57 | + |
| 58 | + return cls( |
| 59 | + io.name, |
| 60 | + io.reader, |
| 61 | + detector, |
| 62 | + io.writer, |
| 63 | + ) |
| 64 | + |
| 65 | + |
| 66 | +@dataclass(slots=True, repr=False) |
| 67 | +class Result: |
| 68 | + calibration: Calibration |
| 69 | + reprojection_error: float |
| 70 | + |
| 71 | + samples: int |
| 72 | + speed_rejections: int |
| 73 | + similarity_rejections: int |
| 74 | + |
| 75 | + average_metrics: DetectionDetails |
| 76 | + progress_metrics: DetectionDetails |
| 77 | + |
| 78 | + def __repr__(self) -> str: |
| 79 | + calibration = self.calibration |
| 80 | + average_metrics = self.average_metrics |
| 81 | + progress_metrics = self.progress_metrics |
| 82 | + |
| 83 | + def format_float_tuple(input: tuple[float, ...], format_specifier: str) -> str: |
| 84 | + elements = ', '.join(x.__format__(format_specifier) for x in input) |
| 85 | + return f'({elements})' |
| 86 | + |
| 87 | + def percent(value: float) -> str: |
| 88 | + return f'{value * 100.0:.2f}%' |
| 89 | + |
| 90 | + return dedent( |
| 91 | + f"""\ |
| 92 | + Result: |
| 93 | + samples: {self.samples} |
| 94 | + speed_rejections: {self.speed_rejections} |
| 95 | + similarity_rejections: {self.similarity_rejections} |
| 96 | + reprojection_error: {self.reprojection_error:.3e} |
| 97 | +
|
| 98 | + calibration: Calibration: |
| 99 | + focal_length: {format_float_tuple(calibration.focal_length, '.2f')} |
| 100 | + optical_center: {format_float_tuple(calibration.optical_center, '.2f')} |
| 101 | + distortion: {format_float_tuple(calibration.distortion, '.3e')} |
| 102 | +
|
| 103 | + average_metrics: DetectionProperties: |
| 104 | + area: {average_metrics.area:.3e} |
| 105 | + skew: {average_metrics.skew:.3e} |
| 106 | + x_offset: {average_metrics.x_offset:.3e} |
| 107 | + y_offset: {average_metrics.y_offset:.3e} |
| 108 | + perspective_offset: {average_metrics.perspective_offset:.3e} |
| 109 | +
|
| 110 | + progress_metrics: DetectionProperties: |
| 111 | + skew: {percent(progress_metrics.skew)} |
| 112 | + x_offset: {percent(progress_metrics.x_offset)} |
| 113 | + y_offset: {percent(progress_metrics.y_offset)} |
| 114 | + perspective_offset: {percent(progress_metrics.perspective_offset)} |
| 115 | + """ |
| 116 | + ) |
| 117 | + |
| 118 | + |
| 119 | +@dataclass(slots=True) |
| 120 | +class SamplingSummary: |
| 121 | + """ |
| 122 | + Container which stores intermediate results required to compute the calibration. |
| 123 | + """ |
| 124 | + |
| 125 | + image_size: tuple[int, int] |
| 126 | + board_properties: BoardProperties |
| 127 | + |
| 128 | + samples: list[Float[numpy.ndarray, 'n_points 1 2']] |
| 129 | + metrics: list[DetectionDetails] |
| 130 | + speed_rejections: int |
| 131 | + similarity_rejections: int |
| 132 | + |
| 133 | + def calibrate(self) -> Result: |
| 134 | + """ |
| 135 | + Compute the calibration based on the collected samples. |
| 136 | + """ |
| 137 | + |
| 138 | + board_3d_points = self.board_properties.rigid_model |
| 139 | + samples = self.samples |
| 140 | + |
| 141 | + reprojection_error, intrinsics, distortion, *_ = opencv.calibrateCamera( |
| 142 | + [board_3d_points for _ in range(len(samples))], |
| 143 | + samples, |
| 144 | + (self.image_size[1], self.image_size[0]), |
| 145 | + numpy.eye(3, 3, dtype=numpy.float32), |
| 146 | + numpy.zeros(5, dtype=numpy.float32), |
| 147 | + ) |
| 148 | + assert distortion.shape == (5, 1) |
| 149 | + assert intrinsics.shape == (3, 3) |
| 150 | + |
| 151 | + calibration = Calibration( |
| 152 | + focal_length=(float(intrinsics[0, 0]), float(intrinsics[1, 1])), |
| 153 | + optical_center=(float(intrinsics[0, 2]), float(intrinsics[1, 2])), |
| 154 | + distortion=tuple(distortion.flatten().tolist()), |
| 155 | + ) |
| 156 | + |
| 157 | + aggregated_metrics = AggregatedDetectionDetails(self.metrics) |
| 158 | + average_metrics = aggregated_metrics.mean() |
| 159 | + progress_metrics = aggregated_metrics.progress() |
| 160 | + |
| 161 | + return Result( |
| 162 | + calibration, |
| 163 | + reprojection_error, |
| 164 | + len(samples), |
| 165 | + self.speed_rejections, |
| 166 | + self.similarity_rejections, |
| 167 | + average_metrics, |
| 168 | + progress_metrics, |
| 169 | + ) |
| 170 | + |
| 171 | + |
| 172 | +class Procedure: |
| 173 | + configuration: Configuration |
| 174 | + context: VideoAnalysisContext |
| 175 | + |
| 176 | + def __init__( |
| 177 | + self, |
| 178 | + configuration: Configuration, |
| 179 | + context: VideoIoContext, |
| 180 | + ) -> None: |
| 181 | + self.configuration = configuration |
| 182 | + self.context = VideoAnalysisContext.from_io(context, configuration) |
| 183 | + |
| 184 | + def length_estimate(self) -> int: |
| 185 | + return self.context.reader.metadata.frames |
| 186 | + |
| 187 | + @no_garbage_collection() |
| 188 | + def run(self, on_step: Callable[[], object] = lambda: None) -> SamplingSummary | None: |
| 189 | + reader = self.context.reader |
| 190 | + detector = self.context.detector |
| 191 | + writer = self.context.writer |
| 192 | + |
| 193 | + frames_since_previous_result = 0 |
| 194 | + time_delta = 1.0 / reader.metadata.fps |
| 195 | + max_speed = self.configuration.max_speed |
| 196 | + min_distance = self.configuration.min_distance |
| 197 | + speed_rejections = 0 |
| 198 | + similarity_rejections = 0 |
| 199 | + |
| 200 | + image_points: list[Float[numpy.ndarray, 'n_corners 1 2']] = [] |
| 201 | + previous_result: chessboard.Result | None = None |
| 202 | + metrics: list[chessboard.DetectionDetails] = [] |
| 203 | + |
| 204 | + while (tensor_frame := reader.read()) is not None: |
| 205 | + on_step() |
| 206 | + |
| 207 | + result = detector.predict(tensor_frame.permute((1, 2, 0)).numpy()) |
| 208 | + if result is None: |
| 209 | + frames_since_previous_result += 1 |
| 210 | + continue |
| 211 | + |
| 212 | + speed = ( |
| 213 | + result.average_speed( |
| 214 | + previous_result, |
| 215 | + time_delta * frames_since_previous_result, |
| 216 | + ) |
| 217 | + if previous_result is not None |
| 218 | + else 0.0 |
| 219 | + ) |
| 220 | + frames_since_previous_result = 1 |
| 221 | + previous_result = result |
| 222 | + |
| 223 | + if speed >= max_speed: |
| 224 | + speed_rejections += 1 |
| 225 | + continue |
| 226 | + |
| 227 | + result_metrics = result.details |
| 228 | + |
| 229 | + closest_sample_distance = min( |
| 230 | + (result_metrics.distance(other) for other in metrics), |
| 231 | + default=float('inf'), |
| 232 | + ) |
| 233 | + if closest_sample_distance <= min_distance: |
| 234 | + similarity_rejections += 1 |
| 235 | + continue |
| 236 | + |
| 237 | + metrics.append(result_metrics) |
| 238 | + image_points.append(result.corners) |
| 239 | + |
| 240 | + if writer is not None: |
| 241 | + writer.write(tensor_frame, [result]) |
| 242 | + |
| 243 | + n_samples = len(image_points) |
| 244 | + max_samples = self.configuration.max_samples |
| 245 | + |
| 246 | + if max_samples is not None and n_samples > max_samples: |
| 247 | + n_samples = max_samples |
| 248 | + indices = list(range(n_samples)) |
| 249 | + random_samples = random.sample(indices, max_samples) |
| 250 | + image_points = [image_points[i] for i in random_samples] |
| 251 | + metrics = [metrics[i] for i in random_samples] |
| 252 | + |
| 253 | + return SamplingSummary( |
| 254 | + (reader.metadata.height, reader.metadata.width), |
| 255 | + self.configuration.board_properties, |
| 256 | + image_points, |
| 257 | + metrics, |
| 258 | + speed_rejections, |
| 259 | + similarity_rejections, |
| 260 | + ) |
0 commit comments