Skip to content

Commit acf955a

Browse files
authored
Refactored retry config into _retry.py and added support for exponential backoff and Retry-After header (#871)
* Refactored retry config to `_retry.py` and added support for backoff and Retry-After * Added unit tests for `_retry.py` * Updated unit tests for HTTPX request errors * Address review comments
1 parent a4bc029 commit acf955a

File tree

4 files changed

+708
-104
lines changed

4 files changed

+708
-104
lines changed

firebase_admin/_retry.py

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
# Copyright 2025 Google Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Internal retry logic module
16+
17+
This module provides utilities for adding retry logic to HTTPX requests
18+
"""
19+
20+
from __future__ import annotations
21+
import copy
22+
import email.utils
23+
import random
24+
import re
25+
import time
26+
from types import CoroutineType
27+
from typing import Any, Callable, List, Optional, Tuple
28+
import logging
29+
import asyncio
30+
import httpx
31+
32+
logger = logging.getLogger(__name__)
33+
34+
35+
class HttpxRetry:
36+
"""HTTPX based retry config"""
37+
# Status codes to be used for respecting `Retry-After` header
38+
RETRY_AFTER_STATUS_CODES = frozenset([413, 429, 503])
39+
40+
# Default maximum backoff time.
41+
DEFAULT_BACKOFF_MAX = 120
42+
43+
def __init__(
44+
self,
45+
max_retries: int = 10,
46+
status_forcelist: Optional[List[int]] = None,
47+
backoff_factor: float = 0,
48+
backoff_max: float = DEFAULT_BACKOFF_MAX,
49+
backoff_jitter: float = 0,
50+
history: Optional[List[Tuple[
51+
httpx.Request,
52+
Optional[httpx.Response],
53+
Optional[Exception]
54+
]]] = None,
55+
respect_retry_after_header: bool = False,
56+
) -> None:
57+
self.retries_left = max_retries
58+
self.status_forcelist = status_forcelist
59+
self.backoff_factor = backoff_factor
60+
self.backoff_max = backoff_max
61+
self.backoff_jitter = backoff_jitter
62+
if history:
63+
self.history = history
64+
else:
65+
self.history = []
66+
self.respect_retry_after_header = respect_retry_after_header
67+
68+
def copy(self) -> HttpxRetry:
69+
"""Creates a deep copy of this instance."""
70+
return copy.deepcopy(self)
71+
72+
def is_retryable_response(self, response: httpx.Response) -> bool:
73+
"""Determine if a response implies that the request should be retried if possible."""
74+
if self.status_forcelist and response.status_code in self.status_forcelist:
75+
return True
76+
77+
has_retry_after = bool(response.headers.get("Retry-After"))
78+
if (
79+
self.respect_retry_after_header
80+
and has_retry_after
81+
and response.status_code in self.RETRY_AFTER_STATUS_CODES
82+
):
83+
return True
84+
85+
return False
86+
87+
def is_exhausted(self) -> bool:
88+
"""Determine if there are anymore more retires."""
89+
# retries_left is negative
90+
return self.retries_left < 0
91+
92+
# Identical implementation of `urllib3.Retry.parse_retry_after()`
93+
def _parse_retry_after(self, retry_after_header: str) -> float | None:
94+
"""Parses Retry-After string into a float with unit seconds."""
95+
seconds: float
96+
# Whitespace: https://tools.ietf.org/html/rfc7230#section-3.2.4
97+
if re.match(r"^\s*[0-9]+\s*$", retry_after_header):
98+
seconds = int(retry_after_header)
99+
else:
100+
retry_date_tuple = email.utils.parsedate_tz(retry_after_header)
101+
if retry_date_tuple is None:
102+
raise httpx.RemoteProtocolError(f"Invalid Retry-After header: {retry_after_header}")
103+
104+
retry_date = email.utils.mktime_tz(retry_date_tuple)
105+
seconds = retry_date - time.time()
106+
107+
seconds = max(seconds, 0)
108+
109+
return seconds
110+
111+
def get_retry_after(self, response: httpx.Response) -> float | None:
112+
"""Determine the Retry-After time needed before sending the next request."""
113+
retry_after_header = response.headers.get('Retry-After', None)
114+
if retry_after_header:
115+
# Convert retry header to a float in seconds
116+
return self._parse_retry_after(retry_after_header)
117+
return None
118+
119+
def get_backoff_time(self):
120+
"""Determine the backoff time needed before sending the next request."""
121+
# attempt_count is the number of previous request attempts
122+
attempt_count = len(self.history)
123+
# Backoff should be set to 0 until after first retry.
124+
if attempt_count <= 1:
125+
return 0
126+
backoff = self.backoff_factor * (2 ** (attempt_count-1))
127+
if self.backoff_jitter:
128+
backoff += random.random() * self.backoff_jitter
129+
return float(max(0, min(self.backoff_max, backoff)))
130+
131+
async def sleep_for_backoff(self) -> None:
132+
"""Determine and wait the backoff time needed before sending the next request."""
133+
backoff = self.get_backoff_time()
134+
logger.debug('Sleeping for backoff of %f seconds following failed request', backoff)
135+
await asyncio.sleep(backoff)
136+
137+
async def sleep(self, response: httpx.Response) -> None:
138+
"""Determine and wait the time needed before sending the next request."""
139+
if self.respect_retry_after_header:
140+
retry_after = self.get_retry_after(response)
141+
if retry_after:
142+
logger.debug(
143+
'Sleeping for Retry-After header of %f seconds following failed request',
144+
retry_after
145+
)
146+
await asyncio.sleep(retry_after)
147+
return
148+
await self.sleep_for_backoff()
149+
150+
def increment(
151+
self,
152+
request: httpx.Request,
153+
response: Optional[httpx.Response] = None,
154+
error: Optional[Exception] = None
155+
) -> None:
156+
"""Update the retry state based on request attempt."""
157+
self.retries_left -= 1
158+
self.history.append((request, response, error))
159+
160+
161+
class HttpxRetryTransport(httpx.AsyncBaseTransport):
162+
"""HTTPX transport with retry logic."""
163+
164+
DEFAULT_RETRY = HttpxRetry(max_retries=4, status_forcelist=[500, 503], backoff_factor=0.5)
165+
166+
def __init__(self, retry: HttpxRetry = DEFAULT_RETRY, **kwargs) -> None:
167+
self._retry = retry
168+
169+
transport_kwargs = kwargs.copy()
170+
transport_kwargs.update({'retries': 0, 'http2': True})
171+
# We use a full AsyncHTTPTransport under the hood that is already
172+
# set up to handle requests. We also insure that that transport's internal
173+
# retries are not allowed.
174+
self._wrapped_transport = httpx.AsyncHTTPTransport(**transport_kwargs)
175+
176+
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
177+
return await self._dispatch_with_retry(
178+
request, self._wrapped_transport.handle_async_request)
179+
180+
async def _dispatch_with_retry(
181+
self,
182+
request: httpx.Request,
183+
dispatch_method: Callable[[httpx.Request], CoroutineType[Any, Any, httpx.Response]]
184+
) -> httpx.Response:
185+
"""Sends a request with retry logic using a provided dispatch method."""
186+
# This request config is used across all requests that use this transport and therefore
187+
# needs to be copied to be used for just this request and it's retries.
188+
retry = self._retry.copy()
189+
# First request
190+
response, error = None, None
191+
192+
while not retry.is_exhausted():
193+
194+
# First retry
195+
if response:
196+
await retry.sleep(response)
197+
198+
# Need to reset here so only last attempt's error or response is saved.
199+
response, error = None, None
200+
201+
try:
202+
logger.debug('Sending request in _dispatch_with_retry(): %r', request)
203+
response = await dispatch_method(request)
204+
logger.debug('Received response: %r', response)
205+
except httpx.HTTPError as err:
206+
logger.debug('Received error: %r', err)
207+
error = err
208+
209+
if response and not retry.is_retryable_response(response):
210+
return response
211+
212+
if error:
213+
raise error
214+
215+
retry.increment(request, response, error)
216+
217+
if response:
218+
return response
219+
if error:
220+
raise error
221+
raise AssertionError('_dispatch_with_retry() ended with no response or exception')
222+
223+
async def aclose(self) -> None:
224+
await self._wrapped_transport.aclose()

firebase_admin/messaging.py

Lines changed: 9 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import json
2121
import warnings
2222
import asyncio
23+
import logging
2324
import requests
2425
import httpx
2526

@@ -38,7 +39,9 @@
3839
exceptions,
3940
App
4041
)
42+
from firebase_admin._retry import HttpxRetryTransport
4143

44+
logger = logging.getLogger(__name__)
4245

4346
_MESSAGING_ATTRIBUTE = '_messaging'
4447

@@ -376,15 +379,6 @@ def exception(self):
376379
"""A ``FirebaseError`` if an error occurs while sending the message to the FCM service."""
377380
return self._exception
378381

379-
# Auth Flow
380-
# TODO: Remove comments
381-
# The aim here is to be able to get auth credentials right before the request is sent.
382-
# This is similar to what is done in transport.requests.AuthorizedSession().
383-
# We can then pass this in at the client level.
384-
385-
# Notes:
386-
# - This implementations does not cover timeouts on requests sent to refresh credentials.
387-
# - Uses HTTP/1 and a blocking credential for refreshing.
388382
class GoogleAuthCredentialFlow(httpx.Auth):
389383
"""Google Auth Credential Auth Flow"""
390384
def __init__(self, credential: credentials.Credentials):
@@ -410,6 +404,9 @@ def auth_flow(self, request: httpx.Request):
410404
# copy original headers
411405
request.headers = _original_headers.copy()
412406
# mutates request headers
407+
logger.debug(
408+
'Refreshing credentials for request attempt %d',
409+
_credential_refresh_attempt + 1)
413410
self.apply_auth_headers(request)
414411

415412
# Continue to perform the request
@@ -420,6 +417,9 @@ def auth_flow(self, request: httpx.Request):
420417
# on refreshable status codes. Current transport.requests.AuthorizedSession()
421418
# only does this on 401 errors. We should do the same.
422419
if response.status_code in self._refresh_status_codes:
420+
logger.debug(
421+
'Request attempt %d failed due to unauthorized credentials',
422+
_credential_refresh_attempt + 1)
423423
_credential_refresh_attempt += 1
424424
else:
425425
break
@@ -670,11 +670,6 @@ def _handle_batch_error(self, error):
670670
return _gapic_utils.handle_platform_error_from_googleapiclient(
671671
error, _MessagingService._build_fcm_error_googleapiclient)
672672

673-
# TODO: Remove comments
674-
# We should be careful to clean up the httpx clients.
675-
# Since we are using an async client we must also close in async. However we can sync wrap this.
676-
# The close method is called by the app on shutdown/clean-up of each service. We don't seem to
677-
# make use of this much elsewhere.
678673
def close(self) -> None:
679674
asyncio.run(self._async_client.aclose())
680675

@@ -715,45 +710,3 @@ def _build_fcm_error(cls, error_dict) -> Optional[Callable[..., exceptions.Fireb
715710
fcm_code = detail.get('errorCode')
716711
break
717712
return _MessagingService.FCM_ERROR_TYPES.get(fcm_code) if fcm_code else None
718-
719-
720-
# TODO: Remove comments
721-
# Notes:
722-
# This implementation currently only covers basic retires for pre-defined status errors
723-
class HttpxRetryTransport(httpx.AsyncBaseTransport):
724-
"""HTTPX transport with retry logic."""
725-
# We could also support passing kwargs here
726-
def __init__(self, **kwargs) -> None:
727-
# Hardcoded settings for now
728-
self._retryable_status_codes = (500, 503,)
729-
self._max_retry_count = 4
730-
731-
# - We use a full AsyncHTTPTransport under the hood to make use of it's
732-
# fully implemented `handle_async_request()`.
733-
# - We could consider making the `HttpxRetryTransport`` class extend a
734-
# `AsyncHTTPTransport` instead and use the parent class's methods to handle
735-
# requests.
736-
# - We should also ensure that that transport's internal retry is
737-
# not enabled.
738-
transport_kwargs = kwargs.copy()
739-
transport_kwargs.update({'retries': 0, 'http2': True})
740-
self._wrapped_transport = httpx.AsyncHTTPTransport(**transport_kwargs)
741-
742-
743-
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
744-
_retry_count = 0
745-
746-
while True:
747-
# Dispatch request
748-
# Let exceptions pass through for now
749-
response = await self._wrapped_transport.handle_async_request(request)
750-
751-
# Check if request is retryable
752-
if response.status_code in self._retryable_status_codes:
753-
_retry_count += 1
754-
755-
# Return if retries exhausted
756-
if _retry_count > self._max_retry_count:
757-
return response
758-
else:
759-
return response

0 commit comments

Comments
 (0)