Skip to content

Commit

Permalink
Merge pull request microsoft#2124 from jenshnielsen/nested_measurements
Browse files Browse the repository at this point in the history
Only create one background writer for multiple datasets
  • Loading branch information
jenshnielsen authored Sep 8, 2020
2 parents 9987733 + 8580c34 commit e1c2f27
Show file tree
Hide file tree
Showing 7 changed files with 765 additions and 254 deletions.

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion docs/examples/DataSet/Saving_data_in_the_background.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,13 @@
"_ = plot_dataset(datasaver.dataset)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that if you perform more than one measurement concurrently, they must all either perform the saving in the main thread on in the background. Mixing between foreground and background is not allowed."
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand All @@ -359,7 +366,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.4"
"version": "3.7.9"
}
},
"nbformat": 4,
Expand Down
127 changes: 97 additions & 30 deletions qcodes/dataset/data_set.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import atexit
import functools
import importlib
import json
import logging
import os
import time
import uuid
from dataclasses import dataclass
from queue import Empty, Queue
from threading import Thread
from typing import (TYPE_CHECKING, Any, Callable, Dict, List, Mapping,
Expand Down Expand Up @@ -217,11 +219,10 @@ class _BackgroundWriter(Thread):
Write the results from the DataSet's dataqueue in a new thread
"""

def __init__(self, queue: Queue, conn: ConnectionPlus, table_name: str):
super().__init__()
def __init__(self, queue: Queue, conn: ConnectionPlus):
super().__init__(daemon=True)
self.queue = queue
self.path = conn.path_to_dbfile
self.table_name = table_name
self.keep_writing = True

def run(self) -> None:
Expand All @@ -234,13 +235,39 @@ def run(self) -> None:
if item['keys'] == 'stop':
self.keep_writing = False
self.conn.close()
elif item['keys'] == 'finalize':
_WRITERS[self.path].active_datasets.remove(item['values'])
else:
self.write_results(item['keys'], item['values'])
self.write_results(item['keys'], item['values'], item['table_name'])
self.queue.task_done()

def write_results(self, keys: Sequence[str],
values: Sequence[List[Any]]) -> None:
insert_many_values(self.conn, self.table_name, keys, values)
values: Sequence[List[Any]],
table_name: str) -> None:
insert_many_values(self.conn, table_name, keys, values)

def shutdown(self) -> None:
"""
Send a termination signal to the data writing queue, wait for the
queue to empty and the thread to join.
If the background writing thread is not alive this will do nothing.
"""
if self.is_alive():
self.queue.put({'keys': 'stop', 'values': []})
self.queue.join()
self.join()


@dataclass
class _WriterStatus:
bg_writer: Optional[_BackgroundWriter]
write_in_background: Optional[bool]
data_write_queue: Queue
active_datasets: Set[int]


_WRITERS: Dict[str, _WriterStatus] = {}


class DataSet(Sized):
Expand All @@ -254,6 +281,7 @@ class DataSet(Sized):
'description', 'completed_timestamp_raw', 'metadata',
'dependent_parameters', 'parent_dataset_links',
'captured_run_id', 'captured_counter')
background_sleep_time = 1e-3

def __init__(self, path_to_db: str = None,
run_id: Optional[int] = None,
Expand Down Expand Up @@ -293,11 +321,11 @@ def __init__(self, path_to_db: str = None,
self.subscribers: Dict[str, _Subscriber] = {}
self._interdeps: InterDependencies_
self._parent_dataset_links: List[Link]
self._data_write_queue: Queue = Queue()
#: In memory representation of the data in the dataset.
self.cache: DataSetCache = DataSetCache(self)
self._results: List[Dict[str, VALUE]] = []


if run_id is not None:
if not run_exists(self.conn, run_id):
raise ValueError(f"Run with run_id {run_id} does not exist in "
Expand Down Expand Up @@ -340,9 +368,14 @@ def __init__(self, path_to_db: str = None,
self._metadata = get_metadata_from_run_id(self.conn, self.run_id)
self._parent_dataset_links = []

self._bg_writer = _BackgroundWriter(self._data_write_queue,
self.conn,
self.table_name)
if _WRITERS.get(self.path_to_db) is None:
queue: Queue = Queue()
ws: _WriterStatus = _WriterStatus(
bg_writer=None,
write_in_background=None,
data_write_queue=queue,
active_datasets=set())
_WRITERS[self.path_to_db] = ws

@property
def run_id(self) -> int:
Expand Down Expand Up @@ -486,6 +519,10 @@ def parent_dataset_links(self, links: List[Link]) -> None:

self._parent_dataset_links = links

@property
def _writer_status(self) -> _WriterStatus:
return _WRITERS[self.path_to_db]

def the_same_dataset_as(self, other: 'DataSet') -> bool:
"""
Check if two datasets correspond to the same run by comparing
Expand Down Expand Up @@ -708,8 +745,23 @@ def _perform_start_actions(self, start_bg_writer: bool) -> None:
pdl_str = links_to_str(self._parent_dataset_links)
update_parent_datasets(self.conn, self.run_id, pdl_str)

writer_status = self._writer_status

write_in_background_status = writer_status.write_in_background
if write_in_background_status is not None and write_in_background_status != start_bg_writer:
raise RuntimeError("All datasets written to the same database must "
"be written either in the background or in the "
"main thread. You cannot mix.")
if start_bg_writer:
self._bg_writer.start()
writer_status.write_in_background = True
if writer_status.bg_writer is None:
writer_status.bg_writer = _BackgroundWriter(writer_status.data_write_queue, self.conn)
if not writer_status.bg_writer.is_alive():
writer_status.bg_writer.start()
else:
writer_status.write_in_background = False

writer_status.active_datasets.add(self.run_id)

def mark_completed(self) -> None:
"""
Expand All @@ -728,7 +780,7 @@ def _perform_completion_actions(self) -> None:
"""
for sub in self.subscribers.values():
sub.done_callback()
self.terminate_queue()
self._ensure_dataset_written()

@deprecate(alternative='add_results')
def add_result(self, results: Mapping[str, VALUE]) -> int:
Expand Down Expand Up @@ -788,9 +840,12 @@ def add_results(self, results: Sequence[Mapping[str, VALUE]]) -> None:
expected_keys = frozenset.union(*[frozenset(d) for d in results])
values = [[d.get(k, None) for k in expected_keys] for d in results]

if self._bg_writer.is_alive():
item = {'keys': list(expected_keys), 'values': values}
self._data_write_queue.put(item)
writer_status = self._writer_status

if writer_status.write_in_background:
item = {'keys': list(expected_keys), 'values': values,
"table_name": self.table_name}
writer_status.data_write_queue.put(item)
else:
insert_many_values(self.conn, self.table_name, list(expected_keys),
values)
Expand All @@ -816,18 +871,27 @@ def add_result_to_queue(self,
expected_keys = frozenset.union(*[frozenset(d) for d in results])
values = [[d.get(k, None) for k in expected_keys] for d in results]

item = {'keys': list(expected_keys), 'values': values}
self._data_write_queue.put(item)
item = {'keys': list(expected_keys), 'values': values,
"table_name": self.table_name}
writer_status = self._writer_status

writer_status.data_write_queue.put(item)

def _ensure_dataset_written(self) -> None:
writer_status = self._writer_status

if writer_status.write_in_background:
writer_status.data_write_queue.put({'keys': 'finalize', 'values': self.run_id})
while self.run_id in writer_status.active_datasets:
time.sleep(self.background_sleep_time)
else:
writer_status.active_datasets.remove(self.run_id)
if len(writer_status.active_datasets) == 0:
writer_status.write_in_background = None
if writer_status.bg_writer is not None:
writer_status.bg_writer.shutdown()
writer_status.bg_writer = None

def terminate_queue(self) -> None:
"""
Send a termination signal to the data writing queue, if the
background writing thread has been started. Else do nothing.
"""
if self._bg_writer.is_alive():
self._data_write_queue.put({'keys': 'stop', 'values': []})
self._data_write_queue.join()
self._bg_writer.join()

@staticmethod
def _validate_parameters(*params: Union[str, ParamSpec, _BaseParameter]
Expand Down Expand Up @@ -1331,26 +1395,29 @@ def _flush_data_to_database(self, block: bool = False) -> None:
argument has no effect if not using a background thread.
"""

log.debug('Flushing to database')
writer_status = self._writer_status
if len(self._results) > 0:
try:

self.add_results(self._results)
if self._bg_writer.is_alive():
if writer_status.write_in_background:
log.debug(f"Succesfully enqueued result for write thread")
else:
log.debug(f'Successfully wrote result to disk')
self._results = []
except Exception as e:
if self._bg_writer.is_alive():
if writer_status.write_in_background:
log.warning(f"Could not enqueue result; {e}")
else:
log.warning(f'Could not commit to database; {e}')
else:
log.debug('No results to flush')

if self._bg_writer.is_alive() and block:
if writer_status.write_in_background and block:
log.debug(f"Waiting for write queue to empty.")
self._data_write_queue.join()
writer_status.data_write_queue.join()


# public api
Expand Down
10 changes: 1 addition & 9 deletions qcodes/dataset/measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,6 @@ class Runner:
to the database. Additionally, it may perform experiment bootstrapping
and clean-up after a measurement.
"""
_is_entered: bool = False

def __init__(
self, enteractions: List, exitactions: List,
Expand Down Expand Up @@ -424,12 +423,6 @@ def __enter__(self) -> DataSaver:
# TODO: should user actions really precede the dataset?
# first do whatever bootstrapping the user specified

if Runner._is_entered:
log.warning('Nested measurements are not supported. This will '
'become an error in future releases of QCoDeS')

Runner._is_entered = True

for func, args in self.enteractions:
func(*args)

Expand Down Expand Up @@ -488,9 +481,8 @@ def __exit__(self,
traceback: Optional[TracebackType]
) -> None:
with DelayedKeyboardInterrupt():
self.datasaver.flush_data_to_database()
self.datasaver.flush_data_to_database(block=True)

Runner._is_entered = False
# perform the "teardown" events
for func, args in self.exitactions:
func(*args)
Expand Down
60 changes: 60 additions & 0 deletions qcodes/tests/dataset/test_concurrent_datasets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
Test that multiple datasets can coexist as expected
"""
import pytest

from qcodes import new_experiment
from qcodes.dataset.data_set import DataSet


def test_foreground_after_background_raises(empty_temp_db_connection):
new_experiment("test", "test1", conn=empty_temp_db_connection)
ds1 = DataSet(conn=empty_temp_db_connection)
ds1.mark_started(start_bg_writer=True)

ds2 = DataSet(conn=empty_temp_db_connection)
with pytest.raises(RuntimeError, match="All datasets written"):
ds2.mark_started(start_bg_writer=False)


def test_background_after_foreground_raises(empty_temp_db_connection):
new_experiment("test", "test1", conn=empty_temp_db_connection)
ds1 = DataSet(conn=empty_temp_db_connection)
ds1.mark_started(start_bg_writer=False)

ds2 = DataSet(conn=empty_temp_db_connection)
with pytest.raises(RuntimeError, match="All datasets written"):
ds2.mark_started(start_bg_writer=True)


def test_background_twice(empty_temp_db_connection):
new_experiment("test", "test1", conn=empty_temp_db_connection)
ds1 = DataSet(conn=empty_temp_db_connection)
ds1.mark_started(start_bg_writer=True)

ds2 = DataSet(conn=empty_temp_db_connection)
ds2.mark_started(start_bg_writer=True)


def test_foreground_twice(empty_temp_db_connection):
new_experiment("test", "test1", conn=empty_temp_db_connection)
ds1 = DataSet(conn=empty_temp_db_connection)
ds1.mark_started(start_bg_writer=False)

ds2 = DataSet(conn=empty_temp_db_connection)
ds2.mark_started(start_bg_writer=False)


def test_foreground_after_background_non_concurrent(empty_temp_db_connection):
new_experiment("test", "test1", conn=empty_temp_db_connection)
ds1 = DataSet(conn=empty_temp_db_connection)
ds1.mark_started(start_bg_writer=True)
ds1.mark_completed()

ds2 = DataSet(conn=empty_temp_db_connection)
ds2.mark_started(start_bg_writer=False)
ds2.mark_completed()

ds3 = DataSet(conn=empty_temp_db_connection)
ds3.mark_started(start_bg_writer=True)
ds3.mark_completed()
18 changes: 0 additions & 18 deletions qcodes/tests/dataset/test_measurement_context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,24 +228,6 @@ def test_mixing_array_and_numeric(DAC, bg_writing):
(DAC.ch2, np.array([DAC.ch2(), DAC.ch1()])))


@pytest.mark.usefixtures("experiment")
def test_nested_measurement_raises_warning(DAC, caplog):
meas1 = Measurement()
meas1.register_parameter(DAC.ch1, paramtype='numeric')
meas1.register_parameter(DAC.ch2, paramtype='array')

meas2 = Measurement()
meas2.register_parameter(DAC.ch1, paramtype='numeric')
meas2.register_parameter(DAC.ch2, paramtype='array')

with meas1.run():
with meas2.run():
pass
pass
assert "Nested measurements are not supported. This will become an " \
"error in future releases of QCoDeS" in caplog.text


def test_measurement_name_default(experiment, DAC, DMM):
fmt = experiment.format_string
exp_id = experiment.exp_id
Expand Down
Loading

0 comments on commit e1c2f27

Please sign in to comment.