|
49 | 49 | import argparse |
50 | 50 | import time |
51 | 51 | import platform |
| 52 | +import threading |
52 | 53 | import multiprocessing as mp |
53 | 54 | from typing import Dict, Any, List, Optional, Tuple, Literal, Generator |
54 | 55 | from datetime import datetime |
|
142 | 143 | class FFMPEGVideoWriter: |
143 | 144 | """ |
144 | 145 | Video writer using ffmpeg subprocess for encoding with 10-bit support. |
145 | | - |
| 146 | +
|
146 | 147 | Provides cv2.VideoWriter-compatible interface (write, isOpened, release) while |
147 | 148 | using ffmpeg for encoding. Enables 10-bit output (yuv420p10le with x265) which |
148 | 149 | reduces banding artifacts in gradients compared to 8-bit opencv output. |
149 | | - |
| 150 | +
|
150 | 151 | Args: |
151 | 152 | path: Output video file path |
152 | 153 | width: Frame width in pixels |
153 | 154 | height: Frame height in pixels |
154 | 155 | fps: Frames per second |
155 | 156 | use_10bit: If True, uses x265 codec with yuv420p10le pixel format. |
156 | 157 | If False, uses x264 with yuv420p (default: False) |
157 | | - |
| 158 | +
|
158 | 159 | Raises: |
159 | 160 | RuntimeError: If ffmpeg is not found in system PATH |
160 | | - |
| 161 | +
|
161 | 162 | Note: |
162 | 163 | Frames must be passed to write() in BGR format (same as cv2.VideoWriter). |
163 | 164 | Internally converts to RGB for ffmpeg rawvideo input. |
164 | 165 | """ |
165 | | - |
| 166 | + |
| 167 | + # Timeout (seconds) for ffmpeg to finalize encoding after stdin is closed. |
| 168 | + # x265 flush can be slow on large videos, so this is generous. |
| 169 | + WAIT_TIMEOUT = 120 |
| 170 | + TERMINATE_TIMEOUT = 15 |
| 171 | + |
166 | 172 | def __init__(self, path: str, width: int, height: int, fps: float, use_10bit: bool = False): |
167 | 173 | pix_fmt = 'yuv420p10le' if use_10bit else 'yuv420p' |
168 | 174 | codec = 'libx265' if use_10bit else 'libx264' |
169 | | - |
| 175 | + |
170 | 176 | self.proc = subprocess.Popen( |
171 | | - ['ffmpeg', '-y', '-f', 'rawvideo', '-pix_fmt', 'rgb24', |
| 177 | + ['ffmpeg', '-y', '-loglevel', 'warning', |
| 178 | + '-f', 'rawvideo', '-pix_fmt', 'rgb24', |
172 | 179 | '-s', f'{width}x{height}', '-r', str(fps), '-i', '-', |
173 | 180 | '-c:v', codec, '-pix_fmt', pix_fmt, '-preset', 'medium', '-crf', '12', path], |
174 | | - stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL |
| 181 | + stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE |
175 | 182 | ) |
176 | | - |
| 183 | + # Drain stderr in a background thread to prevent pipe buffer filling up |
| 184 | + # and blocking ffmpeg. Only warnings/errors are emitted (-loglevel warning). |
| 185 | + self._stderr_lines: list = [] |
| 186 | + self._stderr_thread = threading.Thread(target=self._drain_stderr, daemon=True) |
| 187 | + self._stderr_thread.start() |
| 188 | + |
| 189 | + def _drain_stderr(self): |
| 190 | + """Read stderr in background to prevent pipe deadlock.""" |
| 191 | + try: |
| 192 | + for line in self.proc.stderr: |
| 193 | + self._stderr_lines.append(line) |
| 194 | + # Keep only last 50 lines to limit memory usage |
| 195 | + if len(self._stderr_lines) > 50: |
| 196 | + self._stderr_lines = self._stderr_lines[-50:] |
| 197 | + except (ValueError, OSError): |
| 198 | + pass # Pipe closed or process exited |
| 199 | + |
177 | 200 | def write(self, frame_bgr: np.ndarray): |
178 | 201 | if not self.isOpened(): |
179 | | - raise RuntimeError("FFMPEGVideoWriter: ffmpeg process is not running") |
180 | | - |
| 202 | + stderr_hint = self._get_stderr_summary() |
| 203 | + raise RuntimeError( |
| 204 | + f"FFMPEGVideoWriter: ffmpeg process is not running.{stderr_hint}" |
| 205 | + ) |
| 206 | + |
181 | 207 | frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) |
182 | 208 | try: |
183 | 209 | self.proc.stdin.write(frame_rgb.astype(np.uint8).tobytes()) |
184 | 210 | self.proc.stdin.flush() # Critical: prevent buffering issues |
185 | 211 | except BrokenPipeError: |
| 212 | + stderr_hint = self._get_stderr_summary() |
186 | 213 | raise RuntimeError( |
187 | 214 | "FFMPEGVideoWriter: ffmpeg process terminated unexpectedly. " |
188 | | - "Check video path, codec support, and disk space." |
| 215 | + f"Check video path, codec support, and disk space.{stderr_hint}" |
189 | 216 | ) |
190 | | - |
| 217 | + |
191 | 218 | def isOpened(self) -> bool: |
192 | 219 | return self.proc is not None and self.proc.poll() is None |
193 | | - |
| 220 | + |
194 | 221 | def release(self): |
195 | | - if self.proc: |
| 222 | + if self.proc is None: |
| 223 | + return |
| 224 | + |
| 225 | + # Close stdin to signal EOF to ffmpeg |
| 226 | + try: |
| 227 | + self.proc.stdin.close() |
| 228 | + except Exception: |
| 229 | + pass |
| 230 | + |
| 231 | + # Wait for ffmpeg to finalize encoding with a timeout |
| 232 | + try: |
| 233 | + self.proc.wait(timeout=self.WAIT_TIMEOUT) |
| 234 | + except subprocess.TimeoutExpired: |
| 235 | + debug.log( |
| 236 | + f"ffmpeg did not exit within {self.WAIT_TIMEOUT}s after closing input, sending SIGTERM", |
| 237 | + level="WARNING", force=True, category="file" |
| 238 | + ) |
| 239 | + self.proc.terminate() |
196 | 240 | try: |
197 | | - self.proc.stdin.close() |
198 | | - except Exception: |
199 | | - pass # Ignore errors on close |
200 | | - |
201 | | - self.proc.wait() |
202 | | - |
203 | | - if self.proc.returncode != 0: |
| 241 | + self.proc.wait(timeout=self.TERMINATE_TIMEOUT) |
| 242 | + except subprocess.TimeoutExpired: |
204 | 243 | debug.log( |
205 | | - f"ffmpeg exited with code {self.proc.returncode}. " |
206 | | - "Check output file for corruption.", |
| 244 | + "ffmpeg did not respond to SIGTERM, sending SIGKILL", |
207 | 245 | level="WARNING", force=True, category="file" |
208 | 246 | ) |
209 | | - self.proc = None |
| 247 | + self.proc.kill() |
| 248 | + self.proc.wait() |
| 249 | + |
| 250 | + # Wait for stderr thread to finish |
| 251 | + self._stderr_thread.join(timeout=5) |
| 252 | + |
| 253 | + if self.proc.returncode != 0: |
| 254 | + stderr_hint = self._get_stderr_summary() |
| 255 | + debug.log( |
| 256 | + f"ffmpeg exited with code {self.proc.returncode}. " |
| 257 | + f"Check output file for corruption.{stderr_hint}", |
| 258 | + level="WARNING", force=True, category="file" |
| 259 | + ) |
| 260 | + self.proc = None |
| 261 | + |
| 262 | + def _get_stderr_summary(self) -> str: |
| 263 | + """Return last stderr lines from ffmpeg for diagnostics.""" |
| 264 | + if not self._stderr_lines: |
| 265 | + return "" |
| 266 | + try: |
| 267 | + text = b"".join(self._stderr_lines).decode("utf-8", errors="replace").strip() |
| 268 | + if text: |
| 269 | + return f"\nffmpeg stderr: {text[-500:]}" |
| 270 | + except Exception: |
| 271 | + pass |
| 272 | + return "" |
| 273 | + |
| 274 | + def __del__(self): |
| 275 | + """Safety net: ensure ffmpeg process is cleaned up on garbage collection.""" |
| 276 | + if getattr(self, 'proc', None) is not None and self.proc.poll() is None: |
| 277 | + try: |
| 278 | + self.proc.stdin.close() |
| 279 | + except Exception: |
| 280 | + pass |
| 281 | + try: |
| 282 | + self.proc.terminate() |
| 283 | + self.proc.wait(timeout=5) |
| 284 | + except Exception: |
| 285 | + try: |
| 286 | + self.proc.kill() |
| 287 | + except Exception: |
| 288 | + pass |
210 | 289 |
|
211 | 290 |
|
212 | 291 | # ============================================================================= |
|
0 commit comments