From b3a64db3e077ddab4ce6ddf5a7952f004a6c427f Mon Sep 17 00:00:00 2001 From: Graham Rowlands Date: Mon, 12 Apr 2021 13:36:15 -0400 Subject: [PATCH 1/3] Revamp multiprocessing to use pathos multiprocess and enable using fork and spawn. Move to class methods as the basis for processes. --- setup.py | 1 + src/auspex/experiment.py | 104 +++++++++++------ src/auspex/filters/average.py | 153 ++++++++++++------------ src/auspex/filters/filter.py | 195 +++++++++++++++++++------------ src/auspex/filters/io.py | 96 +++++++++------ src/auspex/filters/plot.py | 4 +- src/auspex/filters/singleshot.py | 2 +- src/auspex/instruments/X6.py | 55 ++++++--- src/auspex/instruments/alazar.py | 2 +- src/auspex/qubit/qubit_exp.py | 28 +++-- src/auspex/stream.py | 52 +++++++-- test/test_alazar.py | 2 +- 12 files changed, 438 insertions(+), 256 deletions(-) diff --git a/setup.py b/setup.py index bb868fe2f..0639b72e6 100644 --- a/setup.py +++ b/setup.py @@ -24,6 +24,7 @@ "ipywidgets >= 7.0.0", "sqlalchemy >= 1.2.15", "pyserial >= 3.4", + "multiprocess == 0.70.11.1" "setproctitle", "progress" ] diff --git a/src/auspex/experiment.py b/src/auspex/experiment.py index 642322b17..17d86d8e2 100644 --- a/src/auspex/experiment.py +++ b/src/auspex/experiment.py @@ -11,16 +11,24 @@ import uuid import json -if sys.platform == 'win32' or 'NOFORKING' in os.environ: - from queue import Queue as Queue - from threading import Event - from threading import Thread as Process - from threading import Thread as Thread -else: - from multiprocessing import Queue as Queue - from multiprocessing import Event - from multiprocessing import Process - from threading import Thread as Thread +from auspex.log import logger + +# if sys.platform == 'win32' or 'NOFORKING' in os.environ: +# logger.info("Using threads") +# from queue import Queue as Queue +# from threading import Event +# from threading import Thread as Process +# from threading import Thread as Thread +# else: + +from multiprocess import Queue as Queue +from multiprocess import Event +from multiprocess import Process +import multiprocess as mp +from threading import Thread as Thread +import multiprocess.context as ctx +# ctx._force_start_method('fork') +logger.info(f"Using pathos multiprocess with {mp.get_start_method()}") from IPython.core.getipython import get_ipython in_notebook = False @@ -53,7 +61,7 @@ from auspex.sweep import Sweeper from auspex.stream import DataStream, DataAxis, SweepAxis, DataStreamDescriptor, InputConnector, OutputConnector from auspex.filters import Plotter, MeshPlotter, ManualPlotter, WriteToFile, DataBuffer, Filter -from auspex.log import logger + import auspex.config from auspex.config import isnotebook @@ -85,9 +93,10 @@ def update_filename(filename, add_date=True): return "{}-{:04d}".format(basename,i) class ExperimentGraph(object): - def __init__(self, edges): + def __init__(self, edges, manager): self.dag = None self.edges = [] + self.manager = manager self.create_graph(edges) def dfs_edges(self): @@ -110,7 +119,8 @@ def create_graph(self, edges): dag = nx.DiGraph() self.edges = [] for edge in edges: - obj = DataStream(name="{}_TO_{}".format(edge[0].name, edge[1].name)) + logger.debug(f"{edge[0].name}_TO_{edge[1].name}: {self.manager}") + obj = DataStream(name="{}_TO_{}".format(edge[0].name, edge[1].name), manager=self.manager) edge[0].add_output_stream(obj) edge[1].add_input_stream(obj) self.edges.append(obj) @@ -170,6 +180,9 @@ def __init__(self): # Should we show the dashboard? self.dashboard = False + # Multiprocessing manager? + self.manager = mp.Manager() + # Create and use plots? self.do_plotting = False @@ -246,6 +259,12 @@ def __init__(self): # Run the stream init self.init_streams() + # Event for filters to register a panic + self.filter_panic = Event() + + # Event to tell all filters to exit + self.filter_exit = Event() + def set_graph(self, edges): unique_nodes = [] for eb, ee in edges: @@ -254,7 +273,7 @@ def set_graph(self, edges): if ee.parent not in unique_nodes: unique_nodes.append(ee.parent) self.nodes = unique_nodes - self.graph = ExperimentGraph(edges) + self.graph = ExperimentGraph(edges, manager=self.manager) def init_streams(self): """Establish the base descriptors for any internal data streams and connectors.""" @@ -372,6 +391,12 @@ def sweep(self): # Run the procedure self.run() + # Check for any filter panics + if self.filter_panic.is_set(): + logger.warning("Panic detected in listeners/filters. Exiting.") + self.filter_exit.set() + break + # See if the axes want to extend themselves. They will push updates # directly to the output_connecters as messages that will be passed # through the filter pipeline. @@ -573,23 +598,15 @@ def run_sweeps(self): #initialize instruments self.init_instruments() - # def catch_ctrl_c(signum, frame): - # import ipdb; ipdb.set_trace(); - # logger.info("Caught SIGINT. Shutting down.") - # self.declare_done() # Ask nicely - - # self.shutdown() - # raise NameError("Shutting down.") - # sys.exit(0) - - # signal.signal(signal.SIGINT, catch_ctrl_c) - # We want to wait for the sweep method above, # not the experiment's run method, so replace this # in the list of tasks. self.other_nodes = self.nodes[:] self.other_nodes.extend(self.extra_plotters) self.other_nodes.remove(self) + self.other_node_processes = {} + self.other_node_configs = {} + self.other_node_outputs = {} # If we are launching the process dashboard, # setup the bokeh server and establish a queue for @@ -601,7 +618,15 @@ def run_sweeps(self): # Start the filter processes for n in self.other_nodes: - n.start() + conf = n.get_config() + oq = Queue() + p = Process(target=n.run, args=(conf, self.filter_exit, n.done, self.filter_panic, + n.output_connectors, n.input_connectors, + self.profile, n.perf_queue, oq)) + p.start() + self.other_node_configs[n] = conf + self.other_node_processes[n] = p + self.other_node_outputs[n] = oq # Run the main experiment loop self.sweep() @@ -611,7 +636,7 @@ def run_sweeps(self): if callback: callback(plot) - # Wait for the + # Wait for the filters to finish. time.sleep(0.1) times = {n: 0 for n in self.other_nodes} dones = {n: n.done.is_set() for n in self.other_nodes} @@ -630,7 +655,19 @@ def run_sweeps(self): n.final_buffer = n._final_buffer.get() for n in self.other_nodes: - n.join() + p = self.other_node_processes[n] + conf = self.other_node_configs[n] + oq = self.other_node_outputs[n] + + while True: + try: + # Get the Output Queue + param, v = oq.get(False) + setattr(n, param, v) + except: + break + + p.join() for buff in self.buffers: buff.output_data, buff.descriptor = buff.get_data() @@ -640,9 +677,11 @@ def run_sweeps(self): self.perf_thread.join() except KeyboardInterrupt as e: - print("Caught KeyboardInterrupt, terminating.") - #self.shutdown() #Commented out, since this is already in the finally. - #sys.exit(0) + logger.warning("Caught KeyboardInterrupt, shutting down.") + except Exception as e: + just_the_string = traceback.format_exc() + logger.warning(f"Experiment raised exception {e} (traceback follows). Bailing.") + logger.warning(just_the_string) finally: self.shutdown() @@ -655,7 +694,6 @@ def stop_manual_plotters(self): mp.stop() def shutdown(self): - logger.debug("Shutting down instruments") # This includes stopping the flow of data, and must be done before terminating nodes self.shutdown_instruments() @@ -663,7 +701,7 @@ def shutdown(self): ct_max = 5 #arbitrary waiting 10 s before giving up for ct in range(ct_max): if hasattr(self, 'other_nodes'): - if any([n.is_alive() for n in self.other_nodes]): + if any([p.is_alive() for p in self.other_node_processes.values()]): logger.warning("Filter pipeline appears stuck. Use keyboard interrupt to terminate.") time.sleep(2.0) else: diff --git a/src/auspex/filters/average.py b/src/auspex/filters/average.py index 1a34ea5cb..acfaa4b03 100644 --- a/src/auspex/filters/average.py +++ b/src/auspex/filters/average.py @@ -188,21 +188,42 @@ def final_init(self): if self.points_before_final_average is None: raise Exception("Average has not been initialized. Run 'update_descriptors'") - self.completed_averages = 0 - self.idx_frame = 0 - self.idx_global = 0 - # We only need to accumulate up to the averaging axis - # BUT we may get something longer at any given time! - self.carry = np.zeros(0, dtype=self.source.descriptor.dtype) - - def process_data(self, data): - - if self.passthrough: - for os in self.source.output_streams: + def get_config(self): + config = super().get_config() + config['dtype'] = self.source.descriptor.dtype + config['completed_averages'] = 0 + config['idx_frame'] = 0 + config['idx_global'] = 0 + config['passthrough'] = self.passthrough + config['pbfa'] = self.points_before_final_average + config['pbpa'] = self.points_before_partial_average + config['reshape_dims'] = self.reshape_dims + config['mean_axis'] = self.mean_axis + config['threshold'] = self.threshold.value + config['is_adaptive'] = self.sink.descriptor.is_adaptive() + config['num_averages'] = self.num_averages + config['axis_num'] = self.axis_num + config['avg_dims'] = self.avg_dims + config['data_dims'] = self.data_dims + config['last_update'] = self.last_update + config['update_interval'] = self.update_interval + return config + + @classmethod + def execute_on_run(cls, config): + config['sum_so_far'] = np.zeros(config['avg_dims'], dtype=config['dtype']) + config['current_avg_frame'] = np.zeros(config['pbfa'], dtype=config['dtype']) + config['excited_counts'] = np.zeros(config['data_dims'], dtype=np.int64) + config['carry'] = np.zeros(0, dtype=config['dtype']) + + @classmethod + def process_data(cls, config, ocs, ics, data): + if config['passthrough']: + for os in ocs['source'].output_streams: os.push(data) - for os in self.final_variance.output_streams: + for os in ocs['final_variance'].output_streams: os.push(data*0.0) - for os in self.partial_average.output_streams: + for os in ocs['partial_average'].output_streams: os.push(data) return @@ -213,100 +234,86 @@ def process_data(self, data): elif not isinstance(data, np.ndarray) and (data.size == 1): data = np.array([data]) - if self.carry.size > 0: - data = np.concatenate((self.carry, data)) - self.carry = np.zeros(0, dtype=self.source.descriptor.dtype) + if config['carry'].size > 0: + data = np.concatenate((config['carry'], data)) + config['carry'] = np.zeros(0, dtype=config['dtype']) idx = 0 while idx < data.size: #check whether we have enough data to fill an averaging frame - if data.size - idx >= self.points_before_final_average: + if data.size - idx >= config['pbfa']: #logger.debug("Have {} points, enough for final avg.".format(data.size)) # How many chunks can we process at once? - num_chunks = int((data.size - idx)/self.points_before_final_average) - new_points = num_chunks*self.points_before_final_average - reshaped = data[idx:idx+new_points].reshape(self.reshape_dims) - averaged = reshaped.mean(axis=self.mean_axis) + num_chunks = int((data.size - idx)/config['pbfa']) + new_points = num_chunks*config['pbfa'] + reshaped = data[idx:idx+new_points].reshape(config['reshape_dims']) + averaged = reshaped.mean(axis=config['mean_axis']) idx += new_points # do state assignment - excited_states = (np.real(reshaped) > self.threshold.value).sum(axis=self.mean_axis) - ground_states = self.num_averages - excited_states - - if self.sink.descriptor.is_adaptive(): - new_tuples = self.sink.descriptor.tuples()[self.idx_global:self.idx_global + new_points] - new_tuples_stripped = remove_fields(new_tuples, self.axis.value) - take_axis = -1 if self.axis_num > 0 else 0 - reduced_tuples = new_tuples_stripped.reshape(self.reshape_dims).take((0,), axis=take_axis) - self.idx_global += new_points - - # Add to Visited tuples - if self.sink.descriptor.is_adaptive(): - for os in self.source.output_streams + self.final_variance.output_streams + self.partial_average.output_streams: - os.descriptor.visited_tuples = np.append(os.descriptor.visited_tuples, reduced_tuples) - - for os in self.source.output_streams: - os.push(averaged) + excited_states = (np.real(reshaped) > config['threshold']).sum(axis=config['mean_axis']) + ground_states = config['num_averages'] - excited_states - for os in self.final_variance.output_streams: - os.push(reshaped.var(axis=self.mean_axis, ddof=1)) # N-1 in the denominator - - for os in self.partial_average.output_streams: + for os in ocs['source'].output_streams: os.push(averaged) - - for os in self.final_counts.output_streams: + for os in ocs['final_variance'].output_streams: + os.push(reshaped.var(axis=config['mean_axis'], ddof=1)) # N-1 in the denominator + for os in ocs['partial_average'].output_streams: + os.push(averaged) + for os in ocs['final_counts'].output_streams: os.push(ground_states) os.push(excited_states) # Maybe we can fill a partial frame - elif data.size - idx >= self.points_before_partial_average: + elif data.size - idx >= config['pbpa']: # logger.info("Have {} points, enough for partial avg.".format(data.size)) # How many chunks can we process at once? - num_chunks = int((data.size - idx)/self.points_before_partial_average) - new_points = num_chunks*self.points_before_partial_average + num_chunks = int((data.size - idx)/config['pbpa']) + new_points = num_chunks*config['pbpa'] # Find the appropriate dimensions for the partial - partial_reshape_dims = self.reshape_dims[:] - partial_reshape_dims[self.mean_axis] = -1 - partial_reshape_dims = partial_reshape_dims[self.mean_axis:] + partial_reshape_dims = config['reshape_dims'][:] + partial_reshape_dims[config['mean_axis']] = -1 + partial_reshape_dims = partial_reshape_dims[config['mean_axis']:] reshaped = data[idx:idx+new_points].reshape(partial_reshape_dims) - summed = reshaped.sum(axis=self.mean_axis) - self.sum_so_far += summed + summed = reshaped.sum(axis=config['mean_axis']) + config['sum_so_far'] += summed - self.current_avg_frame[self.idx_frame:self.idx_frame+new_points] = data[idx:idx+new_points] + config['current_avg_frame'][config['idx_frame']:config['idx_frame']+new_points] = data[idx:idx+new_points] idx += new_points - self.idx_frame += new_points + config['idx_frame'] += new_points - self.completed_averages += num_chunks + config['completed_averages'] += num_chunks # If we now have enoough for the final average, push to both partial and final... - if self.completed_averages == self.num_averages: - reshaped = self.current_avg_frame.reshape(partial_reshape_dims) - for os in self.source.output_streams + self.partial_average.output_streams: - os.push(reshaped.mean(axis=self.mean_axis)) - for os in self.final_variance.output_streams: - os.push(np.real(reshaped).var(axis=self.mean_axis, ddof=1)+1j*np.imag(reshaped).var(axis=self.mean_axis, ddof=1)) # N-1 in the denominator + if config['completed_averages'] == config['num_averages']: + reshaped = config['current_avg_frame'].reshape(partial_reshape_dims) + for os in ocs['source'].output_streams + ocs['partial_average'].output_streams: + os.push(reshaped.mean(axis=config['mean_axis'])) + for os in ocs['final_variance'].output_streams: + os.push(np.real(reshaped).var(axis=config['mean_axis'], ddof=1)+1j*np.imag(reshaped).var(axis=config['mean_axis'], ddof=1)) # N-1 in the denominator # do state assignment - excited_states = (np.real(reshaped) < self.threshold.value).sum(axis=self.mean_axis) - ground_states = self.num_averages - excited_states - for os in self.final_counts.output_streams: + excited_states = (np.real(reshaped) < config['threshold']).sum(axis=config['mean_axis']) + ground_states = config['num_averages'] - excited_states + for os in ocs['final_counts'].output_streams: os.push(ground_states) os.push(excited_states) - self.sum_so_far[:] = 0.0 - self.current_avg_frame[:] = 0.0 - self.completed_averages = 0 - self.idx_frame = 0 + config['sum_so_far'][:] = 0.0 + config['current_avg_frame'][:] = 0.0 + config['completed_averages'] = 0 + config['idx_frame'] = 0 else: # Emit a partial average since we've accumulated enough data - if (time.time() - self.last_update >= self.update_interval): - for os in self.partial_average.output_streams: - os.push(self.sum_so_far/self.completed_averages) - self.last_update = time.time() + if (time.time() - config['last_update'] >= config['update_interval']): + for os in ocs['partial_average'].output_streams: + os.push(config['sum_so_far']/config['completed_averages']) + config['last_update'] = time.time() # otherwise just add it to the carry else: - self.carry = data[idx:] + config['carry'] = data[idx:] break + # raise Exception("Fake Error") diff --git a/src/auspex/filters/filter.py b/src/auspex/filters/filter.py index 9e0e1188e..8e69dea10 100644 --- a/src/auspex/filters/filter.py +++ b/src/auspex/filters/filter.py @@ -10,17 +10,17 @@ import os import sys +import traceback import psutil if sys.platform == 'win32' or 'NOFORKING' in os.environ: - from threading import Thread as Process from threading import Event from queue import Queue else: - from multiprocessing import Process - from multiprocessing import Event - from multiprocessing import Queue - from multiprocessing import Value, Array + from multiprocess import Event + from multiprocess import Queue + from multiprocess import Value, Array + import multiprocess as mp from setproctitle import setproctitle import cProfile @@ -31,10 +31,10 @@ import ctypes import numpy as np +import auspex.config from auspex.parameter import Parameter from auspex.stream import DataStream, InputConnector, OutputConnector from auspex.log import logger -import auspex.config class MetaFilter(type): """Meta class to bake the input/output connectors into a Filter class description @@ -59,28 +59,21 @@ def __init__(self, name, bases, dct): v.name = k self._parameters.append(v) -class Filter(Process, metaclass=MetaFilter): +class Filter(object, metaclass=MetaFilter): """Any node on the graph that takes input streams with optional output streams""" - def __init__(self, name=None, **kwargs): + def __init__(self, name=None, manager=None, **kwargs): super(Filter, self).__init__() self.filter_name = name self.input_connectors = {} self.output_connectors = {} self.parameters = {} self.qubit_name = "" + self.manager = manager - # Event for killing the filter properly - self.exit = Event() + # Event for keeping track of individual filters being done self.done = Event() - # Keep track of data throughput - self.processed = 0 - - # For objectively measuring doneness - self.finished_processing = Event() - self.finished_processing.clear() - for ic in self._input_connectors: a = InputConnector(name=ic, parent=self) a.parent = self @@ -98,8 +91,6 @@ def __init__(self, name=None, **kwargs): setattr(self, param.name, a) # For sending performance information - self.last_performance_update = datetime.datetime.now() - self.beginning = datetime.datetime.now() self.perf_queue = None def __repr__(self): @@ -118,9 +109,10 @@ def configure_with_proxy(self, proxy_obj): param.value = getattr(proxy_obj, name) else: raise ValueError(f"{proxy_obj} was expected to have parameter {name}") + raise ValueError(f"{proxy_obj} was expected to have parameter {name}") def __repr__(self): - return "<{}(name={})>".format(self.__class__.__name__, self.name) + return "<{}(name={})>".format(self.__class__.__name__, self.filter_name) def update_descriptors(self): """This method is called whenever the connectivity of the graph changes. This may have implications @@ -141,15 +133,26 @@ def descriptor_map(self, input_descriptors): """Return a dict of the output descriptors.""" return {'source': v for v in input_descriptors.values()} - def on_done(self): + def final_init(self): + """Any final configuration that gets run in the main thread before a process is + spawned on the class run method""" + pass + + def get_config(self): + """Return a config dictionary for this object""" + config = {} + config['name'] = str(self) + config['processed'] = 0 + return config + + @classmethod + def on_done(cls): """To be run when the done signal is received, in case additional steps are needed (such as flushing a plot or data).""" pass - def shutdown(self): - self.exit.set() - - def _parent_process_running(self): + @classmethod + def _parent_process_running(cls): try: os.kill(os.getppid(), 0) except OSError: @@ -157,112 +160,154 @@ def _parent_process_running(self): else: return True - def run(self): - self.p = psutil.Process(os.getpid()) - logger.debug(f"{self} launched with pid {os.getpid()}. ppid {os.getppid()}") - if auspex.config.profile: - if not self.filter_name: - name = "Unlabeled" + @classmethod + def spawn_fix(cls, ocs, ics): + for oc in ocs.values(): + for stream in oc.output_streams: + stream.spawn_fix() + for ic in ics.values(): + for stream in ic.input_streams: + stream.spawn_fix() + + @classmethod + def run(cls, config, exit, done, panic, ocs, ics, profile, perf_queue, output_queue): + """This is the entry function when filters are launched as Processes. + config: configuration dictionary for any necessary parameters at run-time + exit: event for killing the filter + done: event the filter uses to declare it is done + panic: event the filter + ocs: output connectors + ics: input connectors + profile: push performance information (Boolean) + perf_queue: where to push the performance information + output_queue: queue for returning key, value pairs to the main process + """ + try: + p = psutil.Process(os.getpid()) + name = config["name"] + logger.debug(f"{name} launched with pid {os.getpid()}. ppid {os.getppid()}") + if profile: + cProfile.runctx('cls.main(config, exit, done, panic, ocs, ics, perf_queue)', globals(), locals(), 'prof-%s.prof' % name) else: - name = self.filter_name - cProfile.runctx('self.main()', globals(), locals(), 'prof-%s-%s.prof' % (self.__class__.__name__, name)) - else: - self.execute_on_run() - self.main() - self.done.set() + cls.spawn_fix(ocs, ics) + cls.execute_on_run(config) + cls.main(config, exit, done, panic, ocs, ics, perf_queue) + cls.execute_after_run(config, output_queue) + done.set() + except Exception as e: + just_the_string = traceback.format_exc() + logger.warning(f"Filter {config['name']} raised exception {e} (traceback follows). Bailing.") + logger.warning(just_the_string) + + panic.set() + done.set() + + @classmethod + def execute_on_run(cls, config): + pass - def execute_on_run(self): + @classmethod + def execute_after_run(cls, config, output_queue): pass - def push_to_all(self, message): - for oc in self.output_connectors.values(): + @classmethod + def push_to_all(cls, ocs, message): + for oc in ocs.values(): for ost in oc.output_streams: ost.queue.put(message) if message['type'] == 'event' and message["event_type"] == "done": logger.debug(f"Closing out queue {ost.queue}") ost.queue.close() - def push_resource_usage(self): - if self.perf_queue and (datetime.datetime.now() - self.last_performance_update).seconds > 1.0: - perf_info = (str(self), datetime.datetime.now()-self.beginning, self.p.cpu_percent(), self.p.memory_info(), self.processed) - self.perf_queue.put(perf_info) - self.last_performance_update = datetime.datetime.now() + @classmethod + def push_resource_usage(cls, name, perf_queue, beginning, last_performance_update, processed): + if perf_queue and (datetime.datetime.now() - last_performance_update).seconds > 1.0: + perf_info = (name, datetime.datetime.now()-beginning, p.cpu_percent(), p.memory_info(), processed) + perf_queue.put(perf_info) - def process_message(self, msg): + @classmethod + def process_message(cls, config, msg): """To be overridden for interesting default behavior""" pass - def checkin(self): + @classmethod + def checkin(cls, config): """For any filter-specific loop needs""" pass - def main(self): + @classmethod + def process_data(cls, config, ocs, ics, message_data): + """Specific per-filter data processing""" + pass + + @classmethod + def main(cls, config, exit, done, panic, ocs, ics, perf_queue): """ Generic run method which waits on a single stream and calls `process_data` on any new_data """ - # try: + name = config["name"] + setproctitle(f"auspex filt: {name}") - logger.debug('Running "%s" run loop', self.filter_name) - setproctitle(f"python auspex filter: {self}") - input_stream = getattr(self, self._input_connectors[0]).input_streams[0] + # Assume we only have a single input stream in this base implementation + input_stream = list(ics.values())[0].input_streams[0] desc = input_stream.descriptor - stream_done = False + stream_done = False stream_points = 0 - while not self.exit.is_set():# and not self.finished_processing.is_set(): - # Try to pull all messages in the queue. queue.empty() is not reliable, so we - # ask for forgiveness rather than permission. + # For performance profiling + beginning = datetime.datetime.now() + last_performance_update = datetime.datetime.now() + + while not exit.is_set() and not panic.is_set(): messages = [] # For any filter-specific loop needs - self.checkin() + cls.checkin(config) # Check to see if the parent process still exists: - if not self._parent_process_running(): - logger.warning(f"{self} with pid {os.getpid()} could not find parent with pid {os.getppid()}. Assuming something has gone wrong. Exiting.") + if not cls._parent_process_running(): + logger.warning(f"{name} with pid {os.getpid()} could not find parent with pid {os.getppid()}. Assuming something has gone wrong. Exiting.") break - while not self.exit.is_set(): + while not exit.is_set(): try: messages.append(input_stream.queue.get(False)) except queue.Empty as e: time.sleep(0.002) break + + cls.push_resource_usage(name, perf_queue, beginning, last_performance_update, config['processed']) + last_performance_update = datetime.datetime.now() - self.push_resource_usage() for message in messages: message_type = message['type'] if message['type'] == 'event': - logger.debug('%s "%s" received event with type "%s"', self.__class__.__name__, message_type) + logger.debug('%s received event with type "%s"', name, message['event_type']) # Check to see if we're done if message['event_type'] == 'done': - logger.debug(f"{self} received done message!") + logger.debug(f"{name} received done message!") stream_done = True else: # Propagate along the graph - self.push_to_all(message) - self.process_message(message) + cls.push_to_all(ocs, message) + cls.process_message(config, message) elif message['type'] == 'data': - # if not hasattr(message_data, 'size'): - # message_data = np.array([message_data]) message_data = input_stream.pop() if message_data is not None: - logger.debug('%s "%s" received %d points.', self.__class__.__name__, self.filter_name, message_data.size) + logger.debug('%s received %d points.', name, message_data.size) logger.debug("Now has %d of %d points.", input_stream.points_taken.value, input_stream.num_points()) stream_points += len(message_data) - self.process_data(message_data) - self.processed += message_data.nbytes + cls.process_data(config, ocs, ics, message_data) + config['processed'] += message_data.nbytes if stream_done: - self.push_to_all({"type": "event", "event_type": "done", "data": None}) - self.done.set() + cls.push_to_all(ocs, {"type": "event", "event_type": "done", "data": None}) + done.set() break # When we've finished, either prematurely or as expected - self.on_done() + cls.on_done() - # except Exception as e: - # logger.warning(f"Filter {self} raised exception {e}. Bailing.") diff --git a/src/auspex/filters/io.py b/src/auspex/filters/io.py index fa9593652..4752153dc 100644 --- a/src/auspex/filters/io.py +++ b/src/auspex/filters/io.py @@ -17,11 +17,9 @@ from threading import Event from queue import Queue else: - import multiprocessing as mp - from multiprocessing import Process, Event - from multiprocessing import Queue - -from auspex.data_format import AuspexDataContainer + import multiprocess as mp + from multiprocess import Process, Event + from multiprocess import Queue import itertools import contextlib @@ -35,10 +33,11 @@ import cProfile from .filter import Filter +from auspex.data_format import AuspexDataContainer from auspex.parameter import Parameter, FilenameParameter, BoolParameter from auspex.stream import InputConnector, OutputConnector -from auspex.log import logger import auspex.config as config +from auspex.log import logger class WriteToFile(Filter): """Writes data to file using the Auspex container type, which is a simple directory structure @@ -65,14 +64,24 @@ def final_init(self): assert self.filename.value, "Filename never supplied to writer." assert self.groupname.value, "Groupname never supplied to writer." assert self.datasetname.value, "Dataset name never supplied to writer." - self.descriptor = self.sink.input_streams[0].descriptor - self.container = AuspexDataContainer(self.filename.value) - self.group = self.container.new_group(self.groupname.value) - self.mmap = self.container.new_dataset(self.groupname.value, self.datasetname.value, self.descriptor) - self.w_idx = 0 - self.points_taken = 0 + def get_config(self): + config = super().get_config() + config['dtype'] = self.descriptor.dtype + config['w_idx'] = 0 + config['points_taken'] = 0 + config['descriptor'] = self.descriptor + config['filename'] = self.filename.value + config['groupname'] = self.groupname.value + config['datasetname'] = self.datasetname.value + return config + + @classmethod + def execute_on_run(cls, config): + config['container'] = AuspexDataContainer(config['filename']) + config['group'] = config['container'].new_group(config['groupname']) + config['mmap'] = config['container'].new_dataset(config['groupname'], config['datasetname'], config['descriptor']) def get_data_while_running(self, return_queue): """Return data to the main thread or user as requested. Use a MP queue to transmit.""" @@ -82,13 +91,14 @@ def get_data_while_running(self, return_queue): def get_data(self): assert self.done.is_set(), Exception("Experiment is still running. Please use get_data_while_running") container = AuspexDataContainer(self.filename.value) - return container.open_dataset(self.groupname.value, self.datasetname.value) + return container.open_dataset(self.groupname.value, self.datasetname.value)[:2] - def process_data(self, data): + @classmethod + def process_data(cls, config, ocs, ics, data): # Write the data - self.mmap[self.w_idx:self.w_idx+data.size] = data - self.w_idx += data.size - self.points_taken = self.w_idx + config['mmap'][config['w_idx']:config['w_idx']+data.size] = data + config['w_idx'] += data.size + config['points_taken'] = config['w_idx'] class DataBuffer(Filter): """Writes data to IO.""" @@ -97,31 +107,49 @@ class DataBuffer(Filter): def __init__(self, **kwargs): super(DataBuffer, self).__init__(**kwargs) - self._final_buffer = Queue() - self._temp_buffer = Queue() + if self.manager: + self._final_buffer = self.manager.Queue() + self._temp_buffer = self.manager.Queue() + else: + self._final_buffer = Queue() + self._temp_buffer = Queue() self._get_buffer = Event() self.final_buffer = None def final_init(self): - self.w_idx = 0 - self.points_taken = 0 self.descriptor = self.sink.input_streams[0].descriptor - self.buff = np.empty(self.descriptor.expected_num_points(), dtype=self.descriptor.dtype) - - def checkin(self): - if self._get_buffer.is_set(): - self._temp_buffer.put(self.buff) - self._get_buffer.clear() - def process_data(self, data): + def get_config(self): + config = super().get_config() + config['dtype'] = self.descriptor.dtype + config['w_idx'] = 0 + config['points_taken'] = 0 + config['expected_points'] = self.descriptor.expected_num_points() + config['final_buffer'] = self._final_buffer + config['temp_buffer'] = self._temp_buffer + config['get_buffer'] = self._get_buffer + return config + + @classmethod + def execute_on_run(cls, config): + config['buff'] = np.empty(config['expected_points'], dtype=config['dtype']) + + @classmethod + def execute_after_run(cls, config, output_queue): + if mp.get_start_method() == "spawn": + output_queue.put(("_final_buffer", config['buff'])) + config['final_buffer'].put(config['buff']) + + @classmethod + def process_data(cls, config, ocs, ics, data): # Write the data - self.buff[self.w_idx:self.w_idx+data.size] = data - self.w_idx += data.size - self.points_taken = self.w_idx + config['buff'][config['w_idx']:config['w_idx']+data.size] = data + config['w_idx'] += data.size + config['points_taken'] = config['w_idx'] - def main(self): - super(DataBuffer, self).main() - self._final_buffer.put(self.buff) + if config['get_buffer'].is_set(): + config['temp_buffer'].put(config['buff']) + config['get_buffer'].clear() def get_data(self): if self.done.is_set(): diff --git a/src/auspex/filters/plot.py b/src/auspex/filters/plot.py index eb95a139d..8852507ba 100644 --- a/src/auspex/filters/plot.py +++ b/src/auspex/filters/plot.py @@ -24,8 +24,8 @@ import threading as mp from queue import Queue else: - import multiprocessing as mp - from multiprocessing import Queue + import multiprocess as mp + from multiprocess import Queue class Plotter(Filter): sink = InputConnector() diff --git a/src/auspex/filters/singleshot.py b/src/auspex/filters/singleshot.py index a335a4e5c..0895548ea 100644 --- a/src/auspex/filters/singleshot.py +++ b/src/auspex/filters/singleshot.py @@ -20,7 +20,7 @@ if sys.platform == 'win32' or 'NOFORKING' in os.environ: from queue import Queue else: - from multiprocessing import Queue + from multiprocess import Queue from .filter import Filter from auspex.parameter import Parameter, FloatParameter, IntParameter, BoolParameter diff --git a/src/auspex/instruments/X6.py b/src/auspex/instruments/X6.py index 736f3cf21..0772c63a4 100644 --- a/src/auspex/instruments/X6.py +++ b/src/auspex/instruments/X6.py @@ -22,7 +22,7 @@ from .instrument import Instrument, ReceiverChannel from unittest.mock import MagicMock -from multiprocessing import Value +from multiprocess import Value # win32 doesn't support MSG_WAITALL, so on windows we # need to do things a slower, less efficient way. @@ -276,37 +276,48 @@ def spew_fake_data(self, counter, ideal_data, random_mag=0.1, random_seed=12345) return total - def receive_data(self, channel, oc, exit, ready, run): + @staticmethod + def receive_data(sock, oc, exit, ready, run, panic, last_timestamp, dtype): try: - sock = self._chan_to_rsocket[channel] sock.settimeout(10) - self.last_timestamp.value = datetime.datetime.now().timestamp() + last_timestamp.value = datetime.datetime.now().timestamp() total = 0 ready.value += 1 - logger.debug(f"{self} receiver launched with pid {os.getpid()}. ppid {os.getppid()}") - while not exit.is_set(): + # logger.info(f"X6 Receiver launched with pid {os.getpid()}. ppid {os.getppid()}") + while not exit.is_set() and not panic.is_set(): # push data from a socket into an OutputConnector (oc) # wire format is just: [size, buffer...] # TODO receive 4 or 8 bytes depending on sizeof(size_t) run.wait() # Block until we are running again try: msg = sock.recv(8) - self.last_timestamp.value = datetime.datetime.now().timestamp() + last_timestamp.value = datetime.datetime.now().timestamp() except: + logger.warning("Exceeded X6 listener socket timeout") + if not exit.is_set(): + logger.warning("X6 listener timed out after 10 seconds, panic.") + panic.set() + break + + if msg == b'': + logger.info("X6 listener received blank message. This seems to happen when using multiprocessing with spawn instead of fork (i.e. on Windows)") continue # reinterpret as int (size_t) msg_size = struct.unpack('n', msg)[0] buf = sock_recvall(sock, msg_size) if len(buf) != msg_size: - logger.error("Channel %s socket msg shorter than expected" % channel.channel) - logger.error("Expected %s bytes, received %s bytes" % (msg_size, len(buf))) + # logger.error("Channel %s socket msg shorter than expected" % channel.channel) + logger.error("X6 listener expected %s bytes, received %s bytes" % (msg_size, len(buf))) return - data = np.frombuffer(buf, dtype=channel.dtype) + data = np.frombuffer(buf, dtype=dtype) # logger.info(f"X6 {msg_size} got {len(data)}") total += len(data) oc.push(data) + # logger.info(f"listener got {data} ---- {oc.output_streams[0].queue}") + # raise Exception("Fake Error") + # logger.info('RECEIVED %d %d', total, oc.points_taken.value) # TODO: this is suspeicious @@ -322,12 +333,18 @@ def receive_data(self, channel, oc, exit, ready, run): break # logger.info("X6 receive data exiting") except Exception as e: - logger.warning(f"{self} receiver raised exception {e}. Bailing.") + import traceback + just_the_string = traceback.format_exc() + logger.warning(f"X6 listener raised exception {e}. Bailing.") + + logger.warning(just_the_string) + logger.warning(f"X6 listener raised exception {e}. Bailing.") + panic.set() def get_buffer_for_channel(self, channel): return self._lib.transfer_stream(*channel.channel) - def wait_for_acquisition(self, dig_run, timeout=15, ocs=None, progressbars=None): + def wait_for_acquisition(self, dig_run, panic, timeout=15, ocs=None, progressbars=None): progress_updaters = {} if ocs and progressbars: @@ -355,15 +372,17 @@ def wait_for_acquisition(self, dig_run, timeout=15, ocs=None, progressbars=None) total_spewed += self.spew_fake_data(counter, self.ideal_data) else: total_spewed += self.spew_fake_data(counter, [0.0 for i in range(self.number_segments)]) - # logger.info(f"Spewed {total_spewed}") - time.sleep(0.0001) + # logger.info(f"Spewed {total_spewed}... {j}") + time.sleep(0.0005) self.ideal_counter += 1 # logger.info("Counter: %s", str(counter)) # logger.info('TOTAL fake data generated %d', total_spewed) if ocs: - while True: + while not panic.is_set(): total_taken = 0 + if not dig_run.is_set(): + self.last_timestamp.value = datetime.datetime.now().timestamp() for oc in ocs: total_taken += oc.points_taken.value - initial_points[oc] if progressbars: @@ -372,9 +391,13 @@ def wait_for_acquisition(self, dig_run, timeout=15, ocs=None, progressbars=None) if total_taken == total_spewed: break # logger.info('WAITING for acquisition to finish %d < %d', total_taken, total_spewed) + + if (datetime.datetime.now().timestamp() - self.last_timestamp.value) > timeout: + logger.error("Digitizer %s timed out while sending fake data.", self.name) + break time.sleep(0.025) for oc in ocs: - if progressbars: + if progressbars and (datetime.datetime.now().timestamp() - self.last_timestamp.value) > 1.0: try: progressbars[oc].next() progressbars[oc].finish() diff --git a/src/auspex/instruments/alazar.py b/src/auspex/instruments/alazar.py index ab942f4b3..5f95f0c45 100644 --- a/src/auspex/instruments/alazar.py +++ b/src/auspex/instruments/alazar.py @@ -15,7 +15,7 @@ import sys import numpy as np -from multiprocessing import Value +from multiprocess import Value from .instrument import Instrument, ReceiverChannel from auspex.log import logger diff --git a/src/auspex/qubit/qubit_exp.py b/src/auspex/qubit/qubit_exp.py index 327a5102b..e2688e46d 100644 --- a/src/auspex/qubit/qubit_exp.py +++ b/src/auspex/qubit/qubit_exp.py @@ -13,9 +13,9 @@ from threading import Thread as Process from threading import Event else: - from multiprocessing import Process - from multiprocessing import Event - from multiprocessing import Value + from multiprocess import Process + from multiprocess import Event + from multiprocess import Value from . import pipeline import time @@ -359,9 +359,9 @@ def instantiate_filters(self, graph): if isinstance(node, bbndb.auspex.FilterProxy): if all(name in self.measured_qubit_names for name in node.qubit_name.split('-')): #include correlators only if all participating qubits are measured - new_filt = filter_map[type(node)]() + new_filt = filter_map[type(node)](manager=self.manager) new_filt.configure_with_proxy(node) - new_filt.proxy = node + # new_filt.proxy = node self.filters.append(new_filt) self.proxy_to_filter[node] = new_filt if isinstance(node, bbndb.auspex.OutputProxy): @@ -469,10 +469,21 @@ def init_instruments(self): self.dig_run = Event() self.dig_exit = Event() for chan, dig in self.chan_to_dig.items(): - socket = dig.get_socket(chan) + sock = dig.get_socket(chan) oc = self.chan_to_oc[chan] - p = Process(target=dig.receive_data, args=(chan, oc, self.dig_exit, ready, self.dig_run)) + + # Temporarility emove the parent from the output connector sent to the receive method since + # it contains all sorts of unpickleable content + parent = oc.parent + oc.parent = None + + ts = dig.last_timestamp + p = Process(target=dig.receive_data, args=(sock, oc, self.dig_exit, ready, self.dig_run, self.filter_panic, ts, chan.dtype)) self.dig_listeners[p] = self.dig_exit + + # Restore the parent + # oc.parent = parent + assert None not in self.dig_listeners.keys() for listener in self.dig_listeners.keys(): listener.start() @@ -643,7 +654,8 @@ def run(self): # Wait for all of the acquisitions to complete timeout = 10 for dig in self.digitizers: - dig.wait_for_acquisition(self.dig_run, timeout=timeout, ocs=list(self.chan_to_oc.values()), progressbars=self.progressbars) + dig.wait_for_acquisition(self.dig_run, self.filter_panic, timeout=timeout, + ocs=list(self.chan_to_oc.values()), progressbars=self.progressbars) # Bring everything to a stop for dig in self.digitizers: diff --git a/src/auspex/stream.py b/src/auspex/stream.py index 4af5a876f..82ea2dd5c 100644 --- a/src/auspex/stream.py +++ b/src/auspex/stream.py @@ -13,9 +13,9 @@ import threading as mp from queue import Queue else: - import multiprocessing as mp - from multiprocessing import Queue -from multiprocessing import Value, RawValue, RawArray + import multiprocess as mp + from multiprocess import Queue +from multiprocess import Value, RawValue, RawArray, Array import ctypes import logging import numbers @@ -25,7 +25,6 @@ import numpy as np from functools import reduce - from auspex.log import logger def cartesian(arrays, out=None, dtype='f'): @@ -479,14 +478,14 @@ def __getitem__(self, axis_name): def _ipython_key_completions_(self): return [a.name for a in self.axes] + class DataStream(object): """A stream of data""" - def __init__(self, name=None, unit=None): + def __init__(self, name=None, unit=None, manager=None): super(DataStream, self).__init__() - self.queue = Queue() + self.manager = None self.name = name self.unit = unit - self.points_taken_lock = mp.Lock() self.points_taken = Value('i', 0) # Using shared memory since these are used in filter processes self.descriptor = None self.start_connector = None @@ -494,9 +493,25 @@ def __init__(self, name=None, unit=None): self.closed = False # Shared memory interface - self.buffer_lock = mp.Lock() - # self.buffer_size = 500000 - self.buff_idx = Value('i', 0) + self.buffer_size = None + self.init_multiprocessing_objs() + + self.re_np = None + self.im_np = None + + def init_multiprocessing_objs(self): + if self.manager: + logger.debug(f"{self.name} stream using manager") + self.queue = self.manager.Queue() + self.buffer_lock = self.manager.Lock() + self.points_taken_lock = self.manager.Lock() + self.buff_idx = self.manager.Value('i', 0) + else: + logger.debug(f"{self.name} stream not using manager") + self.queue = Queue() + self.buff_idx = Value('i', 0) + self.buffer_lock = mp.Lock() + self.points_taken_lock = mp.Lock() def final_init(self): self.buffer_size = self.descriptor.num_points()*self.descriptor.buffer_mult_factor @@ -506,8 +521,6 @@ def final_init(self): self.buffer_size = 50e6 self.buff_shared_re = RawArray(ctypes.c_double, int(self.buffer_size)) self.buff_shared_im = RawArray(ctypes.c_double, int(self.buffer_size)) - self.re_np = np.frombuffer(self.buff_shared_re, dtype=np.float64) - self.im_np = np.frombuffer(self.buff_shared_im, dtype=np.float64) def set_descriptor(self, descriptor): if isinstance(descriptor,DataStreamDescriptor): @@ -536,6 +549,7 @@ def percent_complete(self): def reset(self): self.descriptor.reset() + self.init_multiprocessing_objs() with self.points_taken_lock: self.points_taken.value = 0 while not self.queue.empty(): @@ -547,7 +561,17 @@ def __repr__(self): return "".format( self.name, self.percent_complete(), self.descriptor) + def spawn_fix(self): + # Make sure we have regenerated the wrappers properly. By putting this code here + # we hope to force any spawned sub-processes to properly recreate the numpy arrays + # wrapping the synchronization objects. + if self.re_np is None: + self.re_np = np.frombuffer(self.buff_shared_re, dtype=np.float64) + self.im_np = np.frombuffer(self.buff_shared_im, dtype=np.float64) + def push(self, data): + self.spawn_fix() + if self.closed: raise Exception("The queue is closed and should not be receiving any more data") with self.points_taken_lock: @@ -664,6 +688,10 @@ def __len__(self): with self.points_taken_lock: return self.points_taken.value + def spawn_fix(self): + for stream in self.output_streams: + stream.spawn_fix() + # We allow the connectors itself to posess # a descriptor, that it may pass def set_descriptor(self, descriptor): diff --git a/test/test_alazar.py b/test/test_alazar.py index dbdda61dc..cb038e43e 100644 --- a/test/test_alazar.py +++ b/test/test_alazar.py @@ -1,4 +1,4 @@ -from multiprocessing import Queue, Process, Event, Value +from multiprocess import Queue, Process, Event, Value from auspex.instruments import AlazarATS9870, AlazarChannel import time From 92c7dca036e30e60991d0bd3a4435f75bee4d8d0 Mon Sep 17 00:00:00 2001 From: Graham Rowlands Date: Mon, 12 Apr 2021 13:36:43 -0400 Subject: [PATCH 2/3] Minor :bug: fixes and verbosity improvements --- src/auspex/experiment.py | 10 +++++++--- src/auspex/filters/channelizer.py | 2 +- src/auspex/filters/elementwise.py | 2 +- src/auspex/filters/framer.py | 2 +- src/auspex/filters/plot.py | 2 +- src/auspex/filters/singleshot.py | 4 ++-- src/auspex/instruments/stanford.py | 2 +- 7 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/auspex/experiment.py b/src/auspex/experiment.py index 17d86d8e2..2f8117a45 100644 --- a/src/auspex/experiment.py +++ b/src/auspex/experiment.py @@ -10,6 +10,7 @@ import sys import uuid import json +import traceback from auspex.log import logger @@ -180,6 +181,9 @@ def __init__(self): # Should we show the dashboard? self.dashboard = False + # Profile? + self.profile = False + # Multiprocessing manager? self.manager = mp.Manager() @@ -814,13 +818,13 @@ def connect_to_plot_server(self): logger.info("Connection established to plot server.") self.do_plotting = True else: - raise Exception("Server returned invalid message, expected ACK.") + raise Exception("Plot server returned invalid message, expected ACK.") except: - logger.info("Could not connect to server.") + logger.info("Could not connect to plot server.") for p in self.plotters: p.do_plotting = False else: - logger.info("Plot Server did not respond.") + logger.info("Plot server did not respond.") for p in self.plotters: p.do_plotting = False diff --git a/src/auspex/filters/channelizer.py b/src/auspex/filters/channelizer.py index 528623cf2..44c37e987 100644 --- a/src/auspex/filters/channelizer.py +++ b/src/auspex/filters/channelizer.py @@ -68,7 +68,7 @@ def __init__(self, frequency=None, bandwidth=None, decimation_factor=None, self.follow_axis.value = follow_axis if follow_freq_offset: self.follow_freq_offset.value = follow_freq_offset - self.quince_parameters = [self.decimation_factor, self.frequency, self.bandwidth] + # self.quince_parameters = [self.decimation_factor, self.frequency, self.bandwidth] self._phase = 0.0 def final_init(self): diff --git a/src/auspex/filters/elementwise.py b/src/auspex/filters/elementwise.py index f21c28675..51fe0b960 100644 --- a/src/auspex/filters/elementwise.py +++ b/src/auspex/filters/elementwise.py @@ -31,7 +31,7 @@ class ElementwiseFilter(Filter): def __init__(self, filter_name=None, **kwargs): super(ElementwiseFilter, self).__init__(filter_name=filter_name, **kwargs) self.sink.max_input_streams = 100 - self.quince_parameters = [] + # self.quince_parameters = [] def operation(self): """Must be overridden with the desired mathematical function""" diff --git a/src/auspex/filters/framer.py b/src/auspex/filters/framer.py index bbfb08ee7..c02d15da0 100644 --- a/src/auspex/filters/framer.py +++ b/src/auspex/filters/framer.py @@ -32,7 +32,7 @@ def __init__(self, axis=None, **kwargs): self.sum_so_far = None self.num_averages = None - self.quince_parameters = [self.axis] + # self.quince_parameters = [self.axis] def final_init(self): descriptor_in = self.sink.descriptor diff --git a/src/auspex/filters/plot.py b/src/auspex/filters/plot.py index 8852507ba..91c020a6c 100644 --- a/src/auspex/filters/plot.py +++ b/src/auspex/filters/plot.py @@ -47,7 +47,7 @@ def __init__(self, *args, name="", plot_dims=None, plot_mode=None, **plot_args): self._final_buffer = Queue() self.final_buffer = None - self.quince_parameters = [self.plot_dims, self.plot_mode] + # self.quince_parameters = [self.plot_dims, self.plot_mode] # Unique id for plot server self.uuid = None diff --git a/src/auspex/filters/singleshot.py b/src/auspex/filters/singleshot.py index 0895548ea..6cd83ecbc 100644 --- a/src/auspex/filters/singleshot.py +++ b/src/auspex/filters/singleshot.py @@ -53,8 +53,8 @@ def __init__(self, save_kernel=False, optimal_integration_time=False, self.set_threshold.value = set_threshold self.logistic_regression.value = logistic_regression - self.quince_parameters = [self.save_kernel, self.optimal_integration_time, - self.zero_mean, self.set_threshold, self.logistic_regression] + # self.quince_parameters = [self.save_kernel, self.optimal_integration_time, + # self.zero_mean, self.set_threshold, self.logistic_regression] self.pdf_data_queue = Queue() #Output queue self.fidelity = self.source diff --git a/src/auspex/instruments/stanford.py b/src/auspex/instruments/stanford.py index 05c73a506..0f01449fe 100644 --- a/src/auspex/instruments/stanford.py +++ b/src/auspex/instruments/stanford.py @@ -86,7 +86,7 @@ def get_buffer(self, channel): self.interface.write("TRCB?{:d},0,{:d}".format(channel, stored_points)) #buf = self.interface.read_raw(numbytes=4) buf = self.interface.read_bytes(4*stored_points,chunk_size=4) - logger.info(f"Raw buffer is {buf} with length {len(buf)} bytes.") + # logger.info(f"Raw buffer is {buf} with length {len(buf)} bytes.") return np.frombuffer(buf, dtype=np.float32) def buffer_start(self): From 666ff6664a6569e4b7d7007c1d9b4ff1451f1209 Mon Sep 17 00:00:00 2001 From: Graham Rowlands Date: Mon, 12 Apr 2021 14:48:58 -0400 Subject: [PATCH 3/3] Setup.py typo fix --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 0639b72e6..42825649e 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,7 @@ "ipywidgets >= 7.0.0", "sqlalchemy >= 1.2.15", "pyserial >= 3.4", - "multiprocess == 0.70.11.1" + "multiprocess == 0.70.11.1", "setproctitle", "progress" ]