-
Notifications
You must be signed in to change notification settings - Fork 562
/
Copy pathdata_acquisition.py
324 lines (259 loc) · 14.9 KB
/
data_acquisition.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""General client implementation for the main, on-robot data-acquisition service."""
import collections
from google.protobuf import json_format
from bosdyn.api import data_acquisition_pb2 as data_acquisition
from bosdyn.api import data_acquisition_service_pb2_grpc as data_acquisition_service
from bosdyn.client.common import (BaseClient, common_header_errors, custom_params_error,
error_factory, error_pair, handle_common_header_errors,
handle_custom_params_errors, handle_unset_status_error)
from bosdyn.client.exceptions import Error, InternalServerError, ResponseError
from bosdyn.util import now_nsec, now_sec, now_timestamp, seconds_to_duration
class DataAcquisitionResponseError(ResponseError):
"""Error in Data Acquisition RPC"""
class RequestIdDoesNotExistError(DataAcquisitionResponseError):
"""The provided request id does not exist or is invalid."""
class UnknownCaptureTypeError(DataAcquisitionResponseError):
"""The provided request contains unknown capture requests."""
class CancellationFailedError(DataAcquisitionResponseError):
"""The data acquisition request was unable to be cancelled."""
class DataAcquisitionClient(BaseClient):
"""A client for triggering data acquisition and logging."""
default_service_name = 'data-acquisition'
service_type = 'bosdyn.api.DataAcquisitionService'
def __init__(self):
super(DataAcquisitionClient,
self).__init__(data_acquisition_service.DataAcquisitionServiceStub)
self._timesync_endpoint = None
def update_from(self, other):
super(DataAcquisitionClient, self).update_from(other)
# Grab a timesync endpoint if it is available.
try:
self._timesync_endpoint = other.time_sync.endpoint
except AttributeError:
pass # other doesn't have a time_sync accessor
def make_acquire_data_request(self, acquisition_requests, action_name, group_name,
data_timestamp=None, metadata=None, min_timeout=None):
"""Helper utility to generate an AcquireDataRequest."""
if data_timestamp is None:
if not self._timesync_endpoint:
data_timestamp = now_timestamp()
else:
data_timestamp = self._timesync_endpoint.robot_timestamp_from_local_secs(now_sec())
action_id = data_acquisition.CaptureActionId(action_name=action_name, group_name=group_name,
timestamp=data_timestamp)
req = data_acquisition.AcquireDataRequest(acquisition_requests=acquisition_requests,
action_id=action_id,
metadata=metadata_to_proto(metadata))
if min_timeout:
req.min_timeout.CopyFrom(seconds_to_duration(min_timeout))
return req
def acquire_data(self, acquisition_requests, action_name, group_name, data_timestamp=None,
metadata=None, **kwargs):
"""Trigger a data acquisition to save data and metadata to the data buffer.
Args:
acquisition_requests (bosdyn.api.AcquisitionRequestList): The different image sources and
data sources to capture from and save to the data buffer with the same timestamp.
action_name(string): The unique action name that all data will be saved with.
group_name(string): The unique group name that all data will be saved with.
data_timestamp (google.protobuf.Timestamp): The unique timestamp that all data will be
saved with.
metadata (bosdyn.api.Metadata | dict): The JSON structured metadata to be associated with
the data returned by the DataAcquisitionService when logged in the data buffer
service.
Raises:
RpcError: Problem communicating with the robot.
ValueError: Metadata is not in the right format.
Returns:
If the RPC is successful, then it will return the acquire data request id, which can be
used to check the status of the acquisition and get feedback.
"""
request = self.make_acquire_data_request(acquisition_requests, action_name, group_name,
data_timestamp, metadata)
return self.call(self._stub.AcquireData, request, value_from_response=get_request_id,
error_from_response=acquire_data_error, copy_request=False, **kwargs)
def acquire_data_async(self, acquisition_requests, action_name, group_name, data_timestamp=None,
metadata=None, **kwargs):
"""Async version of the acquire_data() RPC."""
request = self.make_acquire_data_request(acquisition_requests, action_name, group_name,
data_timestamp, metadata)
return self.call_async(self._stub.AcquireData, request, value_from_response=get_request_id,
error_from_response=acquire_data_error, copy_request=False, **kwargs)
def acquire_data_from_request(self, request, **kwargs):
"""Alternate version of acquire_data() that takes an AcquireDataRequest directly.
Returns:
If the RPC is successful, then it will return the AcquireDataResponse.
"""
return self.call(self._stub.AcquireData, request, error_from_response=acquire_data_error,
**kwargs)
def acquire_data_from_request_async(self, request, **kwargs):
"""Async version of acquire_data_from_request()."""
return self.call_async(self._stub.AcquireData, request,
error_from_response=acquire_data_error, **kwargs)
def get_status(self, request_id, **kwargs):
"""Check the status of a data acquisition based on the request id.
Args:
request_id (int): The request id associated with an AcquireData request.
Raises:
RpcError: Problem communicating with the robot.
bosdyn.client.data_acquisition.RequestIdDoesNotExistError: The request id provided is incorrect.
Returns:
If the RPC is successful, then it will return the full status response, which includes the
status as well as other information about any possible errors.
"""
request = data_acquisition.GetStatusRequest(request_id=request_id)
return self.call(self._stub.GetStatus, request, error_from_response=_get_status_error,
copy_request=False, **kwargs)
def get_status_async(self, request_id, **kwargs):
"""Async version of the get_status() RPC."""
request = data_acquisition.GetStatusRequest(request_id=request_id)
return self.call_async(self._stub.GetStatus, request, error_from_response=_get_status_error,
copy_request=False, **kwargs)
def get_service_info(self, **kwargs):
"""Get information from a Data Acquisition service to list its capabilities - which data,
metadata,or processing the Data Acquisition service will perform.
Raises:
RpcError: Problem communicating with the robot.
Returns:
The GetServiceInfoResponse message, which contains all the different capabilities.
"""
request = data_acquisition.GetServiceInfoRequest()
return self.call(self._stub.GetServiceInfo, request,
value_from_response=_get_service_info_capabilities,
error_from_response=common_header_errors, copy_request=False, **kwargs)
def get_service_info_async(self, **kwargs):
"""Async version of the get_service_info() RPC."""
request = data_acquisition.GetServiceInfoRequest()
return self.call_async(self._stub.GetServiceInfo, request,
value_from_response=_get_service_info_capabilities,
error_from_response=common_header_errors, copy_request=False,
**kwargs)
def cancel_acquisition(self, request_id, **kwargs):
"""Cancel a data acquisition based on the request id.
Args:
request_id (int): The request id associated with an AcquireData request.
Raises:
RpcError: Problem communicating with the robot.
CancellationFailedError: The data acquisitions associated with the request id were unable
to be cancelled.
bosdyn.client.data_acquisition.RequestIdDoesNotExistError: The request id provided is incorrect.
Returns:
If the RPC is successful, then it will return the full status response, which includes the
status as well as other information about any possible errors.
"""
request = data_acquisition.CancelAcquisitionRequest(request_id=request_id)
return self.call(self._stub.CancelAcquisition, request,
error_from_response=_cancel_acquisition_error, copy_request=False,
**kwargs)
def cancel_acquisition_async(self, request_id, **kwargs):
"""Async version of the cancel_acquisition() RPC."""
request = data_acquisition.CancelAcquisitionRequest(request_id=request_id)
return self.call_async(self._stub.CancelAcquisition, request,
error_from_response=_cancel_acquisition_error, copy_request=False,
**kwargs)
def get_live_data(self, request):
"""Call the GetLiveData RPC of the plugin service."""
return self.call(self._stub.GetLiveData, request, error_from_response=_get_live_data_error,
copy_request=True)
def get_live_data_async(self, request):
"""Async version of the get_live_data() RPC."""
return self.call_async(self._stub.GetLiveData, request,
error_from_response=_get_live_data_error, copy_request=True)
_ACQUIRE_DATA_STATUS_TO_ERROR = collections.defaultdict(lambda:
(DataAcquisitionResponseError, None))
_ACQUIRE_DATA_STATUS_TO_ERROR.update({
data_acquisition.AcquireDataResponse.STATUS_OK: (None, None),
data_acquisition.AcquireDataResponse.STATUS_UNKNOWN_CAPTURE_TYPE:
error_pair(UnknownCaptureTypeError)
})
_GET_STATUS_STATUS_TO_ERROR = collections.defaultdict(lambda: (None, None))
_GET_STATUS_STATUS_TO_ERROR.update({
data_acquisition.GetStatusResponse.STATUS_REQUEST_ID_DOES_NOT_EXIST:
error_pair(RequestIdDoesNotExistError)
})
_CANCEL_ACQUISITION_STATUS_TO_ERROR = collections.defaultdict(lambda: (None, None))
_CANCEL_ACQUISITION_STATUS_TO_ERROR.update({
data_acquisition.CancelAcquisitionResponse.STATUS_REQUEST_ID_DOES_NOT_EXIST:
error_pair(RequestIdDoesNotExistError),
data_acquisition.CancelAcquisitionResponse.STATUS_FAILED_TO_CANCEL:
error_pair(CancellationFailedError)
})
_CAPABILITY_LIVE_DATA_STATUS_TO_ERROR = collections.defaultdict(lambda: (None, None))
_CAPABILITY_LIVE_DATA_STATUS_TO_ERROR.update({
# STATUS_UNKNOWN is not handled directly.
data_acquisition.LiveDataResponse.CapabilityLiveData.STATUS_OK: (None, None),
data_acquisition.LiveDataResponse.CapabilityLiveData.STATUS_UNKNOWN_CAPTURE_TYPE:
error_pair(UnknownCaptureTypeError),
# STATUS_CUSTOM_PARAMS_ERROR is handled separately.
data_acquisition.LiveDataResponse.CapabilityLiveData.STATUS_INTERNAL_ERROR:
error_pair(InternalServerError),
})
def metadata_to_proto(metadata):
"""Checks the type to determine if a conversion is required to create a
bosdyn.api.Metadata proto message.
Args:
metadata (bosdyn.api.Metadata or dict): The JSON structured metadata to be associated
with the data returned by the DataAcquisitionService when logged in the data buffer
service.
Raises:
ValueError: Metadata is not in the right format.
Returns:
If metadata is provided, this will return a protobuf Metadata message. Otherwise it will
return None.
"""
if metadata is None:
return None
metadata_proto = None
if isinstance(metadata, data_acquisition.Metadata):
metadata_proto = metadata
elif isinstance(metadata, dict):
metadata_proto = data_acquisition.Metadata()
metadata_proto.data.update(metadata)
else:
raise ValueError('Invalid metadata, not a dict or data_acquisition.Metadata')
return metadata_proto
@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def acquire_data_error(response):
"""Return a custom exception based on the AcquireData response, None if no error."""
return error_factory(response, response.status,
status_to_string=data_acquisition.AcquireDataResponse.Status.Name,
status_to_error=_ACQUIRE_DATA_STATUS_TO_ERROR)
@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def _get_status_error(response):
"""Return a custom exception based on the GetStatus response, None if no error."""
return error_factory(response, response.status,
status_to_string=data_acquisition.GetStatusResponse.Status.Name,
status_to_error=_GET_STATUS_STATUS_TO_ERROR)
@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def _cancel_acquisition_error(response):
"""Return a custom exception based on the CancelAcquisition response, None if no error."""
return error_factory(response, response.status,
status_to_string=data_acquisition.CancelAcquisitionResponse.Status.Name,
status_to_error=_CANCEL_ACQUISITION_STATUS_TO_ERROR)
@handle_common_header_errors
def _get_live_data_error(response):
"""Return a custom exception based on the first invalid CapabilityLiveData, None if no error."""
for capability_live_data in response.live_data:
result = custom_params_error(capability_live_data, total_response=response)
if result is not None:
return result
result = error_factory(
response, capability_live_data.status,
status_to_string=data_acquisition.LiveDataResponse.CapabilityLiveData.Status.Name,
status_to_error=_CAPABILITY_LIVE_DATA_STATUS_TO_ERROR)
if result is not None:
# The exception is using the capability_live_data. Replace it with the full response.
result.response = response
return result
return None
def _get_service_info_capabilities(response):
return response.capabilities
def get_request_id(response):
return response.request_id