diff --git a/docker-compose.yaml b/docker-compose.yaml index 47f8c98..2d0341b 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -5,3 +5,8 @@ services: - "127.0.0.1:5672:5672" volumes: - "./config/rabbitmq:/etc/rabbitmq:ro" + # using playback server to test Camera on a stream for now. consider using something more general like gstreamer's videotestsrc? + playback-server: + image: waggle/wes-playback-server:0.1.0 + ports: + - "127.0.0.1:8090:8090" diff --git a/docs/writing-a-plugin.md b/docs/writing-a-plugin.md index b1044be..a42d9fd 100644 --- a/docs/writing-a-plugin.md +++ b/docs/writing-a-plugin.md @@ -172,7 +172,6 @@ source: architectures: - "linux/amd64" - "linux/arm64" - - "linux/arm/v7" ``` This file contains metadata about what your plugin is called and what it's supposed to do. It is used by the [Edge Code Repository](https://portal.sagecontinuum.org/apps/explore) when submitting plugins. diff --git a/src/waggle/data/vision.py b/src/waggle/data/vision.py index e92a185..de22711 100644 --- a/src/waggle/data/vision.py +++ b/src/waggle/data/vision.py @@ -14,6 +14,8 @@ from shutil import which import ffmpeg import logging +from contextlib import ExitStack, contextmanager + logger = logging.getLogger(__name__) @@ -90,13 +92,13 @@ def __init__(self, path, timestamp, format=RGB): def __enter__(self): self.capture = cv2.VideoCapture(self.path) if not self.capture.isOpened(): - raise RuntimeError( - f"unable to open video capture for file {self.path!r}" - ) + raise RuntimeError(f"unable to open video capture for file {self.path!r}") self.fps = self.capture.get(cv2.CAP_PROP_FPS) - if self.fps > 100.: - self.fps = 0. - logger.debug(f'pywaggle cannot calculate timestamp because the fps ({self.fps}) is too high.') + if self.fps > 100.0: + self.fps = 0.0 + logger.debug( + f"pywaggle cannot calculate timestamp because the fps ({self.fps}) is too high." + ) self.timestamp_delta = 0 else: self.timestamp_delta = 1 / self.fps @@ -113,31 +115,39 @@ def __iter__(self): def __next__(self): if self.capture == None or not self.capture.isOpened(): - raise RuntimeError("video is not opened. use the Python WITH statement to open the video") + raise RuntimeError( + "video is not opened. use the Python WITH statement to open the video" + ) ok, data = self.capture.read() if not ok or data is None: raise StopIteration # timestamp must be an integer in nanoseconds - approx_timestamp = self.timestamp + int(self.timestamp_delta * self._frame_count * 1e9) + approx_timestamp = self.timestamp + int( + self.timestamp_delta * self._frame_count * 1e9 + ) self._frame_count += 1 return ImageSample(data=data, timestamp=approx_timestamp, format=self.format) +INPUT_TYPE_FILE = "file" +INPUT_TYPE_OTHER = "other" + + def resolve_device(device): if isinstance(device, Path): - return resolve_device_from_path(device) + return resolve_device_from_path(device), INPUT_TYPE_FILE # objects that are not paths or strings are considered already resolved if not isinstance(device, str): - return device + return device, INPUT_TYPE_OTHER match = re.match(r"([A-Za-z0-9]+)://(.*)$", device) # non-url like paths refer to data shim devices if match is None: - return resolve_device_from_data_config(device) + return resolve_device_from_data_config(device), INPUT_TYPE_OTHER # return file:// urls as path if match.group(1) == "file": - return resolve_device_from_path(Path(match.group(2))) + return resolve_device_from_path(Path(match.group(2))), INPUT_TYPE_FILE # return other urls as-is - return device + return device, INPUT_TYPE_OTHER def resolve_device_from_path(path): @@ -154,74 +164,152 @@ def resolve_device_from_data_config(device): except KeyError: raise KeyError(f"missing .handler.args.url field for device {device!r}.") -class Camera: - INPUT_TYPE_FILE = "file" - INPUT_TYPE_OTHER = "other" +class Camera: def __init__(self, device=0, format=RGB): - self.capture = _Capture(resolve_device(device), format) - match = re.match(r"([A-Za-z0-9]+)://(.*)$", device) - if match is not None and match.group(1) == "file": - self.input_type = self.INPUT_TYPE_FILE + self.es = ExitStack() + + device, input_type = resolve_device(device) + self.device = device + self.format = format + if input_type == "file": + self.capture_class = FileCapture + elif input_type == "other": + self.capture_class = StreamCapture else: - self.input_type = self.INPUT_TYPE_OTHER + raise RuntimeError(f"invalid camera input type for device {device}") def __enter__(self): - if self.input_type == self.INPUT_TYPE_FILE: - logger.info(f'input is a file. the background thread disabled for grabbing frames') - self.capture.enable_daemon = False - else: - self.capture.enable_daemon = True - self.capture.__enter__() - return self + capture = self.capture_class(self.device, self.format) + self.es.callback(capture.close) + return capture def __exit__(self, exc_type, exc_val, exc_tb): - self.capture.__exit__(exc_type, exc_val, exc_tb) + self.es.close() def snapshot(self): - with self.capture: - return self.capture.snapshot() + with self as capture: + return capture.snapshot() def stream(self): - with self.capture: - yield from self.capture.stream() + with self as capture: + yield from capture.stream() def record(self, duration, file_path="./sample.mp4", skip_second=1): - return self.capture.record(duration, file_path, skip_second) + if which("ffmpeg") is None: + raise RuntimeError( + "ffmpeg does not exist to record video. please install ffmpeg" + ) + # TODO find cross platform option for webcams since likely to be used during tutorials + if isinstance(self.device, int): + c = ffmpeg.input(str(self.device), ss=skip_second) + elif isinstance(self.device, str) and self.device.startswith("rtsp"): + c = ffmpeg.input(self.device, rtsp_transport="tcp", ss=skip_second) + else: + c = ffmpeg.input(self.device, ss=skip_second) + c = ffmpeg.output( + c, file_path, codec="copy", f="mp4", t=duration + ).overwrite_output() + timestamp = get_timestamp() + + try: + ffmpeg.run(c, quiet=True) + except ffmpeg.Error as e: + raise RuntimeError(f"error while recording: {e.stderr.decode()}") + + return VideoSample(path=file_path, timestamp=timestamp) + + +class FileCapture: + def __init__(self, device, format): + self.device = device + self.format = format + + self.capture = cv2.VideoCapture(self.device) + if not self.capture.isOpened(): + raise RuntimeError( + f"unable to open video capture for device {self.device!r}" + ) + + def close(self): + self.capture.release() + + def snapshot(self): + return self._grab_frame() + + def stream(self): + try: + while True: + yield self._grab_frame() + except RuntimeError: + return + + def record(self): + raise RuntimeError( + "Camera already opened. Camera.record must be called outside of a with block." + ) + + def _grab_frame(self): + ok = self.capture.grab() + if not ok: + raise RuntimeError("failed to grab frame") + timestamp = get_timestamp() + ok, data = self.capture.retrieve() + if not ok: + raise RuntimeError("failed to decode frame") + return ImageSample(data=data, timestamp=timestamp, format=self.format) -class _Capture: +class StreamCapture: def __init__(self, device, format): self.device = device self.format = format - self.context_depth = 0 - self.enable_daemon = False + + self.capture = cv2.VideoCapture(self.device) + if not self.capture.isOpened(): + raise RuntimeError( + f"unable to open video capture for device {self.device!r}" + ) + + self.lock = threading.Lock() self.daemon_need_to_stop = threading.Event() self._ready_for_next_frame = threading.Event() - self.daemon = threading.Thread(target=self._run, daemon=True) - self.lock = threading.Lock() + self.stopped = threading.Event() + threading.Thread(target=self._run, daemon=True).start() - def __enter__(self): - if self.context_depth == 0: - self.capture = cv2.VideoCapture(self.device) - if not self.capture.isOpened(): - raise RuntimeError( - f"unable to open video capture for device {self.device!r}" - ) - # spin up a thread to keep up with the camera frame rate - if self.enable_daemon: - self.daemon_need_to_stop.clear() - self.daemon.start() - self.context_depth += 1 - return self + def close(self): + self.daemon_need_to_stop.set() + self.stopped.wait(timeout=10) + self.capture.release() + + def snapshot(self): + return self._grab_frame() + + def stream(self): + try: + while True: + yield self._grab_frame() + except RuntimeError: + return + + def record(self): + raise RuntimeError( + "Camera already opened. Camera.record must be called outside of a with block." + ) + + def _grab_frame(self): + if not self._ready_for_next_frame.wait(timeout=10.0): + raise RuntimeError( + "failed to grab a frame from the background thread: timed out" + ) + self._ready_for_next_frame.clear() + with acquire_with_timeout(self.lock, timeout=1.0): + timestamp = self.timestamp + ok, data = self.capture.retrieve() + if not ok: + raise RuntimeError("failed to retrieve the taken snapshot") + return ImageSample(data=data, timestamp=timestamp, format=self.format) - def __exit__(self, exc_type, exc_val, exc_tb): - self.context_depth -= 1 - if self.context_depth == 0: - if self.enable_daemon: - self.daemon_need_to_stop.set() - self.capture.release() - def _run(self): # we sleep slighly shorter than FPS to drain the buffer efficiently # NOTE: OpenCV's FPS get function is inaccurate as a USB webcam gives 1 FPS while @@ -232,69 +320,17 @@ def _run(self): # if fps > 0 and fps < 100: # sleep = 1 / (fps + 1) # logging.debug(f'camera FPS is {fps}. the background thread sleeps {sleep} seconds in between grab()') - while not self.daemon_need_to_stop.is_set(): - try: - self.lock.acquire() - ok = self.capture.grab() - if not ok: - raise RuntimeError("failed to grab a frame") - self.timestamp = get_timestamp() - finally: - self.lock.release() - self._ready_for_next_frame.set() - time.sleep(sleep) - - def grab_frame(self): - if self.daemon.is_alive(): - if not self._ready_for_next_frame.wait(timeout=10.): - raise RuntimeError("failed to grab a frame from the background thread: timed out") - self._ready_for_next_frame.clear() - try: - self.lock.acquire(timeout=1) - timestamp = self.timestamp - ok, data = self.capture.retrieve() - if not ok: - raise RuntimeError("failed to retrieve the taken snapshot") - finally: - self.lock.release() - return ImageSample(data=data, timestamp=timestamp, format=self.format) - else: - ok = self.capture.grab() - if not ok: - raise RuntimeError("failed to take a snapshot") - timestamp = get_timestamp() - ok, data = self.capture.retrieve() - if not ok: - raise RuntimeError("failed to retrieve the taken snapshot") - return ImageSample(data=data, timestamp=timestamp, format=self.format) - - def snapshot(self): - return self.grab_frame() - - def stream(self): try: - while True: - yield self.grab_frame() - except: - pass - - def record(self, duration, file_path="./sample.mp4", skip_second=1): - if which("ffmpeg") == None: - raise RuntimeError("ffmpeg does not exist to record video. please install ffmpeg") - if self.context_depth > 0: - raise RuntimeError(f'the stream {self.device} is already open. please close first or use without the Python\'s WITH statement') - if isinstance(self.device, str) and self.device.startswith("rtsp"): - c = ffmpeg.input(self.device, rtsp_transport="tcp", ss=skip_second) - else: - c = ffmpeg.input(self.device, ss=skip_second) - c = ffmpeg.output(c, file_path, codec="copy", f='mp4', t=duration).overwrite_output() - timestamp = get_timestamp() - _, stderr = ffmpeg.run(c, quiet=True) - if os.path.exists(file_path) and os.path.getsize(file_path) > 0: - return VideoSample(path=file_path, timestamp=timestamp) - else: - raise RuntimeError(f'error while recording: {stderr}') - + while not self.daemon_need_to_stop.is_set(): + with acquire_with_timeout(self.lock, timeout=10.0): + ok = self.capture.grab() + if not ok: + raise RuntimeError("failed to grab a frame") + self.timestamp = get_timestamp() + self._ready_for_next_frame.set() + time.sleep(sleep) + finally: + self.stopped.set() class ImageFolder: @@ -320,3 +356,13 @@ def __getitem__(self, i): def __repr__(self): return f"ImageFolder{self.files!r}" + + +@contextmanager +def acquire_with_timeout(lock, timeout): + if not lock.acquire(timeout=timeout): + raise TimeoutError("timed out when acquiring lock") + try: + yield + finally: + lock.release() diff --git a/tests/test.mp4 b/tests/test.mp4 new file mode 100644 index 0000000..aa26a4a Binary files /dev/null and b/tests/test.mp4 differ diff --git a/tests/test_data.py b/tests/test_data.py index fcd5885..0fc5e70 100644 --- a/tests/test_data.py +++ b/tests/test_data.py @@ -1,12 +1,20 @@ import unittest from waggle.data.audio import AudioFolder, AudioSample -from waggle.data.vision import RGB, BGR, ImageFolder, ImageSample, resolve_device +from waggle.data.vision import ( + RGB, + BGR, + ImageFolder, + ImageSample, + resolve_device, + Camera, +) from waggle.data.timestamp import get_timestamp import numpy as np from tempfile import TemporaryDirectory from pathlib import Path import os.path from itertools import product +import subprocess def generate_audio_data(samplerate, channels, dtype): @@ -40,21 +48,21 @@ def test_colors(self): def test_resolve_device(self): self.assertEqual( - resolve_device(Path("test.jpg")), str(Path("test.jpg").absolute()) + resolve_device(Path("test.jpg")), (str(Path("test.jpg").absolute()), "file") ) self.assertEqual( resolve_device("file://path/to/test.jpg"), - str(Path("path/to/test.jpg").absolute()), + (str(Path("path/to/test.jpg").absolute()), "file"), ) self.assertEqual( resolve_device("http://camera-ip.org/image.jpg"), - "http://camera-ip.org/image.jpg", + ("http://camera-ip.org/image.jpg", "other"), ) self.assertEqual( resolve_device("rtsp://camera-ip.org/image.jpg"), - "rtsp://camera-ip.org/image.jpg", + ("rtsp://camera-ip.org/image.jpg", "other"), ) - self.assertEqual(resolve_device(0), 0) + self.assertEqual(resolve_device(0), (0, "other")) def test_image_save(self): with TemporaryDirectory() as dir: @@ -119,5 +127,48 @@ def test_get_timestamp(self): self.assertIsInstance(ts, int) +def playback_server_available(): + try: + output = subprocess.check_output(["docker-compose", "logs", "playback-server"]) + except subprocess.CalledProcessError: + return False + return b"Serving data" in output + + +class TestCamera(unittest.TestCase): + def test_file_snapshot(self): + # open test.mp4 is a test video 90 480x640 frames + cam = Camera("file://tests/test.mp4") + sample = cam.snapshot() + assert sample.data.shape == (640, 480, 3) + + def test_file_stream(self): + # open test.mp4 is a test video 90 480x640 frames + cam = Camera("file://tests/test.mp4") + numframes = 0 + for sample in cam.stream(): + assert sample.data.shape == (640, 480, 3) + numframes += 1 + assert numframes == 90 + + @unittest.skipUnless(playback_server_available(), "playback server not available") + def test_stream_snapshot(self): + # open playback server which provides test video stream of 800x600 frames + cam = Camera("http://127.0.0.1:8090/bottom/live.mp4") + sample = cam.snapshot() + assert sample.data.shape == (600, 800, 3) + + @unittest.skipUnless(playback_server_available(), "playback server not available") + def test_stream_stream(self): + # open playback server which provides test video stream of 800x600 frames + cam = Camera("http://127.0.0.1:8090/bottom/live.mp4") + numframes = 0 + for sample in cam.stream(): + assert sample.data.shape == (600, 800, 3) + numframes += 1 + if numframes > 30: + break + + if __name__ == "__main__": unittest.main()