Skip to content

[JIT]: just-in-time series computations #646

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 24 commits into
base: ds/bump-pandas
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
14ac2f4
Server: add CovidcastRow helper class for testing
dshemetov Oct 7, 2022
7124021
Server: update csv_to_database to use CovidcastRow
dshemetov Oct 7, 2022
f1b361f
Server: update test_db to use CovidcastRow
dshemetov Oct 7, 2022
01a2011
Server: update test_delete_batch to use CovidcastRow
dshemetov Oct 7, 2022
0ec5b28
Server: update test_delphi_epidata to use CovidcastRow
dshemetov Oct 7, 2022
c49a64a
Server: update test_covidcast_endpoints to use CovidcastRow
dshemetov Oct 7, 2022
48825fc
Server: update test_covidcast to use CovidcastRow
dshemetov Oct 7, 2022
f6d7ae6
Server: update test_utils to use CovidcastRow
dshemetov Oct 7, 2022
eac044f
Server: update TimePair to auto-sort tuples
dshemetov Oct 7, 2022
5b998d5
Server: minor model.py data_source_by_id name update
dshemetov Oct 7, 2022
dc26e55
Server: update csv issue none handling
dshemetov Dec 1, 2022
f8de0ba
Server: add type hints to _query
dshemetov Nov 4, 2022
f264325
Acquisition: update test_csv_uploading to remove Pandas warning
dshemetov Oct 11, 2022
287af33
Server: add PANDAS_DTYPES to model.py
dshemetov Dec 5, 2022
0962cd3
Docker: add more_itertools==8.4.0 to Python and API images
dshemetov Dec 5, 2022
9fc5535
Acquisition: update database.py to use CovidcastRow
dshemetov Dec 5, 2022
0fa53d7
Docker: bump API and Python pandas to 1.5.1
dshemetov Dec 5, 2022
a1efe48
JIT: major feature commit
dshemetov Dec 5, 2022
2391d10
CI: Build a container image from this branch
korlaxxalrok Oct 11, 2022
7b0addb
JIT: Small name improvement in model.py
dshemetov Dec 8, 2022
e2a2585
JIT: reduce the iterator stack on non-derived signals
dshemetov Dec 8, 2022
4c023f3
JIT: rewrite a few iterators and reduce the iterator stack
dshemetov Dec 9, 2022
2e09038
JIT: iterator optimizations
dshemetov Dec 9, 2022
f6dfed9
Merge pull request #1049 from cmu-delphi/ds/jit-faster-base-signals
dshemetov Feb 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ jobs:
image:
needs: build
# only on main and dev branch
if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev'
#if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev'
if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev' || github.ref == 'refs/heads/jit_computations'

runs-on: ubuntu-latest
steps:
Expand Down
8 changes: 7 additions & 1 deletion deploy.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,20 @@
"match": "^.*\\.(py)$",
"add-header-comment": true
},
{
"type": "move",
"src": "src/server/utils",
"dst": "[[package]]/server/utils/",
"match": "^.*\\.(py)$",
"add-header-comment": true
},
{
"type": "move",
"src": "src/server/endpoints/covidcast_utils",
"dst": "[[package]]/server/endpoints/covidcast_utils/",
"match": "^.*\\.(py)$",
"add-header-comment": true
},

"// acquisition - fluview",
{
"type": "move",
Expand Down
4 changes: 2 additions & 2 deletions integrations/acquisition/covidcast/test_csv_uploading.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ def test_uploading(self):
"time_value": [20200419],
"signal": [signal_name],
"direction": [None]})], axis=1).rename(columns=uploader_column_rename)
expected_values_df["missing_value"].iloc[0] = Nans.OTHER
expected_values_df["missing_sample_size"].iloc[0] = Nans.NOT_MISSING
expected_values_df.loc[0, "missing_value"] = Nans.OTHER
expected_values_df.loc[0, "missing_sample_size"] = Nans.NOT_MISSING
expected_values = expected_values_df.to_dict(orient="records")
expected_response = {'result': 1, 'epidata': self.apply_lag(expected_values), 'message': 'success'}

Expand Down
15 changes: 8 additions & 7 deletions integrations/acquisition/covidcast/test_db.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import unittest

from delphi_utils import Nans
from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow, DBLoadStateException

from delphi.epidata.acquisition.covidcast.database import DBLoadStateException
from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow
from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase
import delphi.operations.secrets as secrets


# all the Nans we use here are just one value, so this is a shortcut to it:
nmv = Nans.NOT_MISSING.value

Expand All @@ -31,8 +32,8 @@ def _find_matches_for_row(self, row):

def test_insert_or_update_with_nonempty_load_table(self):
# make rows
a_row = self._make_placeholder_row()[0]
another_row = self._make_placeholder_row(time_value=self.DEFAULT_TIME_VALUE+1, issue=self.DEFAULT_ISSUE+1)[0]
a_row = CovidcastRow(time_value=20200202)
another_row = CovidcastRow(time_value=20200203, issue=20200203)
# insert one
self._db.insert_or_update_bulk([a_row])
# put something into the load table
Expand Down Expand Up @@ -61,7 +62,7 @@ def test_id_sync(self):
latest_view = 'epimetric_latest_v'

# add a data point
base_row, _ = self._make_placeholder_row()
base_row = CovidcastRow()
self._insert_rows([base_row])
# ensure the primary keys match in the latest and history tables
matches = self._find_matches_for_row(base_row)
Expand All @@ -71,7 +72,7 @@ def test_id_sync(self):
old_pk_id = matches[latest_view][pk_column]

# add a reissue for said data point
next_row, _ = self._make_placeholder_row()
next_row = CovidcastRow()
next_row.issue += 1
self._insert_rows([next_row])
# ensure the new keys also match
Expand Down
7 changes: 2 additions & 5 deletions integrations/acquisition/covidcast/test_delete_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@
import unittest
from os import path

# third party
import mysql.connector

# first party
from delphi_utils import Nans
from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow
import delphi.operations.secrets as secrets
from delphi.epidata.acquisition.covidcast.database import Database
from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow

# py3tester coverage target (equivalent to `import *`)
__test_target__ = 'delphi.epidata.acquisition.covidcast.database'
Expand Down
110 changes: 55 additions & 55 deletions integrations/client/test_delphi_epidata.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
"""Integration tests for delphi_epidata.py."""

# standard library
import unittest
import time
from unittest.mock import patch, MagicMock
from json import JSONDecodeError
from unittest.mock import MagicMock, patch

# third party
from aiohttp.client_exceptions import ClientResponseError
import mysql.connector
# first party
import pytest
from aiohttp.client_exceptions import ClientResponseError

# first party
from delphi_utils import Nans
from delphi.epidata.client.delphi_epidata import Epidata
from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow
# third party
import delphi.operations.secrets as secrets
from delphi.epidata.acquisition.covidcast.covidcast_meta_cache_updater import main as update_covidcast_meta_cache
from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow
from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase
import delphi.operations.secrets as secrets
from delphi.epidata.client.delphi_epidata import Epidata
from delphi_utils import Nans


# py3tester coverage target
__test_target__ = 'delphi.epidata.client.delphi_epidata'
# all the Nans we use here are just one value, so this is a shortcut to it:
nmv = Nans.NOT_MISSING.value
IGNORE_FIELDS = ["id", "direction_updated_timestamp", "value_updated_timestamp", "source", "time_type", "geo_type"]

def fake_epidata_endpoint(func):
"""This can be used as a decorator to enable a bogus Epidata endpoint to return 404 responses."""
Expand All @@ -30,9 +32,6 @@ def wrapper(*args):
Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php'
return wrapper

# all the Nans we use here are just one value, so this is a shortcut to it:
nmv = Nans.NOT_MISSING.value

class DelphiEpidataPythonClientTests(CovidcastBase):
"""Tests the Python client."""

Expand All @@ -54,12 +53,12 @@ def test_covidcast(self):

# insert placeholder data: three issues of one signal, one issue of another
rows = [
self._make_placeholder_row(issue=self.DEFAULT_ISSUE + i, value=i, lag=i)[0]
CovidcastRow(issue=20200202 + i, value=i, lag=i)
for i in range(3)
]
row_latest_issue = rows[-1]
rows.append(
self._make_placeholder_row(signal="sig2")[0]
CovidcastRow(signal="sig2")
)
self._insert_rows(rows)

Expand All @@ -70,10 +69,11 @@ def test_covidcast(self):
)

expected = [
self.expected_from_row(row_latest_issue),
self.expected_from_row(rows[-1])
row_latest_issue.as_dict(ignore_fields=IGNORE_FIELDS),
rows[-1].as_dict(ignore_fields=IGNORE_FIELDS)
]

self.assertEqual(response['epidata'], expected)
# check result
self.assertEqual(response, {
'result': 1,
Expand All @@ -89,10 +89,10 @@ def test_covidcast(self):

expected = [{
rows[0].signal: [
self.expected_from_row(row_latest_issue, self.DEFAULT_MINUS + ['signal']),
row_latest_issue.as_dict(ignore_fields=IGNORE_FIELDS + ['signal']),
],
rows[-1].signal: [
self.expected_from_row(rows[-1], self.DEFAULT_MINUS + ['signal']),
rows[-1].as_dict(ignore_fields=IGNORE_FIELDS + ['signal']),
],
}]

Expand All @@ -109,12 +109,12 @@ def test_covidcast(self):
**self.params_from_row(rows[0])
)

expected = self.expected_from_row(row_latest_issue)
expected = [row_latest_issue.as_dict(ignore_fields=IGNORE_FIELDS)]

# check result
self.assertEqual(response_1, {
'result': 1,
'epidata': [expected],
'epidata': expected,
'message': 'success',
})

Expand All @@ -124,13 +124,13 @@ def test_covidcast(self):
**self.params_from_row(rows[0], as_of=rows[1].issue)
)

expected = self.expected_from_row(rows[1])
expected = [rows[1].as_dict(ignore_fields=IGNORE_FIELDS)]

# check result
self.maxDiff=None
self.assertEqual(response_1a, {
'result': 1,
'epidata': [expected],
'epidata': expected,
'message': 'success',
})

Expand All @@ -141,8 +141,8 @@ def test_covidcast(self):
)

expected = [
self.expected_from_row(rows[0]),
self.expected_from_row(rows[1])
rows[0].as_dict(ignore_fields=IGNORE_FIELDS),
rows[1].as_dict(ignore_fields=IGNORE_FIELDS)
]

# check result
Expand All @@ -158,12 +158,12 @@ def test_covidcast(self):
**self.params_from_row(rows[0], lag=2)
)

expected = self.expected_from_row(row_latest_issue)
expected = [row_latest_issue.as_dict(ignore_fields=IGNORE_FIELDS)]

# check result
self.assertDictEqual(response_3, {
'result': 1,
'epidata': [expected],
'epidata': expected,
'message': 'success',
})
with self.subTest(name='long request'):
Expand Down Expand Up @@ -223,16 +223,16 @@ def test_geo_value(self):
# insert placeholder data: three counties, three MSAs
N = 3
rows = [
self._make_placeholder_row(geo_type="county", geo_value=str(i)*5, value=i)[0]
CovidcastRow(geo_type="county", geo_value=str(i)*5, value=i)
for i in range(N)
] + [
self._make_placeholder_row(geo_type="msa", geo_value=str(i)*5, value=i*10)[0]
CovidcastRow(geo_type="msa", geo_value=str(i)*5, value=i*10)
for i in range(N)
]
self._insert_rows(rows)

counties = [
self.expected_from_row(rows[i]) for i in range(N)
rows[i].as_dict(ignore_fields=IGNORE_FIELDS) for i in range(N)
]

def fetch(geo):
Expand All @@ -241,31 +241,31 @@ def fetch(geo):
)

# test fetch all
r = fetch('*')
self.assertEqual(r['message'], 'success')
self.assertEqual(r['epidata'], counties)
request = fetch('*')
self.assertEqual(request['message'], 'success')
self.assertEqual(request['epidata'], counties)
# test fetch a specific region
r = fetch('11111')
self.assertEqual(r['message'], 'success')
self.assertEqual(r['epidata'], [counties[1]])
request = fetch('11111')
self.assertEqual(request['message'], 'success')
self.assertEqual(request['epidata'], [counties[1]])
# test fetch a specific yet not existing region
r = fetch('55555')
self.assertEqual(r['message'], 'no results')
request = fetch('55555')
self.assertEqual(request['message'], 'no results')
# test fetch a multiple regions
r = fetch(['11111', '22222'])
self.assertEqual(r['message'], 'success')
self.assertEqual(r['epidata'], [counties[1], counties[2]])
request = fetch(['11111', '22222'])
self.assertEqual(request['message'], 'success')
self.assertEqual(request['epidata'], [counties[1], counties[2]])
# test fetch a multiple regions in another variant
r = fetch(['00000', '22222'])
self.assertEqual(r['message'], 'success')
self.assertEqual(r['epidata'], [counties[0], counties[2]])
request = fetch(['00000', '22222'])
self.assertEqual(request['message'], 'success')
self.assertEqual(request['epidata'], [counties[0], counties[2]])
# test fetch a multiple regions but one is not existing
r = fetch(['11111', '55555'])
self.assertEqual(r['message'], 'success')
self.assertEqual(r['epidata'], [counties[1]])
request = fetch(['11111', '55555'])
self.assertEqual(request['message'], 'success')
self.assertEqual(request['epidata'], [counties[1]])
# test fetch a multiple regions but specify no region
r = fetch([])
self.assertEqual(r['message'], 'no results')
request = fetch([])
self.assertEqual(request['message'], 'no results')

def test_covidcast_meta(self):
"""Test that the covidcast_meta endpoint returns expected data."""
Expand All @@ -275,7 +275,7 @@ def test_covidcast_meta(self):
# 2nd issue: 1 11 21
# 3rd issue: 2 12 22
rows = [
self._make_placeholder_row(time_value=self.DEFAULT_TIME_VALUE + t, issue=self.DEFAULT_ISSUE + i, value=t*10 + i)[0]
CovidcastRow(time_value=2020_02_02 + t, issue=2020_02_02 + i, value=t*10 + i)
for i in range(3) for t in range(3)
]
self._insert_rows(rows)
Expand All @@ -299,14 +299,14 @@ def test_covidcast_meta(self):
signal=rows[0].signal,
time_type=rows[0].time_type,
geo_type=rows[0].geo_type,
min_time=self.DEFAULT_TIME_VALUE,
max_time=self.DEFAULT_TIME_VALUE + 2,
min_time=2020_02_02,
max_time=2020_02_02 + 2,
num_locations=1,
min_value=2.,
mean_value=12.,
max_value=22.,
stdev_value=8.1649658, # population stdev, not sample, which is 10.
max_issue=self.DEFAULT_ISSUE + 2,
max_issue=2020_02_02 + 2,
min_lag=0,
max_lag=0, # we didn't set lag when inputting data
)
Expand All @@ -322,10 +322,10 @@ def test_async_epidata(self):
# insert placeholder data: three counties, three MSAs
N = 3
rows = [
self._make_placeholder_row(geo_type="county", geo_value=str(i)*5, value=i)[0]
CovidcastRow(geo_type="county", geo_value=str(i)*5, value=i)
for i in range(N)
] + [
self._make_placeholder_row(geo_type="msa", geo_value=str(i)*5, value=i*10)[0]
CovidcastRow(geo_type="msa", geo_value=str(i)*5, value=i*10)
for i in range(N)
]
self._insert_rows(rows)
Expand Down
Loading