-
Notifications
You must be signed in to change notification settings - Fork 562
/
Copy pathptz.py
234 lines (191 loc) · 10.6 KB
/
ptz.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""For clients to the Spot CAM Ptz service."""
import logging
_LOGGER = logging.getLogger(__name__)
from google.protobuf.wrappers_pb2 import FloatValue, Int32Value
from bosdyn.api.spot_cam import ptz_pb2, service_pb2_grpc
from bosdyn.client.common import BaseClient, common_header_errors, handle_common_header_errors
from bosdyn.client.math_helpers import recenter_value_mod
class PtzClient(BaseClient):
"""A client calling Spot CAM Ptz service.
"""
default_service_name = 'spot-cam-ptz'
service_type = 'bosdyn.api.spot_cam.PtzService'
def __init__(self):
super(PtzClient, self).__init__(service_pb2_grpc.PtzServiceStub)
def list_ptz(self, **kwargs):
"""List all the available ptzs"""
request = ptz_pb2.ListPtzRequest()
return self.call(self._stub.ListPtz, request, self._list_ptz_from_response,
common_header_errors, copy_request=False, **kwargs)
def list_ptz_async(self, **kwargs):
"""Async version of list_ptz()"""
request = ptz_pb2.ListPtzRequest()
return self.call_async(self._stub.ListPtz, request, self._list_ptz_from_response,
common_header_errors, copy_request=False, **kwargs)
def get_ptz_position(self, ptz_desc, **kwargs):
"""Position of the specified ptz"""
request = ptz_pb2.GetPtzPositionRequest(ptz=ptz_desc)
return self.call(self._stub.GetPtzPosition, request, self._get_ptz_position_from_response,
common_header_errors, copy_request=False, **kwargs)
def get_ptz_position_async(self, ptz_desc, **kwargs):
"""Async version of get_ptz_position()"""
request = ptz_pb2.GetPtzPositionRequest(ptz=ptz_desc)
return self.call_async(self._stub.GetPtzPosition, request,
self._get_ptz_position_from_response, common_header_errors,
copy_request=False, **kwargs)
def get_ptz_velocity(self, ptz_desc, **kwargs):
"""Velocity of the specified ptz"""
request = ptz_pb2.GetPtzVelocityRequest(ptz=ptz_desc)
return self.call(self._stub.GetPtzVelocity, request, self._get_ptz_velocity_from_response,
common_header_errors, copy_request=False, **kwargs)
def get_ptz_velocity_async(self, ptz_desc, **kwargs):
"""Async version of get_ptz_velocity()"""
request = ptz_pb2.GetPtzVelocityRequest(ptz=ptz_desc)
return self.call_async(self._stub.GetPtzVelocity, request,
self._get_ptz_velocity_from_response, common_header_errors,
copy_request=False, **kwargs)
def set_ptz_position(self, ptz_desc, pan, tilt, zoom, **kwargs):
"""Set position of the specified ptz in PTZ-space"""
p = FloatValue(value=pan)
t = FloatValue(value=tilt)
z = FloatValue(value=zoom)
ptz_position = ptz_pb2.PtzPosition(ptz=ptz_desc, pan=p, tilt=t, zoom=z)
request = ptz_pb2.SetPtzPositionRequest(position=ptz_position)
return self.call(self._stub.SetPtzPosition, request, self._set_ptz_position_from_response,
common_header_errors, copy_request=False, **kwargs)
def set_ptz_position_async(self, ptz_desc, pan, tilt, zoom, **kwargs):
"""Async version of set_ptz_position()"""
p = FloatValue(value=pan)
t = FloatValue(value=tilt)
z = FloatValue(value=zoom)
ptz_position = ptz_pb2.PtzPosition(ptz=ptz_desc, pan=p, tilt=t, zoom=z)
request = ptz_pb2.SetPtzPositionRequest(position=ptz_position)
return self.call_async(self._stub.SetPtzPosition, request,
self._set_ptz_position_from_response, common_header_errors,
copy_request=False, **kwargs)
def set_ptz_velocity(self, ptz_desc, pan, tilt, zoom, **kwargs):
"""Set velocity of the specified ptz in PTZ-space"""
p = FloatValue(value=pan)
t = FloatValue(value=tilt)
z = FloatValue(value=zoom)
ptz_velocity = ptz_pb2.PtzVelocity(ptz=ptz_desc, pan=p, tilt=t, zoom=z)
request = ptz_pb2.SetPtzVelocityRequest(velocity=ptz_velocity)
return self.call(self._stub.SetPtzVelocity, request, self._set_ptz_velocity_from_response,
common_header_errors, copy_request=False, **kwargs)
def set_ptz_velocity_async(self, ptz_desc, pan, tilt, zoom, **kwargs):
"""Async version of set_ptz_velocity()"""
p = FloatValue(value=pan)
t = FloatValue(value=tilt)
z = FloatValue(value=zoom)
ptz_velocity = ptz_pb2.PtzVelocity(ptz=ptz_desc, pan=p, tilt=t, zoom=z)
request = ptz_pb2.SetPtzVelocityRequest(velocity=ptz_velocity)
return self.call_async(self._stub.SetPtzVelocity, request,
self._set_ptz_velocity_from_response, common_header_errors,
copy_request=False, **kwargs)
def initialize_lens(self, **kwargs):
"""Initializes the PTZ autofocus or resets it if already initialized"""
request = ptz_pb2.InitializeLensRequest()
return self.call(self._stub.InitializeLens, request, self._initialize_lens_from_response,
common_header_errors, copy_request=False, **kwargs)
def initialize_lens_async(self, **kwargs):
"""Async version of initialize_lens()"""
request = ptz_pb2.InitializeLensRequest()
return self.call_async(self._stub.InitializeLens, request,
self._initialize_lens_from_response, common_header_errors,
copy_request=False, **kwargs)
# Manual Focus RPCs
def get_ptz_focus_state(self, **kwargs):
"""Retrieve focus of the mechanical ptz
Args:
focus_mode (PtzFocusMode): Enum indicating whether to autofocus or manually focus
distance (float): Approximate distance to focus on, most accurate between 1.2m and 20m,
only settable in PTZ_FOCUS_MANUAL mode
focus_position (int32): Precise lens position for the camera for repeatable operations,
overrides distance if specified, only settable in PTZ_FOCUS_MANUAL mode
Returns:
PtzFocusState containing the current focus mode and position
"""
request = ptz_pb2.GetPtzFocusStateRequest()
return self.call(self._stub.GetPtzFocusState, request,
self._get_ptz_focus_state_from_response, common_header_errors,
copy_request=False, **kwargs)
def get_ptz_focus_state_async(self, **kwargs):
"""Async version of get_ptz_focus_state()"""
request = ptz_pb2.GetPtzFocusStateRequest()
return self.call_async(self._stub.GetPtzFocusState, request,
self._get_ptz_focus_state_from_response, common_header_errors,
copy_request=False, **kwargs)
def set_ptz_focus_state(self, focus_mode, distance=None, focus_position=None, **kwargs):
"""Set focus of the mechanical ptz
Args:
focus_mode (PtzFocusMode): Enum indicating whether to autofocus or manually focus
distance (float): Approximate distance to focus on, most accurate between 1.2m and 20m,
only settable in PTZ_FOCUS_MANUAL mode
focus_position (int32): Precise lens position for the camera for repeatable operations,
overrides distance if specified, only settable in PTZ_FOCUS_MANUAL mode
Returns:
SetPtzFocusStateResponse indicating whether the call was successful
"""
if focus_position is not None:
focus_position_val = Int32Value(value=focus_position)
approx_distance = None
elif distance is not None:
approx_distance = FloatValue(value=distance)
focus_position_val = None
else:
raise ValueError("One of distance or focus_position must be specified.")
ptz_focus_state = ptz_pb2.PtzFocusState(mode=focus_mode, approx_distance=approx_distance,
focus_position=focus_position_val)
request = ptz_pb2.SetPtzFocusStateRequest(focus_state=ptz_focus_state)
return self.call(self._stub.SetPtzFocusState, request,
self._set_ptz_focus_state_from_response, common_header_errors,
copy_request=False, **kwargs)
def set_ptz_focus_state_async(self, focus_mode, distance=None, focus_position=None, **kwargs):
"""Async version of set_ptz_focus_state()"""
if focus_position is not None:
focus_position_val = Int32Value(value=focus_position)
approx_distance = None
elif distance is not None:
approx_distance = FloatValue(value=distance)
focus_position_val = None
else:
raise ValueError("One of distance or focus_position must be specified.")
ptz_focus_state = ptz_pb2.PtzFocusState(mode=focus_mode, approx_distance=approx_distance,
focus_position=focus_position_val)
request = ptz_pb2.SetPtzFocusStateRequest(focus_state=ptz_focus_state)
return self.call_async(self._stub.SetPtzFocusState, request,
self._set_ptz_focus_state_from_response, common_header_errors,
copy_request=False, **kwargs)
@staticmethod
def _list_ptz_from_response(response):
return response.ptzs
@staticmethod
def _get_ptz_position_from_response(response):
return response.position
@staticmethod
def _get_ptz_velocity_from_response(response):
return response.velocity
@staticmethod
def _set_ptz_position_from_response(response):
return response.position
@staticmethod
def _set_ptz_velocity_from_response(response):
return response.velocity
@staticmethod
def _initialize_lens_from_response(response):
return response
# Focus methods
@staticmethod
def _get_ptz_focus_state_from_response(response):
return response.focus_state
@staticmethod
def _set_ptz_focus_state_from_response(response):
return response
def shift_pan_angle(pan):
"""Shift the pan angle (degrees) so that it is in the [0,360] range."""
return recenter_value_mod(pan, 180, 360)