Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 32 additions & 20 deletions tests/test_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@
from autoptz.engine.discovery.onvif import ONVIFDevice, ONVIFDiscovery
from autoptz.engine.discovery.usb import USBDevice, USBDiscovery, enumerate_cameras


def _wait_until(predicate, timeout: float = 3.0, interval: float = 0.01) -> bool:
"""Poll ``predicate`` until true or ``timeout`` elapses; return its final value.

Discovery runs a background poll thread, so waiting for the asserted condition
(rather than sleeping a fixed window) keeps these tests fast and immune to a
loaded CI runner stalling the thread.
"""
deadline = time.monotonic() + timeout
while not predicate() and time.monotonic() < deadline:
time.sleep(interval)
return predicate()


# ── USBDiscovery ───────────────────────────────────────────────────────────────


Expand All @@ -41,7 +55,7 @@ def mock_probe(max_index: int = 10) -> set[int]:
discovery = USBDiscovery(poll_interval=0.1)
discovery.on_change(lambda ev, dev: events.append((ev, dev)))
discovery.start()
time.sleep(0.3)
_wait_until(lambda: {0, 1} <= {e[1].index for e in events if e[0] == "added"})
discovery.stop()

added_indices = {e[1].index for e in events if e[0] == "added"}
Expand All @@ -64,7 +78,7 @@ def mock_probe(max_index: int = 10) -> set[int]:
discovery = USBDiscovery(poll_interval=0.05)
discovery.on_change(lambda ev, dev: events.append((ev, dev)))
discovery.start()
time.sleep(0.4)
_wait_until(lambda: 2 in {e[1].index for e in events if e[0] == "added"})
discovery.stop()

added = [e for e in events if e[0] == "added"]
Expand All @@ -86,7 +100,7 @@ def mock_probe(max_index: int = 10) -> set[int]:
discovery = USBDiscovery(poll_interval=0.05)
discovery.on_change(lambda ev, dev: events.append((ev, dev)))
discovery.start()
time.sleep(0.4)
_wait_until(lambda: any(e[0] == "removed" and e[1].index == 1 for e in events))
discovery.stop()

removed = [e for e in events if e[0] == "removed"]
Expand All @@ -99,7 +113,7 @@ def mock_probe(max_index: int = 10) -> set[int]:
with patch("autoptz.engine.discovery.usb._probe_indices", mock_probe):
discovery = USBDiscovery(poll_interval=0.05)
discovery.start()
time.sleep(0.2)
_wait_until(lambda: {0, 1, 3} <= {d.index for d in discovery.devices})
devices = discovery.devices
discovery.stop()

Expand All @@ -116,7 +130,7 @@ def mock_probe(max_index: int = 10) -> set[int]:
discovery = USBDiscovery(poll_interval=0.05)
discovery.on_change(lambda ev, dev: events.append((ev, dev)))
discovery.start()
time.sleep(0.3)
_wait_until(lambda: any(e[0] == "added" and e[1].index == 0 for e in events))
discovery.stop()

# Only the initial "added" event for device 0, nothing else
Expand All @@ -135,7 +149,7 @@ def mock_probe(max_index: int = 10) -> set[int]:
discovery.on_change(lambda ev, _: events1.append(ev))
discovery.on_change(lambda ev, _: events2.append(ev))
discovery.start()
time.sleep(0.2)
_wait_until(lambda: events1 and events2)
discovery.stop()

assert len(events1) > 0
Expand Down Expand Up @@ -314,7 +328,7 @@ def test_new_source_fires_added_event(self) -> None:
discovery = NDIDiscovery(poll_interval=0.05)
discovery.on_change(lambda ev, src: events.append((ev, src)))
discovery.start()
time.sleep(0.5)
_wait_until(lambda: any(e[0] == "added" and e[1].name == "LAPTOP (TEST)" for e in events))
discovery.stop()

added = [e for e in events if e[0] == "added"]
Expand All @@ -334,7 +348,7 @@ def test_removed_source_fires_removed_event(self) -> None:
discovery = NDIDiscovery(poll_interval=0.05)
discovery.on_change(lambda ev, src: events.append((ev, src)))
discovery.start()
time.sleep(0.5)
_wait_until(lambda: any(e[0] == "removed" and e[1].name == "NDI_CAM_1" for e in events))
discovery.stop()

removed = [e for e in events if e[0] == "removed"]
Expand All @@ -350,7 +364,7 @@ def test_sources_property(self) -> None:

discovery = NDIDiscovery(poll_interval=0.05)
discovery.start()
time.sleep(0.3)
_wait_until(lambda: {"SOURCE_A", "SOURCE_B"} <= {s.name for s in discovery.sources})
sources = discovery.sources
discovery.stop()

Expand All @@ -363,9 +377,8 @@ def test_cyndilib_unavailable_does_not_raise(self) -> None:
_remove_mock_cyndilib_for_discovery() # ensure cyndilib not in sys.modules

discovery = NDIDiscovery(poll_interval=0.05)
# Should NOT raise even without cyndilib
# Should NOT raise even without cyndilib (no poll thread starts).
discovery.start()
time.sleep(0.1)
discovery.stop()

assert discovery.sources == []
Expand All @@ -383,10 +396,10 @@ def test_no_events_when_sources_stable(self) -> None:
discovery = NDIDiscovery(poll_interval=0.05)
discovery.on_change(lambda ev, src: events.append((ev, src)))
discovery.start()
time.sleep(0.3)
_wait_until(lambda: any(e[0] == "added" for e in events))
discovery.stop()

# "STABLE" should appear as added exactly once
# "STABLE" should appear as added exactly once (dedup holds across polls).
added = [e for e in events if e[0] == "added"]
removed = [e for e in events if e[0] == "removed"]
assert len(added) == 1
Expand Down Expand Up @@ -449,7 +462,7 @@ def test_discovered_device_fires_added(self) -> None:
discovery = ONVIFDiscovery(rescan_interval=60.0)
discovery.on_change(lambda ev, dev: events.append((ev, dev)))
discovery.start()
time.sleep(0.5)
_wait_until(lambda: any(e[0] == "added" for e in events))
discovery.stop()

added = [e for e in events if e[0] == "added"]
Expand All @@ -472,7 +485,7 @@ def test_device_removed_after_miss_threshold(self) -> None:
discovery = ONVIFDiscovery(rescan_interval=0.05)
discovery.on_change(lambda ev, dev: events.append((ev, dev)))
discovery.start()
time.sleep(0.6)
_wait_until(lambda: any(e[0] == "removed" for e in events))
discovery.stop()

added = [e for e in events if e[0] == "added"]
Expand All @@ -495,10 +508,10 @@ def test_stable_device_not_re_reported(self) -> None:
discovery = ONVIFDiscovery(rescan_interval=0.05)
discovery.on_change(lambda ev, dev: events.append((ev, dev)))
discovery.start()
time.sleep(0.4)
_wait_until(lambda: any(e[0] == "added" for e in events))
discovery.stop()

# Should only fire "added" once, not on every rescan
# Should only fire "added" once, not on every rescan (dedup holds).
added = [e for e in events if e[0] == "added"]
assert len(added) == 1

Expand All @@ -507,7 +520,6 @@ def test_wsdiscovery_unavailable_does_not_raise(self) -> None:

discovery = ONVIFDiscovery(rescan_interval=0.05)
discovery.start()
time.sleep(0.1)
discovery.stop()

assert discovery.devices == []
Expand All @@ -518,7 +530,7 @@ def test_devices_property(self) -> None:

discovery = ONVIFDiscovery(rescan_interval=0.05)
discovery.start()
time.sleep(0.3)
_wait_until(lambda: len(discovery.devices) >= 1)
devices = discovery.devices
discovery.stop()

Expand All @@ -535,7 +547,7 @@ def test_multiple_callbacks_all_called(self) -> None:
discovery.on_change(lambda ev, _: results1.append(ev))
discovery.on_change(lambda ev, _: results2.append(ev))
discovery.start()
time.sleep(0.5)
_wait_until(lambda: results1 and results2)
discovery.stop()

assert len(results1) > 0
Expand Down
6 changes: 5 additions & 1 deletion tests/test_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,11 @@ def fake_read(self: RTSPAdapter) -> np.ndarray | None:
patch.object(RTSPAdapter, "_close", lambda self: None),
):
adapter.start()
time.sleep(1.0)
# Poll for the stall-triggered reconnect (≥2 opens) instead of a fixed
# 1 s sleep, which flakes when a loaded CI runner is slow to reconnect.
deadline = time.monotonic() + 3.0
while open_count < 2 and time.monotonic() < deadline:
time.sleep(0.01)
adapter.stop()

assert open_count >= 2, f"Expected ≥ 2 opens after stall, got {open_count}"
Expand Down
Loading