Skip to content

Commit

Permalink
Mock socket instead of using fake UDP server
Browse files Browse the repository at this point in the history
  • Loading branch information
sindrehan committed Jan 15, 2025
1 parent dc048a7 commit 48aa246
Showing 1 changed file with 29 additions and 67 deletions.
96 changes: 29 additions & 67 deletions tests/test_udp_client.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
#!/usr/bin/env python3
import json
import socket
import struct
import threading
import time
import unittest
import pytest
from unittest.mock import *
from unittest.mock import Mock

from blueye.protocol import UdpClient

UDP_PORT = 32011

fake_json = json.loads("""
fake_json = json.loads(
"""
[
{
"version": "2",
Expand Down Expand Up @@ -56,74 +53,39 @@
]
}
]
""")
"""
)


class UDPServer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.daemon = True
self._stop_loop = False
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._addr = ("127.0.0.1", UDP_PORT)
self._msg2 = struct.pack("<BBb", 2, 2, 2)
self._msg3 = struct.pack("<BBh", 2, 3, 3)
self.start()
@pytest.fixture
def udp_client():
client = UdpClient(
protocol_description=fake_json, port=UDP_PORT, drone_ip="127.0.0.1"
)
client._sock = Mock()
recvfrom_side_effect = [
(struct.pack("<BBb", 2, 2, 2), ("127.0.0.1", UDP_PORT)),
(struct.pack("<BBh", 2, 3, 3), ("127.0.0.1", UDP_PORT)),
]
client._sock.recvfrom.side_effect = recvfrom_side_effect
return client

def __del__(self):
self._socket.close()

def stop_thread(self):
self._stop_loop = True
self.join()
def test_get_raw_data(udp_client):
data = udp_client._get_raw_data()
assert data == struct.pack("<BBb", 2, 2, 2)

def run(self):
for n in range(3):
if self._stop_loop:
continue
self._socket.sendto(self._msg2, self._addr)
time.sleep(0.01)
while not self._stop_loop:
self._socket.sendto(self._msg3, self._addr)
time.sleep(0.01)

def test_get_data(udp_client):
data = udp_client.get_data()
assert data == (2, 2, 2)

class TestUdpClient(unittest.TestCase):
def setUp(self):
pass

@pytest.mark.timeout(1)
def test_get_raw_data(self):
us = UDPServer()
uc = UdpClient(protocol_description=fake_json, port=UDP_PORT, drone_ip="127.0.0.1")
data = uc._get_raw_data()
us.stop_thread()
self.assertEqual(data, struct.pack("<BBb", 2, 2, 2))
def test_get_data_t3(udp_client):
data = udp_client.get_data(packet_type=3)
assert data == (2, 3, 3)

@pytest.mark.timeout(1)
def test_get_data(self):
us = UDPServer()
uc = UdpClient(protocol_description=fake_json, port=UDP_PORT, drone_ip="127.0.0.1")
data = uc.get_data()
us.stop_thread()
self.assertEqual(data, (2, 2, 2))

@pytest.mark.timeout(1)
def test_get_data_t3(self):
us = UDPServer()
uc = UdpClient(protocol_description=fake_json, port=UDP_PORT, drone_ip="127.0.0.1")
data = uc.get_data(packet_type=3)
us.stop_thread()
self.assertEqual(data, (2, 3, 3))

@pytest.mark.timeout(1)
def test_get_data_dict(self):
us = UDPServer()
uc = UdpClient(protocol_description=fake_json, port=UDP_PORT, drone_ip="127.0.0.1")
data = uc.get_data_dict()
us.stop_thread()
self.assertEqual(data, {'u1-2-v': 2, 'u1-2-t': 2, 'i1-2': 2})


if __name__ == '__main__':
unittest.main()
def test_get_data_dict(udp_client):
data = udp_client.get_data_dict()
assert data == {"u1-2-v": 2, "u1-2-t": 2, "i1-2": 2}

0 comments on commit 48aa246

Please sign in to comment.