Skip to content

Commit f7fc080

Browse files
authored
Merge pull request #432 from cmu-delphi/async-limit
Have async raise 4xx/5xx status codes and add connection limit
2 parents 0e59e6b + df2b662 commit f7fc080

File tree

2 files changed

+35
-11
lines changed

2 files changed

+35
-11
lines changed

integrations/client/test_delphi_epidata.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
from unittest.mock import patch, MagicMock
66

77
# third party
8+
from aiohttp.client_exceptions import ClientResponseError
89
import mysql.connector
10+
import pytest
911

1012
# first party
1113
from delphi.epidata.client.delphi_epidata import Epidata
@@ -15,6 +17,14 @@
1517
# py3tester coverage target
1618
__test_target__ = 'delphi.epidata.client.delphi_epidata'
1719

20+
def fake_epidata_endpoint(func):
21+
"""This can be used as a decorator to enable a bogus Epidata endpoint to return 404 responses."""
22+
def wrapper(*args):
23+
Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/fake_api.php'
24+
func(*args)
25+
Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php'
26+
return wrapper
27+
1828

1929
class DelphiEpidataPythonClientTests(unittest.TestCase):
2030
"""Tests the Python client."""
@@ -509,10 +519,25 @@ def test_async_epidata(self):
509519
'geo_value': '00000',
510520
'time_values': '20200414'
511521
}
512-
], batch_size=10)
513-
responses = [i[0] for i in test_output]*12
522+
]*12, batch_size=10)
523+
responses = [i[0] for i in test_output]
514524
# check response is same as standard covidcast call, using 24 calls to test batch sizing
515525
self.assertEqual(responses,
516526
[Epidata.covidcast('src', 'sig', 'day', 'county', 20200414, '11111'),
517527
Epidata.covidcast('src', 'sig', 'day', 'county', 20200414, '00000')]*12
518528
)
529+
530+
@fake_epidata_endpoint
531+
def test_async_epidata_fail(self):
532+
with pytest.raises(ClientResponseError, match="404, message='Not Found'"):
533+
Epidata.async_epidata([
534+
{
535+
'source': 'covidcast',
536+
'data_source': 'src',
537+
'signals': 'sig',
538+
'time_type': 'day',
539+
'geo_type': 'county',
540+
'geo_value': '11111',
541+
'time_values': '20200414'
542+
}
543+
])

src/client/delphi_epidata.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import asyncio
1414
import warnings
1515

16-
from aiohttp import ClientSession
16+
from aiohttp import ClientSession, TCPConnector
1717
from pkg_resources import get_distribution, DistributionNotFound
1818

1919
# Obtain package version for the user-agent. Uses the installed version by
@@ -709,27 +709,26 @@ def covidcast_nowcast(
709709
return Epidata._request(params)
710710

711711
@staticmethod
712-
def async_epidata(param_list, batch_size=100):
712+
def async_epidata(param_list, batch_size=50):
713713
"""Make asynchronous Epidata calls for a list of parameters."""
714714
async def async_get(params, session):
715715
"""Helper function to make Epidata GET requests."""
716716
async with session.get(Epidata.BASE_URL, params=params) as response:
717+
response.raise_for_status()
717718
return await response.json(), params
718719

719720
async def async_make_calls(param_combos):
720721
"""Helper function to asynchronously make and aggregate Epidata GET requests."""
721722
tasks = []
722-
async with ClientSession() as session:
723+
connector = TCPConnector(limit=batch_size)
724+
async with ClientSession(connector=connector) as session:
723725
for param in param_combos:
724726
task = asyncio.ensure_future(async_get(param, session))
725727
tasks.append(task)
726728
responses = await asyncio.gather(*tasks)
727729
return responses
728730

729-
batches = [param_list[i:i+batch_size] for i in range(0, len(param_list), batch_size)]
730-
responses = []
731-
for batch in batches:
732-
loop = asyncio.get_event_loop()
733-
future = asyncio.ensure_future(async_make_calls(batch))
734-
responses += loop.run_until_complete(future)
731+
loop = asyncio.get_event_loop()
732+
future = asyncio.ensure_future(async_make_calls(param_list))
733+
responses = loop.run_until_complete(future)
735734
return responses

0 commit comments

Comments
 (0)