1
+ import subprocess
2
+ import time
3
+ import os
4
+ from collections import deque
5
+
6
+ import zmq
7
+
8
+ from pybmt .fictrac .state import FicTracState
9
+ from pybmt .tools import which
10
+
11
+
12
+ class FicTracDriver :
13
+ """
14
+ This class drives the tracking of the fly via a separate software called FicTrac. It invokes this process and
15
+ calls a control function once for each time the tracking state is updated.
16
+ """
17
+ def __init__ (self , config_file = None , remote_endpoint_url = None , console_ouput_file = "output.txt" ,
18
+ track_change_callback = None , pgr_enable = False , plot_on = True , fic_trac_bin_path = None ):
19
+ """
20
+ Create the FicTrac driver object. This function will perform a check to see if the FicTrac program is present
21
+ on the path. If it is not, it will throw an exception.
22
+
23
+ :param str config_file: The path to the configuration file that should be passed to the FicTrac command.
24
+ :param str remote_endpoint_url: If FicTrac is already running on another machine, this is the url.
25
+ :param str console_output_file: The path to the file where console output should be stored.
26
+ :param track_change_callback: A FlyVRCallback class which is called once everytime tracking status changes. See
27
+ control.FlyVRCallback for example.
28
+ :param bool pgr_enable: Is Point Grey camera support needed. This just decides which executable to call, either
29
+ 'FicTrac' or 'FicTrac-PGR'.
30
+ :param str fic_trac_bin_path: The path the the fictrac binary to use. Default is None. If None, we will try to
31
+ find fictrac on the path.
32
+ :param str remote_enpoint_url
33
+ """
34
+
35
+ self .track_change_callback = track_change_callback
36
+ self .plot_on = plot_on
37
+
38
+ # The message loop has to stay above this average number of frames per second. If it falls below we are
39
+ # not grabbing messages fast enough and will fall behind FicTrac in state. I don't like this solution that
40
+ # much, with shared memory this was easier to detect.
41
+ self .average_fps_threshold = 400
42
+
43
+ # If fictrac is already running, for example, on another machine, then we don't need to worry about running it.
44
+ if remote_endpoint_url is not None :
45
+ self .remote_endpoint_url = "tcp://" + remote_endpoint_url
46
+ self .start_fictrac = False
47
+ else :
48
+ self .start_fictrac = True
49
+ self .remote_endpoint_url = "tcp://localhost:5556"
50
+
51
+ self .config_file = config_file
52
+
53
+ # Get the directory that the config file is in, this will be the current working directory
54
+ # of FicTrac.
55
+ self .config_dir = os .path .dirname (self .config_file )
56
+ if self .config_dir == "" :
57
+ self .config_dir = None
58
+
59
+ # Get base config file name
60
+ self .config_file_base = os .path .basename (self .config_file )
61
+
62
+ self .console_output_file = console_ouput_file
63
+ self .track_change_callback = track_change_callback
64
+ self .pgr_enable = pgr_enable
65
+ self .plot_on = plot_on
66
+
67
+ # If the user didn't specify the path to fictrac, look for it on the path.
68
+ if fic_trac_bin_path is None :
69
+ self .fictrac_bin = 'fictrac'
70
+ if self .pgr_enable :
71
+ self .fictrac_bin = 'fictrac-pgr'
72
+
73
+ # If this is Windows, we need to add the .exe extension.
74
+ if os .name == 'nt' :
75
+ self .fictrac_bin = self .fictrac_bin + ".exe"
76
+
77
+ # Lets make sure FicTrac exists on the path
78
+ self .fictrac_bin_fullpath = which (self .fictrac_bin )
79
+
80
+ if self .fictrac_bin_fullpath is None :
81
+ raise RuntimeError ("Could not find " + self .fictrac_bin + " on the PATH!" )
82
+
83
+ else :
84
+ self .fictrac_bin_fullpath = fic_trac_bin_path
85
+
86
+ # TODO: Make sure we are using the correct version of fictrac.
87
+
88
+ self .fictrac_process = None
89
+
90
+ def run (self ):
91
+ """
92
+ Start the the FicTrac process and block till it closes. This function will poll a shared memory region for
93
+ changes in tracking data and invoke a control function when they occur. FicTrac is assumed to exist on the
94
+ system path.
95
+
96
+ :return:
97
+ """
98
+
99
+ # Setup anything the callback needs.
100
+ self .track_change_callback .setup_callback ()
101
+
102
+ try :
103
+ # Start FicTrac if we need to
104
+ if self .start_fictrac :
105
+ with open (self .console_output_file , "wb" ) as out :
106
+ self .fictrac_process = subprocess .Popen ([self .fictrac_bin_fullpath , "--exit_when_done" ,
107
+ self .config_file_base ],
108
+ stdout = out , stderr = subprocess .STDOUT ,
109
+ cwd = self .config_dir )
110
+ self ._process_messages ()
111
+ else :
112
+ self ._process_messages ()
113
+
114
+ # Call poll one last time to get the return value
115
+ self .fictrac_process .poll ()
116
+
117
+ # Get the fic trac process return code
118
+ if self .fictrac_process is not None and self .fictrac_process .returncode is not None and self .fictrac_process .returncode != 0 :
119
+ raise RuntimeError ("FicTrac failed because of an application error. " +
120
+ "Consult the FicTrac console output file ({}). " .format (self .console_output_file ))
121
+ if self .frame_cnt == 0 :
122
+ raise RuntimeError ("Zero frames processed. FicTrac failed because of an application error. " +
123
+ "Consult the FicTrac console output file ({}). " .format (self .console_output_file ))
124
+
125
+
126
+ except Exception as ex :
127
+ if self .fictrac_process is not None :
128
+ self .fictrac_process .terminate ()
129
+
130
+ raise Exception ("PyBMT Error!" ) from ex
131
+ finally :
132
+ self .track_change_callback .shutdown_callback ()
133
+ self ._cleanup ()
134
+
135
+ def _process_messages (self ):
136
+
137
+ # Setup ZeroMQ context and socket to talk to server
138
+ context = zmq .Context ()
139
+ socket = context .socket (zmq .SUB )
140
+
141
+ # Set a timeout on this socket so we don't block forever
142
+ socket .RCVTIMEO = 1000 # in milliseconds
143
+
144
+ # This is the receiver high water mark, zero mq will start to drop incoming messages after it
145
+ # has queued this many. This will let us detect if we are not picking up messages quick enough because of a
146
+ # slow callback process. This isn't perfect though since OS buffers messages as well.
147
+ socket .setsockopt (zmq .RCVHWM , 1 )
148
+
149
+ # Bind and subscribe
150
+ socket .connect (self .remote_endpoint_url )
151
+ socket .setsockopt (zmq .SUBSCRIBE , b"" )
152
+
153
+ # if self.plot_on:
154
+ # self.plot_task = ConcurrentTask(task=plot_task_fictrac, comms="pipe",
155
+ # taskinitargs=[state])
156
+ # self.plot_task.start()
157
+
158
+ # Lets track average frame rate we are receiving the messages
159
+ # Our buffer of past speeds
160
+ time_history = deque (maxlen = 10 )
161
+ avg_fps = 0
162
+ self .frame_cnt = 0
163
+
164
+ # Lets keep track of the last fictrac state we received
165
+ last_fstate : FicTracState = None
166
+ fstate : FicTracState = None
167
+ isOK = True
168
+ while isOK :
169
+
170
+ # If we are running FicTrac, make sure it is running still
171
+ #if frame_cnt > 0 and self.fictrac_process is not None and self.fictrac_process.poll() is None:
172
+ # print("FicTrac process gone!")
173
+ # break
174
+
175
+ # Receive state update from FicTrac process
176
+ try :
177
+ data = socket .recv_string ()
178
+ except zmq .error .Again :
179
+
180
+ # If we get socket error, probably means fictrac is gone. If we started it, just break.
181
+ # If we didn't start it, signal the connection error.
182
+ if self .start_fictrac :
183
+ break
184
+ else :
185
+ raise Exception ("Socket timed out. Couldn't reach fictrac!" )
186
+
187
+ # Message received start the timer, want to keep track of how long it takes to process the message.
188
+ t0 = time .perf_counter ()
189
+
190
+ # If FicTrac sent and END signal, its time to clean up
191
+ if data == "END" :
192
+ break
193
+
194
+ # Lets keep track of the last fictrac state we received
195
+ last_fstate = fstate
196
+
197
+ # Parse the data packet into our state structure. Get our new state.
198
+ fstate = FicTracState .zmq_string_msg_to_state (data )
199
+
200
+ if last_fstate is not None and fstate .frame_cnt - last_fstate .frame_cnt != 1 :
201
+ self .fictrac_process .terminate ()
202
+ raise Exception (("FicTrac frame counter jumped by more than 1! oldFrame = " +
203
+ str (last_fstate .frame_cnt ) + ", newFrame = " + str (fstate .frame_cnt )))
204
+
205
+ # Call the main callback function with the current state
206
+ isOK = self .track_change_callback .process_callback (fstate )
207
+
208
+ # Stop the clock
209
+ t1 = time .perf_counter ()
210
+
211
+ # Add the speed to our history
212
+ time_history .append (t1 - t0 )
213
+
214
+ # Get the running average fps
215
+ avg_fps = 1 / (sum (time_history ) / len (time_history ))
216
+
217
+ if avg_fps < self .average_fps_threshold and frame_cnt > 300 :
218
+ self .fictrac_process .terminate ()
219
+ raise Exception ("Average FPS fell below avg_fps_threshold({}). Processing callback is " +
220
+ "probably operating too slow." )
221
+
222
+ self .frame_cnt = self .frame_cnt + 1
223
+
224
+ def _cleanup (self ):
225
+ """
226
+ This is method is called whenever PyBMT run is shutting down things.
227
+
228
+ :return:
229
+ """
230
+ pass
0 commit comments