Skip to content

Commit c0d5d40

Browse files
authored
Merge pull request #1589 from AllenInstitute/GH-1569/bugfix/ecephys-image-download
Bugfix: Asynchronous Http Engine initialization in EcephysProjectCache
2 parents 818b883 + c016f9e commit c0d5d40

File tree

5 files changed

+176
-18
lines changed

5 files changed

+176
-18
lines changed

allensdk/brain_observatory/ecephys/ecephys_project_api/ecephys_project_lims_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,7 @@ def get_natural_scene_template(self, number: int) -> Iterable[bytes]:
568568

569569
@classmethod
570570
def default(cls, lims_credentials: Optional[DbCredentials] = None,
571-
app_kwargs=None, asynchronous=True):
571+
app_kwargs=None, asynchronous=False):
572572
""" Construct a "straightforward" lims api that can fetch data from
573573
lims2.
574574

allensdk/brain_observatory/ecephys/ecephys_project_api/ecephys_project_warehouse_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ def get_unit_analysis_metrics(self, unit_ids=None, ecephys_session_ids=None, ses
307307

308308

309309
@classmethod
310-
def default(cls, asynchronous=True, **rma_kwargs):
310+
def default(cls, asynchronous=False, **rma_kwargs):
311311
_rma_kwargs = {"scheme": "http", "host": "api.brain-map.org"}
312312
_rma_kwargs.update(rma_kwargs)
313313

allensdk/brain_observatory/ecephys/ecephys_project_api/http_engine.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ def stream(self, route):
7979
if elapsed > self.timeout:
8080
raise requests.Timeout(f"Download took {elapsed} seconds, but timeout was set to {self.timeout}")
8181

82+
@staticmethod
83+
def write_bytes(path: str, stream: Iterable[bytes]):
84+
write_from_stream(path, stream)
85+
8286

8387
AsyncStreamCallbackType = Callable[[AsyncIterator[bytes]], Awaitable[None]]
8488

@@ -166,7 +170,13 @@ def __del__(self):
166170
nest_asyncio.apply()
167171
loop = asyncio.get_event_loop()
168172
loop.run_until_complete(self.session.close())
169-
173+
174+
@staticmethod
175+
def write_bytes(
176+
path: str,
177+
coroutine: Callable[[AsyncStreamCallbackType], Awaitable[None]]):
178+
write_bytes_from_coroutine(path, coroutine)
179+
170180

171181
def write_bytes_from_coroutine(
172182
path: str,
@@ -217,7 +227,6 @@ def write_from_stream(path: str, stream: Iterable[bytes]):
217227
iterable yielding bytes to be written
218228
219229
"""
220-
221230
with open(path, "wb") as fil:
222231
for chunk in stream:
223232
fil.write(chunk)

allensdk/brain_observatory/ecephys/ecephys_project_cache.py

Lines changed: 76 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def __init__(
7474
self,
7575
fetch_api: EcephysProjectApi = EcephysProjectWarehouseApi.default(),
7676
fetch_tries: int = 2,
77-
stream_writer: Callable = write_from_stream,
77+
stream_writer: Optional[Callable] = None,
7878
manifest: Optional[Union[str, Path]] = None,
7979
version: Optional[str] = None,
8080
cache: bool = True):
@@ -83,6 +83,9 @@ def __init__(
8383
summaries of sessionwise data and provides tools for downloading detailed
8484
sessionwise data (such as spike times).
8585
86+
To ensure correct configuration, it is recommended to use one of the
87+
class constructors rather than to initialize this class directly.
88+
8689
Parameters
8790
==========
8891
fetch_api :
@@ -100,6 +103,14 @@ def __init__(
100103
fetch_tries : int
101104
Maximum number of times to attempt a download before giving up and
102105
raising an exception. Note that this is total tries, not retries
106+
stream_writer: Callable
107+
The method used to write from stream. Depends on whether the
108+
engine is synchronous or asynchronous. If not set, will use the
109+
`write_bytes` method native to the `fetch_api`'s `rma_engine`.
110+
If the method is incompatible with the `fetch_api`'s `rma_engine`,
111+
will likely encounter errors. For this reason it is recommended
112+
to leave this field unspecified, or to use one of the class
113+
constructors.
103114
manifest : str or Path
104115
full path at which manifest json will be stored (default =
105116
"ecephys_project_manifest.json" in the local directory.)
@@ -108,6 +119,43 @@ def __init__(
108119
recorded in the file at manifest, an error will be raised.
109120
cache: bool
110121
Whether to write to the cache (default=True)
122+
123+
Notes
124+
=====
125+
It is highly recommended to construct an instance of this class
126+
using one of the following constructor methods:
127+
128+
from_warehouse(scheme: Optional[str] = None,
129+
host: Optional[str] = None,
130+
asynchronous: bool = True,
131+
manifest: Optional[Union[str, Path]] = None,
132+
version: Optional[str] = None,
133+
cache: bool = True,
134+
fetch_tries: int = 2)
135+
Create an instance of EcephysProjectCache with an
136+
EcephysProjectWarehouseApi. Retrieves released data stored
137+
in the warehouse. Suitable for all users downloading
138+
published Allen Institute data.
139+
from_lims(lims_credentials: Optional[DbCredentials] = None,
140+
scheme: Optional[str] = None,
141+
host: Optional[str] = None,
142+
asynchronous: bool = True,
143+
manifest: Optional[str] = None,
144+
version: Optional[str] = None,
145+
cache: bool = True,
146+
fetch_tries: int = 2)
147+
Create an instance of EcephysProjectCache with an
148+
EcephysProjectLimsApi. Retrieves bleeding-edge data stored
149+
locally on Allen Institute servers. Suitable for internal
150+
users on-site at the Allen Institute or using the corporate
151+
vpn. Requires Allen Institute database credentials.
152+
fixed(manifest: Optional[Union[str, Path]] = None,
153+
version: Optional[str] = None)
154+
Create an instance of EcephysProjectCache that will only
155+
use locally stored data, downloaded previously from LIMS
156+
or warehouse using the EcephysProjectCache.
157+
Suitable for users who want to analyze a fixed dataset they
158+
have previously downloaded using EcephysProjectCache.
111159
"""
112160
manifest_ = manifest or "ecephys_project_manifest.json"
113161
version_ = version or self.MANIFEST_VERSION
@@ -117,7 +165,24 @@ def __init__(
117165
cache=cache)
118166
self.fetch_api = fetch_api
119167
self.fetch_tries = fetch_tries
120-
self.stream_writer = stream_writer
168+
self.stream_writer = (stream_writer
169+
or self.fetch_api.rma_engine.write_bytes)
170+
if stream_writer is not None:
171+
self.stream_writer = stream_writer
172+
else:
173+
if hasattr(self.fetch_api, "rma_engine"): # EcephysProjectWarehouseApi # noqa
174+
self.stream_writer = self.fetch_api.rma_engine.write_bytes
175+
# TODO: Make these names consistent in the different fetch apis
176+
elif hasattr(self.fetch_api, "app_engine"): # EcephysProjectLimsApi # noqa
177+
self.stream_writer = self.fetch_api.app_engine.write_bytes
178+
else:
179+
raise ValueError(
180+
"Must either set value for `stream_writer`, or use a "
181+
"`fetch_api` with an rma_engine or app_engine attribute "
182+
"that implements `write_bytes`. See `HttpEngine` and "
183+
"`AsyncHttpEngine` from "
184+
"allensdk.brain_observatory.ecephys.ecephys_project_api."
185+
"http_engine for examples.")
121186

122187
def _get_sessions(self):
123188
path = self.get_cache_path(None, self.SESSIONS_KEY)
@@ -531,8 +596,8 @@ def _from_http_source_default(cls, fetch_api_cls, fetch_api_kwargs, **kwargs):
531596
def from_lims(cls, lims_credentials: Optional[DbCredentials] = None,
532597
scheme: Optional[str] = None,
533598
host: Optional[str] = None,
534-
asynchronous: bool = True,
535-
manifest: Optional[str] = None,
599+
asynchronous: bool = False,
600+
manifest: Optional[Union[str, Path]] = None,
536601
version: Optional[str] = None,
537602
cache: bool = True,
538603
fetch_tries: int = 2):
@@ -557,7 +622,7 @@ def from_lims(cls, lims_credentials: Optional[DbCredentials] = None,
557622
value if unspecified. Will not be used unless `scheme` is
558623
also specified.
559624
asynchronous : bool
560-
Whether to fetch file asynchronously. Defaults to True.
625+
Whether to fetch file asynchronously. Defaults to False.
561626
manifest : str or Path
562627
full path at which manifest json will be stored
563628
version : str
@@ -586,7 +651,7 @@ def from_lims(cls, lims_credentials: Optional[DbCredentials] = None,
586651
def from_warehouse(cls,
587652
scheme: Optional[str] = None,
588653
host: Optional[str] = None,
589-
asynchronous: bool = True,
654+
asynchronous: bool = False,
590655
manifest: Optional[Union[str, Path]] = None,
591656
version: Optional[str] = None,
592657
cache: bool = True,
@@ -607,7 +672,7 @@ def from_warehouse(cls,
607672
value if unspecified. Will not be used unless `scheme` is also
608673
specified.
609674
asynchronous : bool
610-
Whether to fetch file asynchronously. Defaults to True.
675+
Whether to fetch file asynchronously. Defaults to False.
611676
manifest : str or Path
612677
full path at which manifest json will be stored
613678
version : str
@@ -623,14 +688,15 @@ def from_warehouse(cls,
623688
app_kwargs = {"scheme": scheme, "host": host,
624689
"asynchronous": asynchronous}
625690
else:
626-
app_kwargs = None
691+
app_kwargs = {"asynchronous": asynchronous}
627692
return cls._from_http_source_default(
628693
EcephysProjectWarehouseApi, app_kwargs, manifest=manifest,
629694
version=version, cache=cache, fetch_tries=fetch_tries
630695
)
631696

632697
@classmethod
633-
def fixed(cls, manifest=None, version=None):
698+
def fixed(cls, manifest: Optional[Union[str, Path]] = None,
699+
version: Optional[str] = None):
634700
"""
635701
Creates a EcephysProjectCache that refuses to fetch any data
636702
- only the existing local cache is accessible. Useful if you
@@ -701,5 +767,4 @@ def read_nwb(path):
701767
reader = pynwb.NWBHDF5IO(str(path), 'r')
702768
nwbfile = reader.read()
703769
nwbfile.identifier # if the file is corrupt, make sure an exception gets raised during read
704-
return nwbfile
705-
770+
return nwbfile

allensdk/test/brain_observatory/ecephys/test_ecephys_project_cache.py

Lines changed: 87 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import allensdk.brain_observatory.ecephys.ecephys_project_cache as epc
1212
import allensdk.brain_observatory.ecephys.write_nwb.__main__ as write_nwb
1313
from allensdk.brain_observatory.ecephys.ecephys_project_api.http_engine import (
14-
write_bytes_from_coroutine, AsyncHttpEngine
14+
write_from_stream, write_bytes_from_coroutine, AsyncHttpEngine, HttpEngine
1515
)
1616

1717

@@ -109,12 +109,18 @@ def shared_tmpdir(tmpdir_factory):
109109
return str(tmpdir_factory.mktemp('test_ecephys_project_cache'))
110110

111111

112+
class MockEngine:
113+
def __init__(self):
114+
self.write_bytes = write_from_stream
115+
116+
112117
@pytest.fixture
113118
def mock_api(shared_tmpdir, raw_sessions, units, channels, raw_probes, analysis_metrics):
114119
class MockApi:
115120

116121
def __init__(self, **kwargs):
117122
self.accesses = collections.defaultdict(lambda: 1)
123+
self.rma_engine = MockEngine()
118124

119125
def __getattr__(self, name):
120126
self.accesses[name] += 1
@@ -375,5 +381,83 @@ def test_from_lims_default(tmpdir_factory):
375381
cache = epc.EcephysProjectCache.from_lims(
376382
manifest=os.path.join(tmpdir, "manifest.json")
377383
)
378-
assert isinstance(cache.fetch_api.app_engine, AsyncHttpEngine)
379-
assert cache.stream_writer is epc.write_bytes_from_coroutine
384+
assert isinstance(cache.fetch_api.app_engine, HttpEngine)
385+
assert cache.stream_writer is write_from_stream
386+
assert cache.fetch_api.app_engine.scheme == "http"
387+
assert cache.fetch_api.app_engine.host == "lims2"
388+
389+
390+
def test_from_warehouse_default(tmpdir_factory):
391+
tmpdir = str(tmpdir_factory.mktemp("test_from_warehouse_default"))
392+
393+
cache = epc.EcephysProjectCache.from_warehouse(
394+
manifest=os.path.join(tmpdir, "manifest.json")
395+
)
396+
assert isinstance(cache.fetch_api.rma_engine, HttpEngine)
397+
assert cache.stream_writer is write_from_stream
398+
assert cache.fetch_api.rma_engine.scheme == "http"
399+
assert cache.fetch_api.rma_engine.host == "api.brain-map.org"
400+
401+
402+
def test_init_default(tmpdir_factory):
403+
tmpdir = str(tmpdir_factory.mktemp("test_init_default"))
404+
cache = epc.EcephysProjectCache(
405+
manifest=os.path.join(tmpdir, "manifest.json")
406+
)
407+
assert isinstance(cache.fetch_api.rma_engine, HttpEngine)
408+
assert cache.stream_writer is cache.fetch_api.rma_engine.write_bytes
409+
assert cache.fetch_api.rma_engine.scheme == "http"
410+
assert cache.fetch_api.rma_engine.host == "api.brain-map.org"
411+
412+
413+
@pytest.mark.parametrize(
414+
("cache_constructor, asynchronous, engine_attr, expected_engine,"
415+
"expected_scheme, expected_host, expected_stream_writer"), [
416+
(
417+
epc.EcephysProjectCache.from_lims, True,
418+
"app_engine", AsyncHttpEngine, "http", "lims2",
419+
write_bytes_from_coroutine
420+
),
421+
(
422+
epc.EcephysProjectCache.from_warehouse, True,
423+
"rma_engine", AsyncHttpEngine, "http", "api.brain-map.org",
424+
write_bytes_from_coroutine
425+
),
426+
(
427+
epc.EcephysProjectCache.from_warehouse, False,
428+
"rma_engine", HttpEngine, "http", "api.brain-map.org",
429+
write_from_stream
430+
),
431+
(
432+
epc.EcephysProjectCache.from_lims, False,
433+
"app_engine", HttpEngine, "http", "lims2",
434+
write_from_stream
435+
),
436+
])
437+
def test_stream_asynchronous_arg(
438+
cache_constructor, asynchronous, engine_attr, expected_engine,
439+
expected_scheme, expected_host, expected_stream_writer,
440+
tmpdir_factory):
441+
""" Ensure the proper stream engine is chosen from the `asynchronous`
442+
argument in the EcephysProjectCache constructors (using other default
443+
values)."""
444+
tmpdir = str(tmpdir_factory.mktemp("test_stream_async_args"))
445+
cache = cache_constructor(
446+
asynchronous=asynchronous,
447+
manifest=os.path.join(tmpdir, "manifest.json")
448+
)
449+
engine = getattr(cache.fetch_api, engine_attr)
450+
assert isinstance(engine, expected_engine)
451+
assert cache.stream_writer is expected_stream_writer
452+
assert engine.scheme == expected_scheme
453+
assert engine.host == expected_host
454+
455+
456+
def test_stream_writer_method_default_correct(tmpdir_factory):
457+
"""Checks that the stream_writer contained in the rma engine is used
458+
when one is not supplied to the __init__ method.
459+
"""
460+
tmpdir = str(tmpdir_factory.mktemp("test_from_warehouse_default"))
461+
manifest = os.path.join(tmpdir, "manifest.json")
462+
cache = epc.EcephysProjectCache(stream_writer=None, manifest=manifest)
463+
assert cache.stream_writer == cache.fetch_api.rma_engine.write_bytes

0 commit comments

Comments
 (0)