Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"ipywidgets >= 7.0.0",
"sqlalchemy >= 1.2.15",
"pyserial >= 3.4",
"multiprocess == 0.70.11.1",
"setproctitle",
"progress"
]
Expand Down
114 changes: 78 additions & 36 deletions src/auspex/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,26 @@
import sys
import uuid
import json
import traceback

if sys.platform == 'win32' or 'NOFORKING' in os.environ:
from queue import Queue as Queue
from threading import Event
from threading import Thread as Process
from threading import Thread as Thread
else:
from multiprocessing import Queue as Queue
from multiprocessing import Event
from multiprocessing import Process
from threading import Thread as Thread
from auspex.log import logger

# if sys.platform == 'win32' or 'NOFORKING' in os.environ:
# logger.info("Using threads")
# from queue import Queue as Queue
# from threading import Event
# from threading import Thread as Process
# from threading import Thread as Thread
# else:

from multiprocess import Queue as Queue
from multiprocess import Event
from multiprocess import Process
import multiprocess as mp
from threading import Thread as Thread
import multiprocess.context as ctx
# ctx._force_start_method('fork')
logger.info(f"Using pathos multiprocess with {mp.get_start_method()}")

from IPython.core.getipython import get_ipython
in_notebook = False
Expand Down Expand Up @@ -53,7 +62,7 @@
from auspex.sweep import Sweeper
from auspex.stream import DataStream, DataAxis, SweepAxis, DataStreamDescriptor, InputConnector, OutputConnector
from auspex.filters import Plotter, MeshPlotter, ManualPlotter, WriteToFile, DataBuffer, Filter
from auspex.log import logger

import auspex.config
from auspex.config import isnotebook

Expand Down Expand Up @@ -85,9 +94,10 @@ def update_filename(filename, add_date=True):
return "{}-{:04d}".format(basename,i)

class ExperimentGraph(object):
def __init__(self, edges):
def __init__(self, edges, manager):
self.dag = None
self.edges = []
self.manager = manager
self.create_graph(edges)

def dfs_edges(self):
Expand All @@ -110,7 +120,8 @@ def create_graph(self, edges):
dag = nx.DiGraph()
self.edges = []
for edge in edges:
obj = DataStream(name="{}_TO_{}".format(edge[0].name, edge[1].name))
logger.debug(f"{edge[0].name}_TO_{edge[1].name}: {self.manager}")
obj = DataStream(name="{}_TO_{}".format(edge[0].name, edge[1].name), manager=self.manager)
edge[0].add_output_stream(obj)
edge[1].add_input_stream(obj)
self.edges.append(obj)
Expand Down Expand Up @@ -170,6 +181,12 @@ def __init__(self):
# Should we show the dashboard?
self.dashboard = False

# Profile?
self.profile = False

# Multiprocessing manager?
self.manager = mp.Manager()

# Create and use plots?
self.do_plotting = False

Expand Down Expand Up @@ -246,6 +263,12 @@ def __init__(self):
# Run the stream init
self.init_streams()

# Event for filters to register a panic
self.filter_panic = Event()

# Event to tell all filters to exit
self.filter_exit = Event()

def set_graph(self, edges):
unique_nodes = []
for eb, ee in edges:
Expand All @@ -254,7 +277,7 @@ def set_graph(self, edges):
if ee.parent not in unique_nodes:
unique_nodes.append(ee.parent)
self.nodes = unique_nodes
self.graph = ExperimentGraph(edges)
self.graph = ExperimentGraph(edges, manager=self.manager)

def init_streams(self):
"""Establish the base descriptors for any internal data streams and connectors."""
Expand Down Expand Up @@ -372,6 +395,12 @@ def sweep(self):
# Run the procedure
self.run()

# Check for any filter panics
if self.filter_panic.is_set():
logger.warning("Panic detected in listeners/filters. Exiting.")
self.filter_exit.set()
break

# See if the axes want to extend themselves. They will push updates
# directly to the output_connecters as messages that will be passed
# through the filter pipeline.
Expand Down Expand Up @@ -573,23 +602,15 @@ def run_sweeps(self):
#initialize instruments
self.init_instruments()

# def catch_ctrl_c(signum, frame):
# import ipdb; ipdb.set_trace();
# logger.info("Caught SIGINT. Shutting down.")
# self.declare_done() # Ask nicely

# self.shutdown()
# raise NameError("Shutting down.")
# sys.exit(0)

# signal.signal(signal.SIGINT, catch_ctrl_c)

# We want to wait for the sweep method above,
# not the experiment's run method, so replace this
# in the list of tasks.
self.other_nodes = self.nodes[:]
self.other_nodes.extend(self.extra_plotters)
self.other_nodes.remove(self)
self.other_node_processes = {}
self.other_node_configs = {}
self.other_node_outputs = {}

# If we are launching the process dashboard,
# setup the bokeh server and establish a queue for
Expand All @@ -601,7 +622,15 @@ def run_sweeps(self):

# Start the filter processes
for n in self.other_nodes:
n.start()
conf = n.get_config()
oq = Queue()
p = Process(target=n.run, args=(conf, self.filter_exit, n.done, self.filter_panic,
n.output_connectors, n.input_connectors,
self.profile, n.perf_queue, oq))
p.start()
self.other_node_configs[n] = conf
self.other_node_processes[n] = p
self.other_node_outputs[n] = oq

# Run the main experiment loop
self.sweep()
Expand All @@ -611,7 +640,7 @@ def run_sweeps(self):
if callback:
callback(plot)

# Wait for the
# Wait for the filters to finish.
time.sleep(0.1)
times = {n: 0 for n in self.other_nodes}
dones = {n: n.done.is_set() for n in self.other_nodes}
Expand All @@ -630,7 +659,19 @@ def run_sweeps(self):
n.final_buffer = n._final_buffer.get()

for n in self.other_nodes:
n.join()
p = self.other_node_processes[n]
conf = self.other_node_configs[n]
oq = self.other_node_outputs[n]

while True:
try:
# Get the Output Queue
param, v = oq.get(False)
setattr(n, param, v)
except:
break

p.join()

for buff in self.buffers:
buff.output_data, buff.descriptor = buff.get_data()
Expand All @@ -640,9 +681,11 @@ def run_sweeps(self):
self.perf_thread.join()

except KeyboardInterrupt as e:
print("Caught KeyboardInterrupt, terminating.")
#self.shutdown() #Commented out, since this is already in the finally.
#sys.exit(0)
logger.warning("Caught KeyboardInterrupt, shutting down.")
except Exception as e:
just_the_string = traceback.format_exc()
logger.warning(f"Experiment raised exception {e} (traceback follows). Bailing.")
logger.warning(just_the_string)
finally:
self.shutdown()

Expand All @@ -655,15 +698,14 @@ def stop_manual_plotters(self):
mp.stop()

def shutdown(self):
logger.debug("Shutting down instruments")
# This includes stopping the flow of data, and must be done before terminating nodes
self.shutdown_instruments()

try:
ct_max = 5 #arbitrary waiting 10 s before giving up
for ct in range(ct_max):
if hasattr(self, 'other_nodes'):
if any([n.is_alive() for n in self.other_nodes]):
if any([p.is_alive() for p in self.other_node_processes.values()]):
logger.warning("Filter pipeline appears stuck. Use keyboard interrupt to terminate.")
time.sleep(2.0)
else:
Expand Down Expand Up @@ -776,13 +818,13 @@ def connect_to_plot_server(self):
logger.info("Connection established to plot server.")
self.do_plotting = True
else:
raise Exception("Server returned invalid message, expected ACK.")
raise Exception("Plot server returned invalid message, expected ACK.")
except:
logger.info("Could not connect to server.")
logger.info("Could not connect to plot server.")
for p in self.plotters:
p.do_plotting = False
else:
logger.info("Plot Server did not respond.")
logger.info("Plot server did not respond.")
for p in self.plotters:
p.do_plotting = False

Expand Down
Loading