Skip to content

Commit c016f9e

Browse files
committed
Make HttpEngine the default rather than AsyncHttpEngine
Due to buggy interaction with jupyter notebooks, which is how most of our users work with the AllenSDK, make the synchronous downloader the default for EcephysProjectCache. See GH #1585. Also resolves GH #1586 - passing 'asynchronous' to the constructors now chooses the proper stream engine.
1 parent 16ebd96 commit c016f9e

File tree

4 files changed

+104
-19
lines changed

4 files changed

+104
-19
lines changed

allensdk/brain_observatory/ecephys/ecephys_project_api/ecephys_project_lims_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,7 @@ def get_natural_scene_template(self, number: int) -> Iterable[bytes]:
568568

569569
@classmethod
570570
def default(cls, lims_credentials: Optional[DbCredentials] = None,
571-
app_kwargs=None, asynchronous=True):
571+
app_kwargs=None, asynchronous=False):
572572
""" Construct a "straightforward" lims api that can fetch data from
573573
lims2.
574574

allensdk/brain_observatory/ecephys/ecephys_project_api/ecephys_project_warehouse_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ def get_unit_analysis_metrics(self, unit_ids=None, ecephys_session_ids=None, ses
307307

308308

309309
@classmethod
310-
def default(cls, asynchronous=True, **rma_kwargs):
310+
def default(cls, asynchronous=False, **rma_kwargs):
311311
_rma_kwargs = {"scheme": "http", "host": "api.brain-map.org"}
312312
_rma_kwargs.update(rma_kwargs)
313313

allensdk/brain_observatory/ecephys/ecephys_project_cache.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,22 @@ class constructors rather than to initialize this class directly.
167167
self.fetch_tries = fetch_tries
168168
self.stream_writer = (stream_writer
169169
or self.fetch_api.rma_engine.write_bytes)
170-
170+
if stream_writer is not None:
171+
self.stream_writer = stream_writer
172+
else:
173+
if hasattr(self.fetch_api, "rma_engine"): # EcephysProjectWarehouseApi # noqa
174+
self.stream_writer = self.fetch_api.rma_engine.write_bytes
175+
# TODO: Make these names consistent in the different fetch apis
176+
elif hasattr(self.fetch_api, "app_engine"): # EcephysProjectLimsApi # noqa
177+
self.stream_writer = self.fetch_api.app_engine.write_bytes
178+
else:
179+
raise ValueError(
180+
"Must either set value for `stream_writer`, or use a "
181+
"`fetch_api` with an rma_engine or app_engine attribute "
182+
"that implements `write_bytes`. See `HttpEngine` and "
183+
"`AsyncHttpEngine` from "
184+
"allensdk.brain_observatory.ecephys.ecephys_project_api."
185+
"http_engine for examples.")
171186

172187
def _get_sessions(self):
173188
path = self.get_cache_path(None, self.SESSIONS_KEY)
@@ -581,7 +596,7 @@ def _from_http_source_default(cls, fetch_api_cls, fetch_api_kwargs, **kwargs):
581596
def from_lims(cls, lims_credentials: Optional[DbCredentials] = None,
582597
scheme: Optional[str] = None,
583598
host: Optional[str] = None,
584-
asynchronous: bool = True,
599+
asynchronous: bool = False,
585600
manifest: Optional[Union[str, Path]] = None,
586601
version: Optional[str] = None,
587602
cache: bool = True,
@@ -607,7 +622,7 @@ def from_lims(cls, lims_credentials: Optional[DbCredentials] = None,
607622
value if unspecified. Will not be used unless `scheme` is
608623
also specified.
609624
asynchronous : bool
610-
Whether to fetch file asynchronously. Defaults to True.
625+
Whether to fetch file asynchronously. Defaults to False.
611626
manifest : str or Path
612627
full path at which manifest json will be stored
613628
version : str
@@ -636,7 +651,7 @@ def from_lims(cls, lims_credentials: Optional[DbCredentials] = None,
636651
def from_warehouse(cls,
637652
scheme: Optional[str] = None,
638653
host: Optional[str] = None,
639-
asynchronous: bool = True,
654+
asynchronous: bool = False,
640655
manifest: Optional[Union[str, Path]] = None,
641656
version: Optional[str] = None,
642657
cache: bool = True,
@@ -657,7 +672,7 @@ def from_warehouse(cls,
657672
value if unspecified. Will not be used unless `scheme` is also
658673
specified.
659674
asynchronous : bool
660-
Whether to fetch file asynchronously. Defaults to True.
675+
Whether to fetch file asynchronously. Defaults to False.
661676
manifest : str or Path
662677
full path at which manifest json will be stored
663678
version : str
@@ -673,7 +688,7 @@ def from_warehouse(cls,
673688
app_kwargs = {"scheme": scheme, "host": host,
674689
"asynchronous": asynchronous}
675690
else:
676-
app_kwargs = None
691+
app_kwargs = {"asynchronous": asynchronous}
677692
return cls._from_http_source_default(
678693
EcephysProjectWarehouseApi, app_kwargs, manifest=manifest,
679694
version=version, cache=cache, fetch_tries=fetch_tries

allensdk/test/brain_observatory/ecephys/test_ecephys_project_cache.py

Lines changed: 81 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import allensdk.brain_observatory.ecephys.ecephys_project_cache as epc
1212
import allensdk.brain_observatory.ecephys.write_nwb.__main__ as write_nwb
1313
from allensdk.brain_observatory.ecephys.ecephys_project_api.http_engine import (
14-
write_from_stream, write_bytes_from_coroutine, AsyncHttpEngine
14+
write_from_stream, write_bytes_from_coroutine, AsyncHttpEngine, HttpEngine
1515
)
1616

1717

@@ -195,14 +195,6 @@ def tmpdir_cache(shared_tmpdir, mock_api):
195195
)
196196

197197

198-
def test_stream_writer_method_default_correct():
199-
"""Checks that the stream_writer contained in the rma engine is used
200-
when one is not supplied to the __init__ method.
201-
"""
202-
cache = epc.EcephysProjectCache(stream_writer=None)
203-
assert cache.stream_writer == cache.fetch_api.rma_engine.write_bytes
204-
205-
206198
def lazy_cache_test(cache, cache_name, api_name, expected, *args, **kwargs):
207199
obtained_one = getattr(cache, cache_name)(*args, **kwargs)
208200
obtained_two = getattr(cache, cache_name)(*args, **kwargs)
@@ -389,5 +381,83 @@ def test_from_lims_default(tmpdir_factory):
389381
cache = epc.EcephysProjectCache.from_lims(
390382
manifest=os.path.join(tmpdir, "manifest.json")
391383
)
392-
assert isinstance(cache.fetch_api.app_engine, AsyncHttpEngine)
393-
assert cache.stream_writer is epc.write_bytes_from_coroutine
384+
assert isinstance(cache.fetch_api.app_engine, HttpEngine)
385+
assert cache.stream_writer is write_from_stream
386+
assert cache.fetch_api.app_engine.scheme == "http"
387+
assert cache.fetch_api.app_engine.host == "lims2"
388+
389+
390+
def test_from_warehouse_default(tmpdir_factory):
391+
tmpdir = str(tmpdir_factory.mktemp("test_from_warehouse_default"))
392+
393+
cache = epc.EcephysProjectCache.from_warehouse(
394+
manifest=os.path.join(tmpdir, "manifest.json")
395+
)
396+
assert isinstance(cache.fetch_api.rma_engine, HttpEngine)
397+
assert cache.stream_writer is write_from_stream
398+
assert cache.fetch_api.rma_engine.scheme == "http"
399+
assert cache.fetch_api.rma_engine.host == "api.brain-map.org"
400+
401+
402+
def test_init_default(tmpdir_factory):
403+
tmpdir = str(tmpdir_factory.mktemp("test_init_default"))
404+
cache = epc.EcephysProjectCache(
405+
manifest=os.path.join(tmpdir, "manifest.json")
406+
)
407+
assert isinstance(cache.fetch_api.rma_engine, HttpEngine)
408+
assert cache.stream_writer is cache.fetch_api.rma_engine.write_bytes
409+
assert cache.fetch_api.rma_engine.scheme == "http"
410+
assert cache.fetch_api.rma_engine.host == "api.brain-map.org"
411+
412+
413+
@pytest.mark.parametrize(
414+
("cache_constructor, asynchronous, engine_attr, expected_engine,"
415+
"expected_scheme, expected_host, expected_stream_writer"), [
416+
(
417+
epc.EcephysProjectCache.from_lims, True,
418+
"app_engine", AsyncHttpEngine, "http", "lims2",
419+
write_bytes_from_coroutine
420+
),
421+
(
422+
epc.EcephysProjectCache.from_warehouse, True,
423+
"rma_engine", AsyncHttpEngine, "http", "api.brain-map.org",
424+
write_bytes_from_coroutine
425+
),
426+
(
427+
epc.EcephysProjectCache.from_warehouse, False,
428+
"rma_engine", HttpEngine, "http", "api.brain-map.org",
429+
write_from_stream
430+
),
431+
(
432+
epc.EcephysProjectCache.from_lims, False,
433+
"app_engine", HttpEngine, "http", "lims2",
434+
write_from_stream
435+
),
436+
])
437+
def test_stream_asynchronous_arg(
438+
cache_constructor, asynchronous, engine_attr, expected_engine,
439+
expected_scheme, expected_host, expected_stream_writer,
440+
tmpdir_factory):
441+
""" Ensure the proper stream engine is chosen from the `asynchronous`
442+
argument in the EcephysProjectCache constructors (using other default
443+
values)."""
444+
tmpdir = str(tmpdir_factory.mktemp("test_stream_async_args"))
445+
cache = cache_constructor(
446+
asynchronous=asynchronous,
447+
manifest=os.path.join(tmpdir, "manifest.json")
448+
)
449+
engine = getattr(cache.fetch_api, engine_attr)
450+
assert isinstance(engine, expected_engine)
451+
assert cache.stream_writer is expected_stream_writer
452+
assert engine.scheme == expected_scheme
453+
assert engine.host == expected_host
454+
455+
456+
def test_stream_writer_method_default_correct(tmpdir_factory):
457+
"""Checks that the stream_writer contained in the rma engine is used
458+
when one is not supplied to the __init__ method.
459+
"""
460+
tmpdir = str(tmpdir_factory.mktemp("test_from_warehouse_default"))
461+
manifest = os.path.join(tmpdir, "manifest.json")
462+
cache = epc.EcephysProjectCache(stream_writer=None, manifest=manifest)
463+
assert cache.stream_writer == cache.fetch_api.rma_engine.write_bytes

0 commit comments

Comments
 (0)