Skip to content

Commit 2044e69

Browse files
committed
Merge branch 'dev' into leonlu2/test_signal_wildcard
2 parents 7457a2c + 0666298 commit 2044e69

File tree

10 files changed

+192
-16
lines changed

10 files changed

+192
-16
lines changed

dev/local/Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ web:
8383
@# Run the web server
8484
@docker run --rm -p 127.0.0.1:10080:80 \
8585
--env "SQLALCHEMY_DATABASE_URI=mysql+mysqldb://user:pass@delphi_database_epidata:3306/epidata" \
86-
--env "FLASK_SECRET=abc" --env "FLASK_PREFIX=/epidata" \
86+
--env "FLASK_SECRET=abc" --env "FLASK_PREFIX=/epidata" --env "LOG_DEBUG" \
8787
--network delphi-net --name delphi_web_epidata \
8888
delphi_web_epidata >$(LOG_WEB) 2>&1 &
8989

requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ tenacity==7.0.0
1212
newrelic
1313
epiweeks==2.1.2
1414
typing-extensions
15+
structlog==22.1.0

src/acquisition/covidcast/logger.py

+16-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
"""Structured logger utility for creating JSON logs in Delphi pipelines."""
22
import logging
3+
import os
34
import sys
45
import threading
56
import structlog
67

7-
88
def handle_exceptions(logger):
99
"""Handle exceptions using the provided logger."""
1010
def exception_handler(etype, value, traceback):
@@ -45,12 +45,24 @@ def get_structured_logger(name=__name__,
4545
if filename:
4646
handlers.append(logging.FileHandler(filename))
4747

48+
if "LOG_DEBUG" in os.environ:
49+
log_level = logging.DEBUG
50+
else:
51+
log_level = logging.INFO
52+
4853
logging.basicConfig(
4954
format="%(message)s",
50-
level=logging.INFO,
55+
level=log_level,
5156
handlers=handlers
5257
)
5358

59+
def add_pid(logger, method_name, event_dict):
60+
"""
61+
Add current PID to the event dict.
62+
"""
63+
event_dict["pid"] = os.getpid()
64+
return event_dict
65+
5466
# Configure structlog. This uses many of the standard suggestions from
5567
# the structlog documentation.
5668
structlog.configure(
@@ -61,6 +73,8 @@ def get_structured_logger(name=__name__,
6173
structlog.stdlib.add_logger_name,
6274
# Include log level in output.
6375
structlog.stdlib.add_log_level,
76+
# Include PID in output.
77+
add_pid,
6478
# Allow formatting into arguments e.g., logger.info("Hello, %s",
6579
# name)
6680
structlog.stdlib.PositionalArgumentsFormatter(),

src/server/_common.py

+51-4
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
from typing import cast
2+
import time
23

34
from flask import Flask, g, request
5+
from sqlalchemy import event
46
from sqlalchemy.engine import Connection
57
from werkzeug.local import LocalProxy
68

9+
from .utils.logger import get_structured_logger
710
from ._config import SECRET
811
from ._db import engine
9-
from ._exceptions import DatabaseErrorException
12+
from ._exceptions import DatabaseErrorException, EpiDataException
1013

1114
app = Flask("EpiData", static_url_path="")
1215
app.config["SECRET"] = SECRET
@@ -24,19 +27,53 @@ def _get_db() -> Connection:
2427
"""
2528
db: Connection = cast(Connection, LocalProxy(_get_db))
2629

30+
@event.listens_for(engine, "before_cursor_execute")
31+
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
32+
context._query_start_time = time.time()
33+
34+
35+
@event.listens_for(engine, "after_cursor_execute")
36+
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
37+
# this timing info may be suspect, at least in terms of dbms cpu time...
38+
# it is likely that it includes that time as well as any overhead that
39+
# comes from throttling or flow control on the streamed data, as well as
40+
# any row transform/processing time
41+
total_time = time.time() - context._query_start_time
42+
43+
# Convert to milliseconds
44+
total_time *= 1000
45+
get_structured_logger('server_api').info("Executed SQL", statement=statement, params=parameters, elapsed_time_ms=total_time)
46+
2747

2848
@app.before_request
29-
def connect_db():
49+
def before_request_execute():
50+
# Set timer for statement
51+
g._request_start_time = time.time()
52+
53+
# Log statement
54+
get_structured_logger('server_api').info("Received API request", method=request.method, url=request.url, form_args=request.form, req_length=request.content_length, remote_addr=request.remote_addr, user_agent=request.user_agent.string)
55+
3056
if request.path.startswith('/lib'):
3157
return
3258
# try to get the db
3359
try:
3460
_get_db()
35-
except:
36-
app.logger.error('database connection error', exc_info=True)
61+
except Exception as e:
62+
get_structured_logger('server_error').error('database connection error', exception=e)
3763
raise DatabaseErrorException()
3864

3965

66+
@app.after_request
67+
def after_request_execute(response):
68+
total_time = time.time() - g._request_start_time
69+
# Convert to milliseconds
70+
total_time *= 1000
71+
get_structured_logger('server_api').info('Served API request', method=request.method, url=request.url, form_args=request.form, req_length=request.content_length, remote_addr=request.remote_addr, user_agent=request.user_agent.string,
72+
values=request.values.to_dict(flat=False), blueprint=request.blueprint, endpoint=request.endpoint,
73+
response_status=response.status, content_length=response.calculate_content_length(), elapsed_time_ms=total_time)
74+
return response
75+
76+
4077
@app.teardown_appcontext
4178
def teardown_db(exception=None):
4279
# close the db connection
@@ -46,6 +83,16 @@ def teardown_db(exception=None):
4683
db.close()
4784

4885

86+
@app.errorhandler(EpiDataException)
87+
def handle_exception(e):
88+
# Log error and pass through; EpiDataExceptions are HTTPExceptions which are valid WSGI responses (see https://werkzeug.palletsprojects.com/en/2.2.x/exceptions/ )
89+
if isinstance(e, DatabaseErrorException):
90+
get_structured_logger('server_error').error('Received DatabaseErrorException', exception=str(e), exc_info=True)
91+
else:
92+
get_structured_logger('server_error').warn('Encountered user-side error', exception=str(e))
93+
return e
94+
95+
4996
def is_compatibility_mode() -> bool:
5097
"""
5198
checks whether this request is in compatibility mode

src/server/_printer.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
import orjson
88

99
from ._config import MAX_RESULTS, MAX_COMPATIBILITY_RESULTS
10-
from ._common import app, is_compatibility_mode
10+
from ._common import is_compatibility_mode
11+
from .utils.logger import get_structured_logger
1112

1213

1314
def print_non_standard(data):
@@ -58,7 +59,7 @@ def gen():
5859
if r is not None:
5960
yield r
6061
except Exception as e:
61-
app.logger.exception(f"error executing: {str(e)}")
62+
get_structured_logger('server_error').error("Exception while executing printer", exception=e)
6263
self.result = -1
6364
yield self._error(e)
6465

src/server/_query.py

-1
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,6 @@ def run_query(p: APrinter, query_tuple: Tuple[str, Dict[str, Any]]):
241241
query, params = query_tuple
242242
# limit rows + 1 for detecting whether we would have more
243243
full_query = text(limit_query(query, p.remaining_rows + 1))
244-
app.logger.info("full_query: %s, params: %s", full_query, params)
245244
return db.execution_options(stream_results=True).execute(full_query, **params)
246245

247246

src/server/endpoints/covidcast_meta.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@
55
from flask.json import loads
66
from sqlalchemy import text
77

8-
from .._common import db, app
8+
from .._common import db
99
from .._printer import create_printer
1010
from .._query import filter_fields
1111
from .._validate import extract_strings
12+
from ..utils.logger import get_structured_logger
1213

1314
bp = Blueprint("covidcast_meta", __name__)
1415

@@ -41,11 +42,12 @@ def fetch_data(
4142
).fetchone()
4243

4344
if not row or not row["epidata"]:
45+
get_structured_logger('server_api').warning("no data in covidcast_meta cache")
4446
return
4547

4648
age = row["age"]
4749
if age > max_age and row["epidata"]:
48-
app.logger.warning("covidcast_meta cache is stale: %d", age)
50+
get_structured_logger('server_api').warning("covidcast_meta cache is stale", cache_age=age)
4951
pass
5052

5153
epidata = loads(row["epidata"])

src/server/main.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,4 @@ def send_lib_file(path: str):
5757
app.logger.handlers = gunicorn_logger.handlers
5858
app.logger.setLevel(gunicorn_logger.level)
5959
sqlalchemy_logger = logging.getLogger("sqlalchemy")
60-
sqlalchemy_logger.handlers = gunicorn_logger.handlers
61-
sqlalchemy_logger.setLevel(gunicorn_logger.level)
60+
sqlalchemy_logger.setLevel(logging.WARN)

src/server/utils/dates.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
Tuple,
66
Union
77
)
8+
from .logger import get_structured_logger
89
from datetime import date, timedelta
910
from epiweeks import Week, Year
1011
from typing_extensions import TypeAlias
11-
import logging
1212

1313
# Alias for a sequence of date ranges (int, int) or date integers
1414
TimeValues: TypeAlias = Sequence[Union[Tuple[int, int], int]]
@@ -86,16 +86,22 @@ def time_values_to_ranges(values: Optional[TimeValues]) -> Optional[TimeValues]:
8686
e.g. [20200101, 20200102, (20200101, 20200104), 20200106] -> [(20200101, 20200104), 20200106]
8787
(the first two values of the original list are merged into a single range)
8888
"""
89+
logger = get_structured_logger('server_utils')
8990
if not values or len(values) <= 1:
91+
logger.info("List of dates looks like 0-1 elements, nothing to optimize", time_values=values)
9092
return values
9193

9294
# determine whether the list is of days (YYYYMMDD) or weeks (YYYYWW) based on first element
9395
first_element = values[0][0] if isinstance(values[0], tuple) else values[0]
9496
if guess_time_value_is_day(first_element):
97+
# TODO: reduce this and other date logging to DEBUG after prod metrics gathered
98+
logger.info("Treating time value as day", time_value=first_element)
9599
return days_to_ranges(values)
96100
elif guess_time_value_is_week(first_element):
101+
logger.info("Treating time value as week", time_value=first_element)
97102
return weeks_to_ranges(values)
98103
else:
104+
logger.info("Time value unclear, not optimizing", time_value=first_element)
99105
return values
100106

101107
def days_to_ranges(values: TimeValues) -> TimeValues:
@@ -138,7 +144,9 @@ def _to_ranges(values: TimeValues, value_to_date: Callable, date_to_value: Calla
138144
else:
139145
ranges.append((date_to_value(m[0]), date_to_value(m[1])))
140146

147+
get_structured_logger('server_utils').info("Optimized list of date values", original=values, optimized=ranges, original_length=len(values), optimized_length=len(ranges))
148+
141149
return ranges
142150
except Exception as e:
143-
logging.info('bad input to date ranges', input=values, exception=e)
151+
get_structured_logger('server_utils').error('bad input to date ranges', time_values=values, exception=e)
144152
return values

src/server/utils/logger.py

+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import logging
2+
import os
3+
import sys
4+
import threading
5+
import structlog
6+
7+
def handle_exceptions(logger):
8+
"""Handle exceptions using the provided logger."""
9+
def exception_handler(etype, value, traceback):
10+
logger.exception("Top-level exception occurred",
11+
exc_info=(etype, value, traceback))
12+
13+
def multithread_exception_handler(args):
14+
exception_handler(args.exc_type, args.exc_value, args.exc_traceback)
15+
16+
sys.excepthook = exception_handler
17+
threading.excepthook = multithread_exception_handler
18+
19+
20+
def get_structured_logger(name=__name__,
21+
filename=None,
22+
log_exceptions=True):
23+
"""Create a new structlog logger.
24+
25+
Use the logger returned from this in indicator code using the standard
26+
wrapper calls, e.g.:
27+
28+
logger = get_structured_logger(__name__)
29+
logger.warning("Error", type="Signal too low").
30+
31+
The output will be rendered as JSON which can easily be consumed by logs
32+
processors.
33+
34+
See the structlog documentation for details.
35+
36+
Parameters
37+
---------
38+
name: Name to use for logger (included in log lines), __name__ from caller
39+
is a good choice.
40+
filename: An (optional) file to write log output.
41+
"""
42+
# Configure the underlying logging configuration
43+
handlers = [logging.StreamHandler()]
44+
if filename:
45+
handlers.append(logging.FileHandler(filename))
46+
47+
if "LOG_DEBUG" in os.environ:
48+
log_level = logging.DEBUG
49+
else:
50+
log_level = logging.INFO
51+
52+
logging.basicConfig(
53+
format="%(message)s",
54+
level=log_level,
55+
handlers=handlers
56+
)
57+
58+
def add_pid(logger, method_name, event_dict):
59+
"""
60+
Add current PID to the event dict.
61+
"""
62+
event_dict["pid"] = os.getpid()
63+
return event_dict
64+
65+
# Configure structlog. This uses many of the standard suggestions from
66+
# the structlog documentation.
67+
structlog.configure(
68+
processors=[
69+
# Filter out log levels we are not tracking.
70+
structlog.stdlib.filter_by_level,
71+
# Include logger name in output.
72+
structlog.stdlib.add_logger_name,
73+
# Include log level in output.
74+
structlog.stdlib.add_log_level,
75+
# Include PID in output.
76+
add_pid,
77+
# Allow formatting into arguments e.g., logger.info("Hello, %s",
78+
# name)
79+
structlog.stdlib.PositionalArgumentsFormatter(),
80+
# Add timestamps.
81+
structlog.processors.TimeStamper(fmt="iso"),
82+
# Match support for exception logging in the standard logger.
83+
structlog.processors.StackInfoRenderer(),
84+
structlog.processors.format_exc_info,
85+
# Decode unicode characters
86+
structlog.processors.UnicodeDecoder(),
87+
# Render as JSON
88+
structlog.processors.JSONRenderer()
89+
],
90+
# Use a dict class for keeping track of data.
91+
context_class=dict,
92+
# Use a standard logger for the actual log call.
93+
logger_factory=structlog.stdlib.LoggerFactory(),
94+
# Use a standard wrapper class for utilities like log.warning()
95+
wrapper_class=structlog.stdlib.BoundLogger,
96+
# Cache the logger
97+
cache_logger_on_first_use=True,
98+
)
99+
100+
logger = structlog.get_logger(name)
101+
102+
if log_exceptions:
103+
handle_exceptions(logger)
104+
105+
return logger

0 commit comments

Comments
 (0)