-
Notifications
You must be signed in to change notification settings - Fork 499
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,7 @@ | |
import threading | ||
import cv2 # type: ignore | ||
from threading import Thread | ||
from typing import Optional | ||
from typing import Optional, Union, Type, Dict | ||
|
||
from .enforce_types import enforce_types | ||
|
||
|
@@ -60,6 +60,8 @@ class Tello: | |
'tof', 'h', 'bat', 'time' | ||
) | ||
FLOAT_STATE_FIELDS = ('baro', 'agx', 'agy', 'agz') | ||
|
||
state_field_converters: Dict[str, Union[Type[int], Type[float]]] | ||
state_field_converters = {key : int for key in INT_STATE_FIELDS} | ||
state_field_converters.update({key : float for key in FLOAT_STATE_FIELDS}) | ||
|
||
|
@@ -157,7 +159,7 @@ def udp_state_receiver(): | |
break | ||
|
||
@staticmethod | ||
def parse_state(state: str) -> dict: | ||
def parse_state(state: str) -> Dict[str, Union[int, float, str]]: | ||
"""Parse a state line to a dictionary | ||
Internal method, you normally wouldn't call this yourself. | ||
""" | ||
|
@@ -174,16 +176,17 @@ def parse_state(state: str) -> dict: | |
continue | ||
|
||
key = split[0] | ||
value = split[1] | ||
value: Union[int, float, str] = split[1] | ||
|
||
if key in Tello.state_field_converters: | ||
num_type = Tello.state_field_converters[key] | ||
try: | ||
value = num_type(value) | ||
except Exception as e: | ||
except ValueError as e: | ||
Tello.LOGGER.debug('Error parsing state value for {}: {} to {}' | ||
.format(key, value, num_type)) | ||
Tello.LOGGER.error(e) | ||
continue | ||
|
||
state_dict[key] = value | ||
|
||
|
@@ -410,7 +413,7 @@ def send_command_with_return(self, command: str, timeout: int = RESPONSE_TIMEOUT | |
diff = time.time() - self.last_received_command_timestamp | ||
if diff < self.TIME_BTW_COMMANDS: | ||
self.LOGGER.debug('Waiting {} seconds to execute command: {}...'.format(diff, command)) | ||
self.sleep(diff) | ||
time.sleep(diff) | ||
|
||
self.LOGGER.info("Send command: '{}'".format(command)) | ||
timestamp = time.time() | ||
|
@@ -424,7 +427,7 @@ def send_command_with_return(self, command: str, timeout: int = RESPONSE_TIMEOUT | |
message = "Aborting command '{}'. Did not receive a response after {} seconds".format(command, timeout) | ||
self.LOGGER.warning(message) | ||
return message | ||
self.sleep(0.1) # Sleep during send command | ||
time.sleep(0.1) # Sleep during send command | ||
|
||
self.last_received_command_timestamp = time.time() | ||
|
||
|
@@ -517,16 +520,11 @@ def connect(self, wait_for_state=True): | |
t = i / REPS # in seconds | ||
Tello.LOGGER.debug("'.connect()' received first state packet after {} seconds".format(t)) | ||
break | ||
self.sleep(1 / REPS) | ||
time.sleep(1 / REPS) | ||
|
||
if not self.get_current_state(): | ||
raise Exception('Did not receive a state packet from the Tello') | ||
|
||
def sleep(time_sleep=5): | ||
"""Sleep. | ||
""" | ||
time.sleep(time_sleep) | ||
|
||
def takeoff(self): | ||
"""Automatic takeoff. | ||
""" | ||
|
@@ -776,7 +774,7 @@ def send_rc_control(self, left_right_velocity: int, forward_backward_velocity: i | |
up_down_velocity: -100~100 (up/down) | ||
yaw_velocity: -100~100 (yaw) | ||
""" | ||
def clamp100(x: int): int: | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
Nikolaj-K
Contributor
|
||
def clamp100(x: int) -> int: | ||
return max(-100, min(100, x)) | ||
|
||
if time.time() - self.last_rc_control_timestamp > self.TIME_BTW_RC_CONTROL_COMMANDS: | ||
|
Yeah I just saw this too.
In sleep, there was only
self
missing.